阿里云-零基础入门推荐系统 【多路召回】

文章目录


赛题介绍

该赛题是以新闻APP中的新闻推荐为背景, 目的是要求我们根据用户历史浏览点击新闻文章的数据信息预测用户未来的点击行为, 即用户的最后一次点击的新闻文章

评价方式理解

最后提交的格式是针对每个用户, 我们都会给出五篇文章的推荐结果,按照点击概率从前往后排序。 而真实的每个用户最后一次点击的文章只会有一篇的真实答案, 所以我们就看我们推荐的这五篇里面是否有命中真实答案的。比如对于user1来说, 我们的提交会是:

user1, article1, article2, article3, article4, article5.

评价指标的公式如下:
在这里插入图片描述

假如article1就是真实的用户点击文章,也就是article1命中, 则s(user1,1)=1, s(user1,2-4)都是0, 如果article2是用户点击的文章, 则s(user,2)=1/2,s(user,1,3,4,5)都是0。也就是score(user)=命中第几条的倒数。如果都没中, 则score(user1)=0。 这个是合理的, 因为我们希望的就是命中的结果尽量靠前, 而此时分数正好比较高。

赛题理解

根据赛题简介,我们首先要明确我们此次比赛的目标: 根据用户历史浏览点击新闻的数据信息预测用户最后一次点击的新闻文章。从这个目标上看, 会发现此次比赛和我们之前遇到的普通的结构化比赛不太一样, 主要有两点:

  • 首先是目标上, 要预测最后一次点击的新闻文章,也就是我们给用户推荐的是新闻文章, 并不是像之前那种预测一个数或者预测数据哪一类那样的问题
  • 数据上, 通过给出的数据我们会发现, 这种数据也不是我们之前遇到的那种特征+标签的数据,而是基于了真实的业务场景, 拿到的用户的点击日志

所以拿到这个题目,我们的思考方向就是结合我们的目标,把该预测问题转成一个监督学习的问题(特征+标签),然后我们才能进行ML,DL等建模预测。

在这里插入图片描述

多路召回

所谓的“多路召回”策略,就是指采用不同的策略、特征或简单模型,分别召回一部分候选集,然后把候选集混合在一起供后续排序模型使用,可以明显的看出,“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。其中,各种简单策略保证候选集的快速召回,从不同角度设计的策略保证召回率接近理想的状态,不至于损伤排序效果。如下图是多路召回的一个示意图,在多路召回中,每个策略之间毫不相关,所以一般可以写并发多线程同时进行,这样可以更加高效。

在这里插入图片描述

上图只是一个多路召回的例子,也就是说可以使用多种不同的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略其实是与业务强相关的 ,针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐,召回规则可以是“热门新闻”、“作者召回”、“关键词召回”、“主题召回“、”协同过滤召回“等等。

代码实战

已修改跑通代码基于itemcf计算的item之间的相似度sim进行的召回、基于embedding搜索得到的item之间的相似度进行的召回、基于冷启动策略的召回

导包

import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import defaultdict
import os, math, warnings, math, pickle
from tqdm import tqdm
# import faiss
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing.sequence import pad_sequences

from deepmatch.models import *
from deepmatch.utils import sampledsoftmaxloss
warnings.filterwarnings('ignore')



# data_path = './data_raw/'
data_path = '/data/temp/用户行为预测数据集/' # '/home/admin/jupyter/data/' # 天池平台路径
save_path = '/data/temp/用户行为预测数据集/result/0211/' # '/home/admin/jupyter/temp_result/'  # 天池平台路径
# 做召回评估的一个标志, 如果不进行评估就是直接使用全量数据进行召回
metric_recall = True

读取数据

# debug模式: 从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
    """
        训练集中采样一部分数据调试
        data_path: 原数据的存储路径
        sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) 
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中
# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集
def get_all_click_df(data_path, offline=True):
    if offline:
        all_click = pd.read_csv(data_path + 'train_click_log.csv')
    else:
        trn_click = pd.read_csv(data_path + 'train_click_log.csv')
        tst_click = pd.read_csv(data_path + 'testA_click_log.csv')

        all_click = trn_click.append(tst_click)
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

读取文章的基本属性

def get_item_info_df(data_path):
    item_info_df = pd.read_csv(data_path + 'articles.csv')
    
    # 为了方便与训练集中的click_article_id拼接,需要把article_id修改成click_article_id
    item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
    
    return item_info_df

读取文章的Embedding数据

def get_item_emb_dict(data_path):
    item_emb_df = pd.read_csv(data_path + 'articles_emb.csv')
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
    # 进行归一化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)

    item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
    pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
    # item_emb_dict.pkl = item_content_emb.pkl
    
    return item_emb_dict



max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))

调用定义函数

# 采样数据
print("开始采样数据")
#all_click_df = get_all_click_sample(data_path)
#all_click_df.to_pickle(save_path + "all_click_df.pkl")
print("结束采样数据")

# 全量训练集
print("开始全量训练集")
all_click_df = get_all_click_df(data_path, offline=False)
all_click_df.to_pickle(save_path + "all_click_df.pkl")
print("结束全量训练集")

# 对时间戳进行归一化,用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)


print("开始读取文章的基本属性")
item_info_df = get_item_info_df(data_path)
item_info_df.to_pickle(save_path + "item_info_df.pkl")
print("结束读取文章的基本属性")


print("开始读取文章的Embedding数据")
item_emb_dict = get_item_emb_dict(data_path)
#item_emb_dict.to_pickle(save_path + "item_emb_dict.pkl") 保存在def中:item_emb_dict.pkl = item_content_emb.pkl
print("结束读取文章的Embedding数据")

获取用户-文章-时间函数

##这个在基于关联规则的用户协同过滤的时候会用到
# 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
    
    click_df = click_df.sort_values('click_timestamp')
    
    def make_item_time_pair(df):
        return list(zip(df['click_article_id'], df['click_timestamp']))
    
    user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'item_time_list'})
    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
    
    return user_item_time_dict

获取文章-用户-时间函数

## 这个在基于关联规则的文章协同过滤的时候会用到
# 根据时间获取商品被点击的用户序列  {item1: [(user1, time1), (user2, time2)...]...}
# 这里的时间是用户点击当前商品的时间,好像没有直接的关系。
def get_item_user_time_dict(click_df):
    def make_user_time_pair(df):
        return list(zip(df['user_id'], df['click_timestamp']))
    
    click_df = click_df.sort_values('click_timestamp')
    item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'user_time_list'})
    
    item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
    return item_user_time_dict

获取历史和最后一次点击

## 这个在评估召回结果, 特征工程和制作标签转成监督学习测试集的时候回用到
# 获取当前数据的历史点击和最后一次点击
def get_hist_and_last_click(all_click):
    
    all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
    click_last_df = all_click.groupby('user_id').tail(1)

    # 如果用户只有一个点击,hist为空了,会导致训练的时候这个用户不可见,此时默认泄露一下
    def hist_func(user_df):
        if len(user_df) == 1:
            return user_df
        else:
            return user_df[:-1]

    click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)

    return click_hist_df, click_last_df

获取文章属性特征

# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用
def get_item_info_dict(item_info_df):
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
    
    item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
    item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
    item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
    
    pickle.dump(item_type_dict, open(save_path + 'item_type_dict.pkl', 'wb'))
    pickle.dump(item_words_dict, open(save_path + 'item_words_dict.pkl', 'wb'))
    pickle.dump(item_created_time_dict, open(save_path + 'item_created_time_dict.pkl', 'wb'))
    
    return item_type_dict, item_words_dict, item_created_time_dict

获取用户历史点击的文章信息

def get_user_hist_item_info_dict(all_click):
    
    # 获取user_id对应的用户历史点击文章类型的集合字典
    user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
    user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
    
    # 获取user_id对应的用户点击文章的集合
    user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
    user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
    
    # 获取user_id对应的用户历史点击的文章的平均字数字典
    user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
    user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
    
    # 获取user_id对应的用户最后一次点击的文章的创建时间
    all_click_ = all_click.sort_values('click_timestamp')
    user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
    
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
    
    user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
                                                user_last_item_created_time['created_at_ts']))
    
    return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict

获取点击次数最多的topk个文章

# 获取近期点击最多的文章
def get_item_topk_click(click_df, k):
    topk_click = click_df['click_article_id'].value_counts().index[:k]
    return topk_click

定义多路召回字典

# 获取文章的属性信息,保存成字典的形式方便查询
print("开始定义多路召回字典")
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# item_type_dict.to_pickle(save_path + "item_type_dict.pkl")
# item_words_dict.to_pickle(save_path + "item_words_dict.pkl")
# item_created_time_dict.to_pickle(save_path + "item_created_time_dict.pkl")
print("结束定义多路召回字典")

定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中

# 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
user_multi_recall_dict =  {'itemcf_sim_itemcf_recall': {},
                           'embedding_sim_item_recall': {},
                           'cold_start_recall': {}}

提取点击数据

# 提取最后一次点击作为召回评估,如果不需要做召回评估直接使用全量的训练集进行召回(线下验证模型)
# 如果不是召回评估,直接使用全量数据进行召回,不用将最后一次提取出来
print("开始提取点击数据")
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
trn_hist_click_df.to_pickle(save_path + "trn_hist_click_df.pkl")
trn_last_click_df.to_pickle(save_path + "trn_last_click_df.pkl")
print("结束提取点击数据")

召回效果评估函数

## 做完了召回有时候也需要对当前的召回方法或者参数进行调整以达到更好的召回效果,因为召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法
# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
    last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
    user_num = len(user_recall_items_dict)
    
    for k in range(10, topk+1, 10):
        hit_num = 0
        for user, item_list in user_recall_items_dict.items():
            # 获取前k个召回的结果
            tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
            if last_click_item_dict[user] in set(tmp_recall_items):
                hit_num += 1
        
        hit_rate = round(hit_num * 1.0 / user_num, 5)
        print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)

计算相似性矩阵

#这一部分主要是通过协同过滤以及向量检索得到相似性矩阵,相似性矩阵主要分为user2user和item2item,下面依次获取基于itemcf的item2item的相似性矩阵
## itemcf i2i_sim
#在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了:用户点击的时间权重、用户点击的顺序权重、文章创建的时间权重
def itemcf_sim(df, item_created_time_dict):
    """
        文章与文章之间的相似性矩阵计算
        :param df: 数据表
        :item_created_time_dict:  文章创建时间的字典
        return : 文章与文章的相似性矩阵
        
        思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
    """
    
    user_item_time_dict = get_user_item_time(df)
    
    # 计算物品相似度
    i2i_sim = {}
    item_cnt = defaultdict(int)
    for user, item_time_list in tqdm(user_item_time_dict.items()):
        # 在基于商品的协同过滤优化的时候可以考虑时间因素
        for loc1, (i, i_click_time) in enumerate(item_time_list):
            item_cnt[i] += 1
            i2i_sim.setdefault(i, {})
            for loc2, (j, j_click_time) in enumerate(item_time_list):
                if(i == j):
                    continue
                    
                # 考虑文章的正向顺序点击和反向顺序点击    
                loc_alpha = 1.0 if loc2 > loc1 else 0.7
                # 位置信息权重,其中的参数可以调节
                loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
                # 点击时间权重,其中的参数可以调节
                click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
                # 两篇文章创建时间的权重,其中的参数可以调节
                created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                i2i_sim[i].setdefault(j, 0)
                # 考虑多种因素的权重计算最终的文章之间的相似度
                i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
                
    i2i_sim_ = i2i_sim.copy()
    for i, related_items in i2i_sim.items():
        for j, wij in related_items.items():
            i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
    
    # 将得到的相似性矩阵保存到本地
    pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
    # i2i_sim =itemcf_i2i_sim 与下方相等
    
    return i2i_sim_



print("开始itemcf i2i_sim")
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
# i2i_sim.to_pickle(save_path + "i2i_sim.pkl") i2i_sim =itemcf_i2i_sim 与上方相等
print("结束itemcf i2i_sim")

## item embedding sim
# 使用Embedding计算item之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。
# 向量检索相似度计算
# topk指的是每个item, faiss搜索后返回最相似的topk个item
#def embdding_sim(click_df, item_emb_df, save_path, topk):
#    """
#        基于内容的文章embedding相似性矩阵计算
#        :param click_df: 数据表
#        :param item_emb_df: 文章的embedding
#        :param save_path: 保存路径
#        :patam topk: 找最相似的topk篇
#        return 文章相似性矩阵
        
#        思路: 对于每一篇文章, 基于embedding的相似性返回topk个与其最相似的文章, 只不过由于文章数量太多,这里用了faiss进行加速
#    """
    
#    # 文章索引与文章id的字典映射
#    item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
    
#    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
#    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
#    # 向量进行单位化
#    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
    
#    # 建立faiss索引
#    item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
#    item_index.add(item_emb_np)
#    # 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
#    sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
    
#    # 将向量检索的结果保存成原始id的对应关系
#    item_sim_dict = collections.defaultdict(dict)
#    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
#        target_raw_id = item_idx_2_rawid_dict[target_idx]
#        # 从1开始是为了去掉商品本身, 所以最终获得的相似商品只有topk-1
#        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
#            rele_raw_id = item_idx_2_rawid_dict[rele_idx]
#            item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
    
#    # 保存i2i相似度矩阵
#    pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))   
    
#    return item_sim_dict



print("开始item embedding sim")
#item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
#item_emb_df.to_pickle(save_path + "item_emb_df.pkl")
#emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk可以自行设置
emb_i2i_sim = pickle.load(open(save_path +"emb_i2i_sim.pkl", "rb"))
print("结束item embedding sim")

itemcf recall
一、上面已经通过协同过滤,Embedding检索的方式得到了文章的相似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章相似的文章。 这里在召回的时候,也是用了关联规则的方式:

1.考虑相似文章与历史点击文章顺序的权重(细节看代码)
2.考虑文章创建时间的权重,也就是考虑相似文章与历史点击文章创建时间差的权重
3.考虑文章内容相似度权重(使用Embedding计算相似文章相似度,但是这里需要注意,在Embedding的时候并没有计算所有商品两两之间的相似度,所以相似的文章与历史点击文章不存在相似度,需要做特殊处理)

基于商品的召回i2i

def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim):
    """
        基于文章协同过滤的召回
        :param user_id: 用户id
        :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
        :param i2i_sim: 字典,文章相似性矩阵
        :param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章
        :param recall_item_num: 整数, 最后的召回文章数量
        :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
        :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵
        
        return: 召回的文章列表 [(item1, score1), (item2, score2)...]
    """
    # 获取用户历史交互的文章
    user_hist_items = user_item_time_dict[user_id]
    user_hist_items_ = {user_id for user_id, _ in user_hist_items}
    
    item_rank = {}
    for loc, (i, click_time) in enumerate(user_hist_items):
        for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
            if j in user_hist_items_:
                continue
            
            # 文章创建时间差权重
            created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
            # 相似文章和历史点击文章序列中历史文章所在的位置权重
            loc_weight = (0.9 ** (len(user_hist_items) - loc))
            
            content_weight = 1.0
            if emb_i2i_sim.get(i, {}).get(j, None) is not None:
                content_weight += emb_i2i_sim[i][j]
            if emb_i2i_sim.get(j, {}).get(i, None) is not None:
                content_weight += emb_i2i_sim[j][i]
                
            item_rank.setdefault(j, 0)
            item_rank[j] += created_time_weight * loc_weight * content_weight * wij
    
    # 不足10个,用热门商品补全
    if len(item_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in item_rank.items(): # 填充的item应该不在原来的列表中
                continue
            item_rank[item] = - i - 100 # 随便给个负数就行
            if len(item_rank) == recall_item_num:
                break
    
    item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
        
    return item_rank

itemcf sim召回

# 先进行itemcf召回, 为了召回评估,所以提取最后一次点击
print("开始itemcf recall")
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)

i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))
emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))

sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \
                                                        i2i_sim, sim_item_topk, recall_item_num, \
                                                        item_topk_click, item_created_time_dict, emb_i2i_sim)

user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)
print("结束itemcf recall")

embedding sim 召回

# 这里是为了召回评估,所以提取最后一次点击
print("开始embedding sim 召回")
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 20
recall_item_num = 10

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
    
user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)

print("结束embedding sim 召回")

冷启动问题
1.冷启动问题可以分成三类:文章冷启动,用户冷启动,系统冷启动。
2.文章冷启动:对于一个平台系统新加入的文章,该文章没有任何的交互记录,如何推荐给用户的问题。(对于我们场景可以认为是,日志数据中没有出现过的文章都可以认为是冷启动的文章)
3.用户冷启动:对于一个平台系统新来的用户,该用户还没有文章的交互信息,如何给该用户进行推荐。(对于我们场景就是,测试集中的用户是否在测试集对应的log数据中出现过,如果没有出现过,那么可以认为该用户是冷启动用户。但是有时候并没有这么严格,我们也可以自己设定某些指标来判别哪些用户是冷启动用户,比如通过使用时长,点击率,留存率等等)
4.系统冷启动:就是对于一个平台刚上线,还没有任何的相关历史数据,此时就是系统冷启动,其实也就是前面两种的一个综合。

先进行itemcf召回,这里不需要做召回评估,这里只是一种策略

print("开始冷启动问题")
trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 150
recall_item_num = 100 # 稍微召回多一点文章,便于后续的规则筛选

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
print("结束冷启动问题")

基于规则进行文章过滤

# 基于规则进行文章过滤
# 保留文章主题与用户历史浏览主题相似的文章
# 保留文章字数与用户历史浏览文章字数相差不大的文章
# 保留最后一次点击当天的文章
# 按照相似度返回最终的结果
def get_click_article_ids_set(all_click_df):
    return set(all_click_df.click_article_id.values)

def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
                     user_last_item_created_time_dict, item_type_dict, item_words_dict, 
                     item_created_time_dict, click_article_ids_set, recall_item_num):
    """
        冷启动的情况下召回一些文章
        :param user_recall_items_dict: 基于内容embedding相似性召回来的很多文章, 字典, {user1: [(item1, item2), ..], }
        :param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射
        :param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射
        :param user_last_item_created_time_idct: 字典,用户点击的历史文章创建时间映射
        :param item_tpye_idct: 字典,文章主题映射
        :param item_words_dict: 字典,文章字数映射
        :param item_created_time_dict: 字典, 文章创建时间映射
        :param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章
        :param recall_item_num: 召回文章的数量, 这个指的是没有出现在日志里面的文章数量
    """
    
    cold_start_user_items_dict = {}
    for user, item_list in tqdm(user_recall_items_dict.items()):
        cold_start_user_items_dict.setdefault(user, [])
        for item, score in item_list:
            # 获取历史文章信息
            hist_item_type_set = user_hist_item_typs_dict[user]
            hist_mean_words = user_hist_item_words_dict[user]
            hist_last_item_created_time = user_last_item_created_time_dict[user]
            hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)
            
            # 获取当前召回文章的信息
            curr_item_type = item_type_dict[item]
            curr_item_words = item_words_dict[item]
            curr_item_created_time = item_created_time_dict[item]
            curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)

            # 首先,文章不能出现在用户的历史点击中, 然后根据文章主题,文章单词数,文章创建时间进行筛选
            if curr_item_type not in hist_item_type_set or \
                item in click_article_ids_set or \
                abs(curr_item_words - hist_mean_words) > 200 or \
                abs((curr_item_created_time - hist_last_item_created_time).days) > 90: 
                continue
                
            cold_start_user_items_dict[user].append((item, score))      # {user1: [(item1, score1), (item2, score2)..]...}
    
    # 需要控制一下冷启动召回的数量
    cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
                                  for k, v in cold_start_user_items_dict.items()}
    
    pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb'))
    
    return cold_start_user_items_dict






print("开始基于规则进行文章过滤")
all_click_df_ = all_click_df.copy()
all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)
click_article_ids_set = get_click_article_ids_set(all_click_df)
# 需要注意的是
# 这里使用了很多规则来筛选冷启动的文章,所以前面再召回的阶段就应该尽可能的多召回一些文章,否则很容易被删掉
cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
                                              user_last_item_created_time_dict, item_type_dict, item_words_dict, \
                                              item_created_time_dict, click_article_ids_set, recall_item_num)

user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict

print("结束基于规则进行文章过滤")

多路召回合并

###多路召回合并就是将前面所有的召回策略得到的用户文章列表合并起来,下面是对前面所有召回结果的汇总

##1.基于itemcf计算的item之间的相似度sim进行的召回
##2.基于embedding搜索得到的item之间的相似度进行的召回
##3.YoutubeDNN召回
##4.YoutubeDNN得到的user之间的相似度进行的召回
##5.基于冷启动策略的召回
print("开始多路召回合并")
def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
    final_recall_items_dict = {}
    
    # 对每一种召回结果按照用户进行归一化,方便后面多种召回结果,相同用户的物品之间权重相加
    def norm_user_recall_items_sim(sorted_item_list):
        # 如果冷启动中没有文章或者只有一篇文章,直接返回,出现这种情况的原因可能是冷启动召回的文章数量太少了,
        # 基于规则筛选之后就没有文章了, 这里还可以做一些其他的策略性的筛选
        if len(sorted_item_list) < 2:
            return sorted_item_list
        
        min_sim = sorted_item_list[-1][1]
        max_sim = sorted_item_list[0][1]
        
        norm_sorted_item_list = []
        for item, score in sorted_item_list:
            if max_sim > 0:
                norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
            else:
                norm_score = 0.0
            norm_sorted_item_list.append((item, norm_score))
            
        return norm_sorted_item_list
    
    print('多路召回合并...')
    for method, user_recall_items in tqdm(user_multi_recall_dict.items()):
        print(method + '...')
        # 在计算最终召回结果的时候,也可以为每一种召回结果设置一个权重
        if weight_dict == None:
            recall_method_weight = 1
        else:
            recall_method_weight = weight_dict[method]
        
        for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化
            user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
        
        for user_id, sorted_item_list in user_recall_items.items():
            # print('user_id')
            final_recall_items_dict.setdefault(user_id, {})
            for item, score in sorted_item_list:
                final_recall_items_dict[user_id].setdefault(item, 0)
                final_recall_items_dict[user_id][item] += recall_method_weight * score  
    
    final_recall_items_dict_rank = {}
    # 多路召回时也可以控制最终的召回数量
    for user, recall_item_dict in final_recall_items_dict.items():
        final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]

    # 将多路召回后的最终结果字典保存到本地
    pickle.dump(final_recall_items_dict_rank, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb'))

    return final_recall_items_dict_rank









# 这里直接对多路召回的权重给了一个相同的值,其实可以根据前面召回的情况来调整参数的值
weight_dict = {'itemcf_sim_itemcf_recall': 1.0,
               'embedding_sim_item_recall': 1.0,
               'cold_start_recall': 1.0}






# 最终合并之后每个用户召回150个商品进行排序
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)

print("结束多路召回合并")

召回字典转换成df

final_recall_items_dict = pickle.load(open(save_path +"final_recall_items_dict.pkl", "rb"))
#print("final_recall_items_dict:", final_recall_items_dict)
#final_recall_items_dict: {4: [(42762, 2.0), (42237, 1.7501865528691507), (166380, 1.0)]}

print("开始召回字典转换成df")
#### 召回字典转换成df
# 将字典的形式转换成df
final_recall_items_score_list = []

for user, items in tqdm(final_recall_items_dict.items()):
    for item, score in items:
        final_recall_items_score_list.append([user, item, score])

recall_df = pd.DataFrame(final_recall_items_score_list, columns=['user_id', 'click_article_id', 'pred_score'])

print("结束召回字典转换成df")

生成提交文件

# 生成提交文件
def submit(recall_df, topk=5, model_name=None):
    recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
    recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')
    
    # 判断是不是每个用户都有5篇文章及以上
    tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
    assert tmp.min() >= topk
    
    del recall_df['pred_score']
    submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()
    
    submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
    # 按照提交格式定义列名
    submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2', 
                                                  3: 'article_3', 4: 'article_4', 5: 'article_5'})
    
    save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'
    submit.to_csv(save_name, index=False, header=True)

获取测试集

print("开始获取测试集")
# 获取测试集
tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
tst_users = tst_click['user_id'].unique()
print("结束获取测试集")

从所有的召回数据中将测试集中的用户选出来

print("开始从所有的召回数据中将测试集中的用户选出来")
# 从所有的召回数据中将测试集中的用户选出来
tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]
print("结束从所有的召回数据中将测试集中的用户选出来")

生成提交文件

print("开始生成提交文件")
# 生成提交文件
submit(tst_recall, topk=5, model_name='final_recall_baseline')
print("结束生成提交文件")

学习过程

20年当时自身功底是比较零基础(会写些基础的Python[三个科学计算包]数据分析),一开始看这块其实挺懵的,不会就去问百度或其他人,当时遇见困难挺害怕的,但22后面开始力扣题【目前已刷好几轮,博客没写力扣文章之前,力扣排名靠前已刷有5遍左右,排名靠后刷3次左右,代码功底也在一步一步提升】不断地刷、遇见代码不懂的代码,也开始去打印print去理解,到后面问其他人的问题越来越少,个人自主学习、自主解决能力也得到了进一步增强。

比赛源自:阿里云天池大赛 - 零基础入门推荐系统 - 新闻推荐

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/447330.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据库】软件测试之MySQL数据库面试总结

有表如下&#xff1a; Student 学生表 SC 成绩表 Course 课程表 Teacher 老师表 每个学生可以学习多门课程&#xff0c;每一个课程都有得分&#xff0c;每一门课程都有老师来教&#xff0c;一个老师可以教多个学生 1、查询姓‘朱’的学生名单 select * from Student whe…

直播录屏软件电脑版盘点,哪个才是你的最佳选择?

随着网络直播的兴起&#xff0c;录屏功能逐渐成为了许多用户电脑上的必备工具。无论是为了记录游戏过程、制作教学视频&#xff0c;还是为了保存会议内容&#xff0c;一个易于操作且功能全面的录屏软件都是不可或缺的。那直播录屏软件电脑版都有哪些呢&#xff1f;本文将为大家…

安卓项目:app注册/登录界面设计

目录 第一步&#xff1a;设计视图xml 第二步&#xff1a;编写登录和注册逻辑代码 运行效果展示&#xff1a; 总结&#xff1a; 提前展示项目结构&#xff1a; 第一步&#xff1a;设计视图xml 在layout目录下面创建activity_login.xml和activity_main.xml文件 activity_lo…

GEE错误——Landsat 9 数据集(LANDSAT/LC09/C02/T1_L2)ST_10波段缺少影像问题如何处理

简介 Landat 9的数据集是由卫星传感器记录并传输的。ST_10波段是其中一个波段,但是如果这个波段的影像数据缺失,可能是由于各种原因导致的。 以下是一些可能导致ST_10波段影像数据缺失的原因: 1. 传感器故障:可能是传感器在记录或传输过程中发生了故障,导致无法正确记录…

重要通告 | 公司更名为“浙江实在智能科技有限公司”

更名公告 升级蜕变、砥砺前行 因业务快速发展和战略升级&#xff0c;经相关政府机构批准&#xff0c;自2024年3月1日起&#xff0c;原“杭州实在智能科技有限公司”正式更名为“浙江实在智能科技有限公司”。 更名后&#xff0c;公司统一社会信用代码不变&#xff0c;业务主体…

【vue3之Pinia:状态管理工具】

Pinia:状态管理工具 一、认识Pinia二、定义store三、gettters四、Action1.定义普通函数2.异步实现 五、storeToRefs工具函数六、pinia持久化插件1. 安装插件2. main.js 使用3. 开启4.其他配置 一、认识Pinia Pinia 是 Vue 的最新 状态管理工具 &#xff0c;是 Vuex 的 替代品 …

关于多权威属性加密论文阅读

来源于2007年Multi-authority Attribute Based Encryption 从单权威机构到多权威机构的意义是什么呢&#xff1f; 基础方案&#xff08;单权威方案SW&#xff09;支持数据持有者对数据进行加密使用指定的属性集合并且指定一个数值d。当一个用户需要使用该数据时&#xff0c;需…

盘点CSV文件在Excel中打开后乱码问题的两种处理方法

目录 一、CSV文件乱码问题概述 二、修改文件编码格式 1.识别CSV文件编码 2.修改编码格式 3.在Excel中打开修改后的CSV文件 案例 三、利用文本编辑器进行预处理 1.打开CSV文件并检查乱码 2.替换或删除乱码字符 3.保存并导入Excel 案例 四、注意事项 1、识别原始编码…

【机器学习】有监督学习算法之:逻辑回归

逻辑回归 1、引言2、逻辑回归2.1 定义2.2 基本原理2.3 公式2.3.1 核心公式2.3.2 Sigmoid函数 2.4 代码示例 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c;鱼哥&#xff0c;求助 小鱼&#xff1a;咋了。 小屌丝&#xff1a;我被逻辑回归难住了。 小鱼&#xff1a;然后你…

数据结构 - 堆

这篇博客将介绍堆的概念以及堆的实现。 1. 堆的定义&#xff1a; 首先堆的元素按照是完全二叉树的顺序存储的。 且堆中的某个节点总是不大于或不小于其父节点的值。 根节点最大的堆叫做大堆&#xff0c;根节点最小的堆叫小堆。逻辑结构如下图所示&#xff1a; 大堆和小堆的…

ZJUBCA研报分享 | 《BTC/USDT周内效应研究》

ZJUBCA研报分享 引言 2023 年 11 月 — 2024 年初&#xff0c;浙大链协顺利举办为期 6 周的浙大链协加密创投训练营 &#xff08;ZJUBCA Community Crypto VC Course&#xff09;。在本次训练营中&#xff0c;我们组织了投研比赛&#xff0c;鼓励学员分析感兴趣的 Web3 前沿话题…

opencv解析系列 - 基于DOM提取大面积植被(如森林)

Note&#xff1a;简单提取&#xff0c;不考虑后处理&#xff08;填充空洞、平滑边界等&#xff09; #include <iostream> #include "opencv2/imgproc.hpp" #include "opencv2/highgui.hpp" #include <opencv2/opencv.hpp> using namespace cv…

Ajax (1)

什么是Ajax&#xff1a; 浏览器与服务器进行数据通讯的技术&#xff0c;动态数据交互 axios库地址&#xff1a; <script src"https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script> 如何使用呢&#xff1f; 我们现有个感性的认识 <scr…

【Prometheus】k8s集群部署node-exporter

​ 目录 一、概述 1.1 prometheus简介 1.2 prometheus架构图 1.3 Exporter介绍 1.4 监控指标 1.5 参数定义 1.6 默认启用的参数 1.7 prometheus如何收集k8s/服务的–三种方式收集 二、安装node-exporter组件 【Prometheus】概念和工作原理介绍-CSDN博客 【云原生】ku…

微信小程序使用 iconfont

base64 形式引入 首先我们点击 iconfont 项目中的 项目设置 按钮&#xff0c;位置如下图所示&#xff1a; 我们勾选图中所示三种字体格式&#xff0c;选择 base64 是为了将另外两种字体转为 base64 形式&#xff0c;而选择 woff 与 ttf 字体原因如下&#xff1a; TTF 兼容性更…

【自然语言处理】NLP入门(五):1、正则表达式与Python中的实现(5):字符串常用方法:对齐方式、大小写转换详解

文章目录 一、前言二、正则表达式与Python中的实现1.字符串构造2. 字符串截取3. 字符串格式化输出4.字符转义符5. 字符串常用函数函数与方法之比较 6. 字符串常用方法1. 对齐方式center()ljust()rjust() 2. 大小写转换lower()upper()capitalize()title()swapcase() 一、前言 本…

基于.Net 的图形验证码模块

&#x1f3c6;作者&#xff1a;科技、互联网行业优质创作者 &#x1f3c6;专注领域&#xff1a;.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造 &#x1f3c6;欢迎关注我&#xff08;Net数字智慧化基地&#xff09;&#xff0c;里面…

【探索程序员职业赛道:挑战与机遇】

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

本地部署推理TextDiffuser-2:释放语言模型用于文本渲染的力量

系列文章目录 文章目录 系列文章目录一、模型下载和环境配置二、模型训练&#xff08;一&#xff09;训练布局规划器&#xff08;二&#xff09;训练扩散模型 三、模型推理&#xff08;一&#xff09;准备训练好的模型checkpoint&#xff08;二&#xff09;全参数推理&#xff…

PaddlePaddle框架安装

提示&#xff1a;可在python环境中进行安装&#xff0c;避免环境污染&#xff0c;创建命令conda create -n xxx_name python3.9,激活conda activate xxx_name 第一步&#xff1a;查看计算机平台版本 在窗口输入查看命令&#xff0c;查看CUDA的版本 nvidia-smi 二、根据以下条件…