【大数据面试知识点】Spark中的累加器

Spark累加器

累加器用来把Executor端变量信息聚合到Driver端,在driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge。

累加器一般是放在行动算子中进行操作的。

Spark累加器有哪些特点?

1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态

2)在Executor中修改它,在Driver读取

3)executor级别共享的,广播变量是task级别的共享两个application不可以共享累加器,但是同一个app不同的job可以共享

应用举例

不经过Shuffle实现词频统计

object Spark06_Accumulator {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
    // 声明累加器
    val sumAcc: LongAccumulator = sc.longAccumulator("sumAcc")
    rdd.foreach {
      case (word, count) => {
        // 使用累加器
        sumAcc.add(count)
      }
    }
    // 累加器的toString方法
    //println(sumAcc)
    //取出累加器中的值
    println(sumAcc.value)
    sc.stop()
  }
}

不经过shuffle,计算以H开头的单词出现的次数。

object Spark07_MyAccumulator {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.makeRDD(List("Hello", "HaHa", "spark", "scala", "Hi", "Hello", "Hi"))
    // 创建累加器
    val myAcc = new MyAccumulator
    //注册累加器
    sc.register(myAcc, "MyAcc")
    rdd.foreach{
      datas => {
        // 使用累加器
        myAcc.add(datas)
      }
    }
    // 获取累加器的结果
    println(myAcc.value)

    sc.stop()
  }
}

// 自定义累加器
// 泛型分别为输入类型和输出类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
  // 定义输出数据变量
  var map: mutable.Map[String, Int] = mutable.Map[String, Int]()

  // 累加器是否为初始状态
  override def isZero: Boolean = map.isEmpty

  // 复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
    val MyAcc = new MyAccumulator
    // 将此累加器中的数据赋值给新创建的累加器
    MyAcc.map = this.map
    MyAcc
  }

  // 重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  // 累加器添加元素
  override def add(v: String): Unit = {
    if (v.startsWith("H")) {
      // 判断map集合中是否已经存在此元素
      map(v) = map.getOrElse(v, 0) + 1
    }
  }

  // 合并累加器中的元素
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    val map1: mutable.Map[String, Int] = this.map
    val map2: mutable.Map[String, Int] = other.value
    // 合并两个map
    map = map1.foldLeft(map2) {
      (m, kv) => {
        m(kv._1) = m.getOrElse(kv._1, 0) + kv._2
        m
      }
    }
  }

  // 获取累加器中的值
  override def value: mutable.Map[String, Int] = {
    map
  }
}

参考:Spark累加器的作用和使用-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283646.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解决相机库CameraView多滤镜拍照错乱的BUG (一) : 复现BUG

1. 前言 这段时间,在使用 natario1/CameraView 来实现带滤镜的预览、拍照、录像功能。 由于CameraView封装的比较到位,在项目前期,的确为我们节省了不少时间。 但随着项目持续深入,对于CameraView的使用进入深水区,逐…

lambda表达式和包装器

正文开始前给大家推荐个网站,前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 我们在使用库里的排序算法时如果排序的是自定义类型或者库里默认的排序不能满足我们则需求&…

【单片机项目实战】温度控制系统

本项目的主要作用是实现温度调控,通过设定一个预定的温度值,实现实时检测外界温度,当外界温度小于预定值时,电机正转,实现降温效果;当外界温度大于预定值时,电机反转,实现升温效果&a…

网络安全—PKI公钥基础设施

文章目录 前提知识散列函数非对称加密数字签名 PKI受信任的人RA注册CA颁发IKE数字签名认证(交换证书)密钥管理 前提知识 散列函数 散列也可以叫哈希函数,MD5、SHA-1、SHA-2、、(不管叫啥,都记得是同一个东西就行&…

分库分表之Mycat应用学习五

5 Mycat 离线扩缩容 当我们规划了数据分片,而数据已经超过了单个节点的存储上线,或者需要下线节 点的时候,就需要对数据重新分片。 5.1 Mycat 自带的工具 5.1.1 准备工作 1、mycat 所在环境安装 mysql 客户端程序。 2、mycat 的 lib 目录…

mac上使用Navicat Premium 在本地和生产环境中保持数据库同步

Navicat Premium 是一款功能强大的数据库管理和开发工具,支持多种数据库系统,如 MySQL、Oracle、SQL Server 等。作为程序员,我深知在开发过程中需要一款方便、高效的数据库管理工具来提升工作效率。而 Navicat Premium 正是这样一款不可多得…

Spring5注解驱动(六)

5. 自动装配 5.1. Autowired&Qualifier&Primary 在原来,我们就是使用Autowired的这个注解来进行自动装配; 现在有一个BookController 类 package com.hutu.controller;import com.hutu.service.BookService; import org.springframework.bea…

2023最新租号平台系统源码支持单独租用或合租使用

这是一款租号平台源码,采用常见的租号模式。目前网络上还很少见到此类类型的源码。 平台的主要功能如下: 支持单独租用或采用合租模式; 采用易支付通用接口进行支付; 添加邀请返利功能,以便站长更好地推广&#xf…

基于蚁狮算法优化的Elman神经网络数据预测 - 附代码

基于蚁狮算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于蚁狮算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于蚁狮优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要:针…

逻辑卷学习后续----------缩容

一、缩容:缩减大小 ext4可以 , xfs无法缩减,缩减会影响业务 1.解挂载 2.检查文件系统完整性 3.缩减文件系统 4.缩减逻辑卷上下一致 5.再挂载回去 添加磁盘 文件系统只能装ext4 缩减文件系统 resize2fs 挂载失败需要重新安装文件系统…

磁盘阵列(RAID)

1.独立硬盘冗余阵列(RAID, Redundant Array of Independent Disks) 旧称廉价磁盘冗余阵列(Redundant Array of Inexpensive Disks),简称磁盘阵列 用虚拟化存储技术把多个硬盘组合起来,成为一个或多个硬盘阵…

[C#]使用ONNXRuntime部署一种用于边缘检测的轻量级密集卷积神经网络LDC

源码地址: github.com/xavysp/LDC LDC: Lightweight Dense CNN for Edge Detection算法介绍: 由于深度学习方法的快速发展,近年来,用于执行图像边缘检测的卷积神经网络(CNN)模型爆炸性地传播。但边缘检测…

Selenium教程04:鼠标+键盘网页的模拟操作

在webdriver 中,鼠标操作都封装在ActionChains类中,使用的时候需要导入这个包。 from selenium.webdriver import ActionChainsActionChains方法列表如下: click(on_elementNone) ——单击鼠标左键click_and_hold(on_elementNone) ——点击…

开始使用MEVN技术栈开发02 MongoDB介绍

开始使用MEVN技术栈开发02 MongoDB介绍 MongoDB介绍 As indicated by the ‘ M ’ in MEVN, we will use MongoDB as the backend database for our app. MongoDB is a NoSQL database. Before we talk about what is a NoSQL database, let ’ s first talk about relationa…

[每周一更]-(第48期):一名成熟Go开发需储备的知识点(问题篇)- 1

问题篇 1、Go语言基础知识 什么是Go语言?它有哪些特点?Go语言的数据类型有哪些?Goroutine是什么?它与线程的区别是什么?介绍一下Go语言的垃圾回收机制。 2、并发和并行 什么是并发和并行?它们之间的区别…

Java超高精度无线定位技术--UWB (超宽带)人员定位系统源码

UWB室内定位技术是一种全新的、与传统通信技术有极大差异的通信新技术。它不需要使用传统通信体制中的载波,而是通过发送和接收具有纳秒或纳秒级以下的极窄脉冲来传输数据,从而具有GHz量级的带宽。 UWB(超宽带)高精度定位系统是一…

Java方法(定义和调用,带参数方法定义和调用,带返回值方法的定义和调用,方法的注意事项,方法重载)

文章目录 1. 方法概述1.1 方法的概念 2. 方法的定义和调用2.1 无参数方法定义和调用2.3 无参数方法的练习 3. 带参数方法定义和调用3.1 带参数方法定义和调用3.2 形参和实参3.3 带参数方法练习 4. 带返回值方法的定义和调用4.1 带返回值方法定义和调用4.2 带返回值方法练习14.3…

React学习计划-React16--React基础(八)react-redux使用与优化,纯函数介绍

笔记gitee地址 学习了 redux,为什么还要讲react-redux呢? redux不是专门为react所创建的,只不过在某一刻,react和redux看对眼了,所以俩人走到了一起,所以为了更好的支持redux,react官方出了react-redux来更好的支持redux 1. react…

UntiyShader(七)Debug

目录 前言 一、利用假彩色图像 二、利用Visual Studio 三、帧调试器 前言 Debug(调试),是程序员检查问题的一种方法,对于一个Shader调试更是一种噩梦,这也是Shader难写的原因之一——如果效果不对,我们…