参考自 https://github.com/xufengtt/recom_teach_code,学习记录。
环境配置(maxcompute+dataworks)
- 下载天猫推荐数据集;
- 开启 aliyun 的 maxcompute,dataworks,pai;
- 使用 odpscmd 上传本地数据,具体配置方法是在 conf 文件夹配置 odps_config.ini 文件,填写 project_name(recom_maxcompute)、access_id、access_key、end_point 参数;进入 bin 目录运行
odpscmd
;
建立数据特征
上传数据——数据运营层
- 在 dataworks 创建表
CREATE TABLE IF NOT EXISTS item_dim (
item_id STRING,
title STRING,
pict_url STRING,
category STRING,
brand_id STRING,
seller_id STRING
) LIFECYCLE 90;
- 上传数据
odpscmd -e "tunnel upload tianchi_2014001_rec_tmall_product.txt item_dim -fd '\u0001';"
- 查看上传结果
SELECT * FROM item_dim LIMIT 100;
- 同样方式上传其他数据
CREATE TABLE IF NOT EXISTS user_item_beh_log (
item_id STRING ,
user_id STRING ,
action STRING ,
vtime STRING
) LIFECYCLE 90;
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_parta.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partb.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partc.txt user_item_beh_log -fd '\u0001';"
数据仓库层
- 首先查看一些指标,可以看到这个数据集是离工业界数据量较近的数据
SELECT max(vtime), min(vtime), COUNT(DISTINCT item_id), COUNT(DISTINCT user_id), COUNT(*)
FROM user_item_beh_log;
数据仓库中分层模型主要有:
- 三层模型:数据运营层(ODS,Operational Data Store);数据仓库层(DW,Data Warehouse Layer);数据服务层(ADS,Application Data Service)
- 四层模型:添加了多维明细层(DWS,Detailed Warehouse Store)
本项目的数据仓储设计:
- 数据运营层:使用上传数据表
- 数据仓库层:需要建设的粒度,1)时间粒度:天维度存储;2)行为类别:点击/收藏/加购/购买不同行为分开存储;3)key 粒度,用户侧特征 user+brand, user+item, user+cate1_id, user+cate2_id, user;品牌测特征 brand+user,brand+item;用户与品牌交互特征 user+brand,user+brand+item,user+brand+cate1_id, user+brand+cate2_id。因此,按照行为类型和时间进行分拆,点击/收藏/加购/购买四张表,且每张表都以天粒度行为时间为分区。为能支持以上所有特征计算,每个表的字段有 user, brand, item, cate1_id, cate2_id
- 数据服务层:品牌统计特征,用户维度统计特征,用户与品牌交叉统计特征
- 维表(属性表):商品品牌/类目/名称
CREATE TABLE IF NOT EXISTS dw_user_item_click_log (
user_id STRING COMMENT '用户id',
item_id STRING COMMENT '商品id',
brand_id STRING COMMENT '品牌id',
seller_id STRING COMMENT '商家id',
cate1 STRING COMMENT '类目1id',
cate2 STRING COMMENT '类目2id',
op_time STRING COMMENT '点击时间'
) PARTITIONED BY (ds STRING COMMENT '日期分区') LIFECYCLE 90;
ds(date string)是创建分区表时的分区字段,不是表的一个普通字段,所以不需要显式声明,以上面这种方式建立四张行为类别表,天维度分区,以下面这种方式添加数据
INSERT OVERWRITE TABLE dw_user_item_click_log PARTITION (ds)
SELECT user_id, t2.item_id, brand_id, seller_id, cate1, cate2, vtime, ds
FROM (
SELECT user_id, item_id, vtime, to_char(TO_DATE(vtime, 'yyyy-mm-dd hh:mi:ss'), 'yyyymmdd') as ds
FROM user_item_beh_log
WHERE action = 'click'
) t1 join (
SELECT item_id, brand_id, seller_id, SPLIT_PART(category, '-', 1) AS cate1, SPLIT_PART(category, '-', 2) AS cate2
FROM item_dim
) t2 on t1.item_id = t2.item_id;
验证是否成功,SELECT * FROM dw_user_item_click_log WHERE ds = ${bizdate} LIMIT 10;
数据服务层
- 品牌维度特征:点击/收藏/购物车/支付,创建和填充一个用于存储品牌维度特征的表,是基于用户在一定时间范围内与品牌互动的行为数据,下面代码是计算了过去60天内(包括 ${bizdate} 当天)的点击、收藏、购物车和支付的用户数,使用 DISTINCT 确保每个用户只被计算一次。
CREATE TABLE IF NOT EXISTS brand_stat_feature_ads (
brand_id STRING,
click_num BIGINT,
collect_num BIGINT,
cart_num BIGINT,
alipay_num BIGINT
) PARTITIONED BY (ds STRING) LIFECYCLE 60;
INSERT OVERWRITE TABLE brand_stat_feature_ads PARTITION(ds=${bizdate})
SELECT t1.brand_id, click_num
,if(collect_num is null, 0, collect_num)
,if(cart_num is null, 0, cart_num)
,if(alipay_num is null, 0, alipay_num)
FROM (
SELECT brand_id, COUNT(DISTINCT user_id) AS click_num
FROM dw_user_item_click_log
where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
GROUP BY brand_id
) t1 left join (
SELECT brand_id, COUNT(DISTINCT user_id) AS collect_num
FROM dw_user_item_collect_log
where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
GROUP BY brand_id
) t2 on t1.brand_id=t2.brand_id
left join (
SELECT brand_id, COUNT(DISTINCT user_id) AS cart_num
FROM dw_user_item_cart_log
where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
GROUP BY brand_id
) t3 on t1.brand_id=t3.brand_id
left join (
SELECT brand_id, COUNT(DISTINCT user_id) AS alipay_num
FROM dw_user_item_alipay_log
where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
GROUP BY brand_id
) t4 on t1.brand_id=t4.brand_id;
- 用户维度特征:创建存储用户点击/收藏/购物车/支付行为的特征,下面代码是使用 COUNT 来计算每个用户在 3days 窗口内点击的不同商品、品牌、卖家、一级分类和二级分类的数量,以及点击的天数,其他时间窗口和行为同理。
CREATE TABLE IF NOT EXISTS user_click_beh_feature_ads (
user_id STRING,
item_num_3d BIGINT,
brand_num_3d BIGINT,
seller_num_3d BIGINT,
cate1_num_3d BIGINT,
cate2_num_3d BIGINT,
cnt_days_3d BIGINT,
... --3d,7d,15d,60d,90d
)PARTITIONED BY (ds string) LIFECYCLE 60;
INSERT OVERWRITE TABLE user_click_beh_feature_ads PARTITION (ds=${bizdate}) -- 用户点击行为特征
SELECT user_id
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), item_id, null))
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), brand_id, null))
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), seller_id, null))
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate1, null))
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate2, null))
,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), ds, null))
...--3d,7d,15d,60d,90d
FROM dw_user_item_click_log
WHERE ds<=${bizdate} AND ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
GROUP BY user_id;
- 用户与品牌交叉特征
实际场景中,比如20w个品牌,可能只有几千个头部品牌才有实力使用推荐服务,因此选取头部品牌减少计算量,也能实验出不错的效果,这里建立维表brand_cate_dim
,以近30天为准,按照支付用户数量选取top500的品牌:
create table if not exists brand_top500_alipay_dim (
brand_id string,
alipay_num bigint
)LIFECYCLE 60;
insert OVERwrite table brand_top500_alipay_dim
select brand_id, alipay_num
from (
select brand_id, count(DISTINCT user_id) as alipay_num
from dw_user_item_alipay_log
where ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
and brand_id is not null
group by brand_id
)t1 order by alipay_num desc limit 500
;
然后计算用户和品牌的点击商品多样性(clk_num)、点击频次(clk_num),点击品牌天数(clk_day),这里的窗口为 90/60/30/15/7/3/1d,这里增加1d这个强特征,因为1d的短期窗口的行为有关键影响;对于收藏,有收藏商品多样性(clt_item),收藏总次数和天数没有重要意义,购物车和购买有商品多样性(cart_item),购物车频次(cart_num),支付多样性(pay_item),支付频次(pay_num)。这里都使用的top500的品牌,减少计算量。
create table if not exists user_brand_cross_beh_feature_ads (
user_id string,
brand_id string,
clk_item_90d bigint,
...--60/30/15/7/3/1d
clk_num_90d bigint,
...
clk_day_90d bigint,
...
clt_item_90d bigint,
...
cart_item_90d bigint,
...
cart_num_90d bigint,
...
pay_item_90d bigint,
...
pay_num_90d bigint,
...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;
insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,
clk_item_90d,
...
clk_num_90d,
...
clk_day_90d,
...
if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,
...
if(cart_item_90d is null, 0, cart_item_90d) as cart_item_90d,
...
if(cart_num_90d is null, 0, cart_num_90d) as cart_num_90d,
...
if(pay_item_90d is null, 0, pay_item_90d) as pay_item_90d,
...
if(pay_num_90d is null, 0, pay_num_90d) as pay_num_90d,
...
from (
select user_id, brand_id
,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d
...
,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_num_90d
...
,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), ds, null)) as clk_day_90d
...
from (
select t1.user_id, t2.brand_id, t1.item_id, t1.ds
from (
select user_id, brand_id, item_id, ds
from dw_user_item_click_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
)t1 join (
select brand_id
from brand_top500_alipay_dim where ds=${bizdate}
)t2 on t1.brand_id=t2.brand_id
)t1
group by user_id, brand_id
)t1 left join (
select user_id, brand_id
,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d
...
from (
select t1.user_id, t2.brand_id, t1.item_id, t1.ds
from (
select user_id, brand_id, item_id, ds
from dw_user_item_collect_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
)t1 join (
select brand_id
from brand_top500_alipay_dim where ds=${bizdate}
)t2 on t1.brand_id=t2.brand_id
)t1
group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...
此外,用户与品牌交叉特征还有用户与品牌的二级类目的交互数据,先建立品牌二级类目维表(暂时先不考虑一级类目),这里取top500的品牌的每个品牌下用户数最多的top2二级分类,ROW_NUMBER 是窗口函数为结果集中的每一行分配唯一的序号,OVER子句定义窗口函数的分区和排序规则,这里表示数据根据brand_id列的值进行分区,意味着每个品牌ID有自己独立的行号序列。
create table if not exists brand_cate2_dim (
brand_id string,
cate2 string
)lifecycle 60;
insert overwrite table brand_cate2_dim
select brand_id, cate2
from (
select brand_id, cate2, ROW_NUMBER() OVER(PARTITION BY brand_id ORDER BY num desc) AS number
from (
select t1.brand_id, t1.cate2, count(distinct user_id) as num
from (
select user_id, ds, brand_id, cate2
from dw_user_item_alipay_log
where ds<${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
)t1 join (
select brand_id
from brand_top500_alipay_dim where ds=${bizdate}
)t2 on t1.brand_id=t2.brand_id
group by t1.cate2, t1.brand_id
)t1
)t1 where number <=2
;
得到二级分类维表后,计算用户和二级分类交叉数据就简单了,这里使用cate2(二级分类)作为连接条件关联日志和二级分类维表,这样做的目的是将用户日志和品牌的二级分类信息关联起来,尽管连接条件是cate2,但查询的最终目的是获取与特定二级分类相关的用户点击行为,并将其与品牌ID关联,所以查询仍然是选择brand_id,将用户行为与品牌信息关联起来,以便进行更有针对性地分析和决策,因此这里是select t2.brand_id。/*+mapjoin(t2)*/
这个提示告诉查询优化器在执行连接操作时,将t2表加载到内存中,在map阶段完成与t1表的连接,前提是t2表足够小,这可以显著提高查询性能。
create table if not exists user_brand_cate2_cross_beh_feature_ads (
user_id string,
brand_id string,
clk_item_90d bigint,
...--60/30/15/7/3/1d
...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;
insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,
clk_item_90d,
...
if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,
...
from (
select user_id, brand_id
,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d
...
from (
select /*+mapjoin(t2)*/
t1.user_id, t2.brand_id, item_id, t1.ds
from (
select user_id, brand_id, item_id, ds
from dw_user_item_click_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
)t1 join (
select brand_id
from brand_top500_alipay_dim where ds=${bizdate}
)t2 on t1.brand_id=t2.brand_id
)t1
group by user_id, brand_id
)t1 left join (
select user_id, brand_id
,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d
...
from (
select t1.user_id, t2.brand_id, item_id, t1.ds
from (
select user_id, brand_id, item_id, ds
from dw_user_item_collect_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
)t1 join (
select brand_id
from brand_top500_alipay_dim where ds=${bizdate}
)t2 on t1.brand_id=t2.brand_id
)t1
group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...
自此,这里的应用层特征开发完成了,利用的是ODPS SQL,其他平台的SQL语句虽然不同,但思路是相同的。
建立样本
购买模型是预测一个用户对该品牌买 or 不买,是一个二分类问题,label 0(负样本)表示不买,1(正样本)表示购买,我们有用户池子,历史一年访问过该平台的用户,样本建设的维度,品牌,以品牌为度选正样本和负样本,正样本,以实验时间那天为起点,未来7天购买的用户为正样本;负样本,未购买的用户为负样本。
比如假设实验时间是20130701这天,则20130701-20130707期间购买该品牌的用户是正样本,用户池子中未购买的为负样本。
用户池子的确定是根据业务要求确定,目标召回率要能够最大化的召回目标用户;
正样本数据:
create table if not exists user_pay_sample_pos (
user_id string,
brand_id string
) partitioned by (ds string) LIFECYCLE 60;
INSERT OVERWRITE TABLE user_pay_sample_pos PARTITION (ds='${bizdate}')
select t1.user_id, t1.brand_id
from (
select distinct user_id , brand_id
from dw_user_item_alipay_log
where ds>${bizdate} and ds<=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd')
) t1 join (
select brand_id
from brand_top500_aliyun_num where ds=${bizdate}
) t2 on t1.brand_id=t2.brand_id;
负样本数据:
由于正负样本数量及其不平衡,因此使用以下查询建立用户id维表,起到随机抽样的作用,为每个不同的user_id生成一个随机数rnd,然后根据随机数对用户进行排序,并分配一个序号number,然后进行抽样,并且排除正负样本相同的情况:
CREATE TABLE if NOT EXISTS user_id_number (
user_id string,
number bigint
) LIFECYCLE 60;
select user_id, ROW_NUMBER() OVER(ORDER BY rnd DESC) AS number
from (
select user_id,RAND() AS rnd
from (
select DISTINCT user_id
from user_item_beh_log where ds=${bizdate}
) t1
) t1;
负样本数据为:
INSERT OVERWRITE TABLE user_pay_sample PARTITION(ds=${bizdate})
select t1.neg_user_id as user_id, t1.brand_id, 0 as label
from (
--去重,防止重复随机数
select DISTINCT t1.brand_id, t2.user_id as neg_user_id
from (
--转换数组为三元组格式
select TRANS_ARRAY(2, ',', user_id, brand_id, rand_neg) as (user_id, brand_id, rand_neg)
from (
--生成10个随机数,用逗号连接成数组
select user_id ,brand_id, concat(
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint),',',
cast(rand()*10000000 as bigint)
) as rand_neg
from user_pay_sample_pos
where ds = '${bizdate}'
)t1
)t1 join (
select user_id, number
from user_id_number
)t2 on t1.rand_neg=t2.number
)t1 left anti join (--使用 anti join 排除正样本,防止负样本和正样本是同一个样本
select user_id, brand_id
from user_pay_sample_pos
where ds = '${bizdate}'
)t2 on t1.neg_user_id=t2.user_id and t1.brand_id=t2.brand_id
union all--负样本和正样本合并
select user_id, brand_id, 1 as label
from user_pay_sample_pos
where ds = '${bizdate}'
;
样本与特征 join
下面代码是将样本和特征 join,这是以某一天为基准的
create table if not exists user_pay_sample_feature_join (
user_id string,
brand_id string,
label bigint,
click_item_num_3d BIGINT,
click_brand_num_3d BIGINT,
click_seller_num_3d BIGINT,
...
)PARTITIONED by (ds string) LIFECYCLE 60;
select t1.user_id, t1.brand_id, t1.label,
if(t2.item_num_3d is null, 0, t2.item_num_3d),
if(t2.brand_num_3d is null, 0, t2.brand_num_3d),
if(t2.seller_num_3d is null, 0, t2.seller_num_3d),
...
from (
select user_id, brand_id, label
from user_pay_sample
where ds=${bizdate}
) t1 left join (
select *
from user_click_beh_feature_ads
where ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (
select *
from user_collect_beh_feature_ads
where ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (
select *
from user_cart_beh_feature_ads
where ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (
select *
from user_alipay_beh_feature_ads
where ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (
select *
from brand_stat_feature_ads
where ds=${bizdate}
) t6 on t1.brand_id=t6.brand_id
left join (
select *
from user_brand_cross_beh_feature_ads
where ds=${bizdate}
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (
select *
from user_brand_cate2_cross_beh_feature_ads
where ds=${bizdate}
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not null
or t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)
;
而一天的样本量实际还是比较少的,由于总数据是20130401-20131001,为了保证90day的数据存在,所以从20130701开始提取特征,7day为一周期(未来七天购买情况作为正负样本),所以有0708、0715、0722、0729…0916等12天作为训练集,留一天0923作为测试集,约1000+w的样本量。
补充数据,按照上面思路开始跑取所需数据,这里可以根据ds=$(bizdate)依次运行添加日期参数添加12次,但这样每个文件都需要选12次,效率太低,因此在这里使用aliyun平台的调度配置。
首先,先查看表与表的依赖关系,目前为止,我们的文件结构为:
- 原始表:
user_item_beh_log
- 原始表分区后的四张表:
dw_user_item_click_log
,dw_user_item_collect_log
,dw_user_item_cart_log
,dw_user_item_alipay_log
- ODS数据层:
user_item_beh_log.sql
- DW数据层:
user_beh_log_dw.sql
- ADS数据层(这里面的四个sql是没有依赖关系的,所以可以并发的跑):
-
- 品牌维度数据:
brand_stat_feature_ads
- 品牌维度数据:
-
- 用户维度数据:
user_beh_feature_ads
- 用户维度数据:
-
- 用户品牌交互数据:
user_brand_cate2_cross_beh_feature_ads
,user_brand_cross_beh_feature_ads
- 用户品牌交互数据:
- 维表:
brand_cate_dim
——top500支付的品牌及其对应的top2二级分类,user_id_dim
——把user_id编号的表 - gbdt 样本:
-
user_pay_sample
,这是划分正负样本的数据,这个表也可以与上面四个并发,所以也依赖于virtual
-
user_pay_sample_feature_join
,样本和特征join的表,这里依赖了上面的特征表和样本表,所以最后跑。
-
查看了依赖关系之后,创建一个虚拟节点
virtual_depend
,里面放一个无成本的sql语句比如select 1
,修改其调度配置然后提交。 -
把五个并行的sql节点依赖于这个节点,以
brand_stat_feature_ads.sql
为例,修改调度配置,调度参数为bizdate,参数值设置$[yyyymmdd-1];依赖的上游节点为virtual_depend,这里要注意删除输入依赖,否则从代码解析会导致多余依赖,完成后保存并提交。 -
user_pay_sample_feature_join
里的所有表都有输入依赖,所以不需删除,直接从代码解析依赖即可。 -
最终的周期任务如下:
至此,训练集特征数据全部补完,下面看一下这个数据长什么样子
SELECT * FROM user_pay_sample_feature_join where ds='20130909' limit 100;
评估集建立样本、样本与特征 join
- 建立样本(user_pay_sample_eval)
取 9.23-10.1 的周期作为评估集,负样本用户量约 900w,为了节省资源,这里对负样本用户随机采样 300w 作为测试集,这里品牌选取 b47686 韩都衣舍 b56508 三星手机 b62063 诺基亚 b78739 LILY
,用于查看交叉品牌不同的影响,使用 mapjoin
对用户和品牌进行笛卡尔积,然后将负样本(随机选取的)和正样本(从 dw_user_item_alipay_log 表中得到的支付用户)合并 :
create table if not exists user_pay_sample_eval (
user_id string,
brand_id string,
label bigint
)PARTITIONED BY (ds STRING) LIFECYCLE 60;
insert OVERWRITE TABLE user_pay_sample_eval partition (ds=${bizdate})
select user_id, brand_id, max(label) as label
from (
select /*+mapjoin(t2)*/
t1.user_id, t2.brand_id, 0 as label
from (
select user_id
from user_id_number
where number<=3000000
)t1 join (
--b47686 韩都衣舍
--b56508 三星手机
--b62063 诺基亚
--b78739 LILY
select 'b47686' as brand_id
union all
select 'b56508' as brand_id
union all
select 'b62063' as brand_id
union all
select 'b78739' as brand_id
)t2
union all
select user_id, brand_id, 1 as label
from dw_user_item_alipay_log
where ds > '${bizdate}' and ds <= to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd') and brand_id in ('b47686','b56508','b62063','b78739')
group by user_id, brand_id
)t1 group by user_id, brand_id
- 样本与特征 join(user_pay_sample_feature_join_eval)
create table if not exists user_pay_sample_feature_join_eval (
user_id string,
brand_id string,
label bigint,
click_item_num_3d BIGINT,
click_brand_num_3d BIGINT,
click_seller_num_3d BIGINT,
...
)PARTITIONED by (ds string) LIFECYCLE 60;
select t1.user_id, t1.brand_id, t1.label,
if(t2.item_num_3d is null, 0, t2.item_num_3d),
if(t2.brand_num_3d is null, 0, t2.brand_num_3d),
if(t2.seller_num_3d is null, 0, t2.seller_num_3d),
...
from (
select user_id, brand_id, label
from user_pay_sample
where ds=${bizdate}
) t1 left join (
select *
from user_click_beh_feature_ads
where ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (
select *
from user_collect_beh_feature_ads
where ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (
select *
from user_cart_beh_feature_ads
where ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (
select *
from user_alipay_beh_feature_ads
where ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (
select *
from brand_stat_feature_ads
where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t6 on t1.brand_id=t6.brand_id
left join (
select *
from user_brand_cross_beh_feature_ads
where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (
select *
from user_brand_cate2_cross_beh_feature_ads
where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not null
or t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)
这里对品牌特征和用户及品牌交叉特征做了 brand_id in ('b47686','b56508','b62063','b78739')
的限制,减少品牌数,测试集数据如下:
SELECT * FROM user_pay_sample_feature_join_eval where ds='20130923' limit 100
下面看一下在训练集中上面四个品牌的样本个数
select t1.brand_id, count(*)
from (
select *
FROM user_pay_sample_feature_join
where ds>='20130701' and ds <= '20130916'
) t1 join (
--b47686 韩都衣舍
--b56508 三星手机
--b62063 诺基亚
--b78739 LILY
select 'b47686' as brand_id
union all
select 'b56508' as brand_id
union all
select 'b62063' as brand_id
union all
select 'b78739' as brand_id
) t2 on t1.brand_id = t2.brand_id
group by t1.brand_id
可以看到整体样本量不大,韩都衣舍和三星比另外两个牌子样本量大很多。
训练模型-GBDT
思路
- baseline:使用每个品牌的数据,单独训练一个模型,并预测结果。
- 混合训练:韩都衣舍+三星手机,这是不同行业的品牌混合后的效果;韩都衣舍+Lily,这是同行业的品牌混合后的效果。
- 所有品牌混合训练
读取数据
这里使用 PyODPS 读写 MaxCompute 表数据:文档
训练过程
- 安装必要的库
pip install lightgbm pandas scikit-learn pyodps
- 训练模型,这里分别用 gbdt 训练四个品牌的模型
import lightgbm as lgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import os
from odps import ODPS
from odps.df import DataFrame
# 建立链接。
access_id =
access_key =
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'
o = ODPS(
access_id,
access_key,
project,
endpoint,
)
# 读取数据。
brand_id='b78739'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id='{brand}'
;
'''.format(brand=brand_id)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 假设您的数据已经加载到DataFrame中,名为df
# df = pd.read_csv('your_data.csv') # 如果数据来自CSV文件
# 分离特征和标签
X = df.drop(['label', 'user_id', 'brand_id', 'ds'], axis=1)
y = df['label']
# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {
'boosting_type': 'gbdt',
'objective': 'binary',
'metric': 'auc',
'num_leaves': 31,
'learning_rate': 0.08,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1 # 设置为-1以减少输出
}
# 存储AUC值的列表
train_aucs = []
test_aucs = []
# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_{brand_id}/auc_scores.txt', 'w') as f:
for i in range(20, 70, 1):
print(f"Training model with {i} trees...")
# 训练模型
gbm = lgb.train(params,
lgb_train,
num_boost_round=i,
valid_sets=[lgb_train, lgb_eval],
valid_names=['train', 'valid']) # 关闭详细日志
# 使用最后一棵树进行预测并计算 AUC
y_train_pred = gbm.predict(X_train)
y_test_pred = gbm.predict(X_test)
train_auc = roc_auc_score(y_train, y_train_pred)
test_auc = roc_auc_score(y_test, y_test_pred)
# 打印 AUC
print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
# 保存模型
gbm.save_model(f'./models_{brand_id}/model_{i}.txt')
# 保存 AUC
f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
- 训练日志
- 训练混合模型,这里学习率过小并没有收敛(测试集 AUC 一直增加,并没有减小),因此调大为 0.13,训练韩都衣舍+三星手机,韩都衣舍+LILY的混合模型
# 读取数据。
brand_id='b47686'
brand_id2='b56508'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id in ('{brand}', '{brand2}')
;
'''.format(brand=brand_id, brand2=brand_id2)
with open(f'./models_{brand_id}_{brand_id2}/auc_scores.txt', 'w') as f:
for i in range(20, 70, 1):
print(f"Training model with {i} trees...")
# 训练模型
gbm = lgb.train(params,
lgb_train,
num_boost_round=i,
valid_sets=[lgb_train, lgb_eval],
valid_names=['train', 'valid']) # 关闭详细日志
# 使用最后一棵树进行预测并计算 AUC
y_train_pred = gbm.predict(X_train)
y_test_pred = gbm.predict(X_test)
train_auc = roc_auc_score(y_train, y_train_pred)
test_auc = roc_auc_score(y_test, y_test_pred)
# 打印 AUC
print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
# 保存模型
gbm.save_model(f'./models_{brand_id}_{brand_id2}/model_{i}.txt')
# 保存 AUC
f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
- 全行业混合训练
o = ODPS(
access_id,
access_key,
project,
endpoint,
)
# 读取数据。
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916'
;
'''
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 删除非特征列
df = df.drop(columns=['user_id', 'brand_id', 'ds'])
# 分离特征和标签
X = df.drop(['label'], axis=1)
y = df['label']
# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {
'boosting_type': 'gbdt',
'objective': 'binary',
'metric': 'auc',
'num_leaves': 31,
'learning_rate': 0.1,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1 # 设置为-1以减少输出
}
# 存储AUC值的列表
train_aucs = []
test_aucs = []
# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_all/auc_scores.txt', 'w') as f:
for i in range(5, 100, 5):
print(f"Training model with {i} trees...")
# 训练模型
gbm = lgb.train(params,
lgb_train,
num_boost_round=i,
valid_sets=[lgb_train, lgb_eval],
valid_names=['train', 'valid']) # 关闭详细日志
# 使用最后一棵树进行预测并计算 AUC
y_train_pred = gbm.predict(X_train)
y_test_pred = gbm.predict(X_test)
train_auc = roc_auc_score(y_train, y_train_pred)
test_auc = roc_auc_score(y_test, y_test_pred)
# 打印 AUC
print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
# 保存模型
gbm.save_model(f'./models_all/model_{i}.txt')
# 保存 AUC
f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
评估方案-GBDT
- 模型训练过程指标
模型训练过程中的指标是 AUC,在模型训练过程中,通过对比训练集和验证集的 AUC 指标,来确定是否过拟合,当需要全面评估分类模型的性能时,ROC曲线是一个非常重要的工具。ROC曲线通过展示模型在所有可能的分类阈值下的性能,提供了一个全面的视角来观察模型的真阳性率(TPR)和假阳性率(FPR)之间的关系。ROC曲线考虑了模型在所有可能的阈值下的性能,这使得它能够提供一个全面的模型性能视图。 - 模型业务离线评估指标
模型业务离线评估指标使用 topn 召回率,假设用户要投放 n 个人,评估集里用户购买该品牌的人有 M 个人,而模型预测的 top n 个人中实际购买的人有 m 个人,则召回率为:
r e c a l l t o p n = m / M recall_{topn}=m/M recalltopn=m/M
在推荐系统或信息检索等业务场景中,Top-N召回率更直接地反映了业务目标,即在前N个推荐项中捕获用户感兴趣的项目的能力。这与AUC关注的整体分类性能有所不同。Top-N召回率模拟了用户可能查看的推荐列表的前N项,这与用户实际交互的场景更为贴近。而AUC则提供了一个全面的分类性能视图,可能不会直接反映用户在特定推荐场景下的体验。
总共有 300w 个用户,这里就看 top 3k,5k,1w,5w,10w 的召回率.
为了节省计算资源,这里使用分批推理的方式,因此对user_pay_sample_feature_join_eval
表添加随机数,分十批进行推理
分批推理
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import os
from odps import ODPS
from odps.df import DataFrame
access_id =
access_key =
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'
o = ODPS(
access_id,
access_key,
project,
endpoint,
)
# 选取根据品牌数据和树个数的模型
brand_id='b78739'
trees = '28'
# 读取数据。
def load_and_predict(model_path):
# 初始化预测和标签列表
predictions = []
labels = []
# 加载模型
gbm = lgb.Booster(model_file=model_path)
# 分批读取数据
for num in range(10):
# 读取数据
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval
WHERE ds='20130923' and brand_id='{brand}' and rnd>{start} and rnd<={end}
;
'''.format(brand=brand_id, start=num/10.0, end=(num+1)/10.0)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
print('read data finish')
df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
# 删除非特征列
chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])
# 删除非特征列 'label'
X_test = chunk.drop('label', axis=1)
y_test = chunk['label']
y_pred = gbm.predict(X_test)
# 预测当前批次数据
predictions.extend(y_pred)
labels.extend(y_test)
return predictions, labels
def calculate_top_k_ratio(predictions, labels, top_k_list):
# 将预测和标签转换成DataFrame
results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
# 按预测分数降序排序
results_df = results_df.sort_values(by='prediction', ascending=False)
# 计算总正例数
total_positives = (results_df['label'] == 1).sum()
# 计算不同 top 数量下的正例比例
ratios = {}
for k in top_k_list:
top_k_df = results_df.head(k)
top_k_positives = (top_k_df['label'] == 1).sum()
ratio = top_k_positives / total_positives
ratios[k] = ratio
return ratios
# 模型文件路径
model_path = f'models_{brand_id}/model_{trees}.txt'
# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]
# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)
# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)
# 输出结果
for k, ratio in ratios.items():
print(f"Top {k} ratio of positive labels: {ratio:4f}")
# 如果需要保存结果到文件
with open(f'models_{brand_id}/top_results.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")
单一模型和两两混合模型结果
-
78739 LILY模型预测自己,tree 28,样本量 10877
-
47686 韩都衣舍模型预测自己,tree 50,样本量 90550
-
62063 诺基亚模型预测自己,tree 29,样本量 10430
-
56508 三星手机模型预测自己,tree 32,样本量 71389
可以看出使用手机品牌特征训练的模型不如衣服品牌,可能是因为衣服特征更明显,而手机特征更难挖掘。
- 下面是混合训练模型的预测结果:
- 韩都衣舍+三星手机特征训练的模型,tree52,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b56508'
trees='52'
# 模型文件路径
model_path = f'models_{brand_model}/model_{trees}.txt'
原始结果
可以看到不同类型品牌混合训练,在数据配比相似的情况下,各 top 召回均有所上升,这可能是因为特征发掘更充分。
- 韩都衣舍+LILY特征训练的模型,tree22,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b78739'
trees='22'
原始结果
可以看到 TOP 10000、5000上升了,其余都下降了,因为LILY的样本量远小于韩都衣舍,所以改变不明显,同时这种组合方式虽然是同类型品牌,但特征交叉效果一般,所以有升有降。
- 韩都衣舍+LILY特征训练的模型,tree22,用于预测LILY:
brand_id='b78739'
brand_model='b47686_b78739'
trees='22'
原始结果
可以看到,对LILY添加了韩都衣舍的大量样本混合训练,在预测LILY本身时有明显提升。
全样本训练模型结果
o = ODPS(
access_id,
access_key,
project,
endpoint,
)
# 选取根据品牌数据和树个数的模型
brand_id='b47686'
# 78739,62063,56508
# 读取数据。
def load_and_predict(model_path):
# 初始化预测和标签列表
predictions = []
labels = []
# 加载模型
gbm = lgb.Booster(model_file=model_path)
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval
WHERE ds='20130923' and brand_id='{brand}'
;
'''.format(brand=brand_id)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
print('read data finish')
df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
# 删除非特征列
chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])
# 删除非特征列 'label'
X_test = chunk.drop('label', axis=1)
y_test = chunk['label']
y_pred = gbm.predict(X_test)
# 预测当前批次数据
predictions.extend(y_pred)
labels.extend(y_test)
return predictions, labels
def calculate_top_k_ratio(predictions, labels, top_k_list):
# 将预测和标签转换成DataFrame
results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
# 按预测分数降序排序
results_df = results_df.sort_values(by='prediction', ascending=False)
# 计算总正例数
total_positives = (results_df['label'] == 1).sum()
# 计算不同 top 数量下的正例比例
ratios = {}
for k in top_k_list:
top_k_df = results_df.head(k)
top_k_positives = (top_k_df['label'] == 1).sum()
ratio = top_k_positives / total_positives
ratios[k] = ratio
return ratios
trees='95'
model_path=f'./models_all/model_{trees}.txt'
# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]
# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)
# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)
# 输出结果
for k, ratio in ratios.items():
print(f"Top {k} ratio of positive labels: {ratio:4f}")
# 如果需要保存结果到文件
with open(f'top_results_{brand_id}.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")
可以看出全样本训练效果有明显的提升。
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY |
---|---|---|---|
0.114842 0.209537 0.265950 0.349899 0.533915 | 0.068056 0.104861 0.129861 0.164583 0.284722 | 0.102041 0.190476 0.217687 0.258503 0.353741 | 0.257225 0.361272 0.427746 0.488439 0.624277 |
0.141706 0.230356 0.288113 0.366017 0.537273 | 0.073611 0.103472 0.125000 0.177778 0.289583 | 0.149660 0.217687 0.231293 0.278912 0.394558 | 0.300578 0.416185 0.462428 0.528902 0.627168 |
然而,GBDT 对于大型稀疏特征的局限使得有相当一部分人群是无法召回的,因此接下来采用 DNN 深度学习模型来进行试验。
DNN 基础知识
线性回归——二分类
损失函数
假设我们有特征 X = [ x 1 , x 2 , … , x n ] X = [x_1, x_2, \ldots, x_n] X=[x1,x2,…,xn],则线性回归为:
y ^ = w 1 x 1 + w 2 x 2 + … + w n x n + b 0 \hat{y} = w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0 y^=w1x1+w2x2+…+wnxn+b0
由于二分类,目标值为0或者1,因此需要将 y ^ \hat{y} y^ 限制在0-1,使用sigmoid函数即可:
h W ( X ) = sigmoid ( x ) = 1 1 + e − ( w 1 x 1 + w 2 x 2 + … + w n x n + b 0 ) h_W(X) = \text{sigmoid}(x) = \frac{1}{1 + e^{-(w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0)}} hW(X)=sigmoid(x)=1+e−(w1x1+w2x2+…+wnxn+b0)1
h W ( X ) h_W(X) hW(X) 有实际的物理意义,表示取1的概率,则其取0和1的概率为:
P ( Y = 1 ∣ X ; W ) = h W ( X ) P(Y=1\mid X; W)=h_W(X) P(Y=1∣X;W)=hW(X)
P ( Y = 0 ∣ X ; W ) = 1 − h W ( X ) P(Y=0\mid X; W)=1-h_W(X) P(Y=0∣X;W)=1−hW(X)
假设目标值为 ( y ),则预测为 y y y 的概率(二分类假设目标变量服从伯努利分布)为:
P ( Y = y ∣ X ; W ) = ( h W ( X ) ) y ( 1 − h W ( X ) ) 1 − y P(Y=y\mid X; W)=\left(h_W(X)\right)^y\left(1-h_W(X)\right)^{1-y} P(Y=y∣X;W)=(hW(X))y(1−hW(X))1−y
假设有 ( N ) 条样本,则其似然函数为:
J ( W ) = ∏ i = 1 N ( h W ( X i ) ) y i ( 1 − h W ( X i ) ) 1 − y i J(W)=\prod_{i=1}^N\left(h_W\left(X_i\right)\right)^{y_i}\left(1-h_W\left(X_i\right)\right)^{1-y_i} J(W)=i=1∏N(hW(Xi))yi(1−hW(Xi))1−yi
算法的核心目标是找到一组参数 ( W ) 使得 ( J(W) ) 值最大,( J(W) ) 最大表示大部分样本预测为其真实值 ( y ) 的概率最大。取对数及取负数,将求最大值改为取最小值,该公式为交叉熵损失函数:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
求解用梯度下降
L ( W ) L(W) L(W) 求导详解:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W) = -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^N \left[ y_i \log(h_W(X_i)) + (1-y_i) \log(1-h_W(X_i)) \right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
h W ( X i ) = 1 1 + e − W T X i h_W(X_i) = \frac{1}{1+e^{-W^TX_i}} hW(Xi)=1+e−WTXi1
则,
log ( h W ( X i ) ) = log ( 1 1 + e − W T X i ) = − log ( 1 + e − W T X i ) log ( 1 − h W ( X i ) ) = log ( 1 − 1 1 + e − W T X i ) = log ( e − W T X i 1 + e − W T X i ) = log ( e − W T X i ) − log ( 1 + e − W T X i ) = − W T X i − log ( 1 + e − W T X i ) \begin{align*} \log(h_W(X_i)) &= \log\left(\frac{1}{1+e^{-W^TX_i}}\right) \\ &= -\log(1+e^{-W^TX_i}) \\ \log(1-h_W(X_i)) &= \log\left(1-\frac{1}{1+e^{-W^TX_i}}\right) \\ &= \log\left(\frac{e^{-W^TX_i}}{1+e^{-W^TX_i}}\right) \\ &= \log(e^{-W^TX_i}) - \log(1+e^{-W^TX_i}) \\ &= -W^TX_i - \log(1+e^{-W^TX_i}) \end{align*} log(hW(Xi))log(1−hW(Xi))=log(1+e−WTXi1)=−log(1+e−WTXi)=log(1−1+e−WTXi1)=log(1+e−WTXie−WTXi)=log(e−WTXi)−log(1+e−WTXi)=−WTXi−log(1+e−WTXi)
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ − y i log ( 1 + e − W T X i ) + ( 1 − y i ) ( − W T X i − log ( 1 + e − W T X i ) ) ] = − 1 N ∑ i = 1 N [ y i W T X i − W T X i − log ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ( e W T X i ) − log ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − ( log ( e W T X i ) + log ( 1 + e − W T X i ) ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ( 1 + e W T X i ) ] \begin{align*} L(W) &= -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^{N} \left[ -y_i \log(1+e^{-W^T X_i}) + (1-y_i)(-W^T X_i - \log(1+e^{-W^T X_i})) \right] \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - W^T X_i - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(e^{W^T X_i}) - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - (\log(e^{W^T X_i}) + \log(1+e^{-W^T X_i})) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(1+e^{W^T X_i}) \right] \end{align*} L(W)=−log(J(W))=−N1i=1∑N[−yilog(1+e−WTXi)+(1−yi)(−WTXi−log(1+e−WTXi))]=−N1i=1∑N[yiWTXi−WTXi−log(1+e−WTXi))=−N1i=1∑N[yiWTXi−log(eWTXi)−log(1+e−WTXi))=−N1i=1∑N[yiWTXi−(log(eWTXi)+log(1+e−WTXi)))=−N1i=1∑N[yiWTXi−log(1+eWTXi)]
则,
∂ ∂ w j L ( W ) = ∂ ∂ w j ( − 1 N ∑ i = 1 N [ y i W T X i − log ( 1 + e W T X i ) ] ) = − 1 N ∑ i = 1 N [ y i ∂ ∂ w j W T X i − ∂ ∂ w j log ( 1 + e W T X i ) ] = − 1 N ∑ i = 1 N [ y i x i j − x i j e W T X i 1 + e W T X i ] = − 1 N ∑ i = 1 N [ y i x i j − x i j 1 + e − W T X i ] = − 1 N ∑ i = 1 N [ y i − 1 1 + e − W T X i ] ∗ x i j = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \begin{align*} \frac{\partial}{\partial w_j}L(W) &= \frac{\partial}{\partial w_j}\left(-\frac{1}{N}\sum_{i=1}^N[y_iW^TX_i-\log(1+e^{W^TX_i})]\right) \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i\frac{\partial}{\partial w_j}W^TX_i-\frac{\partial}{\partial w_j}\log(1+e^{W^TX_i})\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^je^{W^TX_i}}{1+e^{W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^j}{1+e^{-W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i-\frac{1}{1+e^{-W^TX_i}}\right]*x_i^j \\ &= -\frac{1}{N}\sum_{i=1}^N(h_W(X_i)-y_i)*x_i^j \end{align*} ∂wj∂L(W)=∂wj∂(−N1i=1∑N[yiWTXi−log(1+eWTXi)])=−N1i=1∑N[yi∂wj∂WTXi−∂wj∂log(1+eWTXi)]=−N1i=1∑N[yixij−1+eWTXixijeWTXi]=−N1i=1∑N[yixij−1+e−WTXixij]=−N1i=1∑N[yi−1+e−WTXi1]∗xij=−N1i=1∑N(hW(Xi)−yi)∗xij
注意:当预测值接近真实值时 y i y_i yi 则梯度为0,当预测值偏离真实值时,则梯度与该 x i j x^j_i xij 分量和误差大小成正比。
梯度更新方法
给定N个样本,我们通过偏导求解,已经求解出 L ( W ) L(W) L(W) 在 ( x i , y i ) (x_i, y_i) (xi,yi) 处关于 w i w_i wi 的导数为:
∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_j} L(W) = -\frac{1}{N} \sum_{i=1}^N \left( h_W(X_i) - y_i \right) * x_i^j ∂wj∂L(W)=−N1i=1∑N(hW(Xi)−yi)∗xij
我们该如何更新 w i w_i wi 的值,假设 w i w_i wi 的初始值为 w i 0 w_i^0 wi0,则,
w i 1 = w i 0 − l r ∗ ∂ ∂ w j L ( W ) w_i^1 = w_i^0 - lr * \frac{\partial}{\partial w_j} L(W) wi1=wi0−lr∗∂wj∂L(W)
其中 l r lr lr 表示学习率,表示每次更新的步长,“减”表示向梯度的负方向更新。但是在实际的训练中,训练样本可能是几十几百万甚至上亿,模型参数该如何更新。
1) 全量更新
一次性计算所有的样本对于参数的偏导数,然后使用以下公式直接计算梯度值:
∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_{j}} L(W) = -\frac{1}{N} \sum_{i=1}^{N} \left( h_{W}(X_i) - y_i \right) * x_i^j ∂wj∂L(W)=−N1i=1∑N(hW(Xi)−yi)∗xij
优点:梯度会比较稳定。
缺点:容易达到局部最优,且极度耗内存。
2)单条更新
每次只计算一条样本的梯度,并更新参数。
缺点:由于样本的不确定性,梯度会非常不稳定,可能会不收敛。
3)batch 更新
假设有10万个样本,每次随机采样512个样本,放入模型训练,每轮根据这小批量样本计算梯度值然后更新梯度。
优点:平衡了单条更新的稳健性和全量梯度下降的效率。
有的时候使用了部分样本就已经收敛了,那么就不需要用全部的样本,直接根据实际情况停止。如果不进行随机采样,有可能很长的step都是训练同一个品牌的样本,所以不行,在训练之前要对样本进行shuffle随机打乱。
为什么 Loss 不用均方误差
L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0∑N(yi−hW(Xi))2
对于sigmoid函数, h ( x ) = 1 1 + e − x h(x) = \frac{1}{1 + e^{-x}} h(x)=1+e−x1,我们求其导数,首先对公式做变化,
h ( x ) = 1 1 + e − x = e x 1 + e x = 1 − ( e x + 1 ) − 1 , h(x) = \frac{1}{1 + e^{-x}} = \frac{e^x}{1 + e^x} = 1 - (e^x + 1)^{-1}, h(x)=1+e−x1=1+exex=1−(ex+1)−1,
根据求导法则:
h ′ ( x ) = ∂ ∂ x h ( x ) = ( − 1 ) ∗ ( − 1 ) ( e x + 1 ) − 2 e x = ( 1 + e − x ) − 2 e − 2 x e x = ( 1 + e − x ) − 1 ∗ e − x 1 + e − x = 1 1 + e − x ∗ ( 1 − 1 1 + e − x ) = h ( x ) ∗ ( 1 − h ( x ) ) \begin{align*} h'(x) &= \frac{\partial}{\partial x} h(x) = (-1) * (-1) (e^x + 1)^{-2} e^x \\ &= (1 + e^{-x})^{-2} e^{-2x} e^x \\ &= (1 + e^{-x})^{-1} * \frac{e^{-x}}{1 + e^{-x}} \\ &= \frac{1}{1 + e^{-x}} * \left(1 - \frac{1}{1 + e^{-x}}\right) \\ &= h(x) * (1 - h(x)) \end{align*} h′(x)=∂x∂h(x)=(−1)∗(−1)(ex+1)−2ex=(1+e−x)−2e−2xex=(1+e−x)−1∗1+e−xe−x=1+e−x1∗(1−1+e−x1)=h(x)∗(1−h(x))
导数值最大为 0.25,x越大和越小都趋近 0,这样会导致梯度直接消失。
对于均方误差函数, L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0∑N(yi−hW(Xi))2,我们求其偏导为:
∂ ∂ w i L ( W ) = − ∑ i = 0 N ( y i − h W ( X i ) ) h W ′ ( X i ) ∗ x i \frac{\partial}{\partial w_i} L(W) = -\sum_{i=0}^N \left( y_i - h_W(X_i) \right) h_W'(X_i) * x_i ∂wi∂L(W)=−i=0∑N(yi−hW(Xi))hW′(Xi)∗xi
而交叉熵的偏导为,
∂ ∂ w i L ( W ) = ∑ i = 0 N ( y i − h W ( X i ) ) ∗ x i j \frac{\partial}{\partial w_i} L(W) = \sum_{i=0}^N \left( y_i - h_W(X_i) \right) * x_i^j ∂wi∂L(W)=i=0∑N(yi−hW(Xi))∗xij
没有均方误差那么严重的梯度消失问题。
神经元
多层神经网络
假设输入为 X = [ x 1 , x 2 , x 3 , … , x n ] X = [x_1, x_2, x_3, \ldots, x_n] X=[x1,x2,x3,…,xn],第一个隐藏层(Hidden 1), H 1 = [ h 1 1 , h 1 2 , … , h 1 n 1 ] H^1 = [h_1^1, h_1^2, \ldots, h_1^{n_1}] H1=[h11,h12,…,h1n1],第二个隐藏层(Hidden 2), H 2 = [ h 2 1 , h 2 2 , … , h 2 n 2 ] H^2 = [h_2^1, h_2^2, \ldots, h_2^{n_2}] H2=[h21,h22,…,h2n2],假设激活函数为Relu,则
h 1 1 = Relu ( w 1 1 x 1 + w 1 2 x 2 + … + w 1 n x n + b 0 ) = Relu ( w 1 X ) , 其中 w 1 ∈ R 1 × n , X ∈ R n × 1 \begin{align*} & h_1^1 = \text{Relu}(w_1^1 x_1 + w_1^2 x_2 + \ldots + w_1^n x_n + b_0) = \text{Relu}(w_1 X), \text{其中} w_1 \in \mathbb{R}^{1 \times n}, X \in \mathbb{R}^{n \times 1} \end{align*} h11=Relu(w11x1+w12x2+…+w1nxn+b0)=Relu(w1X),其中w1∈R1×n,X∈Rn×1
将 w 1 , w 2 , … , w n 1 w_1, w_2, \ldots, w_{n_1} w1,w2,…,wn1,组成一个矩阵变为,
W 1 = [ w 1 , w 2 , … , w n 1 ] , W 1 ∈ R n 1 × n \begin{align*} & W_1 = [w_1, w_2, \ldots, w_{n_1}], W_1 \in \mathbb{R}^{n_1 \times n} \end{align*} W1=[w1,w2,…,wn1],W1∈Rn1×n
则,
H 1 = Relu ( W 1 X ) H_1 = \text{Relu}(W_1 X) H1=Relu(W1X)
H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))
激活函数
如果没有激活函数,模型退化为线性回归,
H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))
H ^ 2 = W 2 ( W 1 X ) = W 2 W 1 X = W X , W = W 1 W 2 \hat{H}_2 = W_2 (W_1 X) = W_2 W_1 X = W X, W = W_1 W_2 H^2=W2(W1X)=W2W1X=WX,W=W1W2
有了激活函数以后,模型增加了“非线性能力”,即特征选择和组合的能力。
如下图:当模型有了激活函数,则模型可以按照需要,任意选择特征,并进行任意的组合,以单个节点为例,有了激活函数,模型通过选择特征和组合能力,构建了一颗树(就是选择哪条权重要哪条不要)。而到最后一层时,就是根据需要构建了多棵树。
输出层
如果是线性回归或者二分类,模型一般输出一个值,对于多分类,根据需要输出多个值,而最后一层全连接层一般不加激活函数,直接输出。假设输出为二分类,模型只输出一个值,则,
logits = W 3 Relu ( W 2 Relu ( W 1 X ) ) \text{logits} = W_3 \text{Relu}(W_2 \text{Relu}(W_1 X)) logits=W3Relu(W2Relu(W1X))
输出层 embedding
假设输入的特征是经过离散后的编号的特征,比如[‘男’, ‘近30条购买5次’, ‘重庆市’],编号后为[1, 2, 3]。输入的特征为int型从1到N编号的数组。
如何将int型的编号数组变为embedding?
假设特征整体有N个(该N表示编号的最大值),每个特征用一个实数值维度为M的向量 d d d 表示,即 d 1 = [ d 1 1 , d 1 2 , … , d 1 M ] ∈ R 1 × M d_1 = [d_1^1, d_1^2, \ldots, d_1^M] \in \mathbb{R}^{1 \times M} d1=[d11,d12,…,d1M]∈R1×M,N个向量组成的矩阵为 D = [ d 1 , d 2 , … , d N ] ∈ R N × M D = [d_1, d_2, \ldots, d_N] \in \mathbb{R}^{N \times M} D=[d1,d2,…,dN]∈RN×M。
当输入为[1, 2, 3]时,模型会从矩阵 D D D 中选取第1、2、3行的embedding组成新的矩阵,即 D ′ = [ d 1 , d 2 , d 3 ] ∈ R 3 × M D' = [d_1, d_2, d_3] \in \mathbb{R}^{3 \times M} D′=[d1,d2,d3]∈R3×M,然后将 D ′ D' D′ 输入到模型中。
Loss
交叉熵损失函数:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
优化器
- 随机梯度下降法
优化器:在batch迭代过程中如何通过梯度更新参数的算法。
假设 g t = ∂ ∂ w j L ( W ) g_t = \frac{\partial}{\partial w_j} L(W) gt=∂wj∂L(W),最简单的优化器是随机梯度下降法:
w i t = w i t − 1 − l r ∗ ∂ ∂ w j L ( W ) = w i t − 1 − l r ∗ g t w_i^t = w_i^{t-1} - lr * \frac{\partial}{\partial w_j} L(W) = w_i^{t-1} - lr * g_t wit=wit−1−lr∗∂wj∂L(W)=wit−1−lr∗gt
该算法有什么缺点:
-
由于样本的随机性,梯度 g t g_t gt 会有不确定性,包括方向和大小,特别是方向,这会导致权重更新有很大的波动。
-
l r lr lr 是固定的。
对于频繁更新的参数或者稀疏的更新频率低的参数更新步长一致。
随着模型的迭代,接近最优点的时候和刚开始迭代的时候更新步长也一致, l r lr lr 设置太大会导致最后在最优值附近震荡,甚至会跳过最优值,如果设置太小,收敛变慢,且可能收敛于局部最优值。
- Adam算法
Adam算法的关键组成部分之一是:它使用指数加权移动平均值来估算梯度的动量和二次矩,即它使用状态变量:
v t ← β 1 v t − 1 + ( 1 − β 1 ) g t s t ← β 2 s t − 1 + ( 1 − β 2 ) g t 2 v_t \leftarrow \beta_1 v_{t-1} + (1 - \beta_1) g_t \\ s_t \leftarrow \beta_2 s_{t-1} + (1 - \beta_2) g_t^2 vt←β1vt−1+(1−β1)gtst←β2st−1+(1−β2)gt2
其中常设置 β 1 = 0.9 , β 2 = 0.999 \beta_1 = 0.9, \beta_2 = 0.999 β1=0.9,β2=0.999, s t s_t st 移动远远慢于 v t v_t vt 的移动。
v ^ t = v t 1 − β 1 t s ^ t = s t 1 − β 2 t \hat{v}_t = \frac{v_t}{1 - \beta_1^t} \\ \hat{s}_t = \frac{s_t}{1 - \beta_2^t} v^t=1−β1tvts^t=1−β2tst
则新的梯度为:
g t ′ = l r ∗ v ^ t s ^ t + ϵ g_t' = \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} gt′=s^t+ϵlr∗v^t
则梯度更新公式为:
w i t = w i t − 1 − l r ∗ v ^ t s ^ t + ϵ w_i^t = w_i^{t-1} - \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} wit=wit−1−s^t+ϵlr∗v^t
-
动量估计 v t v_t vt:历史梯度的移动平均,保留该梯度的正确方向,且保留了该方向上的梯度值,可以在一定程度上避免每次梯度的不确定性导致的波动。
-
方差估计 s t s_t st:累计历史上梯度的平方。
对于频繁更新的参数,比如自然语言里常出现的字,比如“我”、“的”等,其 s t s_t st 会比较大,而对于稀疏的值,比如“舴”,其出现的次数少,则其 s t s_t st 比较小。则由于其在分母,因此频繁出现的参数的 l r lr lr 会比较小,而出现次数少的参数的 l r lr lr 会比较大。
-
l r lr lr会随着迭代的轮数增加而减小,防止在最小值区间震荡不收敛。
DNN 建模
基于GBDT的品牌购买模型有如下问题:
-
只有统计特征,粒度太粗,无法通过用户的行为序列,学习品牌之间的关系。
例如:目标是预测“李宁”品牌的购买人群,有两个用户A、B,用户A行为序列为[安踏、贵人鸟、特步],用户B行为序列为[阿迪达斯、耐克、小脏鞋],A和B哪个更有可能购买“李宁”?
对于使用GBDT的模型,用户A和B在特征上的表现是,在品牌的二级类目上都有点击行为,没有任何的区分度,因此模型需要通过用户行为序列,区分用户之间的购买兴趣。
-
GBDT模型无法建模规模较大的id类特征,品牌id有8万+,GBDT无法建模。
GBDT建模id类特征,只能使用是和否,比如一个用户的性别[“男”、“女”],GBDT必须建模成为两个特征,是否是男性和是否是女性,如果有8万个id特征,那必须建设8万个是和否特征,GBDT难以建模。
基于以上问题,本方案引入深度学习算法。深度学习算法可以很好地建模id类特征,通过用户行为序列,学习id类特征之间的关系。
为了和 gbdt 对比,使用同样的样本,特征增加用户行为序列特征和目标品牌id特征,评估方案保持一致。
特征开发
总共 8w 个品牌,大多数品牌行为稀疏,为了防止噪声,选取 top 1w 的品牌开发行为特征
create table if not exists brand_top1w_alipay_dim (
brand_id string,
alipay_num bigint
)LIFECYCLE 60;
insert OVERwrite table brand_top1w_alipay_dim
select brand_id, alipay_num
from (
select brand_id, count(DISTINCT user_id) as alipay_num
from dw_user_item_alipay_log
where ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
and brand_id is not null
group by brand_id
)t1 order by alipay_num desc limit 10000
;
user_brand_seq_feature
:
子查询从dw_user_item_click_log表中选择用户ID和品牌ID,其中ds(日期)是在过去30天内且小于${bizdate}(一个变量,代表业务日期)。然后,使用ROW_NUMBER()窗口函数为每个用户按操作时间降序排列品牌点击,并且只选择前50个品牌点击记录。接着,将这些品牌ID与brand_top1w_alipay_dim表中的数据进行连接,以确保只考虑顶级品牌。最后,使用WM_CONCAT函数将这些品牌ID连接成一个字符串,并按用户ID分组,将结果插入到user_click_brand_seq_feature表中对应的分区。
同理建立收藏和购买表,由于加购和购买类似因此不建立cart表了。
create TABLE if not exists user_click_brand_seq_feature (
user_id string,
brand_id_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;
insert OVERWRITE TABLE user_click_brand_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('b_',brand_id)) as brand_id_seq
from (
select t2.user_id, t2.brand_id
from (
select brand_id
from brand_top1w_alipay_dim
)t1 join (
SELECT user_id,brand_id
, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds desc) AS number
from (
select user_id, brand_id, max(ds) as ds
from dw_user_item_click_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
and brand_id is not null
group by user_id, brand_id
)t1
)t2 on t1.brand_id=t2.brand_id
where number<=50
)t1 group by user_id
;
user_cate_seq_feature
用于存储用户点击类别(category)序列特征,并将过去30天内用户点击类别的序列数据插入到这个表中。这个表的设计和user_click_brand_seq_feature 表类似,但是它关注的是商品或内容的类别(cate)而不是品牌(brand)。
create TABLE if not exists user_click_cate_seq_feature (
user_id string,
cate_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;
insert OVERWRITE TABLE user_click_cate_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('c_',cate2)) as cate2_seq
from (
select user_id, cate2
, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds DESC) AS number
from (
select user_id, cate2, max(ds) as ds
from dw_user_item_click_log
where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
and cate2 is not null
group by user_id, cate2
)t1
)t1
where number<=50
group by user_id
;
特征拼接、特征离散化和特征序列化
- 参照 gbdt 的样本与特征 join,添加上面新增的品牌行为特征、行业行为特征
- 对特征(原始实数值)进行离散化,对于某些字段,如 item_num_3d 或 brand_num_3d,进行对数(log(2, value + 1))转换,这可能是为了将计数特征转换为更平滑的表示,避免数据中的极端值。
- 使用 concat 函数合并字符串,例如将 ‘b_’ 与 brand_id 字段合并,形成一个新的特征,这可能用于表示品牌的编码。
create table if not exists user_pay_sample_feature_join_dnn(
user_id string,
brand_id string,
label bigint,
target_brand_id string,
clk_brand_seq string,
clt_brand_seq string,
pay_brand_seq string,
clk_cate_seq string,
clt_cate_seq string,
pay_cate_seq string,
user_click_feature string,
user_clt_feature string,
user_cart_feature string,
user_pay_feature string,
brand_stat_feature string,
user_cate2_cross_feature string,
user_brand_cross_feature string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;
insert OVERWRITE TABLE user_pay_sample_feature_join_dnn partition (ds=${bizdate})
select t1.user_id, t1.brand_id, t1.label, concat('b_',t1.brand_id)
,if(t2.brand_id_seq is null, 'clkb_seq_null', t2.brand_id_seq)
,if(t3.brand_id_seq is null, 'cltb_seq_null', t3.brand_id_seq)
,if(t4.brand_id_seq is null, 'payb_seq_null', t4.brand_id_seq)
,if(t5.cate_seq is null, 'clkc_seq_null', t5.cate_seq)
,if(t6.cate_seq is null, 'cltc_seq_null', t6.cate_seq)
,if(t7.cate_seq is null, 'payc_seq_null', t7.cate_seq)
,if(t8.user_id is null, 'user_click_null', concat(
concat('uclk_item_num_3d','_',if(t8.item_num_3d is null, 'null', cast(log(2,t8.item_num_3d+1) as bigint))),',',
concat('uclk_brand_num_3d','_',if(t8.brand_num_3d is null, 'null', cast(log(2,t8.brand_num_3d+1) as bigint))),',',
concat('uclk_seller_num_3d','_',if(t8.seller_num_3d is null, 'null', cast(log(2,t8.seller_num_3d+1) as bigint))),',',
concat('uclk_cate1_num_3d','_',if(t8.cate1_num_3d is null, 'null', cast(log(2,t8.cate1_num_3d+1) as bigint))),',',
concat('uclk_cate2_num_3d','_',if(t8.cate2_num_3d is null, 'null', cast(log(2,t8.cate2_num_3d+1) as bigint))),',',
concat('uclk_cnt_days_3d','_',if(t8.cnt_days_3d is null, 'null', cast(log(2,t8.cnt_days_3d+1) as bigint))),',',
...
)) as user_click_beh_feature
...
from (
select *
from user_pay_sample
where ds=${bizdate}
)t1 left join (
select user_id, brand_id_seq
from user_click_brand_seq_feature
where ds=${bizdate}
)t2 on t1.user_id=t2.user_id
...
where (t2.brand_id_seq is not null or t3.brand_id_seq is not null or t4.brand_id_seq is not null or
t5.cate_seq is not null or t6.cate_seq is not null or t7.cate_seq is not null or
t8.cnt_days_90d is not null or t9.cate1_num_90d is not null or t10.item_num_90d is not null
or t11.item_num_90d is not null or t12.click_num is not null or t13.clk_item_90d is not null or
t14.clk_item_90d is not null)
;
4. 特征序列化
把所有的特征序列化,目的是为了将类似于b_alipay_num_6
这样的特征编号
create table if not exists user_pay_sample_feature_seq (
feature string ,
number bigint
)LIFECYCLE 90;
insert OVERWRITE TABLE user_pay_sample_feature_seq
select feature, ROW_NUMBER() OVER(ORDER BY feature) AS number
from (
select DISTINCT feature
from (
select target_brand_id as feature
from user_pay_sample_feature_join_dnn
where ds>='20130701' and ds<='20130916'
union all
select trans_array(0,',',clk_brand_seq) as (feature)
from user_pay_sample_feature_join_dnn
where ds>='20130701' and ds<='20130916'
union all
select trans_array(0,',',clt_brand_seq) as (feature)
from user_pay_sample_feature_join_dnn
where ds>='20130701' and ds<='20130916'
...
)t1
)t1
;
使用多个 JOIN 语句将不同的特征序列与原始数据表 user_pay_sample_feature_join_dnn 连接起来
reate table if not exists user_pay_sample_feature_join_dnn_seq(
user_id string,
brand_id string,
label bigint,
bizdate string,
target_brand_id string,
clk_brand_seq string,
clt_brand_seq string,
pay_brand_seq string,
clk_cate_seq string,
clt_cate_seq string,
pay_cate_seq string,
user_click_feature string,
user_clt_feature string,
user_cart_feature string,
user_pay_feature string,
brand_stat_feature string,
user_cate2_cross_feature string,
user_brand_cross_feature string
)LIFECYCLE 90;
insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq
select t1.user_id, t1.brand_id, t1.label, t1.ds, t1.number,
t2.feature, t3.feature, t5.feature, t6.feature, t7.feature, t8.feature
,t9.feature, t10.feature, t11.feature, t12.feature, t13.feature, t14.feature, t15.feature
from (
select t1.user_id, t1.brand_id, t1.label, t1.ds, t2.number
from (
select user_id, brand_id, label, ds, target_brand_id
from user_pay_sample_feature_join_dnn
where ds>='20130701' and ds<='20130916'
)t1 join (
select feature, number
from user_pay_sample_feature_seq
)t2 on t1.target_brand_id=t2.feature
)t1 join (
select user_id, brand_id, label, ds, WM_CONCAT(',', number) as feature
from (
select user_id, brand_id, label, ds, number
from (
select trans_array(4, ',', user_id, brand_id, label, ds, clk_brand_seq) as (user_id, brand_id, label, ds, feature)
from user_pay_sample_feature_join_dnn
where ds>='20130701' and ds<='20130916'
)t1 join (
select feature, number
from user_pay_sample_feature_seq
)t2 on t1.feature=t2.feature
)t1
group by user_id, brand_id, label, ds
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id and t1.label=t2.label and t1.ds=t2.ds
...
;
从 user_pay_sample_feature_join_dnn_seq 表中选择所有列,并为每行生成一个唯一的 key_all,这个键是通过连接随机数和原始数据的一些列(如 user_id, brand_id, label, bizdate)构成的。然后用 group by
使用 MAX() 函数对每个 key_all 分组内的记录选择最大值,这样做可以确保每个特征序列中的每个特征都是最具有代表性的,并且可以用于后续的模型训练。
create table if not exists user_pay_sample_feature_join_dnn_seq_shuffle(
key_all string,
label bigint,
target_brand_id string,
clk_brand_seq string,
clt_brand_seq string,
pay_brand_seq string,
clk_cate_seq string,
clt_cate_seq string,
pay_cate_seq string,
user_click_feature string,
user_clt_feature string,
user_cart_feature string,
user_pay_feature string,
brand_stat_feature string,
user_cate2_cross_feature string,
user_brand_cross_feature string
)LIFECYCLE 90;
insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq_shuffle
select key_all, max(label), MAX(target_brand_id), MAX(clk_brand_seq), MAX(clt_brand_seq), MAX(pay_brand_seq)
,max(clk_cate_seq), max(clt_cate_seq), max(pay_cate_seq), max(user_click_feature)
,max(user_clt_feature), max(user_cart_feature), max(user_pay_feature), max(brand_stat_feature)
,max(user_cate2_cross_feature), max(user_brand_cross_feature)
from (
select *, concat(RAND(),'_',RAND(),'_',user_id,'_',brand_id,'_',label,'_',bizdate) as key_all
from user_pay_sample_feature_join_dnn_seq
)t1 group by key_all
;
训练模型-DNN
思路
- 使用 pyodps 读取数据集。
- dataset:读取数据、转换为 PyTorch 格式,tensor。
- dataLoader:组织数据,也可以数据变换。
- model:包含 embedding、dense、relu。
- 损失函数 loss:使用 celoss(交叉熵损失)。
- 优化器 optimizer:使用 adam。
- 训练的流程:使用 for 循环。
- 保存 loss,使用 tensorboard。
- 评估 auc(Area Under Curve)。
- 保存模型。
代码结构
.
├── config
│ ├── ak_config.py
│ ├── dnn_config.py
├── dataset
│ ├── dnn_dataset.py
├── din_model_train.py
├── dnn_focal_loss_model_train.py
├── dnn_model_test.py
├── dnn_model_train.py
├── loss
│ ├── focal_loss.py
├── model
│ ├── din_model2.py
│ ├── din_model.py
│ ├── dnn_model.py
│ ├── moe_model.py
├── moe_focal_loss_model_train.py
└── utils
├── get_data.py
代码内容
dnn_config
config={
"embedding_dim":32,
"num_embedding":20000,
"lr":0.001,
"batch_size":512,
"num_experts":3,
"feature_col" :["target_brand_id","clk_brand_seq","clt_brand_seq","pay_brand_seq","clk_cate_seq","clt_cate_seq","pay_cate_seq","user_click_feature","user_clt_feature","user_cart_feature","user_pay_feature","brand_stat_feature","user_cate2_cross_feature","user_brand_cross_feature"],
"features_gate_col":["target_brand_id","brand_stat_feature","clk_brand_seq","user_cate2_cross_feature","user_brand_cross_feature"]
}
dnn_dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, IterableDataset
# 基础数据集类,用于处理特征和标签数据
class MyDataset(Dataset):
"""
自定义数据集类,继承自PyTorch的Dataset
用于处理已经预处理好的特征和标签数据
"""
def __init__(self, features, labels, config):
"""
初始化数据集
参数:
features: 特征字典,包含多个特征列
labels: 标签数据
config: 配置字典,包含特征列名等信息
"""
self.config = config
self.features = {}
# 将每个特征列转换为张量格式
for ff in self.config["feature_col"]:
self.features[ff] = [torch.tensor([int(id) for id in seq.split(',')], dtype=torch.long) for seq in features[ff]]
self.labels = torch.tensor(labels, dtype=torch.float32)
def __len__(self):
"""返回数据集的样本数量"""
return len(self.labels)
def __getitem__(self, idx):
"""
获取指定索引的样本
参数:
idx: 样本索引
返回:
包含特征和标签的字典
"""
res_features = {}
for ff in self.config["feature_col"]:
res_features[ff] = self.features[ff][idx]
res_features['labels'] = self.labels[idx]
return res_features
# 优化版数据集类,用于处理大规模数据
class MyPriorDataset(Dataset):
"""
优化的数据集类,用于更高效地处理特征数据
相比MyDataset,减少了内存使用,采用即时转换策略
"""
def __init__(self, features, labels, config):
"""
初始化数据集
参数:
features: 原始特征数据
labels: 标签数据
config: 配置字典
"""
self.config = config
self.features = features
self.labels = labels
def __len__(self):
"""返回数据集的样本数量"""
return len(self.labels)
def __getitem__(self, idx):
"""
获取指定索引的样本,即时进行数据转换
参数:
idx: 样本索引
返回:
包含特征和标签的字典
"""
res_features = {}
for ff in self.config["feature_col"]:
res_features[ff] = torch.tensor([int(id) for id in self.features[ff][idx].split(',')], dtype=torch.long)
res_features['labels'] = torch.tensor(self.labels[idx], dtype=torch.float32)
return res_features
# 可迭代数据集类,用于流式处理大规模数据
class MyIterDataset(IterableDataset):
"""
可迭代数据集类,适用于大规模数据的流式处理
继承自PyTorch的IterableDataset,支持数据流式加载
"""
def __init__(self, df):
"""
初始化可迭代数据集
参数:
df: pandas DataFrame对象,包含特征和标签数据
"""
super().__init__()
self.df = df
def __iter__(self):
"""
返回数据迭代器
生成器函数,逐行产出处理后的数据样本
返回:
字典,包含处理后的特征和标签
"""
for index, row in self.df.iterrows():
yield {
'features': {col: list(map(int, row[col].split(','))) for col in feature_columns if col != 'label'},
'label': int(row['label'])
}
dnn_model
import torch
import torch.nn as nn
import torch.nn.functional as F
# 深度神经网络模型类
class MyModel(nn.Module):
"""
自定义DNN模型
实现了一个包含embedding层和三层全连接层的深度神经网络
用于特征学习和二分类预测
"""
def __init__(self, config):
"""
初始化模型结构
参数:
config: 配置字典,包含以下关键参数:
- num_embedding: embedding字典大小
- embedding_dim: embedding向量维度
- feature_col: 特征列名列表
"""
super(MyModel, self).__init__()
self.config = config
# 创建embedding层,用于将离散特征转换为稠密向量
# padding_idx=0表示将0作为填充值,其embedding向量将始终为0
self.embedding = nn.Embedding(
num_embeddings=self.config["num_embedding"],
embedding_dim=self.config["embedding_dim"],
padding_idx=0
)
# 定义三层全连接网络
# 第一层:将所有特征的embedding连接后映射到512维
self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
# 第二层:将512维特征映射到128维
self.fc2 = nn.Linear(512,128)
# 输出层:将128维特征映射到1维输出
self.fc3 = nn.Linear(128,1)
def forward(self, features):
"""
前向传播函数
参数:
features: 字典,包含各个特征列的输入数据
每个特征的形状为 [batch_size, sequence_length]
返回:
tensor: 模型预测输出,形状为 [batch_size, 1]
"""
# 对每个特征进行embedding操作
embedding_dict = {}
for ff in self.config["feature_col"]:
# 对每个特征序列的embedding结果求和,得到定长表示
embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
# 将所有特征的embedding结果在特征维度上拼接
x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
# 通过三层全连接网络
x = F.relu(self.fc1(x)) # 第一层激活
x = F.relu(self.fc2(x)) # 第二层激活
x = self.fc3(x) # 输出层(不使用激活函数)
return x
get_data
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
import numpy as np
import os
from odps import ODPS
from torch.nn.utils.rnn import pad_sequence
from config.dnn_config import config as dnn_config
import torch
# 从MaxCompute获取训练和测试数据
def get_data(ak_config):
"""
从MaxCompute数据库获取训练数据并进行预处理
参数:
ak_config: 包含MaxCompute访问配置的字典
返回:
train_feature_numpy: 训练集特征
test_feature_numpy: 测试集特征
train_label: 训练集标签
test_label: 测试集标签
"""
# 初始化ODPS连接
o = ODPS(
ak_config["access_id"],
ak_config["access_key"],
ak_config["project"],
ak_config["endpoint"],
)
# 执行SQL查询获取数据
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle
;
'''
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=10) # 使用多线程加速数据读取
print('read data finish')
# 数据预处理
df = df.drop(columns=['key_all']) # 删除非特征列
X = df.drop(columns='label') # 分离特征
y = df['label'] # 分离标签
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 构建特征字典
feature_col = dnn_config["feature_col"]
train_feature_numpy = {}
test_feature_numpy = {}
for feature in feature_col:
train_feature_numpy[feature] = X_train[feature].values
test_feature_numpy[feature] = X_test[feature].values
train_label = y_train.values
test_label = y_test.values
return train_feature_numpy,test_feature_numpy,train_label,test_label
# 计算Top-K正例比例
def calculate_top_k_ratio(predictions, labels, top_k_list):
"""
计算不同K值下的正例召回率
参数:
predictions: 模型预测分数
labels: 真实标签
top_k_list: 需要计算的K值列表
返回:
ratios: 不同K值对应的正例召回率字典
"""
results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
results_df = results_df.sort_values(by='prediction', ascending=False)
total_positives = (results_df['label'] == 1).sum()
ratios = {}
for k in top_k_list:
top_k_df = results_df.head(k)
top_k_positives = (top_k_df['label'] == 1).sum()
ratio = top_k_positives / total_positives
ratios[k] = ratio
return ratios
# 获取特定品牌的测试数据
def get_data_test(ak_config, brand_id):
"""
从MaxCompute获取特定品牌的测试数据
参数:
ak_config: MaxCompute访问配置
brand_id: 品牌ID
返回:
test_feature_numpy: 测试特征
test_label: 测试标签
"""
o = ODPS(ak_config["access_id"], ak_config["access_key"],
ak_config["project"], ak_config["endpoint"])
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval_dnn_seq
where keys_all = '{brand_id}'
;
'''.format(brand_id=brand_id)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=10)
print('read data finish')
df = df.drop(columns=['keys_all'])
X = df.drop(columns='label')
y = df['label']
feature_col = dnn_config["feature_col"]
test_feature_numpy = {}
for feature in feature_col:
test_feature_numpy[feature] = X[feature].values
test_label = y.values
return test_feature_numpy,test_label
# 获取MOE模型测试数据
def get_data_test_moe(ak_config):
"""
获取MOE模型的测试数据(限制3000条)
参数:
ak_config: MaxCompute访问配置
返回:
test_feature_numpy: 测试特征
test_label: 测试标签
"""
o = ODPS(
ak_config["access_id"],
ak_config["access_key"],
ak_config["project"],
ak_config["endpoint"],
)
# 读取数据。
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle limit 3000
;
'''
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=10) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 删除非特征列
df = df.drop(columns=['key_all'])
# 分离特征和标签
X = df.drop(columns='label')
y = df['label']
# 划分训练集和测试集
feature_col = dnn_config["feature_col"]
test_feature_numpy = {}
for feature in feature_col:
test_feature_numpy[feature] = X[feature].values
test_label = y.values
return test_feature_numpy,test_label
# 基础数据整理函数
def my_collate_fn(batch):
"""
数据批次整理函数,用于DataLoader
处理变长序列,不包含mask
参数:
batch: 数据批次
返回:
res_feature: 填充后的特征字典
labels: 标签张量
"""
res_features_tmp = {}
labels = []
for ff in dnn_config["feature_col"]:
res_features_tmp[ff] = []
# 收集批次数据
for sample in batch:
for ff in dnn_config["feature_col"]:
res_features_tmp[ff].append(sample[ff])
labels.append(sample["labels"])
# 对序列进行填充
res_feature = {}
for ff in dnn_config["feature_col"]:
res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)
return res_feature, torch.tensor(labels)
# 带序列mask的数据整理函数
def seq_collate_fn(batch):
"""
带序列mask的数据批次整理函数
用于需要attention mask的模型(如DIN)
参数:
batch: 数据批次
返回:
res_feature: 填充后的特征字典
res_mask: 特征的mask字典
labels: 标签张量
"""
res_features_tmp = {}
labels = []
# 收集数据
for ff in dnn_config["feature_col"]:
res_features_tmp[ff] = []
for sample in batch:
for ff in dnn_config["feature_col"]:
res_features_tmp[ff].append(sample[ff])
labels.append(sample["labels"])
# 生成特征和对应的mask
res_feature = {}
res_mask = {}
for ff in dnn_config["feature_col"]:
res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)
res_mask[ff] = (res_feature[ff] != 0).type(torch.float32) # 生成mask:非0位置为1,0位置为0
return res_feature, res_mask, torch.tensor(labels)
dnn_model_train
from sklearn.metrics import roc_auc_score
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
from dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data
from utils.get_data import my_collate_fn
# 获取训练和测试数据
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
# 创建训练和测试数据集
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
print('dataset finish')
# 创建数据加载器
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
print('dataloader finish')
# 初始化模型、损失函数和优化器
model = MyModel(dnn_config)
criterion = nn.BCEWithLogitsLoss() # 二元交叉熵损失
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
writer = SummaryWriter() # 用于TensorBoard可视化
# 创建模型保存目录
os.makedirs('models/dnn2', exist_ok=True)
def train_model(train_loader, test_loader, model, criterion, optimizer, num_epochs=2):
"""
模型训练主函数
参数:
train_loader: 训练数据加载器
test_loader: 测试数据加载器
model: 神经网络模型
criterion: 损失函数
optimizer: 优化器
num_epochs: 训练轮数
"""
total_step = 0
for epoch in range(num_epochs):
model.train() # 设置为训练模式
for features,labels in train_loader:
# 前向传播
labels = labels
optimizer.zero_grad() # 清除梯度
outputs = model(features)
labels = torch.unsqueeze(labels,dim=1)
loss = criterion(outputs, labels)
# 反向传播和优化
loss.backward()
optimizer.step()
total_step += 1
# 记录训练损失
if (total_step+1)%10 == 0:
writer.add_scalar('Training Loss', loss.item(), total_step)
# 打印训练进度
if (total_step+1)%100 == 0:
print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
# 定期评估模型性能
if (total_step+1)%2000 == 0:
with torch.no_grad():
model.eval() # 设置为评估模式
test_preds = []
test_targets = []
# 在测试集上进行预测
for data, target in test_loader:
output = model(data)
test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
# 计算AUC分数
test_auc = roc_auc_score(test_targets, test_preds)
writer.add_scalar('AUC/train', test_auc, total_step)
# 保存模型检查点
torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}_{total_step}.pth')
model.train()
# 每个epoch结束后保存模型
torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}.pth')
# 训练结束后的最终评估
with torch.no_grad():
model.eval()
test_preds = []
test_targets = []
for data, target in test_loader:
output = model(data)
test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
test_auc = roc_auc_score(test_targets, test_preds)
writer.add_scalar('AUC/train', test_auc, total_step)
# 开始训练
train_model(dataloader_train, dataloader_test, model, criterion, optimizer)
writer.close() # 关闭TensorBoard写入器
dnn_model_test
from torch.utils.data import DataLoader
import torch
from dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data_test
from utils.get_data import calculate_top_k_ratio
from utils.get_data import my_collate_fn
# 定义要测试的品牌列表
brands = ['b47686','b56508','b62063','b78739']
for brand_id in brands:
# 设置模型路径和评估参数
model_path = './models/focal2/model_epoch_1_27999.pth'
# 定义要评估的top-k值列表
top_k_list = [1000, 3000, 5000, 10000, 50000]
# 获取测试数据
test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
# 创建测试数据加载器
dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"],
shuffle=False, collate_fn=my_collate_fn)
# 加载模型
model = MyModel(dnn_config).to('cpu')
model.load_state_dict(torch.load(model_path))
model.to('cpu')
model.eval() # 设置为评估模式
# 进行预测
test_preds = []
test_targets = []
for data, target in dataloader_test:
output = model(data)
test_preds.extend(output.sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
# 计算各个top-k的正例比例
ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
# 打印结果
for k, ratio in ratios.items():
print(f"Top {k} ratio of positive labels: {ratio:.4f}")
# 将结果保存到文件
with open(f'models_{brand_id}_top_results_focal.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
训练过程
可以看到 loss 在震荡,调高 smooth 以后可以看出趋势是在下降的。
focal loss
在真实的样本中,有很多样本是非常容易区分的,比如对品牌有加购的用户,很容易预测为0.9+,对于平台没有任何行为的用户,模型很容易预测为0.1。这些样本都是“容易”样本,对于很多样本预测为0.4-0.6的或者,正样本预测为0.1,负样本预测为0.9,这些样本都是“困难”样本。
假设batch为5,loss如下 [0.1, 0.01, 0.5, 0.1, 0.14],其中0.5是困难样本,其他是容易样本,但是困难样本只在整体的loss中贡献58%,模型还有42%的注意力放到了容易样本上,但是容易样本已经学的很好不需要再学习。因此需要让模型专注学习困难样本,提升困难样本的效果。
本方案采用focal loss优化模型loss函数,让模型自动加大对困难样本的专注度,提升模型效果。
Focal loss 出自视觉领域,Focal Loss for Dense Object Detection,主要解决样本不均衡以及困难样本学习问题。
交叉熵损失函数为,
L ( W ) = − y log ( h W ( X ) ) − ( 1 − y ) log ( 1 − h W ( X ) ) L(W) = -y \log(h_W(X)) - (1 - y) \log(1 - h_W(X)) L(W)=−ylog(hW(X))−(1−y)log(1−hW(X))
其中 h W ( X ) h_W(X) hW(X) 表示预测结果为1的概率,则,
L ( W ) = { − log ( h W ( X ) ) , y = 1 − log ( 1 − h W ( X ) ) , y = 0 L(W) = \begin{cases} -\log(h_W(X)), & y = 1 \\ -\log(1 - h_W(X)), & y = 0 \end{cases} L(W)={−log(hW(X)),−log(1−hW(X)),y=1y=0
定义,focal loss为,
L ( W ) = { − ( 1 − h W ( X ) ) γ log ( h W ( X ) ) , y = 1 − ( h W ( X ) ) γ log ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={−(1−hW(X))γlog(hW(X)),−(hW(X))γlog(1−hW(X)),y=1y=0
其中 γ \gamma γ 一般取2。
对于背景中的例子,预测loss为 [0.1, 0.01, 0.5, 0.1, 0.14],假设只看加权项,则计算后加权项为 [0.01, 0.0001, 0.25, 0.01, 0.0196]。
- 0.5 转换为 0.25,降低了50%,而 0.1 转换为 0.01 降低了 90%,“容易”样本降低的比例比“困难”样本大。
- 困难样本 0.5 转换前占整体误差比例为 58%,转换后占整体样本的比例为 86%。转换后,由于“容易”样本降低幅度大,变相提升了“困难”样本的占比。
由于在实际的案例中,负样本中“容易”样本太多,正样本里“困难”太多,通过该转换后,正样本里“困难”样本提升幅度过大,因此还需要加入一个平衡因子:
L ( W ) = { − α ( 1 − h W ( X ) ) γ log ( h W ( X ) ) , y = 1 − ( 1 − α ) ( h W ( X ) ) γ log ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -\alpha(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(1 - \alpha)(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={−α(1−hW(X))γlog(hW(X)),−(1−α)(hW(X))γlog(1−hW(X)),y=1y=0
其中 α \alpha α 一般取 0.25。
通过 focal loss,模型可以通过模型训练的实际误差对样本动态加权,让模型更关注“困难”样本。
focal_loss
import torch
import torch.nn as nn
from torch.nn.functional import binary_cross_entropy_with_logits
# Focal Loss实现类,用于解决类别不平衡问题
class FocalLoss(nn.Module):
"""
Focal Loss损失函数实现
用于解决分类任务中的类别不平衡问题
通过降低易分样本的权重,提高难分样本的权重,使模型更关注难分样本
"""
def __init__(self, alpha=0.25, gamma=2.0):
"""
初始化Focal Loss
参数:
alpha: float, 类别权重因子
用于平衡正负样本的重要性
当正样本较少时,可以增大alpha值
gamma: float, 调制因子
用于调节易分样本的权重
gamma越大,对易分样本的惩罚越大
"""
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
def forward(self, logits, targets):
"""
计算Focal Loss值
参数:
logits: tensor, 模型输出的原始预测值(未经过sigmoid)
targets: tensor, 真实标签值
返回:
tensor, 计算得到的focal loss均值
"""
# 计算sigmoid后的预测概率
probs = torch.sigmoid(logits)
# 计算二元交叉熵损失(不进行reduction)
ce_loss = binary_cross_entropy_with_logits(logits, targets, reduction='none')
# 计算调制因子
# pt表示预测正确的概率:
# 当target=1时,pt = prob
# 当target=0时,pt = 1 - prob
pt = torch.where(targets == 1, probs, 1 - probs)
# 计算focal weight:(1-pt)^gamma
# 预测越准确,权重越小
focal_weight = (1 - pt) ** self.gamma
# 应用alpha权重
# 当target=1时,权重为alpha
# 当target=0时,权重为1-alpha
alpha_t = torch.where(targets == 1, self.alpha, 1 - self.alpha)
# 计算最终的focal loss
focal_loss_value = alpha_t * focal_weight * ce_loss
# 返回batch的平均loss
return focal_loss_value.mean()
评估结果
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY | 平均 | |
---|---|---|---|---|---|
gbdt 品牌样本 | 0.114842 0.209537 0.265950 0.349899 0.533915 | 0.068056 0.104861 0.129861 0.164583 0.284722 | 0.102041 0.190476 0.217687 0.258503 0.353741 | 0.257225 0.361272 0.427746 0.488439 0.624277 | 0.2569 |
gbdt 全样本 | 0.141706 0.230356 0.288113 0.366017 0.537273 | 0.073611 0.103472 0.125000 0.177778 0.289583 | 0.149660 0.217687 0.231293 0.278912 0.394558 | 0.300578 0.416185 0.462428 0.528902 0.627168 | 0.2970 |
dnn bce loss | 0.1209 0.2317 0.2821 0.3627 0.5346 | 0.0521 0.1007 0.1222 0.1667 0.2938 | 0.1293 0.1837 0.2109 0.2517 0.3741 | 0.2746 0.4017 0.4653 0.5145 0.6561 | 0.2865 |
dnn focal loss | 0.1330 0.2250 0.2807 0.3627 0.5319 | 0.0639 0.1076 0.1306 0.1736 0.3021 | 0.1361 0.1973 0.2041 0.2721 0.3810 | 0.2832 0.3960 0.4624 0.5145 0.6676 | 0.2913 |
可以看到focal loss相比交叉熵效果稍微好了一点,而整体来看dnn在top5w方面优于gbdt。
DIN 训练
模型架构
在之前的DNN模型中,输入的用户行为序列只是使用sum pool将用户的行为序列直接叠加在一起,这不利于学习行为序列之间的关系,为提升模型学习品牌之间的关系,本方案引入deep interest network (DIN)算法使用target attention的模式,让模型通过attention自动学习品牌之间的关联关系,从而提升模型的效果。
代码内容
din_model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
# DIN模型基础实现
# 主要处理用户的购买行为序列
class LocalActivationUnit(nn.Module):
"""
局部激活单元
用于计算用户行为序列与目标商品之间的注意力权重
仅处理购买行为序列
"""
def __init__(self, hidden_units):
"""
初始化局部激活单元
Args:
hidden_units: 隐藏层单元数,用于特征交互和注意力计算
"""
super(LocalActivationUnit, self).__init__()
# 第一个全连接层:输入维度是hidden_units的4倍(包含了多种特征交互)
self.fc1 = nn.Linear(hidden_units * 4, hidden_units)
# 第二个全连接层:将hidden_units维压缩到1维,用于计算注意力分数
self.fc2 = nn.Linear(hidden_units, 1)
def forward(self, user_behaviors, target_item, mask):
"""
计算注意力权重
参数:
user_behaviors: 用户购买行为序列
target_item: 目标商品
mask: 序列填充掩码
返回:
user_interests: 加权后的用户兴趣表示
"""
# 获取序列长度
seq_len = user_behaviors.size(1)
# 将目标商品扩展到与行为序列相同的维度,便于后续计算
target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
# 构建特征交互:拼接多种特征交互形式
# 包括:原始特征、目标商品特征、差异特征、乘积特征
interactions = torch.cat([
user_behaviors, # 原始用户行为
target_item, # 目标商品
user_behaviors-target_item, # 行为与目标的差异
user_behaviors*target_item # 行为与目标的逐元素乘积
], dim=-1)
# 通过两层全连接网络计算注意力分数
x = torch.relu(self.fc1(interactions)) # 第一层激活
attention_logits = self.fc2(x).squeeze(-1) # 第二层得到注意力分数
# 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷
attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
# 使用softmax将注意力分数转换为权重
attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
# 计算加权后的用户兴趣表示
user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
return user_interests
class DinModel(nn.Module):
"""
基础DIN模型实现
只考虑用户的购买行为序列
"""
def __init__(self, config):
"""
初始化DIN模型
Args:
config: 配置字典,包含模型参数设置
"""
super(DinModel, self).__init__()
self.config = config
# 创建embedding层,用于将离散特征转换为稠密向量
self.embedding = nn.Embedding(
num_embeddings=self.config["num_embedding"], # embedding字典大小
embedding_dim=self.config["embedding_dim"], # embedding向量维度
padding_idx=0 # 填充token的索引
)
# 创建多层全连接网络
self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
self.fc2 = nn.Linear(512,128)
self.fc3 = nn.Linear(128,1)
# 创建局部激活单元,用于计算注意力
self.att = LocalActivationUnit(self.config["embedding_dim"])
def forward(self, features, mask):
"""
模型前向传播
特点:
- 只处理购买行为序列(pay_brand_seq)
- 其他特征直接进行embedding sum
"""
# 存储各特征的embedding结果
embedding_dict = {}
# 对每个特征进行embedding处理
for ff in self.config["feature_col"]:
if ff != 'pay_brand_seq': # 非序列特征直接sum
embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
# 使用注意力机制处理行为序列
att_emb = self.att(
self.embedding(features['pay_brand_seq']), # 用户历史行为序列
embedding_dict['target_brand_id'], # 目标商品
mask['pay_brand_seq'] # 序列掩码
)
# 拼接所有特征(除了行为序列)
x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"] if ff != 'pay_brand_seq'], dim=1)
# 拼接注意力处理后的序列特征
x = torch.cat([x,att_emb], dim=1)
# 通过多层全连接网络
x = F.relu(self.fc1(x)) # 第一层
x = F.relu(self.fc2(x)) # 第二层
x = self.fc3(x) # 输出层
return x
din_model_train
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
from dataset.dnn_dataset import MyPriorDataset
from model.din_model import DinModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data, calculate_top_k_ratio, get_data_test
from utils.get_data import seq_collate_fn
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('train dataset finish')
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('train dataloader finish')
brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands:
test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DinModel(dnn_config).to(device)
criterion = nn.BCEWithLogitsLoss()
#criterion = FocalLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir = './runs_din4'
writer = SummaryWriter(log_dir)
os.makedirs('models/din4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)
def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):
"""
DIN模型训练主函数
参数:
train_loader: 训练数据加载器
test_loader_dict: 测试数据加载器字典(按品牌分类)
model: DIN模型
criterion: 损失函数
optimizer: 优化器
num_epochs: 训练轮数
"""
total_step = 0
for epoch in range(num_epochs):
model.train() # 设置为训练模式
for features,mask,labels in train_loader:
# 将数据移到指定设备
for ff in dnn_config["feature_col"]:
features[ff] = features[ff].to(device)
mask[ff] = mask[ff].to(device)
labels = labels.to(device)
# 前向传播和优化
optimizer.zero_grad()
outputs = model(features,mask)
labels = torch.unsqueeze(labels,dim=1)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_step += 1
# 记录训练损失
if (total_step+1)%10 == 0:
writer.add_scalar('Training Loss', loss.item(), total_step)
# 打印训练进度
if (total_step+1)%100 == 0:
print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
# 定期评估模型性能
if (total_step+1)%7000 == 0:
with torch.no_grad():
model.eval()
# 对每个品牌进行评估
for brand_id in brands:
top_k_list = [1000, 3000, 5000, 10000, 50000]
test_preds = []
test_targets = []
# 在测试集上进行预测
for data,mask, target in test_loader_dict[brand_id]:
output = model(data,mask)
test_preds.extend(output.sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
# 计算并保存评估结果
ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
for k, ratio in ratios.items():
print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
# 保存模型检查点
torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}_{total_step}.pth')
model.train()
# 每个epoch结束后保存模型
torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}.pth')
# 训练结束后的最终评估
with torch.no_grad():
model.eval()
for brand_id in brands:
top_k_list = [1000, 3000, 5000, 10000, 50000]
test_preds = []
test_targets = []
for data,mask, target in test_loader_dict[brand_id]:
output = model(data,mask)
test_preds.extend(output.sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
for k, ratio in ratios.items():
print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
# 开始训练
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close() # 关闭TensorBoard写入器
din_model2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
# DIN模型增强版实现
# 同时处理用户的点击和购买行为序列
class LocalActivationUnit(nn.Module):
"""
局部激活单元
用于计算用户行为序列与目标商品之间的注意力权重
可同时处理点击和购买行为序列
"""
def __init__(self, hidden_units):
"""
初始化局部激活单元
Args:
hidden_units: 隐藏层单元数,用于特征交互和注意力计算
"""
super(LocalActivationUnit, self).__init__()
# 第一个全连接层:将4倍hidden_units(包含四种特征交互)映射到hidden_units
self.fc1 = nn.Linear(hidden_units * 4, hidden_units)
# 第二个全连接层:将hidden_units映射到1维注意力分数
self.fc2 = nn.Linear(hidden_units, 1)
def forward(self, user_behaviors, target_item, mask):
"""
前向传播函数
Args:
user_behaviors: 用户历史行为序列 shape: (batch_size, seq_len, hidden_units)
target_item: 目标商品 shape: (batch_size, hidden_units)
mask: 序列填充掩码 shape: (batch_size, seq_len)
"""
# 获取序列长度
seq_len = user_behaviors.size(1)
# 扩展目标商品维度以匹配序列长度
target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
# 构建特征交互向量,包含四种交互方式:
# 1. 原始用户行为
# 2. 目标商品
# 3. 用户行为与目标商品的差异
# 4. 用户行为与目标商品的元素积
interactions = torch.cat([user_behaviors, target_item, user_behaviors-target_item, user_behaviors*target_item], dim=-1)
# 通过两层全连接网络计算注意力分数
x = torch.relu(self.fc1(interactions)) # 第一层带ReLU激活
attention_logits = self.fc2(x).squeeze(-1) # 第二层得到注意力分数
# 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷
attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
# 使用softmax将注意力分数归一化为权重
attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
# 计算加权后的用户兴趣表示
user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
return user_interests
class DinModel(nn.Module):
"""
增强版DIN模型实现
主要改进:
1. 增加了对用户点击行为序列(clk_brand_seq)的处理
2. 分别对点击序列和购买序列使用注意力机制
3. 融合两种行为序列的用户兴趣表示
"""
def __init__(self, config):
"""
初始化DIN模型
Args:
config: 配置字典,包含模型参数设置
"""
super(DinModel, self).__init__()
self.config = config
# 创建embedding层,用于特征的稠密表示
self.embedding = nn.Embedding(
num_embeddings=self.config["num_embedding"], # embedding字典大小
embedding_dim=self.config["embedding_dim"], # embedding向量维度
padding_idx=0 # 填充token的索引
)
# 构建三层全连接网络
self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
self.fc2 = nn.Linear(512,128)
self.fc3 = nn.Linear(128,1)
# 创建局部激活单元,用于计算注意力
self.att = LocalActivationUnit(self.config["embedding_dim"])
def forward(self, features, mask):
"""
模型前向传播
改进之处:
1. 分别处理点击序列(clk_brand_seq)和购买序列(pay_brand_seq)
2. 对两个序列分别计算注意力权重
3. 将两个序列的注意力结果与其他特征一起拼接
参数:
features: 输入特征字典,包含点击序列和购买序列
mask: 两个序列的填充掩码
"""
embedding_dict = {}
# 处理非序列特征
for ff in self.config["feature_col"]:
if ff != 'clk_brand_seq' and ff != 'pay_brand_seq':
embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
# 新增:处理点击序列
embedding_dict['clk_brand_seq'] = self.att(
self.embedding(features['clk_brand_seq']),
embedding_dict['target_brand_id'],
mask['clk_brand_seq']
)
# 处理购买序列
embedding_dict['pay_brand_seq'] = self.att(
self.embedding(features['pay_brand_seq']),
embedding_dict['target_brand_id'],
mask['pay_brand_seq']
)
# 拼接所有特征
x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
# 通过三层全连接网络
x = F.relu(self.fc1(x)) # 第一层,使用ReLU激活
x = F.relu(self.fc2(x)) # 第二层,使用ReLU激活
x = self.fc3(x) # 输出层,无激活函数
return x
moe 训练
代码内容
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
# 局部激活单元:用于计算用户行为序列的注意力权重
class LocalActivationUnit(nn.Module):
"""
局部激活单元
与DIN模型中的注意力机制相同,用于计算用户行为序列与目标商品的相关性
"""
def __init__(self, hidden_units):
"""
初始化局部激活单元
参数:
hidden_units: 隐藏层维度
"""
super(LocalActivationUnit, self).__init__()
self.fc1 = nn.Linear(hidden_units * 4, hidden_units) # 第一层全连接
self.fc2 = nn.Linear(hidden_units, 1) # 第二层全连接
def forward(self, user_behaviors, target_item, mask):
"""
计算注意力权重
参数:
user_behaviors: 用户行为序列 [batch_size, seq_len, hidden_units]
target_item: 目标商品 [batch_size, hidden_units]
mask: 序列掩码 [batch_size, seq_len]
"""
seq_len = user_behaviors.size(1)
target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
# 特征交互
interactions = torch.cat([user_behaviors, target_item,
user_behaviors-target_item,
user_behaviors*target_item], dim=-1)
# 计算注意力分数
x = torch.relu(self.fc1(interactions))
attention_logits = self.fc2(x).squeeze(-1)
# 掩码处理
attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
# 加权求和得到用户兴趣表示
user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
return user_interests
# 专家网络:每个专家独立学习特定的特征模式
class Expert(nn.Module):
"""
专家网络
实现了一个三层全连接网络,作为单个专家的计算单元
"""
def __init__(self, input_size):
"""
初始化专家网络
参数:
input_size: 输入特征维度
"""
super(Expert, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_size, 512), # 第一层
nn.ReLU(),
nn.Linear(512, 128), # 第二层
nn.ReLU(),
nn.Linear(128, 1) # 输出层
)
def forward(self, x):
"""专家网络的前向计算"""
return self.network(x)
# 门控网络:学习如何分配任务给不同的专家
class Gate(nn.Module):
"""
门控网络
负责为每个样本动态分配专家的权重
"""
def __init__(self, input_size, num_experts):
"""
初始化门控网络
参数:
input_size: 输入特征维度
num_experts: 专家数量
"""
super(Gate, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_size, 64), # 第一层
nn.ReLU(),
nn.Linear(64, 32), # 第二层
nn.ReLU(),
nn.Linear(32, num_experts) # 输出层,维度等于专家数量
)
def forward(self, x):
"""计算每个专家的权重"""
return self.network(x)
# 混合专家模型:组合多个专家的预测结果
class MoeModel(nn.Module):
"""
混合专家模型(Mixture of Experts)
结合了DIN的注意力机制和MoE的专家混合机制
"""
def __init__(self, config):
"""
初始化MoE模型
参数:
config: 配置字典,包含模型参数
"""
super(MoeModel, self).__init__()
self.config = config
# embedding层
self.embedding = nn.Embedding(
num_embeddings=self.config["num_embedding"],
embedding_dim=self.config["embedding_dim"],
padding_idx=0
)
# 注意力机制
self.att = LocalActivationUnit(self.config["embedding_dim"])
# 创建多个专家
self.experts = nn.ModuleList([
Expert(self.config["embedding_dim"]*len(self.config["feature_col"]))
for _ in range(self.config["num_experts"])
])
# 门控网络
self.gate = Gate(
self.config["embedding_dim"]*len(self.config["features_gate_col"]),
self.config["num_experts"]
)
def forward(self, features, mask):
"""
模型前向传播
参数:
features: 输入特征字典
mask: 序列特征的掩码
返回:
x: 最终预测结果
gating_weights: 专家权重分布
"""
# 特征embedding
embedding_dict = {}
for ff in self.config["feature_col"]:
if ff != 'pay_brand_seq':
embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
# 处理序列特征
embedding_dict['pay_brand_seq'] = self.att(
self.embedding(features['pay_brand_seq']),
embedding_dict['target_brand_id'],
mask['pay_brand_seq']
)
# 特征拼接
x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
# 计算门控特征
gate_emb = torch.cat([embedding_dict[ff] for ff in self.config["features_gate_col"]], dim=1)
# 计算专家权重
gating_weights = F.softmax(self.gate(gate_emb), dim=1)
# 获取所有专家的输出并加权组合
expert_outputs = torch.stack([expert(x) for expert in self.experts], dim=-1)
x = torch.sum(gating_weights * expert_outputs.squeeze(), dim=-1)
return x, gating_weights
# 导入必要的库
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import os
from odps import ODPS
from odps.df import DataFrame
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
# 导入自定义模块
from dataset.dnn_dataset import MyDataset
from dataset.dnn_dataset import MyPriorDataset
from model.moe_model import MoeModel # 使用混合专家模型(Mixture of Experts)
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data,get_data_test,calculate_top_k_ratio
from utils.get_data import my_collate_fn, seq_collate_fn # 序列数据的特殊整理函数
from loss.focal_loss import FocalLoss
# 获取训练数据并创建数据集
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('dataset finish')
# 创建训练数据加载器,使用序列整理函数
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('dataloader finish')
# 为不同品牌创建测试数据集和数据加载器
brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands:
test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')
# 设置设备(GPU/CPU)并初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MoeModel(dnn_config).to(device) # 初始化MoE模型
criterion = nn.BCEWithLogitsLoss() # 二元交叉熵损失函数
#criterion = FocalLoss() # Focal Loss损失函数(已注释)
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir='./runs_moe4' # TensorBoard日志目录
writer = SummaryWriter(log_dir=log_dir)
# 创建保存模型和日志的目录
os.makedirs('models/moe4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)
def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):
"""
模型训练函数
Args:
train_loader: 训练数据加载器
test_loader_dict: 测试数据加载器字典(按品牌分类)
model: MoE神经网络模型
criterion: 损失函数
optimizer: 优化器
num_epochs: 训练轮数
"""
total_step = 0
for epoch in range(num_epochs):
model.train() # 设置为训练模式
for features,mask,labels in train_loader:
# 将特征和掩码数据移到指定设备(GPU/CPU)
for ff in dnn_config["feature_col"]:
features[ff] = features[ff].to(device)
mask[ff] = mask[ff].to(device)
labels = labels.to(device)
# 训练步骤
optimizer.zero_grad() # 清空梯度
outputs, _ = model(features,mask) # 前向传播,包含特征和掩码
outputs = torch.unsqueeze(outputs,dim=1) # 调整输出维度
labels = torch.unsqueeze(labels,dim=1) # 调整标签维度
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
total_step += 1
# 记录训练损失到TensorBoard
if (total_step+1)%10 == 0:
writer.add_scalar('Training Loss', loss.item(), total_step)
# 打印训练进度
if (total_step+1)%100 == 0:
print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
# 每7000步进行一次评估
if (total_step+1)%7000 == 0:
with torch.no_grad():
model.eval() # 设置为评估模式
for brand_id in brands:
# 设置需要评估的top-k值
top_k_list = [1000, 3000, 5000, 10000, 50000]
test_preds = []
test_targets = []
# 收集预测结果和真实标签
for data,mask, target in test_loader_dict[brand_id]:
output, _ = model(data,mask)
test_preds.extend(output.sigmoid().squeeze().tolist())
test_targets.extend(target.squeeze().tolist())
# 计算并输出top-k的正例比例
ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
for k, ratio in ratios.items():
print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
# 保存评估结果到文件
with open(f'{log_dir}/models_{brand_id}_top_results_moe_{total_step}.txt', 'w') as f:
for k, ratio in ratios.items():
f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
# 保存模型检查点
torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}_{total_step}.pth')
model.train() # 切回训练模式
# 每个epoch结束后保存模型
torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}.pth')
# 开始训练并关闭TensorBoard写入器
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close()
评估结果
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY | 平均 | |
---|---|---|---|---|---|
gbdt 品牌样本 | 0.349899 0.533915 | 0.164583 0.284722 | 0.258503 0.353741 | 0.488439 0.624277 | 0.3823 |
gbdt 全样本 | 0.366017 0.537273 | 0.177778 0.289583 | 0.278912 0.394558 | 0.528902 0.627168 | 0.4002 |
dnn bce loss | 0.3627 0.5346 | 0.1667 0.2938 | 0.2517 0.3741 | 0.5145 0.6561 | 0.3943 |
dnn focal loss | 0.3627 0.5319 | 0.1736 0.3021 | 0.2721 0.3810 | 0.5145 0.6676 | 0.4007 |
din | 0.3620 0.5332 | 0.1757 0.3028 | 0.2585 0.4014 | 0.5087 0.6532 | 0.3994 |
moe focal loss | 0.3553 0.5359 | 0.1722 0.2993 | 0.2789 0.3673 | 0.5231 0.6618 | 0.3992 |