Spark记录未整理,请以较平静的心态阅读。
目的: 根据user_id进行分组,同时将同一user_id看过的anime_id转化为一个字符串数组(anime_ids),将anime_ids转化为二维的list [[[20, 81, 170, 263…],[]…],最后构建一个关于anime_ids的邻接矩阵。
本文的目的
1)根据是anime_id构建一个邻接矩阵
2)构建deep walk的转移矩阵和入口矩阵
3)根据deep walk算法实现sample
4)使用spark的word2vec训练稠密向量
4)使用redis缓存user_id的embedding参数
5)使用局部敏感hash算法完成近似查找
原始数据
+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
| 1| 8074| 10|
| 1| 11617| 10|
| 1| 11757| 10|
| 1| 15451| 10|
| 2| 11771| 10|
+-------+--------+------+
包引入
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import numpy as np
from collections import defaultdict
spark 运行需要jdk,导入环境变量JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/jre-1.8.0-openjdk"
邻接矩阵构建
row和col是anime_id,值是看过行anime_id又看过列anime_id的user_id数
# 辅助函数
def print_(name):
print_str = "=" * 50 + ">>>" + name + "<<<" + "=" * 50
print(print_str)
spark = SparkSession.builder.appName("feat-eng").getOrCreate()
rating_df = spark.read.csv(
"/data/jupyter/recommendation_data/rating.csv", header=True, inferSchema=True
)
rating_df = rating_df.where("rating > 7")
# 原始数据
# print(rating_df.show(5))
watch_seq_df = rating_df.groupBy("user_id").agg(
F.collect_list(F.col("anime_id").cast("string")).alias("anime_ids")
)
# print(watch_seq_df.show(2))
# +-------+--------------------+
# |user_id| anime_ids|
# +-------+--------------------+
# | 148|[20, 81, 170, 263...|
# | 463|[20, 24, 68, 102,...|
# +-------+--------------------+
# print(watch_seq_df.printSchema())
# root
# |-- user_id: integer (nullable = true)
# |-- anime_ids: array (nullable = true)
# | |-- element: string (containsNull = false)
# 转成一个list
watch_seq = watch_seq_df.collect()
watch_seq = [s["anime_ids"] for s in watch_seq]
# 邻接矩阵
matrix = defaultdict(lambda: defaultdict(int))
for i in range(len(watch_seq)):
seq = watch_seq[i]
for x in range(len(seq)):
for y in range(x + 1, len(seq)):
a = seq[x]
b = seq[y]
if a == b:
continue # 对角线不统计
matrix[a][b] += 1
matrix[b][a] += 1
看过同时看过anime_id=20和其他anime_id的人数,dict的key是anime_id=20的邻居节点,value是同时观看的用户人数,结果如下:
print(matrix["20"])
概率转移矩阵和入口转移矩阵
概率转移矩阵:决定当前节点在deep walk中如何选择下一个节点
入口转移矩阵:deep walk 初始化时选择的节点
概率转移矩阵格式:某一个anime_id的neighbours和他的转移概率(tranfer_probs)
{
"anime_id": {
"neighbours": [2, 3, 5, 7],
"probs": [0.16, 0.16, 0.32, 0.32]
},
}
入口转移矩阵格式:计算某一个anime_id看在的次数在所有用户看过的anime_id的总次数的占比
[0.001953672356421993, 0.0004123166720890604, 0.0008729517041885576, ...]
代码
def get_transfer_prob(vs):
neighbours = list(vs.keys())
total_weight = sum(vs.values())
probs = [vs[k] / total_weight for k in vs.keys()]
return {"neighbours": neighbours, "prob": probs}
tranfer_probs = {k: get_transfer_prob(v) for k, v in matrix.items()}
entrance_items = list(tranfer_probs)
neighbour_sum = {k: sum(matrix[k].values()) for k in entrance_items}
total_sum = sum(neighbour_sum.values())
entrence_probs = [neighbour_sum[e] / total_sum for e in entrance_items]
构建随机游走
流程:
1)在入口转移概率中随机选一个节点,加入路径中
2)循环length次在转移概率中随机选一个节点,加入路径中
3)更新当前节点
4)重复循环形成一个length+1的一个节点路径
4)外部循环n次形成n组采样
rng = np.random.default_rng()
def one_walk(length, entrance_items, entrence_probs, tranfer_probs):
start_point = rng.choice(entrance_items, 1, p=entrence_probs)[0]
path = [str(start_point)]
current_point = start_point
for _ in range(length):
neighbours = tranfer_probs[current_point]["neighbours"]
transfor_prob = tranfer_probs[current_point]["prob"]
next_point = rng.choice(neighbours, 1, p=transfor_prob)[0]
path.append(str(next_point))
current_point = next_point
return path
n = 500
sample = [one_walk(20, entrance_items, entrence_probs, tranfer_probs) for _ in range(n)]
采样格式如下,数数字代表anime_id:
spark的word2vec
将sample采样后数据转化成DataFrame格式,利用spark的word2vec转化为稠密向量
sample_df = spark.createDataFrame([[row] for row in sample], ["anime_ids"])
from pyspark.ml.feature import Word2Vec
item2vec = Word2Vec(vectorSize=5, maxIter=2, windowSize=15) # skip model
item2vec.setInputCol("anime_ids")
item2vec.setOutputCol("anime_ids_vec")
model = item2vec.fit(sample_df)
# test 这里是找到和anime_id=20比较近的10条数据
# rec = model.findSynonyms("20", 10)
# 获取训练参数,便于进行下一步操作
item_vec = model.getVectors().collect()
item_emb = {}
for item in item_vec:
item_emb[item.word] = item.vector.toArray()
@F.udf(returnType="array<float>")
def build_user_emb(anime_seq):
anime_embs = [item_emb[aid] if aid in item_emb else [] for aid in anime_seq]
anime_embs = list(filter(lambda l: len(l) > 0, anime_embs))
emb = np.mean(anime_embs, axis=0)
return emb.tolist()
user_emb_df = watch_seq_df.withColumn("user_emb", build_user_emb(F.col("anime_ids")))
print(user_emb_df.show(3))
# +-------+--------------------+--------------------+
# |user_id| anime_ids| user_emb|
# +-------+--------------------+--------------------+
# | 148|[20, 81, 170, 263...|[0.22693123, -0.0...|
# | 463|[20, 24, 68, 102,...|[0.24494155, -0.0...|
# | 471| [1604, 6702, 10681]|[0.37150145, 0.04...|
# +-------+--------------------+--------------------+
build_user_emb函数根据前面将某一个用户看过的所有anime_id逐个进行embedding,之后平均处理。这样最终我们就可以根据某一个用户看过的所有anime_id获得某一个用户的embedding。
redis保存缓存数据
from redis import Redis
redis = Redis()
user_emb = user_emb_df.collect()
# user_id和其对应的embedding
user_emb = {row.user_id: row.user_emb for row in user_emb}
# 辅助函数将float类别转化为字符串用:分割
def vec2str(vec):
if vec is None:
return ""
return ":".join([str(v) for v in vec])
def save_user_emb(embs):
str_emb = {item_id: vec2str(v) for item_id, v in embs.items()}
redis.hset("recall-user-emb", mapping=str_emb)
# 用户的embedding保存到redis
save_user_emb(user_emb)
# test语句
# redis.hget("recall-user-emb", "148")
# 辅助函数将str读成float的列表
def str2vec(s):
if len(s) == 0:
return None
return [float(x) for x in s.split(":")]
def load_user_emb():
result = redis.hgetall("recall-user-emb")
return {user_id.decode(): str2vec(emb.decode()) for user_id, emb in result.items()}
# 读取所有数据
load_user_emb()
近似查找ANN算法实现
使用包faiss的局部敏感hash函数(LSH)
import faiss
import numpy as np
emb_items = item_emb.items()
# emb_items 格式
# dict_items([('9936', array([-0.02695967, -0.14685549, -0.13547155, -0.00479672, -0.00417542])),
#('710', array([ 0.03320758, -0.11462902, 0.17806329, -0.3202453 , 0.12857111])),...])
emb_items = list(__builtin__.filter(lambda t: len(t[1]) > 0, emb_items))
item_ids = [i[0] for i in emb_items]
embs = [i[1] for i in emb_items]
index = faiss.IndexLSH(len(embs[0]), 256)
index.add(np.asarray(embs, dtype=np.float32))
# embs[99]=[-0.03878592 0.15011692 0.01134511 0.03049661 0.11688153]
# user_id=99最接近的10个原始向量的index
# D 是距离(faiss内部定义) I 是原始向量的index
D, I = index.search(np.asanyarray([embs[99]], dtype=np.float32), 10)
print(D) # [[ 0. 8. 23. 25. 29. 32. 33. 33. 39. 39.]]
print(I) # [[ 99 690 540 347 754 788 170 186 206 612]]