Spark 共享变量:广播变量与累加器解析

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

PySpark 数据处理实战:从基础操作到案例分析-CSDN博客

Spark 的容错机制:保障数据处理的稳定性与高效性-CSDN博客

目录

一、需求背景

二、广播变量(Broadcast Variables)

(一)功能

(二)语法 / 用法

(三)示例代码修改

(四)本质与优势

三、累加器(Accumulators)

(一)需求示例

(二)原理与功能

(三)使用方法与示例代码修改

四、总结


        在 Spark 大数据处理框架中,共享变量是一个非常重要的概念。当我们处理一些涉及到不同计算节点(Executor)需要访问相同数据的场景时,共享变量就发挥了关键作用。本文将深入探讨 Spark 中的广播变量和累加器,包括它们的使用场景、原理以及如何在实际代码中应用。

一、需求背景

        假设我们有一份用户数据(user.tsv),其中包含用户的一些基本信息如用户 id、用户名、年龄和城市 id,同时我们还有一个城市字典(city_dict),它存储了城市 id 与城市名称的对应关系。我们的目标是将这两份数据进行处理,得到包含用户完整信息(用户 id、用户名、年龄、城市 id、城市名称)的结果集。

user.tsv数据如下

user001 陆家嘴 18 2
user002 羊毛 20 5
user003 爱丽丝 22 6
user004 蒸饭 24 8
user005 淘米 26 1
user006 小笼包 28 7
user007 凉粉 30 4
user008 泡腾片 25 10
user009 炒米 27 3
user010 颖火虫 29 9

 city中的字典如下

city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}

示例代码如下

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取 conf 对象
	# setMaster  按照什么模式运行,local  bigdata01:7077  yarn
	#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
	#  appName 任务的名字
	conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
	# 假如我想设置压缩
	# conf.set("spark.eventLog.compression.codec","snappy")
	# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
	sc = SparkContext(conf=conf)

	fileRdd = sc.textFile("../datas/user.tsv")
	city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}
	def getLine(line):
		list01 = line.split(" ")
		cityName = city_dict.get(int(list01[3]))
		# print(cityName)
		return line + " " + cityName
	mapRdd = fileRdd.map(getLine)
	mapRdd.foreach(print)

	# 使用完后,记得关闭
	sc.stop()

结果如下

user007 凉粉 30 4 深圳
user008 泡腾片 25 10 成都
.....

        在 Spark 中,user_rdd 的计算处理在 Executor 中进行,而 city_dict 的数据存储在 Driver 的内存中。这就引发了一个问题:计算过程中每个 Task 是如何获取 city_dict 的数据呢?如果 city_dict 的数据量很大(例如 1G),每个 Task 都要从 Driver 中下载一份(假设存在多个 Task 导致总下载量达到 6G),那么网络传输的开销将非常大,性能会变得很差。

二、广播变量(Broadcast Variables)

(一)功能

        广播变量的主要功能就是将一个变量元素广播到每台 Worker 节点的 Executor 中。这样一来,每个 Task 就可以直接从本地读取数据,从而大大减少网络传输的 I/O。

(二)语法 / 用法

在 Spark 中使用广播变量,首先需要创建一个广播变量对象。例如:

broadcastValue = sc.broadcast(city_dict)

        这里的 sc 是 SparkContext 对象,city_dict 是我们想要广播的数据(在这个例子中是城市字典)。创建广播变量后,在需要使用该数据的地方,可以通过 broadcastValue.value 来获取广播的数据。

此链接是官方给的API文档:RDD Programming Guide - Spark 3.5.3 Documentation

(三)示例代码修改

        在我们的用户数据处理示例中,原始代码在处理每个用户数据行时,需要获取对应的城市名称。修改后的代码如下:

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取 conf 对象
	# setMaster  按照什么模式运行,local  bigdata01:7077  yarn
	#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
	#  appName 任务的名字
	conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
	# 假如我想设置压缩
	# conf.set("spark.eventLog.compression.codec","snappy")
	# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
	sc = SparkContext(conf=conf)

	fileRdd = sc.textFile("../datas/user.tsv",2)
	city_dict = {
		1: "北京",
		2: "上海",
		3: "广州",
		4: "深圳",
		5: "苏州",
		6: "无锡",
		7: "重庆",
		8: "厦门",
		9: "大理",
		10: "成都"
	}
	# 将一个变量广播出去,广播到executor中,不是task中
	city_dict_broad = sc.broadcast(city_dict)
	def getLine(line):
		list01 = line.split(" ")
		#cityName = city_dict.get(int(list01[3]))
		# 使用广播变量的变量获取数据
		cityName = city_dict_broad.value.get(int(list01[3]))
		# print(cityName)
		return line + " " + cityName
	mapRdd = fileRdd.map(getLine)
	mapRdd.foreach(print)

	# 释放广播变量
	city_dict_broad.unpersist()
	# 使用完后,记得关闭
	sc.stop()

(四)本质与优势

广播变量本质上是一种优化手段。它的优势主要体现在两个方面:

  1. 减少数据传输量:通过广播一个 Driver 中较大的数据,可以减少每次从 Driver 复制的数据量,降低网络 I/O 损耗,从而提高整体性能。
  2. 优化表连接:在两张表进行 Join 操作时,如果一张表较小,可以将小表进行广播,然后与大表的每个部分进行 Join,这样就可以避免 Shuffle Join(Reduce Join),进一步提升性能。

需要注意的是,广播变量是只读变量,不能被修改

三、累加器(Accumulators)

(一)需求示例

        假设我们有搜狗日志的数据,现在需要统计 10 点搜索的数据一共有多少条。如果按照常规的方式编写代码,可能会出现问题。例如:

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    _sum = 0
    def sumTotalLine(tuple1):
        global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            _sum += 1

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(_sum) # 结果是0


    # 使用完后,记得关闭

        在 Spark 中,上述代码最终结果会是 0。因为 sum = 0 是在 Driver 端的内存中的,Executor 中程序对其进行累加操作并不能改变 Driver 端的结果。

(二)原理与功能

        累加器的功能是实现分布式的计算。它在每个 Task 内部构建一个副本进行累加,并且在最后返回每个 Task 的结果并进行合并。

官方API截图

(三)使用方法与示例代码修改

在 Spark 中使用累加器,首先需要创建一个累加器对象:

accumulator = sc.accumulator(0)

然后在需要进行计数累加的地方使用 accumulator.add(1)。例如:

def getLines(line, accumulator):
    accumulator.add(1)

# 对用户数据 RDD 进行处理并统计数据量
fileRdd.foreach(lambda line: getLines(line, accumulator))

最后可以通过 accumulator.value 获取累加的结果。完整代码如下:

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    accCounter = sc.accumulator(0)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    #_sum = 0
    def sumTotalLine(tuple1):
        #global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            accCounter.add(1)

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(accCounter.value) # 104694

    # 假如我不知道累加器这个操作,这个题目怎么做?
    print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())


    # 使用完后,记得关闭
    sc.stop()

四、总结

        Spark 中的广播变量和累加器是处理分布式计算中共享数据问题的有效工具。广播变量主要用于在多个 Task 之间共享只读数据,减少网络传输开销;累加器则用于实现分布式环境下的计数或累加操作,确保在不同 Task 中的计算结果能够正确地合并到 Driver 端。在实际的 Spark 大数据处理项目中,合理地运用广播变量和累加器能够显著提高程序的性能和计算的准确性。

        希望通过本文的介绍,读者能够对 Spark 中的共享变量有更深入的理解,并能够在自己的项目中熟练运用广播变量和累加器来优化数据处理流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/914511.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Matlab 火焰识别技术

课题介绍 森林承担着为人类提供氧气以及回收二氧化碳等废弃气体的作用,森林保护显得尤其重要。但是每年由于火灾引起的事故不计其数,造成重大的损失。如果有一款监测软件,从硬件处获得的图像中监测是否有火焰,从而报警&#xff0…

Group By、Having用法总结(常见踩雷点总结—SQL)

Group By、Having用法总结 目录 Group By、Having用法总结一、 GROUP BY 用法二、 HAVING 用法三、 GROUP BY 和 HAVING 的常见踩雷点3.1 GROUP BY 选择的列必须出现在 SELECT 中(🤣最重要的一点)3.2 HAVING 与 WHERE 的区别3.3 GROUP BY 可以…

《JavaEE进阶》----20.<基于Spring图书管理系统①(登录+添加图书)>

PS:关于接口定义 接口定义,通常由服务器提供方来定义。 1.路径:自己定义 2.参数:根据需求考虑,我们这个接口功能完成需要哪些信息。 3.返回结果:考虑我们能为对方提供什么。站在对方角度考虑。 我们使用到的…

并发基础:(淘宝笔试题)三个线程分别打印 A,B,C,要求这三个线程一起运行,打印 n 次,输出形如“ABCABCABC....”的字符串

🚀 博主介绍:大家好,我是无休居士!一枚任职于一线Top3互联网大厂的Java开发工程师! 🚀 🌟 在这里,你将找到通往Java技术大门的钥匙。作为一个爱敲代码技术人,我不仅热衷于探索一些框架源码和算法技巧奥秘,还乐于分享这些宝贵的知识和经验。 💡 无论你是刚刚踏…

华为ensp实验二--mux vlan的应用

一、实验内容 1.实验要求: 在交换机上创建三个vlan,vlan10、vlan20、vlan100,将vlan100设置为mux-vlan,将vlan10设置为group vlan,将vlan20设置为separate vlan;实现vlan10的设备在局域网内可以进行互通&…

Hadoop + Hive + Apache Ranger 源码编译记录

背景介绍 由于 CDH(Clouderas Distribution Hadoop )近几年已经开始收费并限制节点数量和版本升级,最近使用开源的 hadoop 搭了一套测试集群,其中的权限管理组件用到了Apache Ranger,所以记录一下编译打包过程。 组件…

物联网对商业领域的影响

互联网彻底改变了通信方式,并跨越了因地理障碍造成的人与人之间的鸿沟。然而,物联网(IoT)的引入通过使设备能够连接到互联网,改变了设备的功能。想象一下,你的闹钟连接到互联网,并且能够用你的声…

PYNQ 框架 - 中断(INTR)驱动

目录 1. 简介 2. 分析 2.1 Block Design 2.2 AXI Timer 2.2.1 IP 基本信息 2.2.2 IP 地址空间 2.2.3 级联模式 2.2.4 生成/捕获模式 2.3 AXI Interrupt 2.3.1 IP 基本信息 2.3.2 IP 地址空间 2.3.3 相关概念 2.3.4 参数配置 2.3.5 中断确认寄存器 3. PYNQ 代码 …

HTB:Photobomb[WriteUP]

目录 连接至HTB服务器并启动靶机 使用nmap对靶机进行端口开放扫描 再次使用nmap对靶机开放端口进行脚本、服务扫描 使用ffuf进行简单的子域名扫描 使用浏览器直接访问该域名 选取一个照片进行下载,使用Yakit进行抓包 USER_FLAG:a9afd9220ae2b5731…

ts枚举 enum

枚举( enum )可以定义⼀组命名常量,它能增强代码的可读性,也让代码更好维护。调用函数时传参时没有任何提示,编码者很容易写错字符串内容。并且⽤于判断逻辑的是连续且相关的⼀组值,那此时就特别适合使用枚…

Android Studio | 修改镜像地址为阿里云镜像地址,启动App

在项目文件的目录下的 settings.gradle.kts 中修改配置,配置中包含插件和依赖项 pluginManagement {repositories {maven { urluri ("https://www.jitpack.io")}maven { urluri ("https://maven.aliyun.com/repository/releases")}maven { urlu…

场景解决之mybatis当中resultType= map时,因某个字段为null导致返回的map的key不存在怎么处理

1、场景:通过查询数据表将返回结果封装到map当中返回,因某个字段为null,导致map当中key丢失 <select id"queryMyBonus" parameterType"com.cn.entity.student" resultType "map">SELECTb.projectName as "projectName",b.money…

初识算法 · 位运算常见总结(1)

目录 前言&#xff1a; 位运算基本总结 部分题目代码 前言&#xff1a; ​本文的主题是位运算&#xff0c;通过常见的知识点讲解&#xff0c;并且会附上5道简单的题目&#xff0c;5道题目的链接分别为&#xff1a;191. 位1的个数 - 力扣&#xff08;LeetCode&#xff09; 1…

国信证券造访图为科技,共探科技与资本交融新契机

在当今时代背景下&#xff0c;科技与资本的交融&#xff0c;是推动企业发展和产业升级的强大动力&#xff0c;两者相辅相成&#xff0c;共同塑造未来经济的新格局。今日&#xff0c;图为科技深圳总部迎来了国信证券董事总经理于吉鑫一行的造访。 这不是一场简单的会面&#xff…

计算机网络:运输层 —— TCP/IP运输层中的两个重要协议

文章目录 TCP 协议工作方式建立连接&#xff08;三次握手&#xff09;释放连接&#xff08;四次挥手&#xff09; 首部格式 UDP 协议首部格式适用场景 TCP 与 UDP 的对比无连接的UDP和面向连接的TCP对单播、多播和广播的支持情况对应用层报文的处理对数据传输可靠性的支持情况U…

机器学习:决策树——ID3算法、C4.5算法、CART算法

决策树是一种常用于分类和回归问题的机器学习模型。它通过一系列的“决策”来对数据进行分类或预测。在决策树中&#xff0c;每个内部节点表示一个特征的测试&#xff0c;每个分支代表特征测试的结果&#xff0c;而每个叶节点则表示分类结果或回归值。 决策树工作原理 根节点&…

Windows,虚拟机Ubuntu和开发板三者之间的NFS服务器搭建

Windows,虚拟机Ubuntu和开发板三者之间的NFS服务器搭建 &#xff08;1&#xff09;虚拟机 ubuntu 要使用桥接模式&#xff0c;不能使用其他模式 &#xff08;2&#xff09;通过网线将PC和开发板网口直连:这样的连接&#xff0c;开发板是无法连接外网的 &#xff08;3&#xff…

ReactPress:重塑内容管理的未来

ReactPress Github项目地址&#xff1a;https://github.com/fecommunity/reactpress 欢迎提出宝贵的建议&#xff0c;欢迎一起共建&#xff0c;感谢Star。 ReactPress&#xff1a;重塑内容管理的未来 在当今信息爆炸的时代&#xff0c;一个高效、易用的内容管理系统&#xff0…

WebRTC视频 01 - 视频采集整体架构

一、前言&#xff1a; 我们从1对1通信说起&#xff0c;假如有一天&#xff0c;你和你情敌使用X信进行1v1通信&#xff0c;想象一下画面是不是一个大画面中有一个小画面&#xff1f;这在布局中就叫做PIP&#xff08;picture in picture&#xff09;&#xff1b;这个随手一点&am…

SQL Servers审核提高数据库安全性

什么是SQL Server审核&#xff1f; SQL Server审核包括追踪和审查发生在SQL Server上的所有活动&#xff0c;检测潜在的威胁和漏洞&#xff0c;能够监控和记录对服务器设置的每次更改。此外&#xff0c;可以帮助管理员可以轻松地追踪数据库中特定表中的所有服务器活动&#xf…