Spark10-11

10. 广播变量

10.1 广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据

广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量

 

10.2 广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率

sc.broadcast这个方法是阻塞的(同步的)

广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据

广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据

10.3 案例:根据IP计算归属地

10.3.1 需求

根据IP规则数据,计算出给定日志中ip地址对应的省份信息,由于IP地址的规则数据相对较小,所以可以将IP规则数据先广播出去,以后关联IP规则数据,就可以在内存中进行关联了,这样可以避免shuffle,提高执行效率!

10.3.2 代码实现

11. 序列化问题

11.1 序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,

  • 封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
  • 函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)

11.2 数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!

Scala

object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用并行化的方式创建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //将数据封装到一个普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以后定义bean,建议使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

11.3 函数闭包问题

11.3.1 闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了

 

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法

为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误

Scala
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

Scala
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

11.3.2 在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
val rulesObj = RuleObjectSer

//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))

 

11.3.3 在Driver端初始化实现序列化的class

在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
val rulesClass = new RuleClassSer

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

 

11.3.4 在函数内部初始化未序列化的object

object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

 

11.3.5 在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

11.3.6 调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//处理数据,关联维度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一个分区的多条数据,使用同一个RuleClassNotSer实例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知")
    //获取当前线程ID
    val treadId = Thread.currentThread().getId
    //获取当前Task对应的分区编号
    val partitiondId = TaskContext.getPartitionId()
    //获取当前Task运行时的所在机器的主机名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/32038.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 如何使用 @ExceptionHandler 注解处理异常消息

Spring Boot 如何使用 ExceptionHandler 注解处理异常消息 在 Spring Boot 应用程序中,异常处理是非常重要的一部分。当应用程序出现异常时,我们需要能够捕获和处理这些异常,并向用户提供有用的错误消息。在 Spring Boot 中,可以…

二叉平衡树之红黑树

目录 1.概念 2.性质 3.节点的定义 4.插入 1.按照二叉搜索树规则插入结点 2.调整颜色 1.uncle存在且为红色 2.uncle不存在或者为黑 cur为 3.根节点改为黑色 5.验证 6.比较 7.应用 1.概念 红黑树,是一种二叉搜索树,但在每个结点上增加一个存…

2023年5月青少年机器人技术等级考试理论综合试卷(五级)

青少年机器人技术等级考试理论综合试卷(五级) 分数: 100 题数: 30 一、 单选题(共 20 题, 每题 4 分, 共 80 分) 1.ESP32 for Arduino, 下列程序的运行结果是? ( &#x…

浅谈无线测温系统在高压开关柜中的应用

关注acrelzxz,了解更多详情 摘要:高压开关柜是配电系统中重要的组成部分,其主要作用是控制电荷、分配电能和开断电流等,对维持系统的稳定性有一定的保障作用。将无线测温技术应用于高压开关柜,可以实现对其进行实时的…

校园外卖行业内卷之下,高校外卖创业者如何成为卷王?

伴随着外卖行业的不断发展,校园市场前景广阔。校园外卖市场因各大平台的竞争而变得越来越复杂。各种技术支持和经验参考让大学生创业校园外卖越来越困难,市场竞争也越来越激烈。 校园外卖市场究竟有多内卷? 外卖龙头企业。 校园市场广阔的发…

【高危】crypto-js<3.2.1 存在不安全的随机性漏洞

漏洞描述 crypto-js 是一个 JavaScript 加密库,用于在浏览器和 Node.js 环境中执行加密和解密操作。 crypto-js 3.2.1 之前版本中的 secureRandom 函数通过将字符串 0. 和三位随机整数拼接的格式生成加密字符串,攻击者可通过爆破破解加密字符。 漏洞…

Oracle 查询优化改写(第五章)

第五章 使用字符串 1.遍历字符串 SELECT 天天向上 内容&#xff0c;level&#xff0c;substr(天天向上, LEVEL, 1) 汉字拆分FROM Dual CONNECT BY LEVEL < Length(天天向上);2.计算字符在字符串中出现的次数 3.从字符中删除不需要的字符 若员工姓名有元音字母AEIOU&#x…

RPC远程调用

简介 PRC是一种调用方式而不是一种协议 在本地调用方式时由于方法在同一个内存空间&#xff0c;所以程序中可以直接调用该方法&#xff0c;但是浏览器端和服务端程序是不在一个内存空间的&#xff0c;需要使用网络来访问&#xff0c;就需要使用TCP或者UDP协议&#xff0c;由于…

STM32实现延时

在STM32单片机中&#xff0c;实现延时一般都是使用定时器&#xff0c;既可以使用Systick定时器&#xff0c;也可以使用常规的定时器。 定时器在设置了定时并开启之后&#xff0c;就会进入自主运行模式&#xff0c;其中&#xff0c;初始化设置这一阶段是由CPU执行相应指令完成的…

ubuntu双系统安装

1. 下载系统 国内镜像 http://mirrors.ustc.edu.cn/ubuntu-releases/2. U盘启动盘 Rufus 软件 制作U盘启动盘 Rufus 链接 https://rufus.en.softonic.com/3. 磁盘中准备一定未分配磁盘 我准备了100G 4. BIOS启动项选择为usb启动&#xff08;每个品牌进BIOS不同&#xff0…

Docker 进入容器和交换文件

1、进入容器 有些时候需要进入容器进行操作&#xff0c;使用 docker exec 命令&#xff0c;这个命令后面可以添加很多参数&#xff0c;我们这里只讲添加 -i 和 -it 参数。 只添加 -i 参数时&#xff0c;由于没有分配伪终端&#xff0c;界面没有我们熟悉的 Linux 命令提示…

Kubernetes 和 Prometheus

资源监控系统是容器编排系统必不可少的组件&#xff0c;也是服务治理的核心之一。而 Prometheus 本质上是一个开源的服务监控系统和时序数据库&#xff0c;是 CNCF 起家的第二个项目&#xff0c;目前已经成为 Kubernetes 生态圈中的监控系统的核心。 Prometheus 的核心组件 Pro…

java学习记录之MySql二

1 mysql回顾 1.1 DDL 数据定义语言&#xff1a;结构  数据库database create database 数据库名称 character set 字符集 [collate 比较]; drop database 数据库名称; alter database 数据库名称 character set 字符集 …;  表 create table 表名(字段描述 , … ); 字段描述…

GD32 SPI 查询方式和DMA方式在全双模式下效率区别

最近在使用SPI的时候&#xff0c;遇到了一些数据传输效率问题&#xff0c;在此记录自己学习过程。SPI的基础知识这里就不在讲述了&#xff0c;直接分析SPI查询方式和DMA方式的效率问题。这里使用的芯片是GD32F303CC。 SPI以查询方式进行全双工通信 1.查询手册&#xff0c;SPI…

Spring Cloud Alibaba-全链路灰度设计

文章目录 灰度发布概念灰度发布架构Spring Cloud Alibaba技术架构下的灰度发布实现基础设计HttpHeader设计 Spring Cloud Gateway改造Spring Cloud Gateway实现灰度发布过滤器 后端服务自定义Spring MVC请求拦截器OpenFeign改造自定义Loadbalancer 测试微服务注册元信息修改自定…

在windows11环境下安装CUDA11.6+Anaconda3+pyToach1.13搭建炼丹炉

0.电脑环境 系统&#xff1a;win11 显卡&#xff1a;NVIDIA GTX1650 还有一个pyCharm&#xff0c;其他也用不到了&#xff0c;需要的文章中会进行说明 1.安装CUDA11.6 目前2023.03出来的pyToach2.0是用不到了&#xff0c;因为最低版本支持CUDA11.7。我的显卡是1650&#xff0c…

阿里巴巴高管换血,吴永明接替张勇

文章目录 经济学人 &#x1f4b0; 第 26 周&#x1fa78; 阿里巴巴高管换血&#xff0c;吴永明接替张勇&#x1f304; 孙正义再出山&#x1f43f;️ 英特尔加码德国&#xff01;投资330亿美元建两座芯片工厂&#xff01;&#x1f30a; 亚马逊被指控强加 Prime 服务✈️ 印度航空…

Jmeter多接口测试之参数传递

目录 前言&#xff1a; 接口示例 正则表达式提取器 正则表达式提取实例 Json提取器 Json提取器实例 前言&#xff1a; 在进行多接口测试时&#xff0c;有些情况下需要将前一个接口返回数据作为后一个接口的参数&#xff0c;以模拟实际场景。JMeter作为一款常用的性能测试…

【EXCEL】如何查找特殊字符 问号‘?’星号 ‘*’

目录 0.环境 1.适用场景 1&#xff09;直接搜索问号的结果&#xff1a; 2&#xff09;修改【查找内容】后&#xff0c;搜索结果变为精准定位&#xff1a; 2.具体做法 0.环境 windows wps&#xff08;或excel&#xff0c;这里试了&#xff0c;此问题wps和excel表格是通用…

中间件解析漏洞

服务器解析漏洞算是历史比较悠久了&#xff0c;但如今依然广泛存在。在此记录汇总一些常见服务器&#xff08;WEB server&#xff09;的解析漏洞&#xff0c;比如IIS6.0、IIS7.5、apache、nginx等 2|0 二、IIS5.x-6.x解析漏洞&#xff08;针对asa/asp/cer&#xff09; 2|11、打…