文章目录
- 赛题介绍
- 评价方式理解
- 赛题理解
- 多路召回
- 代码实战
- 导包
- 读取数据
- 读取文章的基本属性
- 读取文章的Embedding数据
- 调用定义函数
- 获取用户-文章-时间函数
- 获取文章-用户-时间函数
- 获取历史和最后一次点击
- 获取文章属性特征
- 获取用户历史点击的文章信息
- 获取点击次数最多的topk个文章
- 定义多路召回字典
- 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
- 提取点击数据
- 召回效果评估函数
- 计算相似性矩阵
- 基于商品的召回i2i
- itemcf sim召回
- embedding sim 召回
- 先进行itemcf召回,这里不需要做召回评估,这里只是一种策略
- 基于规则进行文章过滤
- 多路召回合并
- 召回字典转换成df
- 生成提交文件
- 获取测试集
- 从所有的召回数据中将测试集中的用户选出来
- 生成提交文件
- 学习过程
赛题介绍
该赛题是以新闻APP中的新闻推荐为背景, 目的是要求我们根据用户历史浏览点击新闻文章的数据信息预测用户未来的点击行为, 即用户的最后一次点击的新闻文章。
评价方式理解
最后提交的格式是针对每个用户, 我们都会给出五篇文章的推荐结果,按照点击概率从前往后排序。 而真实的每个用户最后一次点击的文章只会有一篇的真实答案, 所以我们就看我们推荐的这五篇里面是否有命中真实答案的。比如对于user1来说, 我们的提交会是:
user1, article1, article2, article3, article4, article5.
评价指标的公式如下:
假如article1就是真实的用户点击文章,也就是article1命中, 则s(user1,1)=1, s(user1,2-4)都是0, 如果article2是用户点击的文章, 则s(user,2)=1/2,s(user,1,3,4,5)都是0。也就是score(user)=命中第几条的倒数。如果都没中, 则score(user1)=0。 这个是合理的, 因为我们希望的就是命中的结果尽量靠前, 而此时分数正好比较高。
赛题理解
根据赛题简介,我们首先要明确我们此次比赛的目标: 根据用户历史浏览点击新闻的数据信息预测用户最后一次点击的新闻文章。从这个目标上看, 会发现此次比赛和我们之前遇到的普通的结构化比赛不太一样, 主要有两点:
- 首先是目标上, 要预测最后一次点击的新闻文章,也就是我们给用户推荐的是新闻文章, 并不是像之前那种预测一个数或者预测数据哪一类那样的问题
- 数据上, 通过给出的数据我们会发现, 这种数据也不是我们之前遇到的那种特征+标签的数据,而是基于了真实的业务场景, 拿到的用户的点击日志
所以拿到这个题目,我们的思考方向就是结合我们的目标,把该预测问题转成一个监督学习的问题(特征+标签),然后我们才能进行ML,DL等建模预测。
多路召回
所谓的“多路召回”策略,就是指采用不同的策略、特征或简单模型,分别召回一部分候选集,然后把候选集混合在一起供后续排序模型使用,可以明显的看出,“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。其中,各种简单策略保证候选集的快速召回,从不同角度设计的策略保证召回率接近理想的状态,不至于损伤排序效果。如下图是多路召回的一个示意图,在多路召回中,每个策略之间毫不相关,所以一般可以写并发多线程同时进行,这样可以更加高效。
上图只是一个多路召回的例子,也就是说可以使用多种不同的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略其实是与业务强相关的 ,针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐,召回规则可以是“热门新闻”、“作者召回”、“关键词召回”、“主题召回“、”协同过滤召回“等等。
代码实战
已修改跑通代码基于itemcf计算的item之间的相似度sim进行的召回、基于embedding搜索得到的item之间的相似度进行的召回、基于冷启动策略的召回
导包
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import defaultdict
import os, math, warnings, math, pickle
from tqdm import tqdm
# import faiss
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from deepmatch.models import *
from deepmatch.utils import sampledsoftmaxloss
warnings.filterwarnings('ignore')
# data_path = './data_raw/'
data_path = '/data/temp/用户行为预测数据集/' # '/home/admin/jupyter/data/' # 天池平台路径
save_path = '/data/temp/用户行为预测数据集/result/0211/' # '/home/admin/jupyter/temp_result/' # 天池平台路径
# 做召回评估的一个标志, 如果不进行评估就是直接使用全量数据进行召回
metric_recall = True
读取数据
# debug模式: 从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
"""
训练集中采样一部分数据调试
data_path: 原数据的存储路径
sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
"""
all_click = pd.read_csv(data_path + 'train_click_log.csv')
all_user_ids = all_click.user_id.unique()
sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中
# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集
def get_all_click_df(data_path, offline=True):
if offline:
all_click = pd.read_csv(data_path + 'train_click_log.csv')
else:
trn_click = pd.read_csv(data_path + 'train_click_log.csv')
tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
all_click = trn_click.append(tst_click)
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
读取文章的基本属性
def get_item_info_df(data_path):
item_info_df = pd.read_csv(data_path + 'articles.csv')
# 为了方便与训练集中的click_article_id拼接,需要把article_id修改成click_article_id
item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
return item_info_df
读取文章的Embedding数据
def get_item_emb_dict(data_path):
item_emb_df = pd.read_csv(data_path + 'articles_emb.csv')
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
# 进行归一化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
# item_emb_dict.pkl = item_content_emb.pkl
return item_emb_dict
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
调用定义函数
# 采样数据
print("开始采样数据")
#all_click_df = get_all_click_sample(data_path)
#all_click_df.to_pickle(save_path + "all_click_df.pkl")
print("结束采样数据")
# 全量训练集
print("开始全量训练集")
all_click_df = get_all_click_df(data_path, offline=False)
all_click_df.to_pickle(save_path + "all_click_df.pkl")
print("结束全量训练集")
# 对时间戳进行归一化,用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)
print("开始读取文章的基本属性")
item_info_df = get_item_info_df(data_path)
item_info_df.to_pickle(save_path + "item_info_df.pkl")
print("结束读取文章的基本属性")
print("开始读取文章的Embedding数据")
item_emb_dict = get_item_emb_dict(data_path)
#item_emb_dict.to_pickle(save_path + "item_emb_dict.pkl") 保存在def中:item_emb_dict.pkl = item_content_emb.pkl
print("结束读取文章的Embedding数据")
获取用户-文章-时间函数
##这个在基于关联规则的用户协同过滤的时候会用到
# 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_item_time_pair(df):
return list(zip(df['click_article_id'], df['click_timestamp']))
user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
.reset_index().rename(columns={0: 'item_time_list'})
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
return user_item_time_dict
获取文章-用户-时间函数
## 这个在基于关联规则的文章协同过滤的时候会用到
# 根据时间获取商品被点击的用户序列 {item1: [(user1, time1), (user2, time2)...]...}
# 这里的时间是用户点击当前商品的时间,好像没有直接的关系。
def get_item_user_time_dict(click_df):
def make_user_time_pair(df):
return list(zip(df['user_id'], df['click_timestamp']))
click_df = click_df.sort_values('click_timestamp')
item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
.reset_index().rename(columns={0: 'user_time_list'})
item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
return item_user_time_dict
获取历史和最后一次点击
## 这个在评估召回结果, 特征工程和制作标签转成监督学习测试集的时候回用到
# 获取当前数据的历史点击和最后一次点击
def get_hist_and_last_click(all_click):
all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
click_last_df = all_click.groupby('user_id').tail(1)
# 如果用户只有一个点击,hist为空了,会导致训练的时候这个用户不可见,此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)
return click_hist_df, click_last_df
获取文章属性特征
# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用
def get_item_info_dict(item_info_df):
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
pickle.dump(item_type_dict, open(save_path + 'item_type_dict.pkl', 'wb'))
pickle.dump(item_words_dict, open(save_path + 'item_words_dict.pkl', 'wb'))
pickle.dump(item_created_time_dict, open(save_path + 'item_created_time_dict.pkl', 'wb'))
return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息
def get_user_hist_item_info_dict(all_click):
# 获取user_id对应的用户历史点击文章类型的集合字典
user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
# 获取user_id对应的用户点击文章的集合
user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
# 获取user_id对应的用户历史点击的文章的平均字数字典
user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
# 获取user_id对应的用户最后一次点击的文章的创建时间
all_click_ = all_click.sort_values('click_timestamp')
user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
user_last_item_created_time['created_at_ts']))
return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的topk个文章
# 获取近期点击最多的文章
def get_item_topk_click(click_df, k):
topk_click = click_df['click_article_id'].value_counts().index[:k]
return topk_click
定义多路召回字典
# 获取文章的属性信息,保存成字典的形式方便查询
print("开始定义多路召回字典")
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# item_type_dict.to_pickle(save_path + "item_type_dict.pkl")
# item_words_dict.to_pickle(save_path + "item_words_dict.pkl")
# item_created_time_dict.to_pickle(save_path + "item_created_time_dict.pkl")
print("结束定义多路召回字典")
定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
# 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
user_multi_recall_dict = {'itemcf_sim_itemcf_recall': {},
'embedding_sim_item_recall': {},
'cold_start_recall': {}}
提取点击数据
# 提取最后一次点击作为召回评估,如果不需要做召回评估直接使用全量的训练集进行召回(线下验证模型)
# 如果不是召回评估,直接使用全量数据进行召回,不用将最后一次提取出来
print("开始提取点击数据")
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
trn_hist_click_df.to_pickle(save_path + "trn_hist_click_df.pkl")
trn_last_click_df.to_pickle(save_path + "trn_last_click_df.pkl")
print("结束提取点击数据")
召回效果评估函数
## 做完了召回有时候也需要对当前的召回方法或者参数进行调整以达到更好的召回效果,因为召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法
# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
user_num = len(user_recall_items_dict)
for k in range(10, topk+1, 10):
hit_num = 0
for user, item_list in user_recall_items_dict.items():
# 获取前k个召回的结果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
if last_click_item_dict[user] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)
计算相似性矩阵
#这一部分主要是通过协同过滤以及向量检索得到相似性矩阵,相似性矩阵主要分为user2user和item2item,下面依次获取基于itemcf的item2item的相似性矩阵
## itemcf i2i_sim
#在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了:用户点击的时间权重、用户点击的顺序权重、文章创建的时间权重
def itemcf_sim(df, item_created_time_dict):
"""
文章与文章之间的相似性矩阵计算
:param df: 数据表
:item_created_time_dict: 文章创建时间的字典
return : 文章与文章的相似性矩阵
思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
"""
user_item_time_dict = get_user_item_time(df)
# 计算物品相似度
i2i_sim = {}
item_cnt = defaultdict(int)
for user, item_time_list in tqdm(user_item_time_dict.items()):
# 在基于商品的协同过滤优化的时候可以考虑时间因素
for loc1, (i, i_click_time) in enumerate(item_time_list):
item_cnt[i] += 1
i2i_sim.setdefault(i, {})
for loc2, (j, j_click_time) in enumerate(item_time_list):
if(i == j):
continue
# 考虑文章的正向顺序点击和反向顺序点击
loc_alpha = 1.0 if loc2 > loc1 else 0.7
# 位置信息权重,其中的参数可以调节
loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
# 点击时间权重,其中的参数可以调节
click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
# 两篇文章创建时间的权重,其中的参数可以调节
created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
i2i_sim[i].setdefault(j, 0)
# 考虑多种因素的权重计算最终的文章之间的相似度
i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
# 将得到的相似性矩阵保存到本地
pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
# i2i_sim =itemcf_i2i_sim 与下方相等
return i2i_sim_
print("开始itemcf i2i_sim")
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
# i2i_sim.to_pickle(save_path + "i2i_sim.pkl") i2i_sim =itemcf_i2i_sim 与上方相等
print("结束itemcf i2i_sim")
## item embedding sim
# 使用Embedding计算item之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。
# 向量检索相似度计算
# topk指的是每个item, faiss搜索后返回最相似的topk个item
#def embdding_sim(click_df, item_emb_df, save_path, topk):
# """
# 基于内容的文章embedding相似性矩阵计算
# :param click_df: 数据表
# :param item_emb_df: 文章的embedding
# :param save_path: 保存路径
# :patam topk: 找最相似的topk篇
# return 文章相似性矩阵
# 思路: 对于每一篇文章, 基于embedding的相似性返回topk个与其最相似的文章, 只不过由于文章数量太多,这里用了faiss进行加速
# """
# # 文章索引与文章id的字典映射
# item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
# item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
# item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
# # 向量进行单位化
# item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
# # 建立faiss索引
# item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
# item_index.add(item_emb_np)
# # 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
# sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
# # 将向量检索的结果保存成原始id的对应关系
# item_sim_dict = collections.defaultdict(dict)
# for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
# target_raw_id = item_idx_2_rawid_dict[target_idx]
# # 从1开始是为了去掉商品本身, 所以最终获得的相似商品只有topk-1
# for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
# rele_raw_id = item_idx_2_rawid_dict[rele_idx]
# item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# # 保存i2i相似度矩阵
# pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))
# return item_sim_dict
print("开始item embedding sim")
#item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
#item_emb_df.to_pickle(save_path + "item_emb_df.pkl")
#emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk可以自行设置
emb_i2i_sim = pickle.load(open(save_path +"emb_i2i_sim.pkl", "rb"))
print("结束item embedding sim")
itemcf recall
一、上面已经通过协同过滤,Embedding检索的方式得到了文章的相似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章相似的文章。 这里在召回的时候,也是用了关联规则的方式:
1.考虑相似文章与历史点击文章顺序的权重(细节看代码)
2.考虑文章创建时间的权重,也就是考虑相似文章与历史点击文章创建时间差的权重
3.考虑文章内容相似度权重(使用Embedding计算相似文章相似度,但是这里需要注意,在Embedding的时候并没有计算所有商品两两之间的相似度,所以相似的文章与历史点击文章不存在相似度,需要做特殊处理)
基于商品的召回i2i
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim):
"""
基于文章协同过滤的召回
:param user_id: 用户id
:param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
:param i2i_sim: 字典,文章相似性矩阵
:param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章
:param recall_item_num: 整数, 最后的召回文章数量
:param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
:param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵
return: 召回的文章列表 [(item1, score1), (item2, score2)...]
"""
# 获取用户历史交互的文章
user_hist_items = user_item_time_dict[user_id]
user_hist_items_ = {user_id for user_id, _ in user_hist_items}
item_rank = {}
for loc, (i, click_time) in enumerate(user_hist_items):
for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
if j in user_hist_items_:
continue
# 文章创建时间差权重
created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
# 相似文章和历史点击文章序列中历史文章所在的位置权重
loc_weight = (0.9 ** (len(user_hist_items) - loc))
content_weight = 1.0
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
content_weight += emb_i2i_sim[i][j]
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
content_weight += emb_i2i_sim[j][i]
item_rank.setdefault(j, 0)
item_rank[j] += created_time_weight * loc_weight * content_weight * wij
# 不足10个,用热门商品补全
if len(item_rank) < recall_item_num:
for i, item in enumerate(item_topk_click):
if item in item_rank.items(): # 填充的item应该不在原来的列表中
continue
item_rank[item] = - i - 100 # 随便给个负数就行
if len(item_rank) == recall_item_num:
break
item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return item_rank
itemcf sim召回
# 先进行itemcf召回, 为了召回评估,所以提取最后一次点击
print("开始itemcf recall")
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))
emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))
sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \
i2i_sim, sim_item_topk, recall_item_num, \
item_topk_click, item_created_time_dict, emb_i2i_sim)
user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))
if metric_recall:
# 召回效果评估
metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)
print("结束itemcf recall")
embedding sim 召回
# 这里是为了召回评估,所以提取最后一次点击
print("开始embedding sim 召回")
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))
sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk,
recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))
if metric_recall:
# 召回效果评估
metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)
print("结束embedding sim 召回")
冷启动问题
1.冷启动问题可以分成三类:文章冷启动,用户冷启动,系统冷启动。
2.文章冷启动:对于一个平台系统新加入的文章,该文章没有任何的交互记录,如何推荐给用户的问题。(对于我们场景可以认为是,日志数据中没有出现过的文章都可以认为是冷启动的文章)
3.用户冷启动:对于一个平台系统新来的用户,该用户还没有文章的交互信息,如何给该用户进行推荐。(对于我们场景就是,测试集中的用户是否在测试集对应的log数据中出现过,如果没有出现过,那么可以认为该用户是冷启动用户。但是有时候并没有这么严格,我们也可以自己设定某些指标来判别哪些用户是冷启动用户,比如通过使用时长,点击率,留存率等等)
4.系统冷启动:就是对于一个平台刚上线,还没有任何的相关历史数据,此时就是系统冷启动,其实也就是前面两种的一个综合。
先进行itemcf召回,这里不需要做召回评估,这里只是一种策略
print("开始冷启动问题")
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))
sim_item_topk = 150
recall_item_num = 100 # 稍微召回多一点文章,便于后续的规则筛选
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk,
recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
print("结束冷启动问题")
基于规则进行文章过滤
# 基于规则进行文章过滤
# 保留文章主题与用户历史浏览主题相似的文章
# 保留文章字数与用户历史浏览文章字数相差不大的文章
# 保留最后一次点击当天的文章
# 按照相似度返回最终的结果
def get_click_article_ids_set(all_click_df):
return set(all_click_df.click_article_id.values)
def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
user_last_item_created_time_dict, item_type_dict, item_words_dict,
item_created_time_dict, click_article_ids_set, recall_item_num):
"""
冷启动的情况下召回一些文章
:param user_recall_items_dict: 基于内容embedding相似性召回来的很多文章, 字典, {user1: [(item1, item2), ..], }
:param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射
:param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射
:param user_last_item_created_time_idct: 字典,用户点击的历史文章创建时间映射
:param item_tpye_idct: 字典,文章主题映射
:param item_words_dict: 字典,文章字数映射
:param item_created_time_dict: 字典, 文章创建时间映射
:param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章
:param recall_item_num: 召回文章的数量, 这个指的是没有出现在日志里面的文章数量
"""
cold_start_user_items_dict = {}
for user, item_list in tqdm(user_recall_items_dict.items()):
cold_start_user_items_dict.setdefault(user, [])
for item, score in item_list:
# 获取历史文章信息
hist_item_type_set = user_hist_item_typs_dict[user]
hist_mean_words = user_hist_item_words_dict[user]
hist_last_item_created_time = user_last_item_created_time_dict[user]
hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)
# 获取当前召回文章的信息
curr_item_type = item_type_dict[item]
curr_item_words = item_words_dict[item]
curr_item_created_time = item_created_time_dict[item]
curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)
# 首先,文章不能出现在用户的历史点击中, 然后根据文章主题,文章单词数,文章创建时间进行筛选
if curr_item_type not in hist_item_type_set or \
item in click_article_ids_set or \
abs(curr_item_words - hist_mean_words) > 200 or \
abs((curr_item_created_time - hist_last_item_created_time).days) > 90:
continue
cold_start_user_items_dict[user].append((item, score)) # {user1: [(item1, score1), (item2, score2)..]...}
# 需要控制一下冷启动召回的数量
cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
for k, v in cold_start_user_items_dict.items()}
pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb'))
return cold_start_user_items_dict
print("开始基于规则进行文章过滤")
all_click_df_ = all_click_df.copy()
all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)
click_article_ids_set = get_click_article_ids_set(all_click_df)
# 需要注意的是
# 这里使用了很多规则来筛选冷启动的文章,所以前面再召回的阶段就应该尽可能的多召回一些文章,否则很容易被删掉
cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
user_last_item_created_time_dict, item_type_dict, item_words_dict, \
item_created_time_dict, click_article_ids_set, recall_item_num)
user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict
print("结束基于规则进行文章过滤")
多路召回合并
###多路召回合并就是将前面所有的召回策略得到的用户文章列表合并起来,下面是对前面所有召回结果的汇总
##1.基于itemcf计算的item之间的相似度sim进行的召回
##2.基于embedding搜索得到的item之间的相似度进行的召回
##3.YoutubeDNN召回
##4.YoutubeDNN得到的user之间的相似度进行的召回
##5.基于冷启动策略的召回
print("开始多路召回合并")
def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
final_recall_items_dict = {}
# 对每一种召回结果按照用户进行归一化,方便后面多种召回结果,相同用户的物品之间权重相加
def norm_user_recall_items_sim(sorted_item_list):
# 如果冷启动中没有文章或者只有一篇文章,直接返回,出现这种情况的原因可能是冷启动召回的文章数量太少了,
# 基于规则筛选之后就没有文章了, 这里还可以做一些其他的策略性的筛选
if len(sorted_item_list) < 2:
return sorted_item_list
min_sim = sorted_item_list[-1][1]
max_sim = sorted_item_list[0][1]
norm_sorted_item_list = []
for item, score in sorted_item_list:
if max_sim > 0:
norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
else:
norm_score = 0.0
norm_sorted_item_list.append((item, norm_score))
return norm_sorted_item_list
print('多路召回合并...')
for method, user_recall_items in tqdm(user_multi_recall_dict.items()):
print(method + '...')
# 在计算最终召回结果的时候,也可以为每一种召回结果设置一个权重
if weight_dict == None:
recall_method_weight = 1
else:
recall_method_weight = weight_dict[method]
for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化
user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
for user_id, sorted_item_list in user_recall_items.items():
# print('user_id')
final_recall_items_dict.setdefault(user_id, {})
for item, score in sorted_item_list:
final_recall_items_dict[user_id].setdefault(item, 0)
final_recall_items_dict[user_id][item] += recall_method_weight * score
final_recall_items_dict_rank = {}
# 多路召回时也可以控制最终的召回数量
for user, recall_item_dict in final_recall_items_dict.items():
final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]
# 将多路召回后的最终结果字典保存到本地
pickle.dump(final_recall_items_dict_rank, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb'))
return final_recall_items_dict_rank
# 这里直接对多路召回的权重给了一个相同的值,其实可以根据前面召回的情况来调整参数的值
weight_dict = {'itemcf_sim_itemcf_recall': 1.0,
'embedding_sim_item_recall': 1.0,
'cold_start_recall': 1.0}
# 最终合并之后每个用户召回150个商品进行排序
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)
print("结束多路召回合并")
召回字典转换成df
final_recall_items_dict = pickle.load(open(save_path +"final_recall_items_dict.pkl", "rb"))
#print("final_recall_items_dict:", final_recall_items_dict)
#final_recall_items_dict: {4: [(42762, 2.0), (42237, 1.7501865528691507), (166380, 1.0)]}
print("开始召回字典转换成df")
#### 召回字典转换成df
# 将字典的形式转换成df
final_recall_items_score_list = []
for user, items in tqdm(final_recall_items_dict.items()):
for item, score in items:
final_recall_items_score_list.append([user, item, score])
recall_df = pd.DataFrame(final_recall_items_score_list, columns=['user_id', 'click_article_id', 'pred_score'])
print("结束召回字典转换成df")
生成提交文件
# 生成提交文件
def submit(recall_df, topk=5, model_name=None):
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')
# 判断是不是每个用户都有5篇文章及以上
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() >= topk
del recall_df['pred_score']
submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()
submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
# 按照提交格式定义列名
submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
3: 'article_3', 4: 'article_4', 5: 'article_5'})
save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'
submit.to_csv(save_name, index=False, header=True)
获取测试集
print("开始获取测试集")
# 获取测试集
tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
tst_users = tst_click['user_id'].unique()
print("结束获取测试集")
从所有的召回数据中将测试集中的用户选出来
print("开始从所有的召回数据中将测试集中的用户选出来")
# 从所有的召回数据中将测试集中的用户选出来
tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]
print("结束从所有的召回数据中将测试集中的用户选出来")
生成提交文件
print("开始生成提交文件")
# 生成提交文件
submit(tst_recall, topk=5, model_name='final_recall_baseline')
print("结束生成提交文件")
学习过程
20年当时自身功底是比较零基础(会写些基础的Python[三个科学计算包]数据分析),一开始看这块其实挺懵的,不会就去问百度或其他人,当时遇见困难挺害怕的,但22后面开始力扣题【目前已刷好几轮,博客没写力扣文章之前,力扣排名靠前已刷有5遍左右,排名靠后刷3次左右,代码功底也在一步一步提升】不断地刷、遇见代码不懂的代码,也开始去打印print去理解,到后面问其他人的问题越来越少,个人自主学习、自主解决能力也得到了进一步增强。
比赛源自:阿里云天池大赛 - 零基础入门推荐系统 - 新闻推荐