详解 Spark 核心编程之累加器

累加器是分布式共享只写变量

一、累加器功能

​ 累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge

在这里插入图片描述

二、累加器类型

1. 系统累加器

/**
常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator
说明:累加器一般放在行动算子中进行操作
*/
object TestRDDAcc {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Acc")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        // 创建累加器
        val accSum = sc.longAccumulator("sum")
        
        rdd.foreach(num => {
        	accSum.add(num)    
        })
        
        println(accSum.value)
        
        sc.stop()
        
    }
}

三、自定义累加器

自定义累加器实现 WordCount 案例,避免 shuffle 操作

/**
	1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型
		1.1 IN 表述累加器 add 的数据的类型
		1.2 OUT 表示累加器 value 的返回类型
		
	2.重写累加器的抽象方法
*/
object TestAccWordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello", "hive", "hello", "spark"
        ))
        
        // 创建自定义累加器
        val wcAcc = new MyAccumulator()
        
        // 向 spark 进行注册
        sc.register(wcAcc, "wordCountAcc")
        
        // 循环遍历 rdd
        rdd.foreach(word => {
            // 使用累加器
        	wcAcc.add(word)    
        })
        
        // 输出累加器的值
        println(wcAcc.value)
        
        sc.stop()
        
    }
}

/*
	自定义累加器
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    // 定义累加器的返回结果 Map
    private var resultMap = mutable.Map[String, Long]()
    
    // 判断是否为初始状态
    override def isZero: Boolean = resultMap.isEmpty()
    
    // 复制累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        this
    }
    
    // 重置累加器
    override def reset(): Unit = resultMap.clear()
    
    // 获取累加器输入的数据进行操作
    override def add(word: String): Unit = {
    	// 向 resultMap 中添加新值或累加旧值
        val count = resultMap.getOrElse(word, 0L) + 1
        resultMap.update(word, count)
    }
    
    // 合并多个累加器的结果
    override def merge(other:  AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    other.value.foreach({
        case (word, count) => {
            val newCount = this.resultMap.getOrElse(word, 0L) + 1
            this.resultMap.update(word, newCount)
        }
    })
}
    
    // 返回累加器的结果
    override def value: mutable.Map[String, Long] = resultMap
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/670888.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

制作ChatPDF之前端Vue搭建(二)

前端界面 接上篇: 制作ChatPDF之Elasticsearch8.13.4搭建(一) 为了实现一个基于 Vue.js 的前端应用,用户可以上传 PDF 文件,输入查询,并在输出框中显示查询结果,你需要以下步骤: 初始化 Vue …

Vue3实战笔记(59)—从零开始掌握Vue3插槽机制,进阶与提高

文章目录 前言一、具名插槽二、高级列表组件示例总结 前言 接上文&#xff0c;接下来看一点稍微复杂的&#xff1a;具名插槽 一、具名插槽 子组件 MyComponent.vue&#xff1a; <template><div><slot name"header"></slot><slot><…

LeetCode162寻找峰值元素

题目描述 峰值元素是指其值严格大于左右相邻值的元素。给你一个整数数组 nums&#xff0c;找到峰值元素并返回其索引。数组可能包含多个峰值&#xff0c;在这种情况下&#xff0c;返回 任何一个峰值 所在位置即可。你可以假设 nums[-1] nums[n] -∞ 。你必须实现时间复杂度为…

How to install a dataset from huggingface?

当我从抱抱脸上git clone imdb数据集时&#xff0c;plain_text里的文件是这样的&#xff1a;

FPGA新起点V1开发板(九)——流水灯

文章目录 一、模块框图二、代码编写三、注意点四、总结 一、模块框图 二、代码编写 endmodule下面需要敲出一个回车代码拼接是大括号 led < {led[2:0],led[3]}注意二进制和十进制 module flow_led(input sys_clk50,input rst_n,output reg [3:0] le…

双指针练习:盛水最多的容器

题目链接&#xff1a;11.盛水最多的容器 题目描述&#xff1a; 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可…

# linux 系统下,使用 docker 启动 mysql 后,通过 sqlyog 连接 mysql 报“错误号码2058“

linux 系统下&#xff0c;使用 docker 启动 mysql 后&#xff0c;通过 sqlyog 连接 mysql 报“错误号码2058“ 一、错误描述&#xff1a; 在 ubuntu 系统上&#xff0c;刚安装的 docker 启动 mysql 后&#xff0c;想通过图形界面 SQLyong 等工具连接 mysql 出现“错误号码2058…

算法题-给定一个日期,输出星期几

目录 给定日期&#xff0c;输出对应是星期几 测试结果 如1900年 5月6日是星期三&#xff0c;计算给的日期是星期几 给定日期&#xff0c;输出对应是星期几 #include <stdio.h> #include <stdlib.h> #include <string.h>int main() {char input[100];int d…

【Python技术】AI编程新手快速入门学习LangChain大模型框架

如果我们要搞AI智能体&#xff0c;普通人一般 借助腾讯元器、 coze、KIMI 或者其他大平台搞一搞&#xff0c;比如我配置的coze智能体在微信公众号聊天。 对于程序员来说&#xff0c;一言不合就喜欢搞代码。 前面文章也介绍了不少关于AI知识库问答&#xff0c;AIagent 不少开源…

STM32基于HAL库的HC-SR04模块超声波测距

文章目录 一、HC-SR04模块介绍二、创建工程1.选择芯片2.配置RCC、SY![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/9d2a5b883f0e409eabb804e6da861277.png)3.配置串口14.配置定时器5.配置GPIO 三、Keil代码1.勾选Use MicroLIB2.创建SR04.c和SR04.h文件3.其他代码 …

钣金件设计规范

(一&#xff09; 钣金 1、钣金的概念 钣金&#xff08;sheet metal&#xff09;是针对金属薄板&#xff08;厚度通常在6mm以下&#xff09;的 一种综合冷加工工艺&#xff0c;包括冲裁、折弯、拉深、成形、锻压、铆合等&#xff0c; 其显著的特征是同一零件厚度一致。 2、钣…

使用 Ollama 本地运行各种 LLM

今天看看另外一个产品Ollama。Ollama 的安装非常简单&#xff0c;只需从官网&#xff08;https://ollama.com/download&#xff09;下载后解压缩&#xff0c;并在 Terminal 中运行脚本 ollama run llama3 即可完成环境设置。 我尝试运行 Llama3&#xff0c;虽然在运行时占用了大…

网络协议分析

网络协议分析 网络协议分析概述用IP实现异构网络互联网络协议的分层TCP/IP的分层模型协议分析协议分析应用协议分析任务 常见网络协议PPP协议报文选项IPCP认证协议PAP安全缺陷认证协议CHAPPPPoE协议流程 地址解析协议ARPARP的思想和步骤ARP报文格式及封装 移动IP移动IP的工作机…

OpenAI 近期动荡:解雇 Sam Altman 事件分析与 AI 未来展望

引言 OpenAI 的动荡从未停止。最近&#xff0c;由于 OpenAI 高层领导的更迭&#xff0c;引发了广泛的关注和讨论。特别是在 Sam Altman 被解雇后&#xff0c;再次回归 CEO 职位的过程&#xff0c;更是引起了公众和业内的巨大反响。前 OpenAI 董事会成员 Helen Toner 在最新一期…

vue使用tailwindcss

安装依赖 pnpm add -D tailwindcss postcss autoprefixer创建配置文件tailwind.config.js npx tailwindcss init在配置文件content中添加所有模板文件的路径 /** type {import(tailwindcss).Config} */ export default {content: [./index.html, ./src/**/*.{vue,js,ts,jsx,…

安卓模拟鼠标,绘图板操作电脑PC端,卡卡罗特也说好,儿童节快乐

家人们&#xff0c;上链接了&#xff1a;https://download.csdn.net/download/jasonhongcn/89387887

Go语言-切片底层探索 —— 补充篇:切片和底层数组到底是什么关系?

之前的切片探索中&#xff0c;上篇通过一道算法题目&#xff0c;了解到切片的两大特性&#xff1a;一是&#xff1a;切片是引用类型&#xff0c;指向底层数组&#xff0c;修改其底层数组的时候&#xff0c;会影响切片中的值。二是&#xff1a;向切片中添加元素的时候&#xff0…

限流算法整理——滑动窗口限流算法

限流算法描述 滑动窗口限流需要将每个窗口空间划分为无限小的窗口区间&#xff0c;并且动态调整区间的起始点&#xff0c;并且在调整完毕之后需要判断各个区间&#xff0c;累加各个区间的请求&#xff0c;查看是否到达最大的阈值&#xff0c;以此返回允许请求还是拒绝请求 算…

SDL教程(二)——Qt+SDL播放器

前言 ​ 这篇文章主要是使用SDL来打开视频&#xff0c;显示视频。后续会再继续使用SDL来结合FFmpeg。来能够直接使用网上的demo进行学习。 正文 一、环境 Qt 5.15.2 MSVC2019 64bit Win11 二、Qt搭建SDL Qt搭建&#xff0c;我觉得相比用VS2019来说&#xff0c;更为方便&…

密文域可逆信息隐藏安全性研究-从图像到视频

前言 随着云存储、云计算等新兴技术的兴起&#xff0c;海量的隐私信息被广泛地上传、存储到服务器上。为保证用户的隐私性&#xff0c;必须对用户的数据进行加密&#xff0c;然后再将其上传到服务器上。因此&#xff0c;密文域的可逆信息隐藏(reversible data hiding in encry…