目录
原因
由于实时性,所以算法设计需要满足一下两点
算法设计
算法实现
算法公式
完整代码
原因
用户对电影的偏好随着时间的推移总是会发生变化的。此时离线系统无法解决,需要实时推荐。
由于实时性,所以算法设计需要满足一下两点
1 根据用户最近几次评分进行推荐;
2 计算量要小,满足响应时间上面的实时;
算法设计
借鉴基于物品Item-CF的算法,将相似度和评分结合起来。
算法实现
1 当用户完成一次评分后,从Redis里取最近的k次评分的数据(mid,score);
2 备选电影(与评分电影的相似的电影)和评分过的电影,根据电影形似度矩阵计算相似度;
3 为提高精准率和召回率,设置偏移项,进行模型的鼓励与惩罚。
算法公式
其中:
表示用户 u 对电影 r 的评分;
sim(q,r)表示电影 q 与电影 r 的相似度,设定最小相似度为 0.6,当电影 q 和 电影 r 相似度低于 0.6 的阈值,则视为两者不相关并忽略;
sim_sum 表示 q 与 RK 中电影相似度大于最小阈值的个数;
incount 表示 RK 中与电影 q 相似的、且本身评分较高(>=3)的电影个数;
recount 表示 RK 中与电影 q 相似的、且本身评分较低(<3)的电影个数;
核心代码
def computeMovieScore(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]]): Array[(Int, Double)] = {
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
for(candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
val simScore =getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
if(simScore > 0.7){
scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
if (userRecentlyRating._2 > 3) {
increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
} else {
decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
}
}
}
scores.groupBy(_._1).map{
case (mid, scoreList) =>
(mid,scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
}.toArray
}
完整代码
package com.qh.streaming
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
/**
* 连接助手
* 序列化
*/
object ConnHelper extends Serializable {
lazy val jedis = new Jedis("hadoop100") //redis
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://hadoop100:27017/recommender"))
}
case class MongoConfig(uri: String, db: String)
// 定义一个基准推荐对象
case class Recommendation(mid: Int, score: Double)
// 定义基于预测评分的用户推荐列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])
// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])
object StreamingRecommender {
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_MOVIES_NUM = 20
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop100:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
// spark2.x sparksession已封装除了 sparkstream上下文之外的 session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 拿到streaming context
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(2)) // batch duration 批处理时间 微批次
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
//加载电影相似度矩阵,广播
val simMovieMatrix = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRecs]
.rdd //为了查询相似度的过程更快,转换成map
.map {
movieRecs =>
(movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap)
}.collectAsMap()
val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
// 定义kafka连接参数
val kafkaParam = Map(
"bootstrap.servers" -> "hadoop100:9092",
"key.deserializer" -> classOf[StringDeserializer], //反序列化
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest" //偏移量初始化设置
)
//kafka创建一个DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam )
)
//把原始数据 UID|MID|SCORE|TIMESTAMP =>评分流
val ratingStream = kafkaStream.map {
msg =>
val attr = msg.value().split("\\|")
(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}
//继续做流式处理,核心实时算法
ratingStream.foreachRDD{
rdds => rdds.foreach{
case (uid,mid, score, timestamp) =>
println("rating data coming ! >>>>>>>>>>>>>>>>")
//1 从redis里获取当前用户最近的k次评分,保存成Array[(mid, score)]
val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
//2 从相似度矩阵中获取当前电影最相似的N个电影,作为备选列表,Array[mid]
// 根据uid过滤,将该用户评分过的电影过滤掉
// 在mongodb里面进行过滤
val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
//3 最每个备选电影计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
val streamResc = computeMovieScore( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
//4 把推荐数据保存到mongodb
saveDataToMongoDB( uid, streamResc )
}
}
ssc.start()
println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming started")
ssc.awaitTermination()
}
// redis返回的是java类,需要引入转换类
import scala.collection.JavaConversions._
/**
* 从 redis 中选取 该用户最近 num次的 对某个电影的评分
* @param num 选取的最近评分记录的 数量
* @param uid 该用户的 ID
* @param jedis
* @return 该用户的最近num次 的评分记录
*/
def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
//从redis读数据,用户评分数据保存在 uid:UID 为Key的队列里, value MID:SCORE
jedis.lrange("uid:" + uid, 0, num-1)
.map{
item =>
val attr = item.split("\\:")
( attr(0).trim.toInt, attr(1).trim.toDouble ) //uid , score
}
.toArray
}
/**
* 根据当前电影 从相似度矩阵 选取除了用户已看的电影 之外的 num个形似的电影列表
* @param num 要选取相似的电影数量
* @param mid 当前低钠盐的ID
* @param uid 当前评分的用户ID
* @param simMovies 相似度矩阵
* @param mongoConfig
* @return 过滤后的 备选电影的列表
*/
def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
(implicit mongoConfig: MongoConfig): Array[Int] = {
//从相似度矩阵中取出所有相似的电影
val allSimMovies: Array[(Int, Double)] = simMovies(mid).toArray
//从mongodb中查询用户已看过的电影, 用于过滤
val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
.find( MongoDBObject("uid" -> uid) )
.toArray
.map{
item => item.get("mid").toString.toInt
}
//过于该用户看过的电影(用户如果评过分数 就代表用户看过该电影)
allSimMovies.filter( x => ! ratingExist.contains(x._1) )
.sortWith(_._2>_._2)
.take(num)
.map( x => x._1 )
}
def computeMovieScore(candidateMovies: Array[Int],
userRecentlyRatings: Array[(Int, Double)],
simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {
//定义一个ArrayBuffer.用户保存每一个备选电影的基础得分
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
// 定义一个HashMap,保存每一个备选电影的增强减弱因子
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
//先拿到相似度(备选电影和 最近评分电影)
// 相当于从 相似度矩阵中 最近取 相应坐标 的值
// 可能 没有值 ,所以需要 设置默认值
val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
// 相似度矩阵 在 离线计算中 存储时,筛选条件为 > 0.6
// 这里进一步 筛选
if(simScore > 0.7){
//* 打分 => 备选电影的基础推荐得分
// ArrayBuffer += 运算符重载 第一个()为 +=()的函数() 第二个() 为 元组
scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
// 统计 增强因子 和 减弱因子
if (userRecentlyRating._2 > 3) {
increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1 //设置默认值
} else {
decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
}
}
}
// 除 总数
// 合并相同mid 的 groupby
scores.groupBy(_._1).map{
// groupBY => Map[Int, ArrayBuffer[(Int, Double)]
case (mid, scoreList) =>
( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
}.toArray
}
/**
* 获取两个电影之间的相似度
* @param mid1
* @param mid2
* @param simMovies
* @return
*/
def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
// 用模式匹配判断是否为 空值
simMovies.get(mid1) match {
case Some(sims1) => sims1.get(mid2) match {
case Some(sims2) => sims2
case None => 0.0
}
case None => 0.0
}
}
/**
* 自定义 log 将 底数作为超参数
* 超参数N 默认为 10
*/
def log(x: Int):Double ={
val N = 10
// 对数换底公式
math.log(x) / math.log(N)
}
//覆盖 更新
def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
// 连接表
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
// 如果有 先删除
streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
// 再更新
streamRecsCollection.insert(MongoDBObject("uid" -> uid,
"recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))
}
}