Spark: a little summary

转眼写spark一年半了,从之前写机器学习组件、做olap到后面做图计算,一直都是用的spark,惭愧的是没太看过里面的源码。这篇文章的目的是总结一下Spark里面比较重要的point,重点部分会稍微看一下源代码,因为spark是跟clickhouse同级别的东西,复杂度也是相当的高,因为时间原因,本文也不会过度地进行展开。
只能说:源码用时方恨少。以前在写spark的时候没看源码实现真的挺遗憾的,现在也没那么多时间让我一点点细看了,之后稳定下来之后希望能有机会吧。

数据抽象

RDD

RDD 是一种抽象,是 Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体。
数据分片(Partitions)是 RDD 抽象的重要属性之一。
在初步认识了 RDD 之后,接下来咱们换个视角,从 RDD 的重要属性出发,去进一步深入理解 RDD。
要想吃透 RDD,我们必须掌握它的 4 大属性:

  • partitions:数据分片(由partitioner决定)
  • partitioner:分片切割规则
  • dependencies:RDD 依赖
  • compute:转换函数

在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。
image.png
在 Spark 中,创建 RDD 的典型方式有两种:

  • 通过 SparkContext.parallelize 在内部数据之上创建 RDD;
  • 通过 SparkContext.textFile 等 API 从外部数据创建 RDD。

使用分区变量而非RDD变量

在下面的例子中,RDD每一条都会产生一个MD5对象,这是没必要的。

// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ 
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  // 获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  // 使用MD5计算哈希值
  val hash = md5.digest(word.getBytes).mkString
  // 返回哈希值与数字1的Pair
  (hash, 1)
}

使用mapPartitions可以以数据分区为粒度的数据转换:

// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  // 注意!这里是以数据分区为粒度,获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  val newPartition = partition.map( word => {
  // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
    (md5.digest(word.getBytes()).mkString,1)
  })
  newPartition
})

DataFrame

进程模型

image.png
对于一个完整的 RDD,每个 Executors 负责处理这个 RDD 的一个数据分片子集。每当任务执行完毕,Executors 都会及时地与 Driver 进行通信、汇报任务状态。Driver 在获取到 Executors 的执行进度之后,结合计算流图的任务拆解,依次有序地将下一阶段的任务再次分发给 Executors 付诸执行,直至整个计算流图执行完毕。

调度模型

image.png
DAGScheduler 的主要职责有三个:

  • 根据用户代码构建 DAG;
  • 以 Shuffle 为边界切割 Stages;
  • 基于 Stages 创建 TaskSets,并将 TaskSets 提交给 TaskScheduler 请求调度。

SchedulerBackend 用一个叫做 ExecutorDataMap 的数据结构,来记录每一个计算节点中 Executors 的资源状态。这里的 ExecutorDataMap 是一种 HashMap,它的 Key 是标记 Executor 的字符串,Value 是一种叫做 ExecutorData 的数据结构。
ExecutorData 用于封装 Executor 的资源状态,如 RPC 地址、主机地址、可用 CPU 核数和满配 CPU 核数等等,它相当于是对 Executor 做的“资源画像”。
WorkerOffer 封装了 Executor ID、主机地址和 CPU 核数,它用来表示一份可用于调度任务的空闲资源。
对于给定的 WorkerOffer,TaskScheduler 是按照任务的本地倾向性,来遴选出 TaskSet 中适合调度的 Tasks。
像上面这种定向到计算节点粒度的本地性倾向,Spark 中的术语叫做 NODE_LOCAL。除了定向到节点,Task 还可以定向到进程(Executor)、机架、任意地址,它们对应的术语分别是 PROCESS_LOCAL、RACK_LOCAL 和 ANY。
对于倾向 PROCESS_LOCAL 的 Task 来说,它要求对应的数据分区在某个进程(Executor)中存有副本;而对于倾向 RACK_LOCAL 的 Task 来说,它仅要求相应的数据分区存在于同一机架即可。ANY 则等同于无定向,也就是 Task 对于分发的目的地没有倾向性,被调度到哪里都可以。
image.png
总结
任务调度分为如下 5 个步骤:

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。
  6. 一个job由action算子来触发,每个job又会根据shuffle情况划分出多个stage,每个stage中又会划分出多个task,再根据taskScheduler分配到各个Excecutor。

Shuffle

Shuffle 文件的生成,是以 Map Task 为粒度的,Map 阶段有多少个 Map Task,就会生成多少份 Shuffle 中间文件。
image.png
shuffle文件包括两个文件,**一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。(典中典的列式存储)**换句话说,index 文件标记了 data 文件中的哪些记录,应该由下游 Reduce 阶段中的哪些 Task(简称 Reduce Task)消费。
假设 Reduce 阶段有 N 个 Task,这 N 个 Task 对应着 N 个数据分区,那么在 Map 阶段,每条记录应该分发到哪个 Reduce Task,是由下面的公式来决定的。

P = Hash(Record Key) % N

shuffle生成过程
image.png
在生成中间文件的过程中,**Spark 会借助一种类似于 Map 的数据结构,来计算、缓存并排序数据分区中的数据记录。**这种 Map 结构的 Key 是(Reduce Task Partition ID,Record Key),而 Value 是原数据记录中的数据值,如图中的“内存数据结构”所示。
对于数据分区中的数据记录,Spark 会根据我们前面提到的公式 1 逐条计算记录所属的目标分区 ID,然后把主键(Reduce Task Partition ID,Record Key)和记录的数据值插入到 Map 数据结构中。当 Map 结构被灌满之后,Spark 根据主键对 Map 中的数据记录做排序,然后把所有内容溢出到磁盘中的临时文件,如图中的步骤 1 所示。
随着 Map 结构被清空,Spark 可以继续读取分区内容并继续向 Map 结构中插入数据,直到 Map 结构再次被灌满而再次溢出,如图中的步骤 2 所示。就这样,如此往复,直到数据分区中所有的数据记录都被处理完毕。
到此为之,磁盘上存有若干个溢出的临时文件,而内存的 Map 结构中留有部分数据,Spark 使用归并排序算法对所有临时文件和 Map 结构剩余数据做合并,分别生成 data 文件、和与之对应的 index 文件,如图中步骤 4 所示。Shuffle 阶段生成中间文件的过程,又叫 Shuffle Write。

数据聚合

groupbykey的全量shuffle开销很大(不做map端聚合,只做reduce端聚合),因此普遍使用 reduceByKey、aggregateByKey 和 combineByKey而不是groupbykey。

  • reduceByKey 在落盘与分发之前,会先在 Shuffle 的 Map 阶段做初步的聚合计算。在 Map 阶段,reduceByKey 把 Key 同为 Streaming 的两条数据记录聚合为一条,聚合逻辑就是由函数 f 定义的、取两者之间 Value 较大的数据记录,这个过程我们称之为“Map 端聚合”。相应地,数据经由网络分发之后,在 Reduce 阶段完成的计算,我们称之为“Reduce 端聚合”。reduceByKey 算子的局限性,在于其 Map 阶段与 Reduce 阶段的计算逻辑必须保持一致,这个计算逻辑统一由聚合函数 f 定义。当一种计算场景需要在两个阶段执行不同计算逻辑的时候,reduceByKey 就爱莫能助了。
  • aggregateByKey可以自定义map和reduce端聚合,是更加灵活的聚合算子

内存管理和存储系统

Shuffle 中间文件消耗的是节点磁盘,而广播变量主要占用节点的内存空间,RDD Cache 则是“脚踏两条船”,既可以消耗内存,也可以消耗磁盘。
image.png
Execution Memory 用来执行分布式任务。分布式任务的计算,主要包括数据的转换、过滤、映射、排序、聚合、归并等环节,而这些计算环节的内存消耗,统统来自于 Execution Memory。
Storage Memory 用于缓存分布式数据集,比如 RDD Cache、广播变量等等。RDD Cache 指的是 RDD 物化到内存中的副本。在一个较长的 DAG 中,如果同一个 RDD 被引用多次,那么把这个 RDD 缓存到内存中,往往会大幅提升作业的执行性能。
image.png
存储系统主要是由BlockManager模块负责:
image.png
BlockManager 的核心职责,在于管理数据块的元数据(Meta data),这些元数据记录并维护数据块的地址、位置、尺寸以及状态。
image.png

Cache

image.png
cache 函数实际上会进一步调用 persist(MEMORY_ONLY)来完成计算

数据的准备、重分配和持久化

这部分相关的算子如下:
image.png
首先,在数据准备阶段,union 与 sample 用于对不同来源的数据进行合并与拆分。
我们从左往右接着看,接下来是数据预处理环节。较为均衡的数据分布,对后面数据处理阶段提升 CPU 利用率更有帮助,可以整体提升执行效率。那这种均衡要怎么实现呢?没错,这时就要 coalesce 与 repartition 登场了,它们的作用就是重新调整 RDD 数据分布。
在数据处理完毕、计算完成之后,我们自然要对计算结果进行收集。Spark 提供了两类结果收集算子,一类是像 take、first、collect 这样,把结果直接收集到 Driver 端;另一类则是直接将计算结果持久化到(分布式)文件系统,比如咱们这一讲会提到的 saveAsTextFile。

  • RDD 的 sample 算子用于对 RDD 做随机采样,从而把一个较大的数据集变为一份“小数据”。相较其他算子,sample 的参数比较多,分别是 withReplacement、fraction 和 seed。因此,要在 RDD 之上完成数据采样,你需要使用如下的方式来调用 sample 算子:sample(withReplacement, fraction, seed)。
  • 开发者可以使用 repartition 算子随意调整(提升或降低)RDD 的并行度,而 coalesce 算子则只能用于降低 RDD 并行度。repartition和coalesce相比较,repartition由于引入了shuffle机制,对数据进行打散,混洗,重新平均分配,所以repartition操作较重,但是数据分配均匀。而coalesce只是粗力度移动数据,没有平均分配的过程,会导致数据分布不均匀,在计算时出现数据倾斜。

image.png

广播变量和累加器

  • 广播变量的分发不以task为粒度,而是以Executor为粒度,这样就减少了变量分发的开销
  • 累加器用于存储全局变量

配置项

image.png

Catalyst

Catalyst 优化器,它的职责在于创建并优化执行计划,它包含 3 个功能模块,分别是创建语法树并生成执行计划、逻辑阶段优化和物理阶段优化。
在 Catalyst 优化环节,Spark SQL 首先把用户代码转换为 AST 语法树,又叫执行计划,然后分别通过逻辑优化和物理优化来调整执行计划。逻辑阶段的优化,主要通过先验的启发式经验,如谓词下推、列剪枝,对执行计划做优化调整。而物理阶段的优化,更多是利用统计信息,选择最佳的执行机制、或添加必要的计算节点。

Tungsten

Tungsten 用于衔接 Catalyst 执行计划与底层的 Spark Core 执行引擎,它主要负责优化数据结果与可执行代码。
Tungsten 设计并实现了一种叫做 Unsafe Row 的二进制数据结构。Unsafe Row 本质上是字节数组,它以极其紧凑的格式来存储 DataFrame 的每一条数据记录,大幅削减存储开销,从而提升数据的存储与访问效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/403429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot+vue的靓车汽车销售网站(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

Android中Transition过渡动画的简单使用

前些天发现了一个蛮有意思的人工智能学习网站,8个字形容一下"通俗易懂&#xff0c;风趣幽默"&#xff0c;感觉非常有意思,忍不住分享一下给大家。 &#x1f449;点击跳转到教程 一、布局xml文件代码如下&#xff1a; <?xml version"1.0" encoding&quo…

【ctfshow—web】——信息搜集篇1(web1~20详解)

ctfshow—web题解 web1web2web3web4web5web6web7web8web9web10web11web12web13web14web15web16web17web18web19web20 web1 题目提示 开发注释未及时删除 那就找开发注释咯&#xff0c;可以用F12来查看&#xff0c;也可以CtrlU直接查看源代码呢 就拿到flag了 web2 题目提示 j…

H5移动端文件预览pdf

H5移动端文件预览pdf 需求&#xff1a;H5页面嵌入浙政钉&#xff0c;需要文件预览Pdf。 试用了多个插件&#xff0c;踩了很多坑&#xff0c;如果小伙伴有类似填坑经历&#xff0c;并成功解决&#xff0c;感谢留言指点&#xff01;&#xff01;&#xff01; 先讲最终方案&#x…

[云原生] 二进制安装K8S(中)

书接上文&#xff0c;我们继续部署剩余的插件 一、K8s的CNI网络插件模式 2.1 k8s的三种网络模式 K8S 中 Pod 网络通信&#xff1a; &#xff08;1&#xff09;Pod 内容器与容器之间的通信 在同一个 Pod 内的容器&#xff08;Pod 内的容器是不会跨宿主机的&#xff09;共享…

VG5032VDN 电压控制的晶体振荡器 (VCXO) 输出:LVDS

在今天繁复多变的电子市场中&#xff0c;设计师不断寻求更稳定、更灵活的时钟解决方案&#xff0c;以满足从通信网络到工业控制系统的广泛应用。VG5032VDN VCXO是一款高性能的电压控制晶体振荡器 它结合了高性能、多用途性和紧凑设计&#xff0c;是一款适合广泛应用的晶体振荡…

玩主机游戏能省去不少烦恼?+主机该购买哪台?

文/嘉兰SK 来到次世代&#xff0c;玩家们最关心的问题逐渐变成了购买的游戏能否支持升级。 各个游戏厂商也没有闲着。 此前还有标准版、黄金版、终极版、决定版等一系列。 想出很多招数。 于是很多新玩家开始疑惑&#xff1a;你们都说玩主机游戏可以省去很多麻烦&#xff0c;可…

航空航天5G智能工厂数字孪生可视化平台,推进航空航天数字化转型

航空航天5G智能工厂数字孪生可视化平台&#xff0c;推进航空航天数字化转型。随着科技的不断发展&#xff0c;数字化转型已经成为各行各业关注的焦点。航空航天业作为高端制造业的代表&#xff0c;也在积极探索数字化转型之路。为了更好地推进航空航天数字化转型&#xff0c;一…

设置主从复制时发生报错Could not find first log file name in binary log index file‘;解决方案

如图所示&#xff0c;slave_io_runnind:no,slave_sql_running:yes 此时&#xff0c;主从配置错误&#xff0c;我们可以查看Last_IO_Error:来查看报错信息 此时&#xff0c;我们需要停止从服务器的主从服务&#xff0c; mysql> stop slave; Query OK, 0 rows affected, 1 w…

【汽车电子】万字详解汽车标定与XCP协议

XCP协议基础 文章目录 XCP协议基础一、引言1.1 什么是标定1.2 什么时候进行标定1.3 标定的意义 二、XCP协议简介2.1 xcp简介2.2 XCP如何加快开发过程&#xff1f;2.3 XCP的主要作用 三、XCP工作过程3.1 工作过程3.2 通讯模型3.3 测量与标定 四、XCP报文解析4.1 数据包报文格式4…

petalinux_zynq7 驱动DAC以及ADC模块之六:qt显示adc波形

前文&#xff1a; petalinux_zynq7 C语言驱动DAC以及ADC模块之一&#xff1a;建立IPhttps://blog.csdn.net/qq_27158179/article/details/136234296petalinux_zynq7 C语言驱动DAC以及ADC模块之二&#xff1a;petalinuxhttps://blog.csdn.net/qq_27158179/article/details/1362…

【C语言】走迷宫之推箱子

前言&#xff1a; 在上一篇文章当中我介绍了一个走迷宫的写法&#xff0c;但是那个迷宫没什么可玩性和趣味性&#xff0c;所以我打算在迷宫的基础上加上一个推箱子&#xff0c;使之有更好的操作空间&#xff0c;从而增强了游戏的可玩性和趣味性。 1. 打印菜单 void menu() {…

BUUCTF第二十四、二十五题解题思路

目录 第二十四题CrackRTF 第二十五题[2019红帽杯]easyRE1 第二十四题CrackRTF 查壳 无壳&#xff0c;32位&#xff0c;用32位IDA打开&#xff0c;打开后的main函数很短&#xff0c;可以找到一句“jmz _main_0”——跳转到 _main_0&#xff0c;说明真正的主函数是_main_0&am…

opencv图像的本质

目的 OpenCV是一个跨平台的库&#xff0c;使用它我们可以开发实时的计算机视觉应用程序。 它主要集中在图像处理&#xff0c;视频采集和分析&#xff0c;包括人脸检测和物体检测等功能。 数字图像在计算机中是以矩阵形式存储的&#xff0c;矩阵中的每一个元素都描述一定的图像…

学生个性化成长平台搭建随笔记

1.Vue的自定义指令 在 Vue.js 中&#xff0c;我们可以通过 Vue.directive() 方法来定义自定义指令。具体来说&#xff0c;我们需要传递两个参数&#xff1a; 指令名称&#xff1a;表示我们要定义的指令名称&#xff0c;可以是一个字符串值&#xff0c;例如&#xff1a;has-rol…

时域相位分析技术 和空域相位分析技术

l) 时域相位分析技术 在光 学测量 的许 多情况 下 &#xff0c; 时变图像信 号 的背景光 强 与调制 度可 以看作是 常 数 &#xff0c;并且 其光 强 随时 间 的变化也满足 正 弦条件 。 那 么针 对某 一 空 间采样 点 (x &#xff0c;y) &#xff0c; 某时刻 采 集到 的光 强 可…

CleanMyMacX4.15破解版下载安装包步骤教程

安装CleanMyMac X的步骤如下&#xff1a; 在中文网站上进行安装包的免费下载。找到下载完成的安装包&#xff0c;然后双击打开。用鼠标拖动CleanMyMac X应用程序的图标&#xff0c;将其拖放至右侧的“应用程序”文件夹内。稍等片刻&#xff0c;CleanMyMac X应用程序就会出现在…

使用 package.json 配置代理解决 React 项目中的跨域请求问题

使用 package.json 配置代理解决 React 项目中的跨域请求问题 当我们在开发前端应用时&#xff0c;经常会遇到跨域请求的问题。为了解决这个问题&#xff0c;我们可以通过配置代理来实现在开发环境中向后端服务器发送请求。 在 React 项目中&#xff0c;我们可以使用 package…

【Docker实操】部署php项目

概述 最终达成的容器部署结构和原理如下图&#xff1a; 一、获取nginx、php官方镜像 docker pull nginx //拉取nginx官方镜像 docker pull php:7.4-fpm //拉取php官方镜像需要获取其他可用的php版本&#xff0c;可以上【docker hub】搜索【php】&#xff0c;所有的【xxx-fp…

Windows 7 驱动安装

Windows 7 驱动安装 1. 驱动安装2. 安装驱动和运行环境References 1. 驱动安装 驱动精灵 标准版 驱动精灵 万能网卡版 注意&#xff1a;更改安装路径和安装选项 ​​​ 2. 安装驱动和运行环境 避免自行管理混乱。 References [1] Yongqiang Cheng, https://yongqiang.blo…