天猫推荐数据集实践

参考自 https://github.com/xufengtt/recom_teach_code,学习记录。

环境配置(maxcompute+dataworks)

  • 下载天猫推荐数据集;
  • 开启 aliyun 的 maxcompute,dataworks,pai;
  • 使用 odpscmd 上传本地数据,具体配置方法是在 conf 文件夹配置 odps_config.ini 文件,填写 project_name(recom_maxcompute)、access_id、access_key、end_point 参数;进入 bin 目录运行 odpscmd

建立数据特征

上传数据——数据运营层

  • 在 dataworks 创建表
CREATE TABLE IF NOT EXISTS item_dim (
    item_id STRING,
    title STRING,
    pict_url STRING,
    category STRING,
    brand_id STRING,
    seller_id STRING
) LIFECYCLE 90;
  • 上传数据 odpscmd -e "tunnel upload tianchi_2014001_rec_tmall_product.txt item_dim -fd '\u0001';"
  • 查看上传结果 SELECT * FROM item_dim LIMIT 100;

在这里插入图片描述

  • 同样方式上传其他数据
CREATE TABLE IF NOT EXISTS user_item_beh_log (
    item_id STRING ,
    user_id STRING ,
    action STRING ,
    vtime STRING
) LIFECYCLE 90;

odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_parta.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partb.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partc.txt user_item_beh_log -fd '\u0001';"

数据仓库层

  • 首先查看一些指标,可以看到这个数据集是离工业界数据量较近的数据
SELECT max(vtime), min(vtime), COUNT(DISTINCT item_id), COUNT(DISTINCT user_id), COUNT(*)
FROM user_item_beh_log;

在这里插入图片描述
数据仓库中分层模型主要有:

  1. 三层模型:数据运营层(ODS,Operational Data Store);数据仓库层(DW,Data Warehouse Layer);数据服务层(ADS,Application Data Service)
  2. 四层模型:添加了多维明细层(DWS,Detailed Warehouse Store)

本项目的数据仓储设计:

  • 数据运营层:使用上传数据表
  • 数据仓库层:需要建设的粒度,1)时间粒度:天维度存储;2)行为类别:点击/收藏/加购/购买不同行为分开存储;3)key 粒度,用户侧特征 user+brand, user+item, user+cate1_id, user+cate2_id, user;品牌测特征 brand+user,brand+item;用户与品牌交互特征 user+brand,user+brand+item,user+brand+cate1_id, user+brand+cate2_id。因此,按照行为类型和时间进行分拆,点击/收藏/加购/购买四张表,且每张表都以天粒度行为时间为分区。为能支持以上所有特征计算,每个表的字段有 user, brand, item, cate1_id, cate2_id
  • 数据服务层:品牌统计特征,用户维度统计特征,用户与品牌交叉统计特征
  • 维表(属性表):商品品牌/类目/名称
CREATE TABLE IF NOT EXISTS dw_user_item_click_log (
    user_id STRING COMMENT '用户id',
    item_id STRING COMMENT '商品id',
    brand_id STRING COMMENT '品牌id',
    seller_id STRING COMMENT '商家id',
    cate1 STRING COMMENT '类目1id',
    cate2 STRING COMMENT '类目2id',
    op_time STRING COMMENT '点击时间'
) PARTITIONED BY (ds STRING COMMENT '日期分区') LIFECYCLE 90;

ds(date string)是创建分区表时的分区字段,不是表的一个普通字段,所以不需要显式声明,以上面这种方式建立四张行为类别表,天维度分区,以下面这种方式添加数据

INSERT OVERWRITE TABLE dw_user_item_click_log PARTITION (ds)
SELECT user_id, t2.item_id, brand_id, seller_id, cate1, cate2, vtime, ds
FROM (
    SELECT user_id, item_id, vtime, to_char(TO_DATE(vtime, 'yyyy-mm-dd hh:mi:ss'), 'yyyymmdd') as ds
    FROM user_item_beh_log 
    WHERE action = 'click'
) t1 join (
    SELECT item_id, brand_id, seller_id, SPLIT_PART(category, '-', 1) AS cate1, SPLIT_PART(category, '-', 2) AS cate2
    FROM item_dim
) t2 on t1.item_id = t2.item_id;

验证是否成功,SELECT * FROM dw_user_item_click_log WHERE ds = ${bizdate} LIMIT 10;
在这里插入图片描述

数据服务层

  • 品牌维度特征:点击/收藏/购物车/支付,创建和填充一个用于存储品牌维度特征的表,是基于用户在一定时间范围内与品牌互动的行为数据,下面代码是计算了过去60天内(包括 ${bizdate} 当天)的点击、收藏、购物车和支付的用户数,使用 DISTINCT 确保每个用户只被计算一次。
CREATE TABLE IF NOT EXISTS brand_stat_feature_ads (
    brand_id STRING,
    click_num BIGINT,
    collect_num BIGINT,
    cart_num BIGINT,
    alipay_num BIGINT  
) PARTITIONED BY (ds STRING) LIFECYCLE 60;

INSERT OVERWRITE TABLE brand_stat_feature_ads PARTITION(ds=${bizdate})
SELECT t1.brand_id, click_num
    ,if(collect_num is null, 0, collect_num)
    ,if(cart_num is null, 0, cart_num)
    ,if(alipay_num is null, 0, alipay_num)
FROM (
    SELECT brand_id, COUNT(DISTINCT user_id) AS click_num
    FROM dw_user_item_click_log
    where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
    GROUP BY brand_id
) t1 left join (
    SELECT brand_id, COUNT(DISTINCT user_id) AS collect_num
    FROM dw_user_item_collect_log
    where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
    GROUP BY brand_id
) t2 on t1.brand_id=t2.brand_id
left join (
    SELECT brand_id, COUNT(DISTINCT user_id) AS cart_num
    FROM dw_user_item_cart_log
    where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
    GROUP BY brand_id
) t3 on t1.brand_id=t3.brand_id
left join (
    SELECT brand_id, COUNT(DISTINCT user_id) AS alipay_num
    FROM dw_user_item_alipay_log
    where ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')
    GROUP BY brand_id
) t4 on t1.brand_id=t4.brand_id;
  • 用户维度特征:创建存储用户点击/收藏/购物车/支付行为的特征,下面代码是使用 COUNT 来计算每个用户在 3days 窗口内点击的不同商品、品牌、卖家、一级分类和二级分类的数量,以及点击的天数,其他时间窗口和行为同理。
CREATE TABLE IF NOT EXISTS user_click_beh_feature_ads (
    user_id STRING,
    item_num_3d BIGINT,
    brand_num_3d BIGINT,
    seller_num_3d BIGINT,
    cate1_num_3d BIGINT,
    cate2_num_3d BIGINT,
    cnt_days_3d BIGINT,
   	... --3d,7d,15d,60d,90d
)PARTITIONED BY (ds string) LIFECYCLE 60;

INSERT OVERWRITE TABLE user_click_beh_feature_ads PARTITION (ds=${bizdate}) -- 用户点击行为特征
SELECT user_id
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), item_id, null))
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), brand_id, null))
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), seller_id, null))
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate1, null))
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate2, null))
    ,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), ds, null))
    ...--3d,7d,15d,60d,90d
FROM dw_user_item_click_log
WHERE ds<=${bizdate} AND ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
GROUP BY user_id;
  • 用户与品牌交叉特征

实际场景中,比如20w个品牌,可能只有几千个头部品牌才有实力使用推荐服务,因此选取头部品牌减少计算量,也能实验出不错的效果,这里建立维表brand_cate_dim,以近30天为准,按照支付用户数量选取top500的品牌:

create table if not exists brand_top500_alipay_dim (
    brand_id string,
    alipay_num bigint
)LIFECYCLE 60;

insert OVERwrite table brand_top500_alipay_dim
select brand_id, alipay_num
from (
    select brand_id, count(DISTINCT user_id) as alipay_num
    from dw_user_item_alipay_log
    where ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
        and brand_id is not null
    group by brand_id
)t1 order by alipay_num desc limit 500
;

然后计算用户和品牌的点击商品多样性(clk_num)、点击频次(clk_num),点击品牌天数(clk_day),这里的窗口为 90/60/30/15/7/3/1d,这里增加1d这个强特征,因为1d的短期窗口的行为有关键影响;对于收藏,有收藏商品多样性(clt_item),收藏总次数和天数没有重要意义,购物车和购买有商品多样性(cart_item),购物车频次(cart_num),支付多样性(pay_item),支付频次(pay_num)。这里都使用的top500的品牌,减少计算量

create table if not exists user_brand_cross_beh_feature_ads (
    user_id string,
    brand_id string,

    clk_item_90d bigint,
    ...--60/30/15/7/3/1d

    clk_num_90d bigint,
    ...

    clk_day_90d bigint,
    ...

    clt_item_90d bigint,
    ...

    cart_item_90d bigint,
    ...

    cart_num_90d bigint,
    ...

    pay_item_90d bigint,
    ...

    pay_num_90d bigint,
    ...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;

insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,
    clk_item_90d,
    ...

    clk_num_90d,
    ...

    clk_day_90d,
    ...

    if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,
    ...

    if(cart_item_90d is null, 0, cart_item_90d) as cart_item_90d,
    ...

    if(cart_num_90d is null, 0, cart_num_90d) as cart_num_90d,
    ...

    if(pay_item_90d is null, 0, pay_item_90d) as pay_item_90d,
    ...

    if(pay_num_90d is null, 0, pay_num_90d) as pay_num_90d,
    ...

from (
    select user_id, brand_id
        ,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d
        ...

        ,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_num_90d
        ...

        ,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), ds, null)) as clk_day_90d
        ...
    from (
        select t1.user_id, t2.brand_id, t1.item_id, t1.ds
        from (
            select user_id, brand_id, item_id, ds
            from dw_user_item_click_log
            where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
        )t1 join (
            select brand_id
            from brand_top500_alipay_dim where ds=${bizdate}
        )t2 on t1.brand_id=t2.brand_id
    )t1
    group by user_id, brand_id
)t1 left join (
    select user_id, brand_id
         ,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d
         ...
    from (
        select t1.user_id, t2.brand_id, t1.item_id, t1.ds
        from (
            select user_id, brand_id, item_id, ds
            from dw_user_item_collect_log
            where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
        )t1 join (
            select brand_id
            from brand_top500_alipay_dim where ds=${bizdate}
        )t2 on t1.brand_id=t2.brand_id
    )t1
    group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...

此外,用户与品牌交叉特征还有用户与品牌的二级类目的交互数据,先建立品牌二级类目维表(暂时先不考虑一级类目),这里取top500的品牌的每个品牌下用户数最多的top2二级分类,ROW_NUMBER 是窗口函数为结果集中的每一行分配唯一的序号,OVER子句定义窗口函数的分区和排序规则,这里表示数据根据brand_id列的值进行分区,意味着每个品牌ID有自己独立的行号序列。

create table if not exists brand_cate2_dim (
    brand_id string,
    cate2 string
)lifecycle 60;

insert overwrite table brand_cate2_dim
select brand_id, cate2
from (
    select brand_id, cate2, ROW_NUMBER() OVER(PARTITION BY brand_id ORDER BY num desc) AS number
    from (
        select t1.brand_id, t1.cate2, count(distinct user_id) as num
        from (
            select user_id, ds, brand_id, cate2
            from dw_user_item_alipay_log
            where ds<${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
        )t1 join (
            select brand_id
            from brand_top500_alipay_dim where ds=${bizdate}
        )t2 on t1.brand_id=t2.brand_id
        group by t1.cate2, t1.brand_id
    )t1
)t1 where number <=2
;

得到二级分类维表后,计算用户和二级分类交叉数据就简单了,这里使用cate2(二级分类)作为连接条件关联日志和二级分类维表,这样做的目的是将用户日志和品牌的二级分类信息关联起来,尽管连接条件是cate2,但查询的最终目的是获取与特定二级分类相关的用户点击行为,并将其与品牌ID关联,所以查询仍然是选择brand_id,将用户行为与品牌信息关联起来,以便进行更有针对性地分析和决策,因此这里是select t2.brand_id。/*+mapjoin(t2)*/这个提示告诉查询优化器在执行连接操作时,将t2表加载到内存中,在map阶段完成与t1表的连接,前提是t2表足够小,这可以显著提高查询性能。

create table if not exists user_brand_cate2_cross_beh_feature_ads (
    user_id string,
    brand_id string,

    clk_item_90d bigint,
    ...--60/30/15/7/3/1d
    ...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;

insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,
    clk_item_90d,
    ...
    if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,
    ...

from (
    select user_id, brand_id
        ,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d
        ...
    from (
        select /*+mapjoin(t2)*/
        t1.user_id, t2.brand_id, item_id, t1.ds
        from (
            select user_id, brand_id, item_id, ds
            from dw_user_item_click_log
            where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
        )t1 join (
            select brand_id
            from brand_top500_alipay_dim where ds=${bizdate}
        )t2 on t1.brand_id=t2.brand_id
    )t1
    group by user_id, brand_id
)t1 left join (
    select user_id, brand_id
         ,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d
         ...
    from (
        select t1.user_id, t2.brand_id, item_id, t1.ds
        from (
            select user_id, brand_id, item_id, ds
            from dw_user_item_collect_log
            where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
        )t1 join (
            select brand_id
            from brand_top500_alipay_dim where ds=${bizdate}
        )t2 on t1.brand_id=t2.brand_id
    )t1
    group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...

自此,这里的应用层特征开发完成了,利用的是ODPS SQL,其他平台的SQL语句虽然不同,但思路是相同的。

建立样本

购买模型是预测一个用户对该品牌买 or 不买,是一个二分类问题,label 0(负样本)表示不买,1(正样本)表示购买,我们有用户池子,历史一年访问过该平台的用户,样本建设的维度,品牌,以品牌为度选正样本和负样本,正样本,以实验时间那天为起点,未来7天购买的用户为正样本;负样本,未购买的用户为负样本。

比如假设实验时间是20130701这天,则20130701-20130707期间购买该品牌的用户是正样本,用户池子中未购买的为负样本。

用户池子的确定是根据业务要求确定,目标召回率要能够最大化的召回目标用户

正样本数据

create table if not exists user_pay_sample_pos (
    user_id string,
    brand_id string
) partitioned by (ds string) LIFECYCLE 60;

INSERT OVERWRITE TABLE user_pay_sample_pos PARTITION (ds='${bizdate}')
select t1.user_id, t1.brand_id
from (
    select distinct user_id , brand_id
    from dw_user_item_alipay_log 
    where ds>${bizdate} and ds<=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd')
) t1 join (
    select brand_id
    from brand_top500_aliyun_num where ds=${bizdate}
) t2 on t1.brand_id=t2.brand_id;

负样本数据

由于正负样本数量及其不平衡,因此使用以下查询建立用户id维表,起到随机抽样的作用,为每个不同的user_id生成一个随机数rnd,然后根据随机数对用户进行排序,并分配一个序号number,然后进行抽样,并且排除正负样本相同的情况:

CREATE TABLE if NOT EXISTS user_id_number (
    user_id string,
    number bigint
) LIFECYCLE 60;

select user_id, ROW_NUMBER() OVER(ORDER BY rnd DESC) AS number
from (
    select user_id,RAND() AS rnd
    from (
        select DISTINCT user_id 
        from user_item_beh_log where ds=${bizdate}
    ) t1
) t1;

负样本数据为

INSERT OVERWRITE TABLE user_pay_sample PARTITION(ds=${bizdate})
select t1.neg_user_id as user_id, t1.brand_id, 0 as label
from (
    --去重,防止重复随机数
    select DISTINCT t1.brand_id, t2.user_id as neg_user_id
    from (
        --转换数组为三元组格式
        select TRANS_ARRAY(2, ',', user_id, brand_id, rand_neg) as (user_id, brand_id, rand_neg)
        from (
            --生成10个随机数,用逗号连接成数组
            select user_id ,brand_id, concat(
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint),',',
                cast(rand()*10000000 as bigint)
            ) as rand_neg
            from user_pay_sample_pos
            where ds = '${bizdate}'
        )t1
    )t1 join (
        select user_id, number
        from user_id_number
    )t2 on t1.rand_neg=t2.number
)t1 left anti join (--使用 anti join 排除正样本,防止负样本和正样本是同一个样本
    select user_id, brand_id
    from user_pay_sample_pos
    where ds = '${bizdate}'
)t2 on t1.neg_user_id=t2.user_id and t1.brand_id=t2.brand_id
union all--负样本和正样本合并
select user_id, brand_id, 1 as label
from user_pay_sample_pos
where ds = '${bizdate}'
;

样本与特征 join

下面代码是将样本和特征 join,这是以某一天为基准的

create table if not exists user_pay_sample_feature_join (
    user_id string,
    brand_id string,
    label bigint,

    click_item_num_3d BIGINT,
    click_brand_num_3d BIGINT,
    click_seller_num_3d BIGINT,
    ...
)PARTITIONED by (ds string) LIFECYCLE 60;


select t1.user_id, t1.brand_id, t1.label,
    if(t2.item_num_3d is null, 0, t2.item_num_3d),
    if(t2.brand_num_3d is null, 0, t2.brand_num_3d),
    if(t2.seller_num_3d is null, 0, t2.seller_num_3d),
   	...
from (
    select user_id, brand_id, label 
    from user_pay_sample
    where ds=${bizdate}
) t1 left join (
    select *
    from user_click_beh_feature_ads
    where ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (
    select *
    from user_collect_beh_feature_ads
    where ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (
    select *
    from user_cart_beh_feature_ads
    where ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (
    select *
    from user_alipay_beh_feature_ads
    where ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (
    select *
    from brand_stat_feature_ads
    where ds=${bizdate}
) t6 on t1.brand_id=t6.brand_id
left join (
    select *
    from user_brand_cross_beh_feature_ads
    where ds=${bizdate}
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (
    select *
    from user_brand_cate2_cross_beh_feature_ads
    where ds=${bizdate}
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not null
    or t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)
;

而一天的样本量实际还是比较少的,由于总数据是20130401-20131001,为了保证90day的数据存在,所以从20130701开始提取特征,7day为一周期(未来七天购买情况作为正负样本),所以有0708、0715、0722、0729…0916等12天作为训练集,留一天0923作为测试集,约1000+w的样本量。

补充数据,按照上面思路开始跑取所需数据,这里可以根据ds=$(bizdate)依次运行添加日期参数添加12次,但这样每个文件都需要选12次,效率太低,因此在这里使用aliyun平台的调度配置。

首先,先查看表与表的依赖关系,目前为止,我们的文件结构为:

  • 原始表:user_item_beh_log
  • 原始表分区后的四张表:dw_user_item_click_logdw_user_item_collect_logdw_user_item_cart_logdw_user_item_alipay_log
  • ODS数据层:user_item_beh_log.sql
  • DW数据层:user_beh_log_dw.sql
  • ADS数据层(这里面的四个sql是没有依赖关系的,所以可以并发的跑):
    • 品牌维度数据:brand_stat_feature_ads
    • 用户维度数据:user_beh_feature_ads
    • 用户品牌交互数据:user_brand_cate2_cross_beh_feature_adsuser_brand_cross_beh_feature_ads
  • 维表:brand_cate_dim——top500支付的品牌及其对应的top2二级分类,user_id_dim——把user_id编号的表
  • gbdt 样本:
    • user_pay_sample,这是划分正负样本的数据,这个表也可以与上面四个并发,所以也依赖于virtual
    • user_pay_sample_feature_join,样本和特征join的表,这里依赖了上面的特征表和样本表,所以最后跑。
  1. 查看了依赖关系之后,创建一个虚拟节点virtual_depend,里面放一个无成本的sql语句比如select 1,修改其调度配置然后提交。

  2. 把五个并行的sql节点依赖于这个节点,以brand_stat_feature_ads.sql为例,修改调度配置,调度参数为bizdate,参数值设置$[yyyymmdd-1];依赖的上游节点为virtual_depend,这里要注意删除输入依赖,否则从代码解析会导致多余依赖,完成后保存并提交。

  3. user_pay_sample_feature_join里的所有表都有输入依赖,所以不需删除,直接从代码解析依赖即可。

  4. 最终的周期任务如下:

在这里插入图片描述
在这里插入图片描述
至此,训练集特征数据全部补完,下面看一下这个数据长什么样子

SELECT * FROM user_pay_sample_feature_join where ds='20130909' limit 100;

在这里插入图片描述

评估集建立样本、样本与特征 join

  • 建立样本(user_pay_sample_eval)

取 9.23-10.1 的周期作为评估集,负样本用户量约 900w,为了节省资源,这里对负样本用户随机采样 300w 作为测试集,这里品牌选取 b47686 韩都衣舍 b56508 三星手机 b62063 诺基亚 b78739 LILY,用于查看交叉品牌不同的影响,使用 mapjoin 对用户和品牌进行笛卡尔积,然后将负样本(随机选取的)和正样本(从 dw_user_item_alipay_log 表中得到的支付用户)合并 :

create table if not exists user_pay_sample_eval (
    user_id string,
    brand_id string,
    label bigint
)PARTITIONED BY (ds STRING) LIFECYCLE 60;

insert OVERWRITE TABLE user_pay_sample_eval partition (ds=${bizdate})
select user_id, brand_id, max(label) as label
from (
    select /*+mapjoin(t2)*/
        t1.user_id, t2.brand_id, 0 as label
    from (
        select user_id
        from user_id_number
        where number<=3000000
    )t1 join (
        --b47686 韩都衣舍
        --b56508 三星手机
        --b62063 诺基亚
        --b78739 LILY
        select 'b47686' as brand_id
        union all
        select 'b56508' as brand_id
        union all
        select 'b62063' as brand_id
        union all
        select 'b78739' as brand_id

    )t2
    union all
    select user_id, brand_id, 1 as label
    from dw_user_item_alipay_log
    where ds > '${bizdate}' and ds <= to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd') and brand_id in ('b47686','b56508','b62063','b78739')
    group by user_id, brand_id
)t1 group by user_id, brand_id
  • 样本与特征 join(user_pay_sample_feature_join_eval)
create table if not exists user_pay_sample_feature_join_eval (
    user_id string,
    brand_id string,
    label bigint,

    click_item_num_3d BIGINT,
    click_brand_num_3d BIGINT,
    click_seller_num_3d BIGINT,
    ...
)PARTITIONED by (ds string) LIFECYCLE 60;


select t1.user_id, t1.brand_id, t1.label,
    if(t2.item_num_3d is null, 0, t2.item_num_3d),
    if(t2.brand_num_3d is null, 0, t2.brand_num_3d),
    if(t2.seller_num_3d is null, 0, t2.seller_num_3d),
   	...
from (
    select user_id, brand_id, label 
    from user_pay_sample
    where ds=${bizdate} 
) t1 left join (
    select *
    from user_click_beh_feature_ads
    where ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (
    select *
    from user_collect_beh_feature_ads
    where ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (
    select *
    from user_cart_beh_feature_ads
    where ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (
    select *
    from user_alipay_beh_feature_ads
    where ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (
    select *
    from brand_stat_feature_ads
    where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t6 on t1.brand_id=t6.brand_id
left join (
    select *
    from user_brand_cross_beh_feature_ads 
    where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (
    select *
    from user_brand_cate2_cross_beh_feature_ads
    where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not null
    or t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)

这里对品牌特征和用户及品牌交叉特征做了 brand_id in ('b47686','b56508','b62063','b78739') 的限制,减少品牌数,测试集数据如下

SELECT * FROM user_pay_sample_feature_join_eval where ds='20130923' limit 100

在这里插入图片描述
下面看一下在训练集中上面四个品牌的样本个数

select t1.brand_id, count(*)
from (
    select *
    FROM user_pay_sample_feature_join 
    where ds>='20130701' and ds <= '20130916'
) t1 join (
    --b47686 韩都衣舍
    --b56508 三星手机
    --b62063 诺基亚
    --b78739 LILY
    select 'b47686' as brand_id
    union all
    select 'b56508' as brand_id
    union all
    select 'b62063' as brand_id
    union all
    select 'b78739' as brand_id
) t2 on t1.brand_id = t2.brand_id
group by t1.brand_id

在这里插入图片描述
可以看到整体样本量不大,韩都衣舍和三星比另外两个牌子样本量大很多。

训练模型-GBDT

思路

  1. baseline:使用每个品牌的数据,单独训练一个模型,并预测结果。
  2. 混合训练:韩都衣舍+三星手机,这是不同行业的品牌混合后的效果;韩都衣舍+Lily,这是同行业的品牌混合后的效果。
  3. 所有品牌混合训练

读取数据

这里使用 PyODPS 读写 MaxCompute 表数据:文档

训练过程

  1. 安装必要的库
pip install lightgbm pandas scikit-learn pyodps
  1. 训练模型,这里分别用 gbdt 训练四个品牌的模型
import lightgbm as lgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

import numpy as np
import os

from odps import ODPS
from odps.df import DataFrame
# 建立链接。
access_id = 
access_key = 
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'

o = ODPS(
    access_id,
    access_key,
    project,
    endpoint,
)

# 读取数据。
brand_id='b78739'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id='{brand}'
;
'''.format(brand=brand_id)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 假设您的数据已经加载到DataFrame中,名为df
# df = pd.read_csv('your_data.csv')  # 如果数据来自CSV文件

# 分离特征和标签
X = df.drop(['label', 'user_id', 'brand_id', 'ds'], axis=1)
y = df['label']

# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 31,
    'learning_rate': 0.08,
    'feature_fraction': 0.9,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1  # 设置为-1以减少输出
}

# 存储AUC值的列表
train_aucs = []
test_aucs = []

# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_{brand_id}/auc_scores.txt', 'w') as f:
    for i in range(20, 70, 1):
        print(f"Training model with {i} trees...")

        # 训练模型
        gbm = lgb.train(params,
                        lgb_train,
                        num_boost_round=i,
                        valid_sets=[lgb_train, lgb_eval],
                        valid_names=['train', 'valid'])  # 关闭详细日志

        # 使用最后一棵树进行预测并计算 AUC
        y_train_pred = gbm.predict(X_train)
        y_test_pred = gbm.predict(X_test)

        train_auc = roc_auc_score(y_train, y_train_pred)
        test_auc = roc_auc_score(y_test, y_test_pred)

        # 打印 AUC
        print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
        # 保存模型
        gbm.save_model(f'./models_{brand_id}/model_{i}.txt')
        # 保存 AUC
        f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
  1. 训练日志
    在这里插入图片描述
  2. 训练混合模型,这里学习率过小并没有收敛(测试集 AUC 一直增加,并没有减小),因此调大为 0.13,训练韩都衣舍+三星手机,韩都衣舍+LILY的混合模型
# 读取数据。
brand_id='b47686'
brand_id2='b56508'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id in ('{brand}', '{brand2}')
;
'''.format(brand=brand_id, brand2=brand_id2)
with open(f'./models_{brand_id}_{brand_id2}/auc_scores.txt', 'w') as f:
    for i in range(20, 70, 1):
        print(f"Training model with {i} trees...")

        # 训练模型
        gbm = lgb.train(params,
                        lgb_train,
                        num_boost_round=i,
                        valid_sets=[lgb_train, lgb_eval],
                        valid_names=['train', 'valid'])  # 关闭详细日志

        # 使用最后一棵树进行预测并计算 AUC
        y_train_pred = gbm.predict(X_train)
        y_test_pred = gbm.predict(X_test)

        train_auc = roc_auc_score(y_train, y_train_pred)
        test_auc = roc_auc_score(y_test, y_test_pred)

        # 打印 AUC
        print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
        # 保存模型
        gbm.save_model(f'./models_{brand_id}_{brand_id2}/model_{i}.txt')
        # 保存 AUC
        f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
  1. 全行业混合训练
o = ODPS(
    access_id,
    access_key,
    project,
    endpoint,
)

# 读取数据。
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916'
;
'''
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 删除非特征列
df = df.drop(columns=['user_id', 'brand_id', 'ds'])


# 分离特征和标签
X = df.drop(['label'], axis=1)
y = df['label']

# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 31,
    'learning_rate': 0.1,
    'feature_fraction': 0.9,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1  # 设置为-1以减少输出
}

# 存储AUC值的列表
train_aucs = []
test_aucs = []

# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_all/auc_scores.txt', 'w') as f:
    for i in range(5, 100, 5):
        print(f"Training model with {i} trees...")

        # 训练模型
        gbm = lgb.train(params,
                        lgb_train,
                        num_boost_round=i,
                        valid_sets=[lgb_train, lgb_eval],
                        valid_names=['train', 'valid'])  # 关闭详细日志

        # 使用最后一棵树进行预测并计算 AUC
        y_train_pred = gbm.predict(X_train)
        y_test_pred = gbm.predict(X_test)

        train_auc = roc_auc_score(y_train, y_train_pred)
        test_auc = roc_auc_score(y_test, y_test_pred)

        # 打印 AUC
        print(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")
        # 保存模型
        gbm.save_model(f'./models_all/model_{i}.txt')
        # 保存 AUC
        f.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")

评估方案-GBDT

  • 模型训练过程指标
    模型训练过程中的指标是 AUC,在模型训练过程中,通过对比训练集和验证集的 AUC 指标,来确定是否过拟合,当需要全面评估分类模型的性能时,ROC曲线是一个非常重要的工具。ROC曲线通过展示模型在所有可能的分类阈值下的性能,提供了一个全面的视角来观察模型的真阳性率(TPR)和假阳性率(FPR)之间的关系。ROC曲线考虑了模型在所有可能的阈值下的性能,这使得它能够提供一个全面的模型性能视图。
  • 模型业务离线评估指标
    模型业务离线评估指标使用 topn 召回率假设用户要投放 n 个人,评估集里用户购买该品牌的人有 M 个人,而模型预测的 top n 个人中实际购买的人有 m 个人,则召回率为
    r e c a l l t o p n = m / M recall_{topn}=m/M recalltopn=m/M
    在推荐系统或信息检索等业务场景中,Top-N召回率更直接地反映了业务目标,即在前N个推荐项中捕获用户感兴趣的项目的能力。这与AUC关注的整体分类性能有所不同。Top-N召回率模拟了用户可能查看的推荐列表的前N项,这与用户实际交互的场景更为贴近。而AUC则提供了一个全面的分类性能视图,可能不会直接反映用户在特定推荐场景下的体验。

总共有 300w 个用户,这里就看 top 3k,5k,1w,5w,10w 的召回率.

为了节省计算资源,这里使用分批推理的方式,因此对user_pay_sample_feature_join_eval表添加随机数,分十批进行推理

在这里插入图片描述

分批推理

import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

import numpy as np
import os

from odps import ODPS
from odps.df import DataFrame

access_id = 
access_key = 
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'

o = ODPS(
    access_id,
    access_key,
    project,
    endpoint,
)

# 选取根据品牌数据和树个数的模型
brand_id='b78739'
trees = '28'

# 读取数据。
def load_and_predict(model_path):
    # 初始化预测和标签列表
    predictions = []
    labels = []
    
    # 加载模型
    gbm = lgb.Booster(model_file=model_path)
    
    # 分批读取数据
    for num in range(10):
        # 读取数据
        sql = '''
        SELECT *
        FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval
        WHERE ds='20130923' and brand_id='{brand}' and rnd>{start} and rnd<={end}
        ;
        '''.format(brand=brand_id, start=num/10.0, end=(num+1)/10.0)
        print(sql)
        query_job = o.execute_sql(sql)
        result = query_job.open_reader(tunnel=True)
        print('read data finish')
        df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
        # 删除非特征列
        chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])
        
        # 删除非特征列 'label'
        X_test = chunk.drop('label', axis=1)
        y_test = chunk['label']
        
        y_pred = gbm.predict(X_test)
        # 预测当前批次数据
        predictions.extend(y_pred)
        labels.extend(y_test)
        
    return predictions, labels

def calculate_top_k_ratio(predictions, labels, top_k_list):
    # 将预测和标签转换成DataFrame
    results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
    
    # 按预测分数降序排序
    results_df = results_df.sort_values(by='prediction', ascending=False)
    
    # 计算总正例数
    total_positives = (results_df['label'] == 1).sum()
    
    # 计算不同 top 数量下的正例比例
    ratios = {}
    for k in top_k_list:
        top_k_df = results_df.head(k)
        top_k_positives = (top_k_df['label'] == 1).sum()
        ratio = top_k_positives / total_positives
        ratios[k] = ratio
    
    return ratios


# 模型文件路径
model_path = f'models_{brand_id}/model_{trees}.txt'

# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]

# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)

# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)

# 输出结果
for k, ratio in ratios.items():
    print(f"Top {k} ratio of positive labels: {ratio:4f}")
    
# 如果需要保存结果到文件
with open(f'models_{brand_id}/top_results.txt', 'w') as f:
    for k, ratio in ratios.items():
        f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")

单一模型和两两混合模型结果

  • 78739 LILY模型预测自己,tree 28,样本量 10877
    在这里插入图片描述

  • 47686 韩都衣舍模型预测自己,tree 50,样本量 90550
    在这里插入图片描述

  • 62063 诺基亚模型预测自己,tree 29,样本量 10430
    在这里插入图片描述

  • 56508 三星手机模型预测自己,tree 32,样本量 71389

  • 在这里插入图片描述

可以看出使用手机品牌特征训练的模型不如衣服品牌,可能是因为衣服特征更明显,而手机特征更难挖掘。

  1. 下面是混合训练模型的预测结果:
  • 韩都衣舍+三星手机特征训练的模型,tree52,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b56508'
trees='52'
# 模型文件路径
model_path = f'models_{brand_model}/model_{trees}.txt'

在这里插入图片描述
原始结果
在这里插入图片描述
可以看到不同类型品牌混合训练,在数据配比相似的情况下,各 top 召回均有所上升,这可能是因为特征发掘更充分。

  • 韩都衣舍+LILY特征训练的模型,tree22,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b78739'
trees='22'

在这里插入图片描述
原始结果
在这里插入图片描述
可以看到 TOP 10000、5000上升了,其余都下降了,因为LILY的样本量远小于韩都衣舍,所以改变不明显,同时这种组合方式虽然是同类型品牌,但特征交叉效果一般,所以有升有降。

  • 韩都衣舍+LILY特征训练的模型,tree22,用于预测LILY:
brand_id='b78739'
brand_model='b47686_b78739'
trees='22'

在这里插入图片描述
原始结果
在这里插入图片描述
可以看到,对LILY添加了韩都衣舍的大量样本混合训练,在预测LILY本身时有明显提升。

全样本训练模型结果

o = ODPS(
    access_id,
    access_key,
    project,
    endpoint,
)

# 选取根据品牌数据和树个数的模型
brand_id='b47686'
# 78739,62063,56508

# 读取数据。
def load_and_predict(model_path):
    # 初始化预测和标签列表
    predictions = []
    labels = []
    
    # 加载模型
    gbm = lgb.Booster(model_file=model_path)
    

    sql = '''
    SELECT *
    FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval
    WHERE ds='20130923' and brand_id='{brand}' 
    ;
    '''.format(brand=brand_id)
    print(sql)
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    print('read data finish')
    df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
    # 删除非特征列
    chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])

    # 删除非特征列 'label'
    X_test = chunk.drop('label', axis=1)
    y_test = chunk['label']

    y_pred = gbm.predict(X_test)
    # 预测当前批次数据
    predictions.extend(y_pred)
    labels.extend(y_test)
        
    return predictions, labels

def calculate_top_k_ratio(predictions, labels, top_k_list):
    # 将预测和标签转换成DataFrame
    results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
    
    # 按预测分数降序排序
    results_df = results_df.sort_values(by='prediction', ascending=False)
    
    # 计算总正例数
    total_positives = (results_df['label'] == 1).sum()
    
    # 计算不同 top 数量下的正例比例
    ratios = {}
    for k in top_k_list:
        top_k_df = results_df.head(k)
        top_k_positives = (top_k_df['label'] == 1).sum()
        ratio = top_k_positives / total_positives
        ratios[k] = ratio
    
    return ratios

trees='95'

model_path=f'./models_all/model_{trees}.txt'
# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]

# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)

# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)

# 输出结果
for k, ratio in ratios.items():
    print(f"Top {k} ratio of positive labels: {ratio:4f}")
    
# 如果需要保存结果到文件
with open(f'top_results_{brand_id}.txt', 'w') as f:
    for k, ratio in ratios.items():
        f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")

可以看出全样本训练效果有明显的提升。

b47686 韩都衣舍b56508 三星手机b62063 诺基亚b78739 LILY
0.114842 0.209537 0.265950 0.349899 0.5339150.068056 0.104861 0.129861 0.164583 0.2847220.102041 0.190476 0.217687 0.258503 0.3537410.257225 0.361272 0.427746 0.488439 0.624277
0.141706 0.230356 0.288113 0.366017 0.5372730.073611 0.103472 0.125000 0.177778 0.2895830.149660 0.217687 0.231293 0.278912 0.3945580.300578 0.416185 0.462428 0.528902 0.627168

然而,GBDT 对于大型稀疏特征的局限使得有相当一部分人群是无法召回的,因此接下来采用 DNN 深度学习模型来进行试验。

DNN 基础知识

线性回归——二分类

损失函数

假设我们有特征 X = [ x 1 , x 2 , … , x n ] X = [x_1, x_2, \ldots, x_n] X=[x1,x2,,xn],则线性回归为:

y ^ = w 1 x 1 + w 2 x 2 + … + w n x n + b 0 \hat{y} = w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0 y^=w1x1+w2x2++wnxn+b0

由于二分类,目标值为0或者1,因此需要将 y ^ \hat{y} y^ 限制在0-1,使用sigmoid函数即可:

h W ( X ) = sigmoid ( x ) = 1 1 + e − ( w 1 x 1 + w 2 x 2 + … + w n x n + b 0 ) h_W(X) = \text{sigmoid}(x) = \frac{1}{1 + e^{-(w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0)}} hW(X)=sigmoid(x)=1+e(w1x1+w2x2++wnxn+b0)1

h W ( X ) h_W(X) hW(X) 有实际的物理意义,表示取1的概率,则其取0和1的概率为:

P ( Y = 1 ∣ X ; W ) = h W ( X ) P(Y=1\mid X; W)=h_W(X) P(Y=1X;W)=hW(X)

P ( Y = 0 ∣ X ; W ) = 1 − h W ( X ) P(Y=0\mid X; W)=1-h_W(X) P(Y=0X;W)=1hW(X)

假设目标值为 ( y ),则预测为 y y y 的概率(二分类假设目标变量服从伯努利分布)为:

P ( Y = y ∣ X ; W ) = ( h W ( X ) ) y ( 1 − h W ( X ) ) 1 − y P(Y=y\mid X; W)=\left(h_W(X)\right)^y\left(1-h_W(X)\right)^{1-y} P(Y=yX;W)=(hW(X))y(1hW(X))1y

假设有 ( N ) 条样本,则其似然函数为:

J ( W ) = ∏ i = 1 N ( h W ( X i ) ) y i ( 1 − h W ( X i ) ) 1 − y i J(W)=\prod_{i=1}^N\left(h_W\left(X_i\right)\right)^{y_i}\left(1-h_W\left(X_i\right)\right)^{1-y_i} J(W)=i=1N(hW(Xi))yi(1hW(Xi))1yi

算法的核心目标是找到一组参数 ( W ) 使得 ( J(W) ) 值最大,( J(W) ) 最大表示大部分样本预测为其真实值 ( y ) 的概率最大。取对数及取负数,将求最大值改为取最小值,该公式为交叉熵损失函数:

L ( W ) = − log ⁡ ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ⁡ ( h W ( X i ) ) + ( 1 − y i ) log ⁡ ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=log(J(W))=N1i=1N[yilog(hW(Xi))+(1yi)log(1hW(Xi))]

求解用梯度下降

L ( W ) L(W) L(W) 求导详解:

L ( W ) = − log ⁡ ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ⁡ ( h W ( X i ) ) + ( 1 − y i ) log ⁡ ( 1 − h W ( X i ) ) ] L(W) = -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^N \left[ y_i \log(h_W(X_i)) + (1-y_i) \log(1-h_W(X_i)) \right] L(W)=log(J(W))=N1i=1N[yilog(hW(Xi))+(1yi)log(1hW(Xi))]

h W ( X i ) = 1 1 + e − W T X i h_W(X_i) = \frac{1}{1+e^{-W^TX_i}} hW(Xi)=1+eWTXi1

则,

log ⁡ ( h W ( X i ) ) = log ⁡ ( 1 1 + e − W T X i ) = − log ⁡ ( 1 + e − W T X i ) log ⁡ ( 1 − h W ( X i ) ) = log ⁡ ( 1 − 1 1 + e − W T X i ) = log ⁡ ( e − W T X i 1 + e − W T X i ) = log ⁡ ( e − W T X i ) − log ⁡ ( 1 + e − W T X i ) = − W T X i − log ⁡ ( 1 + e − W T X i ) \begin{align*} \log(h_W(X_i)) &= \log\left(\frac{1}{1+e^{-W^TX_i}}\right) \\ &= -\log(1+e^{-W^TX_i}) \\ \log(1-h_W(X_i)) &= \log\left(1-\frac{1}{1+e^{-W^TX_i}}\right) \\ &= \log\left(\frac{e^{-W^TX_i}}{1+e^{-W^TX_i}}\right) \\ &= \log(e^{-W^TX_i}) - \log(1+e^{-W^TX_i}) \\ &= -W^TX_i - \log(1+e^{-W^TX_i}) \end{align*} log(hW(Xi))log(1hW(Xi))=log(1+eWTXi1)=log(1+eWTXi)=log(11+eWTXi1)=log(1+eWTXieWTXi)=log(eWTXi)log(1+eWTXi)=WTXilog(1+eWTXi)

L ( W ) = − log ⁡ ( J ( W ) ) = − 1 N ∑ i = 1 N [ − y i log ⁡ ( 1 + e − W T X i ) + ( 1 − y i ) ( − W T X i − log ⁡ ( 1 + e − W T X i ) ) ] = − 1 N ∑ i = 1 N [ y i W T X i − W T X i − log ⁡ ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ⁡ ( e W T X i ) − log ⁡ ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − ( log ⁡ ( e W T X i ) + log ⁡ ( 1 + e − W T X i ) ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ⁡ ( 1 + e W T X i ) ] \begin{align*} L(W) &= -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^{N} \left[ -y_i \log(1+e^{-W^T X_i}) + (1-y_i)(-W^T X_i - \log(1+e^{-W^T X_i})) \right] \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - W^T X_i - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(e^{W^T X_i}) - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - (\log(e^{W^T X_i}) + \log(1+e^{-W^T X_i})) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(1+e^{W^T X_i}) \right] \end{align*} L(W)=log(J(W))=N1i=1N[yilog(1+eWTXi)+(1yi)(WTXilog(1+eWTXi))]=N1i=1N[yiWTXiWTXilog(1+eWTXi))=N1i=1N[yiWTXilog(eWTXi)log(1+eWTXi))=N1i=1N[yiWTXi(log(eWTXi)+log(1+eWTXi)))=N1i=1N[yiWTXilog(1+eWTXi)]

则,

∂ ∂ w j L ( W ) = ∂ ∂ w j ( − 1 N ∑ i = 1 N [ y i W T X i − log ⁡ ( 1 + e W T X i ) ] ) = − 1 N ∑ i = 1 N [ y i ∂ ∂ w j W T X i − ∂ ∂ w j log ⁡ ( 1 + e W T X i ) ] = − 1 N ∑ i = 1 N [ y i x i j − x i j e W T X i 1 + e W T X i ] = − 1 N ∑ i = 1 N [ y i x i j − x i j 1 + e − W T X i ] = − 1 N ∑ i = 1 N [ y i − 1 1 + e − W T X i ] ∗ x i j = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \begin{align*} \frac{\partial}{\partial w_j}L(W) &= \frac{\partial}{\partial w_j}\left(-\frac{1}{N}\sum_{i=1}^N[y_iW^TX_i-\log(1+e^{W^TX_i})]\right) \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i\frac{\partial}{\partial w_j}W^TX_i-\frac{\partial}{\partial w_j}\log(1+e^{W^TX_i})\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^je^{W^TX_i}}{1+e^{W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^j}{1+e^{-W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i-\frac{1}{1+e^{-W^TX_i}}\right]*x_i^j \\ &= -\frac{1}{N}\sum_{i=1}^N(h_W(X_i)-y_i)*x_i^j \end{align*} wjL(W)=wj(N1i=1N[yiWTXilog(1+eWTXi)])=N1i=1N[yiwjWTXiwjlog(1+eWTXi)]=N1i=1N[yixij1+eWTXixijeWTXi]=N1i=1N[yixij1+eWTXixij]=N1i=1N[yi1+eWTXi1]xij=N1i=1N(hW(Xi)yi)xij

注意:当预测值接近真实值时 y i y_i yi 则梯度为0,当预测值偏离真实值时,则梯度与该 x i j x^j_i xij 分量和误差大小成正比。

梯度更新方法

给定N个样本,我们通过偏导求解,已经求解出 L ( W ) L(W) L(W) ( x i , y i ) (x_i, y_i) (xi,yi) 处关于 w i w_i wi 的导数为:

∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_j} L(W) = -\frac{1}{N} \sum_{i=1}^N \left( h_W(X_i) - y_i \right) * x_i^j wjL(W)=N1i=1N(hW(Xi)yi)xij

我们该如何更新 w i w_i wi 的值,假设 w i w_i wi 的初始值为 w i 0 w_i^0 wi0,则,

w i 1 = w i 0 − l r ∗ ∂ ∂ w j L ( W ) w_i^1 = w_i^0 - lr * \frac{\partial}{\partial w_j} L(W) wi1=wi0lrwjL(W)

其中 l r lr lr 表示学习率,表示每次更新的步长,“减”表示向梯度的负方向更新。但是在实际的训练中,训练样本可能是几十几百万甚至上亿,模型参数该如何更新。

1) 全量更新

一次性计算所有的样本对于参数的偏导数,然后使用以下公式直接计算梯度值:

∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_{j}} L(W) = -\frac{1}{N} \sum_{i=1}^{N} \left( h_{W}(X_i) - y_i \right) * x_i^j wjL(W)=N1i=1N(hW(Xi)yi)xij

优点:梯度会比较稳定。

缺点:容易达到局部最优,且极度耗内存。

2)单条更新

每次只计算一条样本的梯度,并更新参数。

缺点:由于样本的不确定性,梯度会非常不稳定,可能会不收敛。

3)batch 更新

假设有10万个样本,每次随机采样512个样本,放入模型训练,每轮根据这小批量样本计算梯度值然后更新梯度。

优点:平衡了单条更新的稳健性和全量梯度下降的效率。

有的时候使用了部分样本就已经收敛了,那么就不需要用全部的样本,直接根据实际情况停止。如果不进行随机采样,有可能很长的step都是训练同一个品牌的样本,所以不行,在训练之前要对样本进行shuffle随机打乱。

为什么 Loss 不用均方误差

L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0N(yihW(Xi))2

对于sigmoid函数, h ( x ) = 1 1 + e − x h(x) = \frac{1}{1 + e^{-x}} h(x)=1+ex1,我们求其导数,首先对公式做变化,

h ( x ) = 1 1 + e − x = e x 1 + e x = 1 − ( e x + 1 ) − 1 , h(x) = \frac{1}{1 + e^{-x}} = \frac{e^x}{1 + e^x} = 1 - (e^x + 1)^{-1}, h(x)=1+ex1=1+exex=1(ex+1)1,

根据求导法则:

h ′ ( x ) = ∂ ∂ x h ( x ) = ( − 1 ) ∗ ( − 1 ) ( e x + 1 ) − 2 e x = ( 1 + e − x ) − 2 e − 2 x e x = ( 1 + e − x ) − 1 ∗ e − x 1 + e − x = 1 1 + e − x ∗ ( 1 − 1 1 + e − x ) = h ( x ) ∗ ( 1 − h ( x ) ) \begin{align*} h'(x) &= \frac{\partial}{\partial x} h(x) = (-1) * (-1) (e^x + 1)^{-2} e^x \\ &= (1 + e^{-x})^{-2} e^{-2x} e^x \\ &= (1 + e^{-x})^{-1} * \frac{e^{-x}}{1 + e^{-x}} \\ &= \frac{1}{1 + e^{-x}} * \left(1 - \frac{1}{1 + e^{-x}}\right) \\ &= h(x) * (1 - h(x)) \end{align*} h(x)=xh(x)=(1)(1)(ex+1)2ex=(1+ex)2e2xex=(1+ex)11+exex=1+ex1(11+ex1)=h(x)(1h(x))

在这里插入图片描述
导数值最大为 0.25,x越大和越小都趋近 0,这样会导致梯度直接消失。

对于均方误差函数, L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0N(yihW(Xi))2,我们求其偏导为:

∂ ∂ w i L ( W ) = − ∑ i = 0 N ( y i − h W ( X i ) ) h W ′ ( X i ) ∗ x i \frac{\partial}{\partial w_i} L(W) = -\sum_{i=0}^N \left( y_i - h_W(X_i) \right) h_W'(X_i) * x_i wiL(W)=i=0N(yihW(Xi))hW(Xi)xi

而交叉熵的偏导为,

∂ ∂ w i L ( W ) = ∑ i = 0 N ( y i − h W ( X i ) ) ∗ x i j \frac{\partial}{\partial w_i} L(W) = \sum_{i=0}^N \left( y_i - h_W(X_i) \right) * x_i^j wiL(W)=i=0N(yihW(Xi))xij

没有均方误差那么严重的梯度消失问题。

神经元

多层神经网络

假设输入为 X = [ x 1 , x 2 , x 3 , … , x n ] X = [x_1, x_2, x_3, \ldots, x_n] X=[x1,x2,x3,,xn],第一个隐藏层(Hidden 1), H 1 = [ h 1 1 , h 1 2 , … , h 1 n 1 ] H^1 = [h_1^1, h_1^2, \ldots, h_1^{n_1}] H1=[h11,h12,,h1n1],第二个隐藏层(Hidden 2), H 2 = [ h 2 1 , h 2 2 , … , h 2 n 2 ] H^2 = [h_2^1, h_2^2, \ldots, h_2^{n_2}] H2=[h21,h22,,h2n2],假设激活函数为Relu,则

h 1 1 = Relu ( w 1 1 x 1 + w 1 2 x 2 + … + w 1 n x n + b 0 ) = Relu ( w 1 X ) , 其中 w 1 ∈ R 1 × n , X ∈ R n × 1 \begin{align*} & h_1^1 = \text{Relu}(w_1^1 x_1 + w_1^2 x_2 + \ldots + w_1^n x_n + b_0) = \text{Relu}(w_1 X), \text{其中} w_1 \in \mathbb{R}^{1 \times n}, X \in \mathbb{R}^{n \times 1} \end{align*} h11=Relu(w11x1+w12x2++w1nxn+b0)=Relu(w1X),其中w1R1×n,XRn×1

w 1 , w 2 , … , w n 1 w_1, w_2, \ldots, w_{n_1} w1,w2,,wn1,组成一个矩阵变为,

W 1 = [ w 1 , w 2 , … , w n 1 ] , W 1 ∈ R n 1 × n \begin{align*} & W_1 = [w_1, w_2, \ldots, w_{n_1}], W_1 \in \mathbb{R}^{n_1 \times n} \end{align*} W1=[w1,w2,,wn1],W1Rn1×n

则,

H 1 = Relu ( W 1 X ) H_1 = \text{Relu}(W_1 X) H1=Relu(W1X)

H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))

激活函数

如果没有激活函数,模型退化为线性回归,

H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))

H ^ 2 = W 2 ( W 1 X ) = W 2 W 1 X = W X , W = W 1 W 2 \hat{H}_2 = W_2 (W_1 X) = W_2 W_1 X = W X, W = W_1 W_2 H^2=W2(W1X)=W2W1X=WX,W=W1W2

有了激活函数以后,模型增加了“非线性能力”,即特征选择和组合的能力。

如下图:当模型有了激活函数,则模型可以按照需要,任意选择特征,并进行任意的组合,以单个节点为例,有了激活函数,模型通过选择特征和组合能力,构建了一颗树(就是选择哪条权重要哪条不要)。而到最后一层时,就是根据需要构建了多棵树。

输出层

如果是线性回归或者二分类,模型一般输出一个值,对于多分类,根据需要输出多个值,而最后一层全连接层一般不加激活函数,直接输出。假设输出为二分类,模型只输出一个值,则,

logits = W 3 Relu ( W 2 Relu ( W 1 X ) ) \text{logits} = W_3 \text{Relu}(W_2 \text{Relu}(W_1 X)) logits=W3Relu(W2Relu(W1X))

输出层 embedding

假设输入的特征是经过离散后的编号的特征,比如[‘男’, ‘近30条购买5次’, ‘重庆市’],编号后为[1, 2, 3]。输入的特征为int型从1到N编号的数组。

如何将int型的编号数组变为embedding?

假设特征整体有N个(该N表示编号的最大值),每个特征用一个实数值维度为M的向量 d d d 表示,即 d 1 = [ d 1 1 , d 1 2 , … , d 1 M ] ∈ R 1 × M d_1 = [d_1^1, d_1^2, \ldots, d_1^M] \in \mathbb{R}^{1 \times M} d1=[d11,d12,,d1M]R1×M,N个向量组成的矩阵为 D = [ d 1 , d 2 , … , d N ] ∈ R N × M D = [d_1, d_2, \ldots, d_N] \in \mathbb{R}^{N \times M} D=[d1,d2,,dN]RN×M

当输入为[1, 2, 3]时,模型会从矩阵 D D D 中选取第1、2、3行的embedding组成新的矩阵,即 D ′ = [ d 1 , d 2 , d 3 ] ∈ R 3 × M D' = [d_1, d_2, d_3] \in \mathbb{R}^{3 \times M} D=[d1,d2,d3]R3×M,然后将 D ′ D' D 输入到模型中。

Loss

交叉熵损失函数:

L ( W ) = − log ⁡ ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ⁡ ( h W ( X i ) ) + ( 1 − y i ) log ⁡ ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=log(J(W))=N1i=1N[yilog(hW(Xi))+(1yi)log(1hW(Xi))]

优化器

  1. 随机梯度下降法

优化器:在batch迭代过程中如何通过梯度更新参数的算法。

假设 g t = ∂ ∂ w j L ( W ) g_t = \frac{\partial}{\partial w_j} L(W) gt=wjL(W),最简单的优化器是随机梯度下降法:

w i t = w i t − 1 − l r ∗ ∂ ∂ w j L ( W ) = w i t − 1 − l r ∗ g t w_i^t = w_i^{t-1} - lr * \frac{\partial}{\partial w_j} L(W) = w_i^{t-1} - lr * g_t wit=wit1lrwjL(W)=wit1lrgt

该算法有什么缺点:

  • 由于样本的随机性,梯度 g t g_t gt 会有不确定性,包括方向和大小,特别是方向,这会导致权重更新有很大的波动。

  • l r lr lr 是固定的。

    对于频繁更新的参数或者稀疏的更新频率低的参数更新步长一致。

    随着模型的迭代,接近最优点的时候和刚开始迭代的时候更新步长也一致, l r lr lr 设置太大会导致最后在最优值附近震荡,甚至会跳过最优值,如果设置太小,收敛变慢,且可能收敛于局部最优值。

  1. Adam算法

Adam算法的关键组成部分之一是:它使用指数加权移动平均值来估算梯度的动量和二次矩,即它使用状态变量:

v t ← β 1 v t − 1 + ( 1 − β 1 ) g t s t ← β 2 s t − 1 + ( 1 − β 2 ) g t 2 v_t \leftarrow \beta_1 v_{t-1} + (1 - \beta_1) g_t \\ s_t \leftarrow \beta_2 s_{t-1} + (1 - \beta_2) g_t^2 vtβ1vt1+(1β1)gtstβ2st1+(1β2)gt2

其中常设置 β 1 = 0.9 , β 2 = 0.999 \beta_1 = 0.9, \beta_2 = 0.999 β1=0.9,β2=0.999 s t s_t st 移动远远慢于 v t v_t vt 的移动。

v ^ t = v t 1 − β 1 t s ^ t = s t 1 − β 2 t \hat{v}_t = \frac{v_t}{1 - \beta_1^t} \\ \hat{s}_t = \frac{s_t}{1 - \beta_2^t} v^t=1β1tvts^t=1β2tst

则新的梯度为:

g t ′ = l r ∗ v ^ t s ^ t + ϵ g_t' = \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} gt=s^t +ϵlrv^t

则梯度更新公式为:

w i t = w i t − 1 − l r ∗ v ^ t s ^ t + ϵ w_i^t = w_i^{t-1} - \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} wit=wit1s^t +ϵlrv^t

  • 动量估计 v t v_t vt:历史梯度的移动平均,保留该梯度的正确方向,且保留了该方向上的梯度值,可以在一定程度上避免每次梯度的不确定性导致的波动。

  • 方差估计 s t s_t st:累计历史上梯度的平方。

    对于频繁更新的参数,比如自然语言里常出现的字,比如“我”、“的”等,其 s t s_t st 会比较大,而对于稀疏的值,比如“舴”,其出现的次数少,则其 s t s_t st 比较小。则由于其在分母,因此频繁出现的参数的 l r lr lr 会比较小,而出现次数少的参数的 l r lr lr 会比较大。

  • l r lr lr会随着迭代的轮数增加而减小,防止在最小值区间震荡不收敛。

DNN 建模

基于GBDT的品牌购买模型有如下问题:

  1. 只有统计特征,粒度太粗,无法通过用户的行为序列,学习品牌之间的关系。

    例如:目标是预测“李宁”品牌的购买人群,有两个用户A、B,用户A行为序列为[安踏、贵人鸟、特步],用户B行为序列为[阿迪达斯、耐克、小脏鞋],A和B哪个更有可能购买“李宁”?

    对于使用GBDT的模型,用户A和B在特征上的表现是,在品牌的二级类目上都有点击行为,没有任何的区分度,因此模型需要通过用户行为序列,区分用户之间的购买兴趣。

  2. GBDT模型无法建模规模较大的id类特征,品牌id有8万+,GBDT无法建模。

    GBDT建模id类特征,只能使用是和否,比如一个用户的性别[“男”、“女”],GBDT必须建模成为两个特征,是否是男性和是否是女性,如果有8万个id特征,那必须建设8万个是和否特征,GBDT难以建模。

基于以上问题,本方案引入深度学习算法。深度学习算法可以很好地建模id类特征,通过用户行为序列,学习id类特征之间的关系。

为了和 gbdt 对比,使用同样的样本,特征增加用户行为序列特征和目标品牌id特征,评估方案保持一致。

在这里插入图片描述

特征开发

总共 8w 个品牌,大多数品牌行为稀疏,为了防止噪声,选取 top 1w 的品牌开发行为特征

create table if not exists brand_top1w_alipay_dim (
    brand_id string,
    alipay_num bigint
)LIFECYCLE 60;

insert OVERwrite table brand_top1w_alipay_dim
select brand_id, alipay_num
from (
    select brand_id, count(DISTINCT user_id) as alipay_num
    from dw_user_item_alipay_log
    where ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
        and brand_id is not null
    group by brand_id
)t1 order by alipay_num desc limit 10000
;

在这里插入图片描述

user_brand_seq_feature:

子查询从dw_user_item_click_log表中选择用户ID和品牌ID,其中ds(日期)是在过去30天内且小于${bizdate}(一个变量,代表业务日期)。然后,使用ROW_NUMBER()窗口函数为每个用户按操作时间降序排列品牌点击,并且只选择前50个品牌点击记录。接着,将这些品牌ID与brand_top1w_alipay_dim表中的数据进行连接,以确保只考虑顶级品牌。最后,使用WM_CONCAT函数将这些品牌ID连接成一个字符串,并按用户ID分组,将结果插入到user_click_brand_seq_feature表中对应的分区。

同理建立收藏和购买表,由于加购和购买类似因此不建立cart表了。

create TABLE if not exists user_click_brand_seq_feature (
    user_id string,
    brand_id_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;

insert OVERWRITE TABLE user_click_brand_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('b_',brand_id)) as brand_id_seq
from (
    select t2.user_id, t2.brand_id
    from (
        select brand_id
        from brand_top1w_alipay_dim
    )t1 join (
        SELECT user_id,brand_id
            , ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds desc) AS number
        from (
            select user_id, brand_id, max(ds) as ds
            from dw_user_item_click_log
            where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
                and brand_id is not null
            group by user_id, brand_id
        )t1
    )t2 on t1.brand_id=t2.brand_id
    where number<=50
)t1 group by user_id
;

在这里插入图片描述

user_cate_seq_feature

用于存储用户点击类别(category)序列特征,并将过去30天内用户点击类别的序列数据插入到这个表中。这个表的设计和user_click_brand_seq_feature 表类似,但是它关注的是商品或内容的类别(cate)而不是品牌(brand)。

create TABLE if not exists user_click_cate_seq_feature (
    user_id string,
    cate_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;

insert OVERWRITE TABLE user_click_cate_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('c_',cate2)) as cate2_seq
from (
    select user_id, cate2
        , ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds DESC) AS number
    from (
        select user_id, cate2, max(ds) as ds
        from dw_user_item_click_log
        where ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')
            and cate2 is not null
        group by user_id, cate2
    )t1
)t1
where number<=50
group by user_id
;

在这里插入图片描述

特征拼接、特征离散化和特征序列化

  1. 参照 gbdt 的样本与特征 join,添加上面新增的品牌行为特征、行业行为特征
  2. 对特征(原始实数值)进行离散化,对于某些字段,如 item_num_3d 或 brand_num_3d,进行对数(log(2, value + 1))转换,这可能是为了将计数特征转换为更平滑的表示,避免数据中的极端值。
  3. 使用 concat 函数合并字符串,例如将 ‘b_’ 与 brand_id 字段合并,形成一个新的特征,这可能用于表示品牌的编码。
create table if not exists user_pay_sample_feature_join_dnn(
    user_id string,
    brand_id string,
    label bigint,
    target_brand_id string,
    clk_brand_seq string,
    clt_brand_seq string,
    pay_brand_seq string,
    clk_cate_seq string,
    clt_cate_seq string,
    pay_cate_seq string,
    user_click_feature string,
    user_clt_feature string,
    user_cart_feature string,
    user_pay_feature string,
    brand_stat_feature string,
    user_cate2_cross_feature string,
    user_brand_cross_feature string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;

insert OVERWRITE TABLE user_pay_sample_feature_join_dnn partition (ds=${bizdate})
select t1.user_id, t1.brand_id, t1.label, concat('b_',t1.brand_id)
    ,if(t2.brand_id_seq is null, 'clkb_seq_null', t2.brand_id_seq)
    ,if(t3.brand_id_seq is null, 'cltb_seq_null', t3.brand_id_seq)
    ,if(t4.brand_id_seq is null, 'payb_seq_null', t4.brand_id_seq)
    ,if(t5.cate_seq is null, 'clkc_seq_null', t5.cate_seq)
    ,if(t6.cate_seq is null, 'cltc_seq_null', t6.cate_seq)
    ,if(t7.cate_seq is null, 'payc_seq_null', t7.cate_seq)
    ,if(t8.user_id is null, 'user_click_null', concat(
        concat('uclk_item_num_3d','_',if(t8.item_num_3d is null, 'null', cast(log(2,t8.item_num_3d+1) as bigint))),',',
        concat('uclk_brand_num_3d','_',if(t8.brand_num_3d is null, 'null', cast(log(2,t8.brand_num_3d+1) as bigint))),',',
        concat('uclk_seller_num_3d','_',if(t8.seller_num_3d is null, 'null', cast(log(2,t8.seller_num_3d+1) as bigint))),',',
        concat('uclk_cate1_num_3d','_',if(t8.cate1_num_3d is null, 'null', cast(log(2,t8.cate1_num_3d+1) as bigint))),',',
        concat('uclk_cate2_num_3d','_',if(t8.cate2_num_3d is null, 'null', cast(log(2,t8.cate2_num_3d+1) as bigint))),',',
        concat('uclk_cnt_days_3d','_',if(t8.cnt_days_3d is null, 'null', cast(log(2,t8.cnt_days_3d+1) as bigint))),',',
...
    )) as user_click_beh_feature
    ...
from (
    select *
    from user_pay_sample
    where ds=${bizdate}
)t1 left join (
    select user_id, brand_id_seq
    from user_click_brand_seq_feature
    where ds=${bizdate}
)t2 on t1.user_id=t2.user_id
...
where (t2.brand_id_seq is not null or t3.brand_id_seq is not null or t4.brand_id_seq is not null or
    t5.cate_seq is not null or t6.cate_seq is not null or t7.cate_seq is not null or
    t8.cnt_days_90d is not null or t9.cate1_num_90d is not null or t10.item_num_90d is not null
    or t11.item_num_90d is not null or t12.click_num is not null or t13.clk_item_90d is not null or
    t14.clk_item_90d is not null)
;

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
4. 特征序列化

把所有的特征序列化,目的是为了将类似于b_alipay_num_6这样的特征编号

create table if not exists user_pay_sample_feature_seq (
    feature string ,
    number bigint
)LIFECYCLE 90;

insert OVERWRITE TABLE user_pay_sample_feature_seq
select feature, ROW_NUMBER() OVER(ORDER BY feature) AS number
from (
    select DISTINCT feature
    from (
        select target_brand_id as feature
        from user_pay_sample_feature_join_dnn
        where ds>='20130701' and ds<='20130916'
        union all
        select trans_array(0,',',clk_brand_seq) as (feature)
        from user_pay_sample_feature_join_dnn
        where ds>='20130701' and ds<='20130916'
        union all
        select trans_array(0,',',clt_brand_seq) as (feature)
        from user_pay_sample_feature_join_dnn
        where ds>='20130701' and ds<='20130916'
...
    )t1
)t1
;

在这里插入图片描述

使用多个 JOIN 语句将不同的特征序列与原始数据表 user_pay_sample_feature_join_dnn 连接起来

reate table if not exists user_pay_sample_feature_join_dnn_seq(
    user_id string,
    brand_id string,
    label bigint,
    bizdate string,
    target_brand_id string,
    clk_brand_seq string,
    clt_brand_seq string,
    pay_brand_seq string,
    clk_cate_seq string,
    clt_cate_seq string,
    pay_cate_seq string,
    user_click_feature string,
    user_clt_feature string,
    user_cart_feature string,
    user_pay_feature string,
    brand_stat_feature string,
    user_cate2_cross_feature string,
    user_brand_cross_feature string
)LIFECYCLE 90;

insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq
select t1.user_id, t1.brand_id, t1.label, t1.ds, t1.number,
    t2.feature, t3.feature, t5.feature, t6.feature, t7.feature, t8.feature
    ,t9.feature, t10.feature, t11.feature, t12.feature, t13.feature, t14.feature, t15.feature
from (
    select t1.user_id, t1.brand_id, t1.label, t1.ds, t2.number
    from (
        select user_id, brand_id, label, ds, target_brand_id
        from user_pay_sample_feature_join_dnn
        where ds>='20130701' and ds<='20130916'
    )t1 join (
        select feature, number
        from user_pay_sample_feature_seq
    )t2 on t1.target_brand_id=t2.feature
)t1 join (
    select user_id, brand_id, label, ds, WM_CONCAT(',', number) as feature
    from (
        select user_id, brand_id, label, ds, number
        from (
            select trans_array(4, ',', user_id, brand_id, label, ds, clk_brand_seq) as (user_id, brand_id, label, ds, feature)
            from user_pay_sample_feature_join_dnn
            where ds>='20130701' and ds<='20130916'
        )t1 join (
            select feature, number
            from user_pay_sample_feature_seq
        )t2 on t1.feature=t2.feature
    )t1
    group by user_id, brand_id, label, ds
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id and t1.label=t2.label and t1.ds=t2.ds
...
;

在这里插入图片描述
在这里插入图片描述

从 user_pay_sample_feature_join_dnn_seq 表中选择所有列,并为每行生成一个唯一的 key_all,这个键是通过连接随机数和原始数据的一些列(如 user_id, brand_id, label, bizdate)构成的。然后用 group by

使用 MAX() 函数对每个 key_all 分组内的记录选择最大值,这样做可以确保每个特征序列中的每个特征都是最具有代表性的,并且可以用于后续的模型训练。

create table if not exists user_pay_sample_feature_join_dnn_seq_shuffle(
    key_all string,
    label bigint,
    target_brand_id string,
    clk_brand_seq string,
    clt_brand_seq string,
    pay_brand_seq string,
    clk_cate_seq string,
    clt_cate_seq string,
    pay_cate_seq string,
    user_click_feature string,
    user_clt_feature string,
    user_cart_feature string,
    user_pay_feature string,
    brand_stat_feature string,
    user_cate2_cross_feature string,
    user_brand_cross_feature string
)LIFECYCLE 90;

insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq_shuffle
select key_all, max(label), MAX(target_brand_id), MAX(clk_brand_seq), MAX(clt_brand_seq), MAX(pay_brand_seq)
    ,max(clk_cate_seq), max(clt_cate_seq), max(pay_cate_seq), max(user_click_feature)
    ,max(user_clt_feature), max(user_cart_feature), max(user_pay_feature), max(brand_stat_feature)
    ,max(user_cate2_cross_feature), max(user_brand_cross_feature)
from (
    select *, concat(RAND(),'_',RAND(),'_',user_id,'_',brand_id,'_',label,'_',bizdate) as key_all
    from user_pay_sample_feature_join_dnn_seq
)t1 group by key_all
;

在这里插入图片描述

训练模型-DNN

思路

  1. 使用 pyodps 读取数据集。
  2. dataset:读取数据、转换为 PyTorch 格式,tensor。
  3. dataLoader:组织数据,也可以数据变换。
  4. model:包含 embedding、dense、relu。
  5. 损失函数 loss:使用 celoss(交叉熵损失)。
  6. 优化器 optimizer:使用 adam。
  7. 训练的流程:使用 for 循环。
  8. 保存 loss,使用 tensorboard。
  9. 评估 auc(Area Under Curve)。
  10. 保存模型。

代码结构

.
├── config
│   ├── ak_config.py
│   ├── dnn_config.py
├── dataset
│   ├── dnn_dataset.py
├── din_model_train.py
├── dnn_focal_loss_model_train.py
├── dnn_model_test.py
├── dnn_model_train.py
├── loss
│   ├── focal_loss.py
├── model
│   ├── din_model2.py
│   ├── din_model.py
│   ├── dnn_model.py
│   ├── moe_model.py
├── moe_focal_loss_model_train.py
└── utils
    ├── get_data.py

代码内容

  • dnn_config
config={
    "embedding_dim":32,
    "num_embedding":20000,
    "lr":0.001,
    "batch_size":512,
    "num_experts":3,
    "feature_col" :["target_brand_id","clk_brand_seq","clt_brand_seq","pay_brand_seq","clk_cate_seq","clt_cate_seq","pay_cate_seq","user_click_feature","user_clt_feature","user_cart_feature","user_pay_feature","brand_stat_feature","user_cate2_cross_feature","user_brand_cross_feature"],
    "features_gate_col":["target_brand_id","brand_stat_feature","clk_brand_seq","user_cate2_cross_feature","user_brand_cross_feature"]
}
  • dnn_dataset
import torch
import torch.nn as nn 
import torch.nn.functional as F 
from torch.utils.data import Dataset, IterableDataset

# 基础数据集类,用于处理特征和标签数据
class MyDataset(Dataset):
    """
    自定义数据集类,继承自PyTorch的Dataset
    用于处理已经预处理好的特征和标签数据
    """
    def __init__(self, features, labels, config):
        """
        初始化数据集
        
        参数:
            features: 特征字典,包含多个特征列
            labels: 标签数据
            config: 配置字典,包含特征列名等信息
        """
        self.config = config
        self.features = {}
        # 将每个特征列转换为张量格式
        for ff in self.config["feature_col"]:
            self.features[ff] = [torch.tensor([int(id) for id in seq.split(',')], dtype=torch.long) for seq in features[ff]]
        self.labels = torch.tensor(labels, dtype=torch.float32)
        
    def __len__(self):
        """返回数据集的样本数量"""
        return len(self.labels)
        
    def __getitem__(self, idx):
        """
        获取指定索引的样本
        
        参数:
            idx: 样本索引
        返回:
            包含特征和标签的字典
        """
        res_features = {}
        for ff in self.config["feature_col"]:
            res_features[ff] = self.features[ff][idx]
        res_features['labels'] = self.labels[idx]
        return res_features
    
# 优化版数据集类,用于处理大规模数据
class MyPriorDataset(Dataset):
    """
    优化的数据集类,用于更高效地处理特征数据
    相比MyDataset,减少了内存使用,采用即时转换策略
    """
    def __init__(self, features, labels, config):
        """
        初始化数据集
        
        参数:
            features: 原始特征数据
            labels: 标签数据
            config: 配置字典
        """
        self.config = config
        self.features = features
        self.labels = labels
        
    def __len__(self):
        """返回数据集的样本数量"""
        return len(self.labels)
        
    def __getitem__(self, idx):
        """
        获取指定索引的样本,即时进行数据转换
        
        参数:
            idx: 样本索引
        返回:
            包含特征和标签的字典
        """
        res_features = {}
        for ff in self.config["feature_col"]:
            res_features[ff] = torch.tensor([int(id) for id in self.features[ff][idx].split(',')], dtype=torch.long)
        res_features['labels'] = torch.tensor(self.labels[idx], dtype=torch.float32)
        return res_features    
# 可迭代数据集类,用于流式处理大规模数据
class MyIterDataset(IterableDataset):
    """
    可迭代数据集类,适用于大规模数据的流式处理
    继承自PyTorch的IterableDataset,支持数据流式加载
    """
    def __init__(self, df):
        """
        初始化可迭代数据集
        
        参数:
            df: pandas DataFrame对象,包含特征和标签数据
        """
        super().__init__()
        self.df = df
        
    def __iter__(self):
        """
        返回数据迭代器
        
        生成器函数,逐行产出处理后的数据样本
        返回:
            字典,包含处理后的特征和标签
        """
        for index, row in self.df.iterrows():
            yield {
                'features': {col: list(map(int, row[col].split(','))) for col in feature_columns if col != 'label'},
                'label': int(row['label'])
            }

  • dnn_model
import torch
import torch.nn as nn 
import torch.nn.functional as F 

# 深度神经网络模型类
class MyModel(nn.Module):
    """
    自定义DNN模型
    实现了一个包含embedding层和三层全连接层的深度神经网络
    用于特征学习和二分类预测
    """
    def __init__(self, config):
        """
        初始化模型结构
        
        参数:
            config: 配置字典,包含以下关键参数:
                - num_embedding: embedding字典大小
                - embedding_dim: embedding向量维度
                - feature_col: 特征列名列表
        """
        super(MyModel, self).__init__()
        self.config = config
        # 创建embedding层,用于将离散特征转换为稠密向量
        # padding_idx=0表示将0作为填充值,其embedding向量将始终为0
        self.embedding = nn.Embedding(
            num_embeddings=self.config["num_embedding"], 
            embedding_dim=self.config["embedding_dim"], 
            padding_idx=0
        )
        
        # 定义三层全连接网络
        # 第一层:将所有特征的embedding连接后映射到512维
        self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
        # 第二层:将512维特征映射到128维
        self.fc2 = nn.Linear(512,128)
        # 输出层:将128维特征映射到1维输出
        self.fc3 = nn.Linear(128,1)
        
    def forward(self, features):
        """
        前向传播函数
        
        参数:
            features: 字典,包含各个特征列的输入数据
                     每个特征的形状为 [batch_size, sequence_length]
        
        返回:
            tensor: 模型预测输出,形状为 [batch_size, 1]
        """
        # 对每个特征进行embedding操作
        embedding_dict = {}
        for ff in self.config["feature_col"]:
            # 对每个特征序列的embedding结果求和,得到定长表示
            embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
            
        # 将所有特征的embedding结果在特征维度上拼接
        x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
        
        # 通过三层全连接网络
        x = F.relu(self.fc1(x))  # 第一层激活
        x = F.relu(self.fc2(x))  # 第二层激活
        x = self.fc3(x)          # 输出层(不使用激活函数)
        
        return x
  • get_data
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
import numpy as np
import os
from odps import ODPS
from torch.nn.utils.rnn import pad_sequence
from config.dnn_config import config as dnn_config
import torch

# 从MaxCompute获取训练和测试数据
def get_data(ak_config):
    """
    从MaxCompute数据库获取训练数据并进行预处理
    
    参数:
        ak_config: 包含MaxCompute访问配置的字典
    返回:
        train_feature_numpy: 训练集特征
        test_feature_numpy: 测试集特征
        train_label: 训练集标签
        test_label: 测试集标签
    """
    # 初始化ODPS连接
    o = ODPS(
        ak_config["access_id"],
        ak_config["access_key"],
        ak_config["project"],
        ak_config["endpoint"],
    )

    # 执行SQL查询获取数据
    sql = '''
    SELECT *
    FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle
    ;
    '''
    print(sql)
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=10)  # 使用多线程加速数据读取
    print('read data finish')
    
    # 数据预处理
    df = df.drop(columns=['key_all'])  # 删除非特征列
    X = df.drop(columns='label')       # 分离特征
    y = df['label']                    # 分离标签

    # 划分训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 构建特征字典
    feature_col = dnn_config["feature_col"]
    train_feature_numpy = {}
    test_feature_numpy = {}
    for feature in feature_col:
        train_feature_numpy[feature] = X_train[feature].values
        test_feature_numpy[feature] = X_test[feature].values
    train_label = y_train.values
    test_label = y_test.values
    return train_feature_numpy,test_feature_numpy,train_label,test_label

# 计算Top-K正例比例
def calculate_top_k_ratio(predictions, labels, top_k_list):
    """
    计算不同K值下的正例召回率
    
    参数:
        predictions: 模型预测分数
        labels: 真实标签
        top_k_list: 需要计算的K值列表
    返回:
        ratios: 不同K值对应的正例召回率字典
    """
    results_df = pd.DataFrame({'prediction': predictions, 'label': labels})
    results_df = results_df.sort_values(by='prediction', ascending=False)
    total_positives = (results_df['label'] == 1).sum()
    
    ratios = {}
    for k in top_k_list:
        top_k_df = results_df.head(k)
        top_k_positives = (top_k_df['label'] == 1).sum()
        ratio = top_k_positives / total_positives
        ratios[k] = ratio
    
    return ratios

# 获取特定品牌的测试数据
def get_data_test(ak_config, brand_id):
    """
    从MaxCompute获取特定品牌的测试数据
    
    参数:
        ak_config: MaxCompute访问配置
        brand_id: 品牌ID
    返回:
        test_feature_numpy: 测试特征
        test_label: 测试标签
    """
    o = ODPS(ak_config["access_id"], ak_config["access_key"],
             ak_config["project"], ak_config["endpoint"])

    sql = '''
    SELECT *
    FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval_dnn_seq
    where keys_all = '{brand_id}'
    ;
    '''.format(brand_id=brand_id)
    print(sql)
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=10)
    print('read data finish')
    
    df = df.drop(columns=['keys_all'])
    X = df.drop(columns='label')
    y = df['label']

    feature_col = dnn_config["feature_col"]
    test_feature_numpy = {}
    for feature in feature_col:
        test_feature_numpy[feature] = X[feature].values
    test_label = y.values
    return test_feature_numpy,test_label

# 获取MOE模型测试数据
def get_data_test_moe(ak_config):
    """
    获取MOE模型的测试数据(限制3000条)
    
    参数:
        ak_config: MaxCompute访问配置
    返回:
        test_feature_numpy: 测试特征
        test_label: 测试标签
    """
    o = ODPS(
        ak_config["access_id"],
        ak_config["access_key"],
        ak_config["project"],
        ak_config["endpoint"],
    )

    # 读取数据。
    sql = '''
    SELECT *
    FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle limit 3000
    ;
    '''
    print(sql)
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=10) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
    print('read data finish')
    # 删除非特征列
    df = df.drop(columns=['key_all'])

    # 分离特征和标签
    X = df.drop(columns='label')
    y = df['label']

    # 划分训练集和测试集
    feature_col = dnn_config["feature_col"]
    test_feature_numpy = {}
    for feature in feature_col:
        test_feature_numpy[feature] = X[feature].values
    test_label = y.values
    return test_feature_numpy,test_label

# 基础数据整理函数
def my_collate_fn(batch):
    """
    数据批次整理函数,用于DataLoader
    处理变长序列,不包含mask
    
    参数:
        batch: 数据批次
    返回:
        res_feature: 填充后的特征字典
        labels: 标签张量
    """
    res_features_tmp = {}
    labels = []
    for ff in dnn_config["feature_col"]:
        res_features_tmp[ff] = []
    # 收集批次数据
    for sample in batch:
        for ff in dnn_config["feature_col"]:
            res_features_tmp[ff].append(sample[ff])
        labels.append(sample["labels"])
    # 对序列进行填充
    res_feature = {}
    for ff in dnn_config["feature_col"]:
        res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)
    return res_feature, torch.tensor(labels)

# 带序列mask的数据整理函数
def seq_collate_fn(batch):
    """
    带序列mask的数据批次整理函数
    用于需要attention mask的模型(如DIN)
    
    参数:
        batch: 数据批次
    返回:
        res_feature: 填充后的特征字典
        res_mask: 特征的mask字典
        labels: 标签张量
    """
    res_features_tmp = {}
    labels = []
    # 收集数据
    for ff in dnn_config["feature_col"]:
        res_features_tmp[ff] = []
    for sample in batch:
        for ff in dnn_config["feature_col"]:
            res_features_tmp[ff].append(sample[ff])
        labels.append(sample["labels"])
    # 生成特征和对应的mask
    res_feature = {}
    res_mask = {}
    for ff in dnn_config["feature_col"]:
        res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)
        res_mask[ff] = (res_feature[ff] != 0).type(torch.float32)  # 生成mask:非0位置为1,0位置为0
    return res_feature, res_mask, torch.tensor(labels)
  • dnn_model_train
from sklearn.metrics import roc_auc_score
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn 
from torch.utils.tensorboard import SummaryWriter

from dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data
from utils.get_data import my_collate_fn

# 获取训练和测试数据
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
# 创建训练和测试数据集
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)

print('dataset finish')
# 创建数据加载器
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
print('dataloader finish')

# 初始化模型、损失函数和优化器
model = MyModel(dnn_config)
criterion = nn.BCEWithLogitsLoss()  # 二元交叉熵损失
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
writer = SummaryWriter()  # 用于TensorBoard可视化

# 创建模型保存目录
os.makedirs('models/dnn2', exist_ok=True)

def train_model(train_loader, test_loader, model, criterion, optimizer, num_epochs=2):
    """
    模型训练主函数
    
    参数:
        train_loader: 训练数据加载器
        test_loader: 测试数据加载器
        model: 神经网络模型
        criterion: 损失函数
        optimizer: 优化器
        num_epochs: 训练轮数
    """
    total_step = 0
    for epoch in range(num_epochs):
        model.train()  # 设置为训练模式
        for features,labels in train_loader:
            # 前向传播
            labels = labels
            optimizer.zero_grad()  # 清除梯度
            outputs = model(features)
            labels = torch.unsqueeze(labels,dim=1)
            loss = criterion(outputs, labels)
            
            # 反向传播和优化
            loss.backward()
            optimizer.step()
            
            total_step += 1
            # 记录训练损失
            if (total_step+1)%10 == 0:
                writer.add_scalar('Training Loss', loss.item(), total_step)
            # 打印训练进度
            if (total_step+1)%100 == 0:
                print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
            # 定期评估模型性能
            if (total_step+1)%2000 == 0:
                with torch.no_grad():
                    model.eval()  # 设置为评估模式
                    test_preds = []
                    test_targets = []
                    # 在测试集上进行预测
                    for data, target in test_loader:
                        output  = model(data)
                        test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())
                        test_targets.extend(target.squeeze().tolist())
                    # 计算AUC分数
                    test_auc = roc_auc_score(test_targets, test_preds)
                    writer.add_scalar('AUC/train', test_auc, total_step)
                # 保存模型检查点
                torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}_{total_step}.pth')
                model.train()
        # 每个epoch结束后保存模型
        torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}.pth')
    
    # 训练结束后的最终评估
    with torch.no_grad():
        model.eval()
        test_preds = []
        test_targets = []
        for data, target in test_loader:
            output  = model(data)
            test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())
            test_targets.extend(target.squeeze().tolist())
        test_auc = roc_auc_score(test_targets, test_preds)
        writer.add_scalar('AUC/train', test_auc, total_step)

# 开始训练
train_model(dataloader_train, dataloader_test, model, criterion, optimizer)
writer.close()  # 关闭TensorBoard写入器
  • dnn_model_test
from torch.utils.data import DataLoader
import torch
from dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data_test
from utils.get_data import calculate_top_k_ratio
from utils.get_data import my_collate_fn

# 定义要测试的品牌列表
brands = ['b47686','b56508','b62063','b78739']
for brand_id in brands:
    # 设置模型路径和评估参数
    model_path = './models/focal2/model_epoch_1_27999.pth'
    # 定义要评估的top-k值列表
    top_k_list = [1000, 3000, 5000, 10000, 50000]

    # 获取测试数据
    test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
    dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)

    # 创建测试数据加载器
    dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"], 
                               shuffle=False, collate_fn=my_collate_fn)

    # 加载模型
    model = MyModel(dnn_config).to('cpu')
    model.load_state_dict(torch.load(model_path))
    model.to('cpu')
    model.eval()  # 设置为评估模式
    
    # 进行预测
    test_preds = []
    test_targets = []
    for data, target in dataloader_test:
        output  = model(data)
        test_preds.extend(output.sigmoid().squeeze().tolist())
        test_targets.extend(target.squeeze().tolist())

    # 计算各个top-k的正例比例
    ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
    # 打印结果
    for k, ratio in ratios.items():
        print(f"Top {k} ratio of positive labels: {ratio:.4f}")
    # 将结果保存到文件
    with open(f'models_{brand_id}_top_results_focal.txt', 'w') as f:
        for k, ratio in ratios.items():
            f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")

训练过程

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以看到 loss 在震荡,调高 smooth 以后可以看出趋势是在下降的。

focal loss

在真实的样本中,有很多样本是非常容易区分的,比如对品牌有加购的用户,很容易预测为0.9+,对于平台没有任何行为的用户,模型很容易预测为0.1。这些样本都是“容易”样本,对于很多样本预测为0.4-0.6的或者,正样本预测为0.1,负样本预测为0.9,这些样本都是“困难”样本。

假设batch为5,loss如下 [0.1, 0.01, 0.5, 0.1, 0.14],其中0.5是困难样本,其他是容易样本,但是困难样本只在整体的loss中贡献58%,模型还有42%的注意力放到了容易样本上,但是容易样本已经学的很好不需要再学习。因此需要让模型专注学习困难样本,提升困难样本的效果。

本方案采用focal loss优化模型loss函数,让模型自动加大对困难样本的专注度,提升模型效果。

Focal loss 出自视觉领域,Focal Loss for Dense Object Detection,主要解决样本不均衡以及困难样本学习问题。

交叉熵损失函数为,

L ( W ) = − y log ⁡ ( h W ( X ) ) − ( 1 − y ) log ⁡ ( 1 − h W ( X ) ) L(W) = -y \log(h_W(X)) - (1 - y) \log(1 - h_W(X)) L(W)=ylog(hW(X))(1y)log(1hW(X))

其中 h W ( X ) h_W(X) hW(X) 表示预测结果为1的概率,则,

L ( W ) = { − log ⁡ ( h W ( X ) ) , y = 1 − log ⁡ ( 1 − h W ( X ) ) , y = 0 L(W) = \begin{cases} -\log(h_W(X)), & y = 1 \\ -\log(1 - h_W(X)), & y = 0 \end{cases} L(W)={log(hW(X)),log(1hW(X)),y=1y=0

定义,focal loss为,

L ( W ) = { − ( 1 − h W ( X ) ) γ log ⁡ ( h W ( X ) ) , y = 1 − ( h W ( X ) ) γ log ⁡ ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={(1hW(X))γlog(hW(X)),(hW(X))γlog(1hW(X)),y=1y=0

其中 γ \gamma γ 一般取2。

对于背景中的例子,预测loss为 [0.1, 0.01, 0.5, 0.1, 0.14],假设只看加权项,则计算后加权项为 [0.01, 0.0001, 0.25, 0.01, 0.0196]。

  • 0.5 转换为 0.25,降低了50%,而 0.1 转换为 0.01 降低了 90%,“容易”样本降低的比例比“困难”样本大。
  • 困难样本 0.5 转换前占整体误差比例为 58%,转换后占整体样本的比例为 86%。转换后,由于“容易”样本降低幅度大,变相提升了“困难”样本的占比。

由于在实际的案例中,负样本中“容易”样本太多,正样本里“困难”太多,通过该转换后,正样本里“困难”样本提升幅度过大,因此还需要加入一个平衡因子:

L ( W ) = { − α ( 1 − h W ( X ) ) γ log ⁡ ( h W ( X ) ) , y = 1 − ( 1 − α ) ( h W ( X ) ) γ log ⁡ ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -\alpha(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(1 - \alpha)(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={α(1hW(X))γlog(hW(X)),(1α)(hW(X))γlog(1hW(X)),y=1y=0

其中 α \alpha α 一般取 0.25。

通过 focal loss,模型可以通过模型训练的实际误差对样本动态加权,让模型更关注“困难”样本。

  • focal_loss
import torch
import torch.nn as nn 
from torch.nn.functional import binary_cross_entropy_with_logits

# Focal Loss实现类,用于解决类别不平衡问题
class FocalLoss(nn.Module):
    """
    Focal Loss损失函数实现
    用于解决分类任务中的类别不平衡问题
    通过降低易分样本的权重,提高难分样本的权重,使模型更关注难分样本
    """
    def __init__(self, alpha=0.25, gamma=2.0):
        """
        初始化Focal Loss
        
        参数:
            alpha: float, 类别权重因子
                  用于平衡正负样本的重要性
                  当正样本较少时,可以增大alpha值
            gamma: float, 调制因子
                  用于调节易分样本的权重
                  gamma越大,对易分样本的惩罚越大
        """
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, logits, targets):
        """
        计算Focal Loss值
        
        参数:
            logits: tensor, 模型输出的原始预测值(未经过sigmoid)
            targets: tensor, 真实标签值
        
        返回:
            tensor, 计算得到的focal loss均值
        """
        # 计算sigmoid后的预测概率
        probs = torch.sigmoid(logits)

        # 计算二元交叉熵损失(不进行reduction)
        ce_loss = binary_cross_entropy_with_logits(logits, targets, reduction='none')

        # 计算调制因子
        # pt表示预测正确的概率:
        # 当target=1时,pt = prob
        # 当target=0时,pt = 1 - prob
        pt = torch.where(targets == 1, probs, 1 - probs)
        
        # 计算focal weight:(1-pt)^gamma
        # 预测越准确,权重越小
        focal_weight = (1 - pt) ** self.gamma

        # 应用alpha权重
        # 当target=1时,权重为alpha
        # 当target=0时,权重为1-alpha
        alpha_t = torch.where(targets == 1, self.alpha, 1 - self.alpha)
        
        # 计算最终的focal loss
        focal_loss_value = alpha_t * focal_weight * ce_loss

        # 返回batch的平均loss
        return focal_loss_value.mean()    

评估结果

b47686 韩都衣舍b56508 三星手机b62063 诺基亚b78739 LILY平均
gbdt 品牌样本0.114842 0.209537 0.265950 0.349899 0.5339150.068056 0.104861 0.129861 0.164583 0.2847220.102041 0.190476 0.217687 0.258503 0.3537410.257225 0.361272 0.427746 0.488439 0.6242770.2569
gbdt 全样本0.141706 0.230356 0.288113 0.366017 0.5372730.073611 0.103472 0.125000 0.177778 0.2895830.149660 0.217687 0.231293 0.278912 0.3945580.300578 0.416185 0.462428 0.528902 0.6271680.2970
dnn bce loss0.1209 0.2317 0.2821 0.3627 0.53460.0521 0.1007 0.1222 0.1667 0.29380.1293 0.1837 0.2109 0.2517 0.37410.2746 0.4017 0.4653 0.5145 0.65610.2865
dnn focal loss0.1330 0.2250 0.2807 0.3627 0.53190.0639 0.1076 0.1306 0.1736 0.30210.1361 0.1973 0.2041 0.2721 0.38100.2832 0.3960 0.4624 0.5145 0.66760.2913

可以看到focal loss相比交叉熵效果稍微好了一点,而整体来看dnn在top5w方面优于gbdt。

DIN 训练

模型架构

在之前的DNN模型中,输入的用户行为序列只是使用sum pool将用户的行为序列直接叠加在一起,这不利于学习行为序列之间的关系,为提升模型学习品牌之间的关系,本方案引入deep interest network (DIN)算法使用target attention的模式,让模型通过attention自动学习品牌之间的关联关系,从而提升模型的效果。

在这里插入图片描述

代码内容

  • din_model
import torch
import torch.nn as nn 
import torch.nn.functional as F 
from torch.utils.tensorboard import SummaryWriter

# DIN模型基础实现
# 主要处理用户的购买行为序列

class LocalActivationUnit(nn.Module):
    """
    局部激活单元
    用于计算用户行为序列与目标商品之间的注意力权重
    仅处理购买行为序列
    """
    def __init__(self, hidden_units):
        """
        初始化局部激活单元
        Args:
            hidden_units: 隐藏层单元数,用于特征交互和注意力计算
        """
        super(LocalActivationUnit, self).__init__()
        # 第一个全连接层:输入维度是hidden_units的4倍(包含了多种特征交互)
        self.fc1 = nn.Linear(hidden_units * 4, hidden_units)
        # 第二个全连接层:将hidden_units维压缩到1维,用于计算注意力分数
        self.fc2 = nn.Linear(hidden_units, 1)

    def forward(self, user_behaviors, target_item, mask):
        """
        计算注意力权重
        
        参数:
            user_behaviors: 用户购买行为序列
            target_item: 目标商品
            mask: 序列填充掩码
        返回:
            user_interests: 加权后的用户兴趣表示
        """
        # 获取序列长度
        seq_len = user_behaviors.size(1)
        # 将目标商品扩展到与行为序列相同的维度,便于后续计算
        target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
        
        # 构建特征交互:拼接多种特征交互形式
        # 包括:原始特征、目标商品特征、差异特征、乘积特征
        interactions = torch.cat([
            user_behaviors,           # 原始用户行为
            target_item,             # 目标商品
            user_behaviors-target_item,  # 行为与目标的差异
            user_behaviors*target_item   # 行为与目标的逐元素乘积
        ], dim=-1)
        
        # 通过两层全连接网络计算注意力分数
        x = torch.relu(self.fc1(interactions))  # 第一层激活
        attention_logits = self.fc2(x).squeeze(-1)  # 第二层得到注意力分数
        
        # 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷
        attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
        
        # 使用softmax将注意力分数转换为权重
        attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
        
        # 计算加权后的用户兴趣表示
        user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
        return user_interests

class DinModel(nn.Module):
    """
    基础DIN模型实现
    只考虑用户的购买行为序列
    """
    def __init__(self, config):
        """
        初始化DIN模型
        Args:
            config: 配置字典,包含模型参数设置
        """
        super(DinModel, self).__init__()
        self.config = config
        # 创建embedding层,用于将离散特征转换为稠密向量
        self.embedding = nn.Embedding(
            num_embeddings=self.config["num_embedding"],  # embedding字典大小
            embedding_dim=self.config["embedding_dim"],   # embedding向量维度
            padding_idx=0  # 填充token的索引
        )
        # 创建多层全连接网络
        self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
        self.fc2 = nn.Linear(512,128)
        self.fc3 = nn.Linear(128,1)
        
        # 创建局部激活单元,用于计算注意力
        self.att = LocalActivationUnit(self.config["embedding_dim"])
        
    def forward(self, features, mask):
        """
        模型前向传播
        
        特点:
        - 只处理购买行为序列(pay_brand_seq)
        - 其他特征直接进行embedding sum
        """
        # 存储各特征的embedding结果
        embedding_dict = {}
        # 对每个特征进行embedding处理
        for ff in self.config["feature_col"]:
            if ff != 'pay_brand_seq':  # 非序列特征直接sum
                embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
        
        # 使用注意力机制处理行为序列
        att_emb = self.att(
            self.embedding(features['pay_brand_seq']),  # 用户历史行为序列
            embedding_dict['target_brand_id'],          # 目标商品
            mask['pay_brand_seq']                       # 序列掩码
        )
        
        # 拼接所有特征(除了行为序列)
        x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"] if ff != 'pay_brand_seq'], dim=1)
        # 拼接注意力处理后的序列特征
        x = torch.cat([x,att_emb], dim=1)
        
        # 通过多层全连接网络
        x = F.relu(self.fc1(x))  # 第一层
        x = F.relu(self.fc2(x))  # 第二层
        x = self.fc3(x)          # 输出层
        return x
  • din_model_train
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn 
from torch.utils.tensorboard import SummaryWriter

from dataset.dnn_dataset import MyPriorDataset
from model.din_model import DinModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data, calculate_top_k_ratio, get_data_test
from utils.get_data import seq_collate_fn

train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('train dataset finish')
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('train dataloader finish')

brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands: 
    test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
    dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
    dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DinModel(dnn_config).to(device) 
criterion = nn.BCEWithLogitsLoss()
#criterion = FocalLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir = './runs_din4'
writer = SummaryWriter(log_dir)

os.makedirs('models/din4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)

def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):
    """
    DIN模型训练主函数
    
    参数:
        train_loader: 训练数据加载器
        test_loader_dict: 测试数据加载器字典(按品牌分类)
        model: DIN模型
        criterion: 损失函数
        optimizer: 优化器
        num_epochs: 训练轮数
    """
    total_step = 0
    for epoch in range(num_epochs):
        model.train()  # 设置为训练模式
        for features,mask,labels in train_loader:
            # 将数据移到指定设备
            for ff in dnn_config["feature_col"]:
                features[ff] = features[ff].to(device)
                mask[ff] = mask[ff].to(device)
            labels = labels.to(device) 
            
            # 前向传播和优化
            optimizer.zero_grad()
            outputs = model(features,mask)
            labels = torch.unsqueeze(labels,dim=1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_step += 1
            # 记录训练损失
            if (total_step+1)%10 == 0:
                writer.add_scalar('Training Loss', loss.item(), total_step)
            # 打印训练进度
            if (total_step+1)%100 == 0:
                print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
            # 定期评估模型性能
            if (total_step+1)%7000 == 0:
                with torch.no_grad():
                    model.eval()
                    # 对每个品牌进行评估
                    for brand_id in brands:
                        top_k_list = [1000, 3000, 5000, 10000, 50000]
                        test_preds = []
                        test_targets = []
                        # 在测试集上进行预测
                        for data,mask, target in test_loader_dict[brand_id]:
                            output  = model(data,mask)
                            test_preds.extend(output.sigmoid().squeeze().tolist())
                            test_targets.extend(target.squeeze().tolist())

                        # 计算并保存评估结果
                        ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
                        for k, ratio in ratios.items():
                            print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
                        with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:
                            for k, ratio in ratios.items():
                                f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
                # 保存模型检查点
                torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}_{total_step}.pth')
                model.train()
        # 每个epoch结束后保存模型
        torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}.pth')
    
    # 训练结束后的最终评估
    with torch.no_grad():
        model.eval()
        for brand_id in brands:
            top_k_list = [1000, 3000, 5000, 10000, 50000]
            test_preds = []
            test_targets = []
            for data,mask, target in test_loader_dict[brand_id]:
                output  = model(data,mask)
                test_preds.extend(output.sigmoid().squeeze().tolist())
                test_targets.extend(target.squeeze().tolist())

            ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
            for k, ratio in ratios.items():
                print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
            with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:
                for k, ratio in ratios.items():
                    f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")

# 开始训练
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close()  # 关闭TensorBoard写入器
  • din_model2
import torch
import torch.nn as nn 
import torch.nn.functional as F 
from torch.utils.tensorboard import SummaryWriter

# DIN模型增强版实现
# 同时处理用户的点击和购买行为序列

class LocalActivationUnit(nn.Module):
    """
    局部激活单元
    用于计算用户行为序列与目标商品之间的注意力权重
    可同时处理点击和购买行为序列
    """
    def __init__(self, hidden_units):
        """
        初始化局部激活单元
        Args:
            hidden_units: 隐藏层单元数,用于特征交互和注意力计算
        """
        super(LocalActivationUnit, self).__init__()
        # 第一个全连接层:将4倍hidden_units(包含四种特征交互)映射到hidden_units
        self.fc1 = nn.Linear(hidden_units * 4, hidden_units)
        # 第二个全连接层:将hidden_units映射到1维注意力分数
        self.fc2 = nn.Linear(hidden_units, 1)

    def forward(self, user_behaviors, target_item, mask):
        """
        前向传播函数
        Args:
            user_behaviors: 用户历史行为序列 shape: (batch_size, seq_len, hidden_units)
            target_item: 目标商品 shape: (batch_size, hidden_units)
            mask: 序列填充掩码 shape: (batch_size, seq_len)
        """
        # 获取序列长度
        seq_len = user_behaviors.size(1)
        # 扩展目标商品维度以匹配序列长度
        target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
        
        # 构建特征交互向量,包含四种交互方式:
        # 1. 原始用户行为
        # 2. 目标商品
        # 3. 用户行为与目标商品的差异
        # 4. 用户行为与目标商品的元素积
        interactions = torch.cat([user_behaviors, target_item, user_behaviors-target_item, user_behaviors*target_item], dim=-1)
        
        # 通过两层全连接网络计算注意力分数
        x = torch.relu(self.fc1(interactions))  # 第一层带ReLU激活
        attention_logits = self.fc2(x).squeeze(-1)  # 第二层得到注意力分数
        
        # 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷
        attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
        
        # 使用softmax将注意力分数归一化为权重
        attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
        
        # 计算加权后的用户兴趣表示
        user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
        return user_interests

class DinModel(nn.Module):
    """
    增强版DIN模型实现
    
    主要改进:
    1. 增加了对用户点击行为序列(clk_brand_seq)的处理
    2. 分别对点击序列和购买序列使用注意力机制
    3. 融合两种行为序列的用户兴趣表示
    """
    def __init__(self, config):
        """
        初始化DIN模型
        Args:
            config: 配置字典,包含模型参数设置
        """
        super(DinModel, self).__init__()
        self.config = config
        # 创建embedding层,用于特征的稠密表示
        self.embedding = nn.Embedding(
            num_embeddings=self.config["num_embedding"],  # embedding字典大小
            embedding_dim=self.config["embedding_dim"],   # embedding向量维度
            padding_idx=0  # 填充token的索引
        )
        # 构建三层全连接网络
        self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)
        self.fc2 = nn.Linear(512,128)
        self.fc3 = nn.Linear(128,1)
        
        # 创建局部激活单元,用于计算注意力
        self.att = LocalActivationUnit(self.config["embedding_dim"])
        
    def forward(self, features, mask):
        """
        模型前向传播
        
        改进之处:
        1. 分别处理点击序列(clk_brand_seq)和购买序列(pay_brand_seq)
        2. 对两个序列分别计算注意力权重
        3. 将两个序列的注意力结果与其他特征一起拼接
        
        参数:
            features: 输入特征字典,包含点击序列和购买序列
            mask: 两个序列的填充掩码
        """
        embedding_dict = {}
        # 处理非序列特征
        for ff in self.config["feature_col"]:
            if ff != 'clk_brand_seq' and ff != 'pay_brand_seq':
                embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
        
        # 新增:处理点击序列
        embedding_dict['clk_brand_seq'] = self.att(
            self.embedding(features['clk_brand_seq']),
            embedding_dict['target_brand_id'],
            mask['clk_brand_seq']
        )
        
        # 处理购买序列
        embedding_dict['pay_brand_seq'] = self.att(
            self.embedding(features['pay_brand_seq']),
            embedding_dict['target_brand_id'],
            mask['pay_brand_seq']
        )
        
        # 拼接所有特征
        x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
        
        # 通过三层全连接网络
        x = F.relu(self.fc1(x))  # 第一层,使用ReLU激活
        x = F.relu(self.fc2(x))  # 第二层,使用ReLU激活
        x = self.fc3(x)          # 输出层,无激活函数
        return x

moe 训练

代码内容

import torch
import torch.nn as nn 
import torch.nn.functional as F 
from torch.utils.tensorboard import SummaryWriter

# 局部激活单元:用于计算用户行为序列的注意力权重
class LocalActivationUnit(nn.Module):
    """
    局部激活单元
    与DIN模型中的注意力机制相同,用于计算用户行为序列与目标商品的相关性
    """
    def __init__(self, hidden_units):
        """
        初始化局部激活单元
        参数:
            hidden_units: 隐藏层维度
        """
        super(LocalActivationUnit, self).__init__()
        self.fc1 = nn.Linear(hidden_units * 4, hidden_units)  # 第一层全连接
        self.fc2 = nn.Linear(hidden_units, 1)                 # 第二层全连接

    def forward(self, user_behaviors, target_item, mask):
        """
        计算注意力权重
        参数:
            user_behaviors: 用户行为序列 [batch_size, seq_len, hidden_units]
            target_item: 目标商品 [batch_size, hidden_units]
            mask: 序列掩码 [batch_size, seq_len]
        """
        seq_len = user_behaviors.size(1)
        target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)
        
        # 特征交互
        interactions = torch.cat([user_behaviors, target_item, 
                                user_behaviors-target_item, 
                                user_behaviors*target_item], dim=-1)
        
        # 计算注意力分数
        x = torch.relu(self.fc1(interactions))
        attention_logits = self.fc2(x).squeeze(-1)
        
        # 掩码处理
        attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))
        attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)
        
        # 加权求和得到用户兴趣表示
        user_interests = torch.sum(attention_weights * user_behaviors, dim=1)
        return user_interests

# 专家网络:每个专家独立学习特定的特征模式
class Expert(nn.Module):
    """
    专家网络
    实现了一个三层全连接网络,作为单个专家的计算单元
    """
    def __init__(self, input_size):
        """
        初始化专家网络
        参数:
            input_size: 输入特征维度
        """
        super(Expert, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 512),    # 第一层
            nn.ReLU(),
            nn.Linear(512, 128),           # 第二层
            nn.ReLU(),
            nn.Linear(128, 1)              # 输出层
        )
        
    def forward(self, x):
        """专家网络的前向计算"""
        return self.network(x)

# 门控网络:学习如何分配任务给不同的专家
class Gate(nn.Module):
    """
    门控网络
    负责为每个样本动态分配专家的权重
    """
    def __init__(self, input_size, num_experts):
        """
        初始化门控网络
        参数:
            input_size: 输入特征维度
            num_experts: 专家数量
        """
        super(Gate, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 64),          # 第一层
            nn.ReLU(),
            nn.Linear(64, 32),                  # 第二层
            nn.ReLU(),
            nn.Linear(32, num_experts)          # 输出层,维度等于专家数量
        )
        
    def forward(self, x):
        """计算每个专家的权重"""
        return self.network(x)

# 混合专家模型:组合多个专家的预测结果
class MoeModel(nn.Module):
    """
    混合专家模型(Mixture of Experts)
    结合了DIN的注意力机制和MoE的专家混合机制
    """
    def __init__(self, config):
        """
        初始化MoE模型
        参数:
            config: 配置字典,包含模型参数
        """
        super(MoeModel, self).__init__()
        self.config = config
        # embedding层
        self.embedding = nn.Embedding(
            num_embeddings=self.config["num_embedding"],
            embedding_dim=self.config["embedding_dim"],
            padding_idx=0
        )
        
        # 注意力机制
        self.att = LocalActivationUnit(self.config["embedding_dim"])
        # 创建多个专家
        self.experts = nn.ModuleList([
            Expert(self.config["embedding_dim"]*len(self.config["feature_col"]))
            for _ in range(self.config["num_experts"])
        ])
        # 门控网络
        self.gate = Gate(
            self.config["embedding_dim"]*len(self.config["features_gate_col"]),
            self.config["num_experts"]
        )
        
    def forward(self, features, mask):
        """
        模型前向传播
        参数:
            features: 输入特征字典
            mask: 序列特征的掩码
        返回:
            x: 最终预测结果
            gating_weights: 专家权重分布
        """
        # 特征embedding
        embedding_dict = {}
        for ff in self.config["feature_col"]:
            if ff != 'pay_brand_seq':
                embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)
        
        # 处理序列特征
        embedding_dict['pay_brand_seq'] = self.att(
            self.embedding(features['pay_brand_seq']),
            embedding_dict['target_brand_id'],
            mask['pay_brand_seq']
        )
        
        # 特征拼接
        x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)
        # 计算门控特征
        gate_emb = torch.cat([embedding_dict[ff] for ff in self.config["features_gate_col"]], dim=1)
        # 计算专家权重
        gating_weights = F.softmax(self.gate(gate_emb), dim=1)
        
        # 获取所有专家的输出并加权组合
        expert_outputs = torch.stack([expert(x) for expert in self.experts], dim=-1)
        x = torch.sum(gating_weights * expert_outputs.squeeze(), dim=-1)

        return x, gating_weights
# 导入必要的库
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import os
from odps import ODPS
from odps.df import DataFrame
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch
import torch.nn as nn 
import torch.nn.functional as F 
from torch.utils.tensorboard import SummaryWriter

# 导入自定义模块
from dataset.dnn_dataset import MyDataset
from dataset.dnn_dataset import MyPriorDataset
from model.moe_model import MoeModel  # 使用混合专家模型(Mixture of Experts)
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data,get_data_test,calculate_top_k_ratio
from utils.get_data import my_collate_fn, seq_collate_fn  # 序列数据的特殊整理函数
from loss.focal_loss import FocalLoss

# 获取训练数据并创建数据集
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('dataset finish')
# 创建训练数据加载器,使用序列整理函数
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('dataloader finish')

# 为不同品牌创建测试数据集和数据加载器
brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands: 
    test_feature_numpy,test_label = get_data_test(ak_config, brand_id)
    dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)
    dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')

# 设置设备(GPU/CPU)并初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MoeModel(dnn_config).to(device)  # 初始化MoE模型
criterion = nn.BCEWithLogitsLoss()  # 二元交叉熵损失函数
#criterion = FocalLoss()  # Focal Loss损失函数(已注释)
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir='./runs_moe4'  # TensorBoard日志目录
writer = SummaryWriter(log_dir=log_dir)

# 创建保存模型和日志的目录
os.makedirs('models/moe4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)

def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):
    """
    模型训练函数
    Args:
        train_loader: 训练数据加载器
        test_loader_dict: 测试数据加载器字典(按品牌分类)
        model: MoE神经网络模型
        criterion: 损失函数
        optimizer: 优化器
        num_epochs: 训练轮数
    """
    total_step = 0
    for epoch in range(num_epochs):
        model.train()  # 设置为训练模式
        for features,mask,labels in train_loader:
            # 将特征和掩码数据移到指定设备(GPU/CPU)
            for ff in dnn_config["feature_col"]:
                features[ff] = features[ff].to(device)
                mask[ff] = mask[ff].to(device)
            labels = labels.to(device) 
            
            # 训练步骤
            optimizer.zero_grad()  # 清空梯度
            outputs, _ = model(features,mask)  # 前向传播,包含特征和掩码
            outputs = torch.unsqueeze(outputs,dim=1)  # 调整输出维度
            labels = torch.unsqueeze(labels,dim=1)  # 调整标签维度
            loss = criterion(outputs, labels)  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
            
            total_step += 1
            
            # 记录训练损失到TensorBoard
            if (total_step+1)%10 == 0:
                writer.add_scalar('Training Loss', loss.item(), total_step)
            
            # 打印训练进度
            if (total_step+1)%100 == 0:
                print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')
            
            # 每7000步进行一次评估
            if (total_step+1)%7000 == 0:
                with torch.no_grad():
                    model.eval()  # 设置为评估模式
                    for brand_id in brands:
                        # 设置需要评估的top-k值
                        top_k_list = [1000, 3000, 5000, 10000, 50000]
                        test_preds = []
                        test_targets = []
                        
                        # 收集预测结果和真实标签
                        for data,mask, target in test_loader_dict[brand_id]:
                            output, _  = model(data,mask)
                            test_preds.extend(output.sigmoid().squeeze().tolist())
                            test_targets.extend(target.squeeze().tolist())

                        # 计算并输出top-k的正例比例
                        ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)
                        for k, ratio in ratios.items():
                            print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")
                        
                        # 保存评估结果到文件
                        with open(f'{log_dir}/models_{brand_id}_top_results_moe_{total_step}.txt', 'w') as f:
                            for k, ratio in ratios.items():
                                f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
                
                # 保存模型检查点
                torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}_{total_step}.pth')
                model.train()  # 切回训练模式
        
        # 每个epoch结束后保存模型
        torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}.pth')

# 开始训练并关闭TensorBoard写入器
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close()

评估结果

b47686 韩都衣舍b56508 三星手机b62063 诺基亚b78739 LILY平均
gbdt 品牌样本0.349899 0.5339150.164583 0.2847220.258503 0.3537410.488439 0.6242770.3823
gbdt 全样本0.366017 0.5372730.177778 0.2895830.278912 0.3945580.528902 0.6271680.4002
dnn bce loss0.3627 0.53460.1667 0.29380.2517 0.37410.5145 0.65610.3943
dnn focal loss0.3627 0.53190.1736 0.30210.2721 0.38100.5145 0.66760.4007
din0.3620 0.53320.1757 0.30280.2585 0.40140.5087 0.65320.3994
moe focal loss0.3553 0.53590.1722 0.29930.2789 0.36730.5231 0.66180.3992

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946899.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

库的概念:动态库与静态库

在软件开发中&#xff0c;库是代码复用的核心工具&#xff0c;它帮助开发者避免重复造轮子&#xff0c;提升开发效率。库可以分为动态库和静态库&#xff0c;这两者在程序开发中的使用方式、链接过程和性能上存在显著区别。本文将详细讲解动态库与静态库的定义、区别、链接过程…

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

Flink源码解析之&#xff1a;如何根据JobGraph生成ExecutionGraph 在上一篇Flink源码解析中&#xff0c;我们介绍了Flink如何根据StreamGraph生成JobGraph的流程&#xff0c;并着重分析了其算子链的合并过程和JobGraph的构造流程。 对于StreamGraph和JobGraph的生成来说&…

风力涡轮机缺陷检测数据集,91.4%准确识别率,18912张图片,支持yolo,PASICAL VOC XML,COCO JSON格式的标注

风力涡轮机缺陷检测数据集&#xff0c;91.4&#xff05;准确识别率&#xff0c;18912张图片&#xff0c;支持yolo&#xff0c;PASICAL VOC XML&#xff0c;COCO JSON格式的标注 数据集下载&#xff1a; &#xff59;&#xff4f;&#xff4c;&#xff4f; &#xff56;&#…

系统设计——大文件传输方案设计

摘要 大文件传输是指通过网络将体积较大的文件从一个位置发送到另一个位置的过程。这些文件可能包括高清视频、大型数据库、复杂的软件安装包等&#xff0c;它们的大小通常超过几百兆字节&#xff08;MB&#xff09;甚至达到几个吉字节&#xff08;GB&#xff09;或更大。大文…

linux中执行命令

1.1 命令格式 命令格式&#xff1a; 主命令 选项 参数&#xff08;操作对象&#xff09; 命令分为两类&#xff1a; 内置命令&#xff08; builtin &#xff09;&#xff1a;由 shell 程序自带的命令 外部命令&#xff1a;有独立的可执行程序文件&#xff0c;文件名即命令…

Elasticsearch:当混合搜索真正发挥作用时

作者&#xff1a;来自 Elastic Gustavo Llermaly 展示混合搜索何时优于单独的词汇或语义搜索。 在本文中&#xff0c;我们将通过示例探讨混合搜索&#xff0c;并展示它与单独使用词汇或语义搜索技术相比的真正优势。 什么是混合搜索&#xff1f; 混合搜索是一种结合了不同搜索…

Python pyside6 设置的一个《广告图片生成器》

一、图&#xff1a; 二、说明书&#xff1a; 广告图片生成器使用说明 软件功能 这是一个用于生成广告图片的工具&#xff0c;可以快速制作包含产品图片和文字的广告图片。 主要特点 自定义广告尺寸&#xff08;默认620420像素&#xff09; 智能去除产品图片背景 自动排版&…

Spark基本介绍

一&#xff0c;Spark是什么 1.定义&#xff1a;Aache Spark是用于大规模数据处理的统一分析引擎。 二&#xff0c;Spark的发展 三&#xff0c;Spark的特点 高效性 计算速度快 提供了一个全新的数据结构RDD&#xff08;弹性分布式数据集&#xff09;。整个计算操作&#xff0c;…

Elasticsearch操作笔记版

文章目录 1.ES索引库操作(CRUD)1.mapping常见属性(前提)2.创建索引库3.查询&#xff0c;删除索引库4.修改索引库 2.ES文档操作(CRUD)1.新增文档2.查询、删除文档查询返回的数据解读&#xff1a; 3.修改文档 3.RestClient操作(索引库/文档)(CRUD)1.什么是RestClient2.需要考虑前…

EFEVD: Enhanced Feature Extraction for Smart Contract Vulnerability Detection

假设&#xff0c;攻击者在合约 Dao 内存放有 1 Ether 攻击者调用 withdraw 函数&#xff0c;提取 1 Ether&#xff1b; 函数执行到 require 之后&#xff0c; balances 之前时&#xff0c;6789-6789-6789- contract Dao {function withdraw() public {require(balances[msg.…

我的线代观-秩(向量,矩阵)

都说秩是线代中不可避免的一环&#xff0c;当然&#xff0c;它其中最重要的一环。 我在学习线代之后&#xff0c;也有这种感受&#xff0c;它有着一种很绕的感受。 1.矩阵中 在矩阵中&#xff0c;它的秩是怎么定义的呢。它常常与行列式扯上关系&#xff0c;我们拿三阶矩阵为例…

ES IK分词字典热更新

前言 在使用IK分词器的时候&#xff0c;发现官方默认的分词不满足我们的需求&#xff0c;那么有没有方法可以自定义字典呢&#xff1f; 官方提供了三种方式 一、ik本地文件读取方式 k插件本来已为用户提供自定义词典扩展功能&#xff0c;只要修改配给文件即可&#xff1a; …

基于Spring Boot的电影网站系统

一、技术架构 后端框架&#xff1a;Spring Boot&#xff0c;它提供了自动配置、简化依赖管理、内嵌式容器等特性&#xff0c;使得开发者可以快速搭建起一个功能完备的Web应用。 前端技术&#xff1a;可能采用Vue.js、JS、jQuery、Ajax等技术&#xff0c;结合Element UI等组件库…

C#运动控制系统:雷赛控制卡实用完整例子 C#雷赛开发快速入门 C#雷赛运动控制系统实战例子 C#快速开发雷赛控制卡

雷赛控制技术 DMC系列运动控制卡是一款新型的 PCI/PCIe 总线运动控制卡。可以控制多个步进电机或数字式伺服电机&#xff1b;适合于多轴点位运动、插补运动、轨迹规划、手轮控制、编码器位置检测、IO 控制、位置比较、位置锁存等功能的应用。 DMC3000 系列卡的运动控制函数库功…

android studio 写一个小计时器(版本二)

as版本&#xff1a;23.3.1patch2 例程&#xff1a;timer 在前一个版本的基本上改的&#xff0c;增加了继续的功能&#xff0c;实现方法稍微不同。 动画演示&#xff1a; activity_main.xml <?xml version"1.0" encoding"utf-8"?> <androidx…

python-leetcode-轮转数组

189. 轮转数组 - 力扣&#xff08;LeetCode&#xff09; class Solution:def rotate(self, nums: List[int], k: int) -> None:"""Do not return anything, modify nums in-place instead."""n len(nums)k % n # 如果 k 大于 n&#xff0c;…

亚马逊云科技 | Amazon Nova:智能技术新势力

在2024年亚马逊云科技re:invent大会上&#xff0c;Amazon Nova 系列自研生成式 AI 多模态模型重磅登场&#xff0c;新一代的AI产品-Amazon Nova&#xff0c;隶属于 Amazon Bedrock&#xff0c;一共发布6款大模型&#xff0c;精准切入不同领域&#xff0c;解锁多元业务可能&…

记录第一次跑YOLOV8做目标检测

今天是24年的最后一天&#xff0c;终于要向新世界开始破门了&#xff0c;开始深度学习&#xff0c;YOLO来敲门~ 最近做了一些皮肤检测的功能&#xff0c;在传统的处理中经历了反复挣扎&#xff0c;终于要上YOLO了。听过、看过&#xff0c;不如上手体会过~ 1、YOLO是什么&#x…

从授权校验看SpringBoot自动装配

背景 最近需要实现一个对于系统的授权检测功能&#xff0c;即当SpringBoot应用被启动时&#xff0c;需要当前设备是否具有有效的的授权许可信息&#xff0c;若无则直接退出应用。具体的实现方案请继续看下文。 环境 Ruoyi-Vue SpringBoot3 RuoYi-Vue: &#x1f389; 基于Spr…

【Unity】 HTFramework框架(五十七)通过Tag、Layer批量搜索物体

更新日期&#xff1a;2024年12月30日。 Github源码&#xff1a;[点我获取源码] Gitee源码&#xff1a;[点我获取源码] 索引 问题再现通过Tag搜索物体&#xff08;SearchByTag&#xff09;打开SearchByTag窗口搜索标记指定Tag的所有物体批量修改Tag搜索Undefined状态的所有物体 …