【电影推荐系统】基于 ALS 的协同过滤推荐算法

目录

目的

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

 完整代码

offlineRecommender.scala

ALSTrainer.scala


目的

训练出用户电影推荐矩阵,电影相似度矩阵。

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

val userMovies = userRDD.cartesian(movieRDD)

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

核心代码    

val preRatings = model.predict(userMovies)
    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

均方根误差RMSE公式

核心代码 

 def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3)

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

核心代码   

val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)   
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

 完整代码

offlineRecommender.scala

package com.qh.offline

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

//跟sparkMl 里面区分
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

//基准推荐对象
case class Recommendation(mid: Int, score: Double)

//基于预测评分的用户推荐列表
case class UserResc(uid: Int, recs: Seq[Recommendation]) //类别, top10基准推荐对象

//基于LFM电影特征向量的电影相似度列表
case class MoiveResc(mid: Int, recs: Seq[Recommendation])

/**
 * 基于隐语义模型的协同过滤 推荐
 *
 * 用户电影推荐矩阵
 * 通过ALS训练出来的Model计算所有当前用户电影的推荐矩阵
 * 1. UserID和MovieId做笛卡尔积
 * 2. 通过模型预测uid,mid的元组
 * 3. 将预测结果通过预测分值进行排序
 * 4. 返回分值最大的K个电影,作为推荐
 * 生成的数据结构 存到UserRecs表中
 *
 * 电影相似度矩阵
 * 模型评估和参数选取
 *
 */
object offline {

  val MONGODB_RATING_COLLECTION = "Rating"
  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"
  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    //加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => (rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache() //缓存,诗酒花在内存中
    //预处理
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    //训练LFM
    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    //基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表
    //    笛卡尔积空矩阵
    val userMovies = userRDD.cartesian(movieRDD)

    //预测
    val preRatings = model.predict(userMovies)

    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    //基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
    val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }

    //电影与电影 笛卡尔积
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)    //过滤出相似度
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  /*
  求解余弦相似度
   */
  def consinSim(matrixA: DoubleMatrix, matrixB: DoubleMatrix):Double = {
//    .dot 点乘  .norm2 L2范数,就是模长
    matrixA.dot(matrixB) / (matrixA.norm2() * matrixB.norm2())
  }
}

ALSTrainer.scala

package com.qh.offline

import breeze.numerics.sqrt
import com.qh.offline.offline.MONGODB_RATING_COLLECTION
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * 调参
 * 通过ALS寻找lMF参数,打印到控制台输出
 */
object ALSTrainer {
  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => Rating(rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache()
    //   随机切分数据集=>训练集 测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainRDD = splits(0)
    val testRDD = splits(1)

    //    模型参数选择
    adjustALSParam(trainRDD, testRDD)

    spark.close()

  }

  def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3))
  }

  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    //      计算预测评分
    val userProducts = data.map(item => (item.user, item.product))
    val predictRating = model.predict(userProducts)
    //uid 和 mid 交集 做被减数
    val observed = data.map(item => ((item.user, item.product), item.rating)) //实际观测值和预测值
    val predict = predictRating.map(item => ((item.user, item.product), item.rating))

    sqrt(
      observed.join(predict).map {
        case ((uid, mid), (actual, pre)) => //内连接没有数据冗余,不需要groupby
          val err = actual - pre
          err * err
      }.mean()
    )
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35065.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

elk中kibana使用

1.前言 kibana是一款作为elasticsearch可视化的一款软件&#xff0c;将elasticsearch中的数据以可视化的状态展现出来&#xff0c;kibana也提供了查询、统计、修改索引等功能 2.kibana使用 索引管理 在索引管理中&#xff0c;可以看到所有索引的状态、运行状况、主分片、副本…

pytorch快速入门中文——07(TensorBoard)

使用 TensorBoard 可视化模型&#xff0c;数据和训练 原文&#xff1a;https://pytorch.org/tutorials/intermediate/tensorboard_tutorial.html 在 60 分钟突击中&#xff0c;我们向您展示了如何加载数据&#xff0c;如何通过定义为nn.Module子类的模型提供数据&#xff0c;如…

计算机体系结构基础知识介绍之缓存性能的十大进阶优化之编译器控制的预取和利用HBM扩展内存层次(七)

优化九&#xff1a;编译器控制的预取以减少丢失惩罚或丢失率 硬件预取的替代方案是编译器在处理器需要数据之前插入预取指令来请求数据。 预取有两种类型&#xff1a; ■ 寄存器预取将值加载到寄存器中。 ■ 高速缓存预取仅将数据加载到高速缓存。 这两种类型都可以分为有错…

跟我一起从零开始学python(一)编程语法必修

前言 随着互联网的高速发展&#xff0c;python市场越来越大&#xff0c;也越来越受欢迎&#xff0c;主要源于它&#xff1a;易学易用&#xff0c;通用性广&#xff0c;时代需要&#xff0c;源代码的开放以及人工智能浪潮&#xff0c;接来下我们就从这几个方向谈谈为何python越…

17 MFC进程通信

文章目录 剪切板管道匿名管道父进程写入数据子进程读出数据 命名管道 邮槽邮槽服务器邮槽客户端 剪切板 设置界面 发送 //设置剪切板数据 void CClipboardDlg::OnBnClickedBtnSend() {UpdateData(TRUE);if (m_strSend.IsEmpty()){MessageBox(L"请输入需要设置的文本&quo…

微信小程序如何进行开发?

文章目录 0.引言1.注册微信公众平台账号2.准备微信开发者工具3.创建微信小程序并预览 0.引言 笔者编程一般编得较多的是桌面软件&#xff0c;有时也会编手机软件&#xff0c;这些软件都必须安装才能使用&#xff0c;这限制了软件的推广。而现有社交软件如微信使用得较广泛&…

Linux的编译器——gcc/g++(预处理、编译、汇编、链接)

文章目录 一.程序实现的两个环境二.gcc如何完成1.预处理2.编译3.汇编4.链接 三.动态库与静态库对比下二者生成的文件大小 四.gcc常用选项 前言&#xff1a; 本文主要认识与学习Linux环境下常用的编译器——gcc&#xff08;编译C代码&#xff09;/g&#xff08;编译C代码&#x…

深度学习--神经网络全面知识点总结(持续更新中)

文章目录 神经网络基础1.1 什么是神经网络&#xff1f;1.2 神经元和激活函数1.3 前向传播和反向传播1.4 损失函数和优化算法 深度神经网络2.1 卷积神经网络&#xff08;CNN&#xff09;2.2 循环神经网络&#xff08;RNN&#xff09;2.3 长短期记忆网络&#xff08;LSTM&#xf…

凝思系统docker离线安装

# linux离线安装docker (18.03.1-ce) ## 解压&#xff0c;得到docker文件夹 tar xzvf docker-18.03.1-ce.tgz ## 将docker文件夹里面的所有内容复制到/usr/bin目录 sudo cp docker/* /usr/bin/ ## 开启docker守护进程 sudo dockerd & 当终端中显示【API list…

Mathtype7Mac苹果ios简体中文版

对于很多人来说&#xff0c;每次编辑文字的时候遇到公式简直就是噩梦。像那些复杂的数学、物理还有化学公式&#xff0c;太难编辑出来了。 那么我们该怎么解决这些难题呢&#xff1f;其实很简单&#xff0c;用公式编辑器就行了。 公式编辑器&#xff0c;是一种工具软件&#…

网络安全之反序列化漏洞分析

简介 FastJson 是 alibaba 的一款开源 JSON 解析库&#xff0c;可用于将 Java 对象转换为其 JSON 表示形式&#xff0c;也可以用于将 JSON 字符串转换为等效的 Java 对象分别通过toJSONString和parseObject/parse来实现序列化和反序列化。 使用 对于序列化的方法toJSONStrin…

卷积神经网络| 猫狗系列【AlexNet】

首先&#xff0c;搭建网络&#xff1a; AlexNet神经网络原理图&#xff1a; net代码&#xff1a;【根据网络图来搭建网络&#xff0c;不会的看看相关视频会好理解一些】 import torchfrom torch import nnimport torch.nn.functional as Fclass MyAlexNet(nn.Module): def…

Flutter学习四:Flutter开发基础(六)调试Flutter应用

目录 0 引言 1 Flutter异常捕获 1.1 Dart单线程模型 1.2 Flutter异常捕获 1.2.1 Flutter框架异常捕获 1.2.1.1 Flutter默认异常捕获方式 1.2.1.2 自己捕获异常并上报 1.2.2 其他异常捕获与日志收集 1.2.3 最终的错误上报代码 0 引言 本文是对第二版序 | 《Flutter实…

《Lua程序设计》--学习2

表 Lua语言中的表本质上是一种辅助数组&#xff08;associative array&#xff09;&#xff0c;这种数组不仅可以使用数值作为索引&#xff0c;也可以使用字符串或其他任意类型的值作为索引&#xff08;nil除外&#xff09;。 Lua语言中的表要么是值要么是变量&#xff0c;它…

防火墙基本原理详解

概要 防火墙是可信和不可信网络之间的一道屏障&#xff0c;通常用在LAN和WAN之间。它通常放置在转发路径中&#xff0c;目的是让所有数据包都必须由防火墙检查&#xff0c;然后根据策略来决定是丢弃或允许这些数据包通过。例如&#xff1a; 如上图&#xff0c;LAN有一台主机和一…

【macOS 系列】如何在mac 邮件客户端配置QQ邮箱和第二个账号

文章目录 一、配置QQ邮箱二、添加新的账户 一、配置QQ邮箱 需要在QQ邮箱账户设置中开启&#xff1a; 开启时&#xff0c;会让你发短信到指定号码&#xff0c;然后就会弹出一个验证码 也就是添加邮箱的密码不是QQ密码&#xff0c;而是这个验证码&#xff0c;这个可以生成多个&…

【OpenGL】读取视频并渲染

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍读取视频并渲染。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新不迷路&#…

ELK实验部署过程

ELK集群部署环境准备 配置ELK日志分析系统 192.168.1.51 elk-node1 es、logstash、kibana 192.168.1.52 elk-node2 es、logstash 192.168.1.53 apache logstash &#xff08;我这里是把虚拟机的配置全部都改为2核3G的&#xff09; 2台linux 第1台&#xff1a;elk-nod…

【数据库原理】MyShop 商城数据库设计(SQL server)

MyShop 商城数据库设计 项目背景定义课程设计要求概念结构设计逻辑结构设计数据结构的描述用户信息数据结构的描述地址信息数据结构的描述商品类别数据结构的描述商品数据结构的描述购物车数据结构的描述订单数据结构的描述订单项数据结构的描述 物理结构设计用户表结构地址表结…

STM32——GPIO配置

文章目录 一、GPIO八种模式1. 输入2. 输出3. 如何选择GPIO的模式 二、库函数GPIO配置1. 配置代码2.参数设置 一、GPIO八种模式 GPIO的输入输出是对于STM32单片机来说的。以下仅为个人粗略笔记&#xff0c;内部电路分析可参考博客https://blog.csdn.net/k666499436/article/det…