【电影推荐系统】实时推荐

目录

原因

由于实时性,所以算法设计需要满足一下两点

算法设计

算法实现

算法公式

 完整代码


原因

用户对电影的偏好随着时间的推移总是会发生变化的。此时离线系统无法解决,需要实时推荐。

由于实时性,所以算法设计需要满足一下两点

1 根据用户最近几次评分进行推荐;

2 计算量要小,满足响应时间上面的实时;

算法设计

借鉴基于物品Item-CF的算法,将相似度和评分结合起来。

算法实现

1 当用户完成一次评分后,从Redis里取最近的k次评分的数据(mid,score);

2 备选电影(与评分电影的相似的电影)和评分过的电影,根据电影形似度矩阵计算相似度;

3 为提高精准率和召回率,设置偏移项,进行模型的鼓励与惩罚。

算法公式

其中:

表示用户 u 对电影 r 的评分;

sim(q,r)表示电影 q 与电影 r 的相似度,设定最小相似度为 0.6,当电影 q 和 电影 r 相似度低于 0.6 的阈值,则视为两者不相关并忽略;

sim_sum 表示 q 与 RK 中电影相似度大于最小阈值的个数;

incount 表示 RK 中与电影 q 相似的、且本身评分较高(>=3)的电影个数;

recount 表示 RK 中与电影 q 相似的、且本身评分较低(<3)的电影个数;

核心代码

 def computeMovieScore(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]]): Array[(Int, Double)] = {
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()
    for(candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
      val simScore =getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
      if(simScore > 0.7){
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
        if (userRecentlyRating._2 > 3) {
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else {
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    scores.groupBy(_._1).map{
      case (mid, scoreList) =>
        (mid,scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray
  }

 完整代码

package com.qh.streaming

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
 * 连接助手
 * 序列化
 */
object ConnHelper extends Serializable {
  lazy val jedis = new Jedis("hadoop100") //redis
  lazy val mongoClient = MongoClient(MongoClientURI("mongodb://hadoop100:27017/recommender"))
}

case class MongoConfig(uri: String, db: String)

// 定义一个基准推荐对象
case class Recommendation(mid: Int, score: Double)

// 定义基于预测评分的用户推荐列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])

object StreamingRecommender {

  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
    //    spark2.x sparksession已封装除了 sparkstream上下文之外的 session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2)) // batch duration 批处理时间 微批次

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    //加载电影相似度矩阵,广播
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd //为了查询相似度的过程更快,转换成map
      .map {
        movieRecs =>
        (movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap)
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 定义kafka连接参数
    val kafkaParam = Map(
      "bootstrap.servers" -> "hadoop100:9092",
      "key.deserializer" -> classOf[StringDeserializer],  //反序列化
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"   //偏移量初始化设置
    )
    //kafka创建一个DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam )
    )

    //把原始数据 UID|MID|SCORE|TIMESTAMP =>评分流
    val ratingStream = kafkaStream.map {
      msg =>
        val attr = msg.value().split("\\|")
        (attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
    }

    //继续做流式处理,核心实时算法
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid,mid, score, timestamp) =>
          println("rating data coming ! >>>>>>>>>>>>>>>>")

          //1 从redis里获取当前用户最近的k次评分,保存成Array[(mid, score)]
          val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
          //2 从相似度矩阵中获取当前电影最相似的N个电影,作为备选列表,Array[mid]
//              根据uid过滤,将该用户评分过的电影过滤掉
//              在mongodb里面进行过滤
          val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
          //3 最每个备选电影计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
          val streamResc = computeMovieScore( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
          //4 把推荐数据保存到mongodb
          saveDataToMongoDB( uid, streamResc )

      }
    }

    ssc.start()



    println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming started")
    ssc.awaitTermination()



  }
//  redis返回的是java类,需要引入转换类
  import scala.collection.JavaConversions._

  /**
   * 从 redis 中选取 该用户最近 num次的 对某个电影的评分
   * @param num  选取的最近评分记录的 数量
   * @param uid  该用户的 ID
   * @param jedis
   * @return 该用户的最近num次 的评分记录
   */
  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    //从redis读数据,用户评分数据保存在 uid:UID 为Key的队列里, value MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item =>
          val attr = item.split("\\:")
          ( attr(0).trim.toInt, attr(1).trim.toDouble )  //uid , score
      }
      .toArray
  }

  /**
   * 根据当前电影 从相似度矩阵 选取除了用户已看的电影 之外的 num个形似的电影列表
   * @param num   要选取相似的电影数量
   * @param mid   当前低钠盐的ID
   * @param uid   当前评分的用户ID
   * @param simMovies  相似度矩阵
   * @param mongoConfig
   * @return  过滤后的 备选电影的列表
   */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                     (implicit mongoConfig: MongoConfig): Array[Int] = {
    //从相似度矩阵中取出所有相似的电影
    val allSimMovies: Array[(Int, Double)] = simMovies(mid).toArray
    //从mongodb中查询用户已看过的电影, 用于过滤
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find( MongoDBObject("uid" -> uid) )
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }
    //过于该用户看过的电影(用户如果评过分数 就代表用户看过该电影)
    allSimMovies.filter( x => ! ratingExist.contains(x._1) )
      .sortWith(_._2>_._2)
      .take(num)
      .map( x => x._1 )
  }



  def computeMovieScore(candidateMovies: Array[Int],
                        userRecentlyRatings: Array[(Int, Double)],
                        simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {
    //定义一个ArrayBuffer.用户保存每一个备选电影的基础得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定义一个HashMap,保存每一个备选电影的增强减弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()

    for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
      //先拿到相似度(备选电影和 最近评分电影)
//      相当于从 相似度矩阵中 最近取 相应坐标 的值
//      可能 没有值 ,所以需要 设置默认值
      val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

//      相似度矩阵 在 离线计算中 存储时,筛选条件为 > 0.6
//      这里进一步 筛选
      if(simScore > 0.7){
        //* 打分 => 备选电影的基础推荐得分
//        ArrayBuffer +=  运算符重载 第一个()为 +=()的函数() 第二个() 为 元组
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
//        统计 增强因子 和 减弱因子
        if (userRecentlyRating._2 > 3) {
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1 //设置默认值
        } else {
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    //      除 总数
//     合并相同mid 的 groupby
    scores.groupBy(_._1).map{
//          groupBY => Map[Int, ArrayBuffer[(Int, Double)]
      case (mid, scoreList) =>
        ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray

  }

  /**
   * 获取两个电影之间的相似度
   * @param mid1
   * @param mid2
   * @param simMovies
   * @return
   */
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
    // 用模式匹配判断是否为 空值
    simMovies.get(mid1) match {
      case Some(sims1) => sims1.get(mid2) match {
        case Some(sims2) => sims2
        case None => 0.0
      }
      case None => 0.0
    }
  }

  /**
   * 自定义 log 将 底数作为超参数
   * 超参数N 默认为 10
   */
  def log(x: Int):Double ={
    val N = 10
//    对数换底公式
    math.log(x) / math.log(N)
  }

//覆盖 更新
  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
//    连接表
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
//    如果有 先删除
    streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
//    再更新
    streamRecsCollection.insert(MongoDBObject("uid" -> uid,
      "recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35090.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Go语言远程调试

Go语言远程调试 1、安装dlv # 安装dlv $ go install github.com/go-delve/delve/cmd/dlvlatest$ dlv version Delve Debugger Version: 1.20.1 Build: $Id: 96e65b6c615845d42e0e31d903f6475b0e4ece6e $2、命令行远程调试 我们远程(Linux服务器)有如下代码&#xff1a; [ro…

自学大语言模型之GPT

GPT火爆的发展史 2017年6月OpenAI联合DeepMind首次正式提出的&#xff1a;Deep Reinforcement Learning from Human Preferences&#xff0c;即基于人类偏好的深度强化学习&#xff0c;简称RLHF 2017年7月的OpenAI团队提出的对TRPO算法的改进&#xff1a;PPO算法 GPT-1&#…

Tomcat的优化多实例部署

目录 一.tomcat核心组件模块 1.2. toncat功能组件结构 二.Tomcat 优化 三.简述Tomcat请求过程 四.Tomcat 多实例部署 多实例部署图示 1.关闭防火墙 拖入软件包 2.安装JDk 设置JDK环境变量 3.解压tomcat 创建目录 4.配置 tomcat 环境变量 5.修改 tomcat2 中的 server.xm…

学习系统编程No.29【线程执行过程之页表详解】

引言&#xff1a; 北京时间&#xff1a;2023/7/3/14:09&#xff0c;刚睡醒&#xff0c;放假在家起床时间确实不怎么好调整&#xff0c;根本固定不了一点&#xff0c;当然通俗点说也就是根本起不来&#xff0c;哈哈哈&#xff0c;已经很少见到那种7点起来码字的情形了&#xff…

UART-GD32

UART-GD32 通信的概念 同步通信和异步通信 数据帧格式 波特率 使用步骤 引脚分布

gitLab配置ssh实现私钥访问

1.配置ssh文件 1.cd C:\Users\用户名\.ssh 找到文件夹 删除.ssh 里面所有其他文件方面我们配置要最新的 2.win r cmd 呼出命令行 ssh-keygen -t rsa -C "必须对应gitLab用户名" 3.生成文件夹拿到ssh 4.复制id_rsa_pub 文件的全部字符串 公钥给到GitLab服务器 2.公…

Spring Boot 中的模板引擎是什么,如何使用

Spring Boot 中的模板引擎是什么&#xff0c;如何使用 在 Web 应用程序中&#xff0c;模板引擎是一种用于动态生成 HTML、XML、JSON 等文档的工具。Spring Boot 内置了多种常见的模板引擎&#xff0c;例如 Thymeleaf、Freemarker、Velocity 等&#xff0c;让我们可以轻松地创建…

线性代数行列式的几何含义

行列式可以看做是一系列列向量的排列&#xff0c;并且每个列向量的分量可以理解为其对应标准正交基下的坐标。 行列式有非常直观的几何意义&#xff0c;例如&#xff1a; 二维行列式按列向量排列依次是 a \mathbf{a} a和 b \mathbf{b} b&#xff0c;可以表示 a \mathbf{a} a和…

Lua学习笔记:浅谈对垃圾回收的理解

前言 本篇在讲什么 Lua的垃圾回收 本篇适合什么 适合初学Lua的小白 本篇需要什么 对Lua语法有简单认知 依赖Sublime Text编辑器 本篇的特色 具有全流程的图文教学 重实践&#xff0c;轻理论&#xff0c;快速上手 提供全流程的源码内容 ★提高阅读体验★ &#x1f…

3、boostrap图片视频上传展示

boostrap图片视频上传展示 1、展示效果2、html代码 1、展示效果 项目目录结构 2、html代码 html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><!--<link rel"st…

记一次 .NET 某工控视觉系统 卡死分析

一&#xff1a;背景 1. 讲故事 前段时间有位朋友找到我&#xff0c;说他们的工业视觉软件僵死了&#xff0c;让我帮忙看下到底是什么情况&#xff0c;哈哈&#xff0c;其实卡死的问题相对好定位&#xff0c;无非就是看主线程栈嘛&#xff0c;然后就是具体问题具体分析&#x…

一起来看看文档翻译哪个好吧

在繁忙的都市生活中&#xff0c;小玲是一位年轻的职场人士。她的工作经常需要处理各种文档和文件&#xff0c;而其中不乏需要与外国合作伙伴交流的时候。然而&#xff0c;她并不熟悉其他语言&#xff0c;这给她的工作带来了一定的困扰。于是&#xff0c;她开始寻找免费的文档翻…

什么是AOP?

目录 一、AOP简介 1、AOP简介和作用 2、AOP的概念 二、AOP的基本实现 三、AOP工作流程 1 、AOP工作流程 2、AOP核心概念 四、AOP切入点表达式 1、语法格式 2、通配符 五、AOP通知类型 1、AOP通知分类 2、AOP通知详解 &#xff08;1&#xff09;前置通知 &#xf…

MySQL-分库分表详解(四)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️努力不一定有回报&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xf…

【ArcGIS微课1000例】0069:用ArcGIS提取一条线的高程值

本实验讲解用ArcGIS软件,基于数字高程模型DEM提取一条线的高程值并导出。 文章目录 一、加载实验数据二、将线转为折点三、提取折点高程值四、导出高程值五、注意事项【相关阅读】:【GlobalMapper精品教程】060:用dem提取一条线的高程值 一、加载实验数据 本实验使用的数据…

初学者一步步学习python 学习提纲

当学习Python时&#xff0c;可以按照以下提纲逐步学习&#xff1a; 入门基础 了解Python的历史和应用领域安装Python解释器和开发环境&#xff08;如Anaconda、IDLE等&#xff09;学习使用Python的交互式解释器或集成开发环境&#xff08;IDE&#xff09;进行简单的代码编写和…

Seafile搭建个人云盘 - 内网穿透实现在外随时随地访问

文章目录 1. 前言2. SeaFile云盘设置2.1 Owncould的安装环境设置2.2 SeaFile下载安装2.3 SeaFile的配置 3. cpolar内网穿透3.1 Cpolar下载安装3.2 Cpolar的注册3.3 Cpolar云端设置3.4 Cpolar本地设置 4. 公网访问测试5. 结语 转载自cpolar极点云文章&#xff1a;使用SeaFile搭建…

【电影推荐系统】基于 ALS 的协同过滤推荐算法

目录 目的 用户电影推荐矩阵主要思路如下 1 UserId 和 MovieID 做笛卡尔积&#xff0c;产生&#xff08;uid&#xff0c;mid&#xff09;的元组 2 通过模型预测&#xff08;uid&#xff0c;mid&#xff09;的元组。 3 将预测结果通过预测分值进行排序。 4 返回分值最大的 …

elk中kibana使用

1.前言 kibana是一款作为elasticsearch可视化的一款软件&#xff0c;将elasticsearch中的数据以可视化的状态展现出来&#xff0c;kibana也提供了查询、统计、修改索引等功能 2.kibana使用 索引管理 在索引管理中&#xff0c;可以看到所有索引的状态、运行状况、主分片、副本…

pytorch快速入门中文——07(TensorBoard)

使用 TensorBoard 可视化模型&#xff0c;数据和训练 原文&#xff1a;https://pytorch.org/tutorials/intermediate/tensorboard_tutorial.html 在 60 分钟突击中&#xff0c;我们向您展示了如何加载数据&#xff0c;如何通过定义为nn.Module子类的模型提供数据&#xff0c;如…