【Spark】Spark的两种核心Shuffle工作原理详解

Spark 的shuffle机制

一、Spark ShuffleManager 发展历程

  • Spark 1.1.0 之前
    • 在 Spark 1.1.0 之前,Spark 使用 BlockStoreShuffleFetcher 来处理 Shuffle 操作。这个实现主要依赖于直接从 BlockManager 获取 Shuffle 数据,并通过网络进行交换。
      在这里插入图片描述
  • Spark 1.1.x(默认使用 HashShuffleManager)
    • HashShuffleManager 使用哈希算法将数据划分到不同的分区。在进行 Shuffle 操作时,Spark 会为每个键计算一个哈希值,然后根据该值将数据分配到相应的分区。
      在这里插入图片描述
  • Spark 1.2.x 及以后版本(默认使用 SortShuffleManager)
    • 在这里插入图片描述
  • Spark 2.0.x 及以后版本(不在使用HashShuffleManager,默认使用 SortShuffleManager)
    • 在这里插入图片描述

二、HashShuffleManager 原理

假设:每个 Executor 只有1个CPU core,也就是说,无论这个 Executor 上分配多少个 task
线程,同一时间都只能执行一个 task 线程。

  • 未经优化的HashShuffleManager工作原理
    在这里插入图片描述

1.Shuffle Write 阶段

  • 将每个task处理的数据按照key进行hash算法,从而将相同key都写入同一个磁盘文件,而每一个磁盘文件都只属于下游stage的一个task,在数据写入磁盘之前,会现将数据写入内存缓冲区中,当内存缓冲区填满以后,才会溢写到磁盘文件中去。
  • 下一个stage的task有多少个,当前stage的每个task就要创建多少分磁盘文件,比如当前stage有20个task,总共有4个Executor,每个Executor执行5个task,下一个stage总共有40个task,那么每个Executor上就要创建200个磁盘文件,所有Executor会创建800个磁盘文件,由此可见,未经过优化的shuffle write操作所产生的磁盘文件数据是惊人的。

2. Shuffle Read 阶段

  • 将上一个stage的计算结果中所有相同的key,从各个节点上通过网络都拉取到自己所在的节点上,然后按照key进行聚合或连接等操作。
  • 由于shuffle write阶段,map task给下游stage的每个reduce task都创建了一个磁盘文件,因此shuffle read阶段,每个reduce task只要从上游stage的所有map task所在节点上拉取属于自己的那个磁盘文件即可。
  • shuffle read的拉取过程是一边拉取一边聚合的,每个shuffle read task都有一个自己的buffer缓冲,每次只能拉取与buffer缓冲相同大小的数据,然后在内存中进行聚合等操作,聚合完一批数据,再拉取下一批,以此类推,直接所有数据拉取完,并得到最终结果。
  • 优化后的HashShuffleManager工作原理
    在这里插入图片描述

为了优化 HashShuffleManager,可以启用参数 spark.shuffle.consolidateFiles,该参数的默认值为 false,启用后设置为 true,可以启动优化机制。

开启优化机制后的效果:

1.Shuffle Write 阶段

  • 在 shuffle write 过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。
  • 一个Executor上有多少个 CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。
  • 当执行下一批task时,下一批task就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件。
  • consolidate 机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。
  • 比如当前stage有20个task,总共有4个Executor,每个Executor执行5个task,下一个stage总共有40个task,那么每个Executor上就要创建40个磁盘文件,所有Executor会创建160个磁盘文件,由此可见,优化后shuffle write操作所产生的磁盘文件较优化前明显减少。

2. Shuffle Read 阶段

  • 由于shuffle write阶段,每个Executor仅为下游每个reduce task创建一个磁盘文件,在shuffle read阶段,每个reduce task只要从上游stage的所有map task所在节点上拉取属于自己的那个磁盘文件即可。

三、SortShuffleManager 原理

SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。 当 shuffle
read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数(默认为
200)的值且不是聚合类的shuffle算子时,就会启用 bypass 机制。

  • 普通运行机制的SortShuffleManager工作原理
    在这里插入图片描述
  • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子, 可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会 选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。 排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通 过 Java 的BufferedOutputStream 实现的。BufferedOutputStream 是 Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲填满之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。
  • 一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作, 也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最 终的磁盘文件之中。此外,由于一个 task就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。
  • SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 20 个 task,总共有 4 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 40 个 task。由于每个 task 最终只有一个磁盘文件,因此 此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有
    20 个磁盘文件。
  • bypass运行机制的SortShuffleManager工作原理
    在这里插入图片描述
  • 每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
  • 该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此 少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
  • 而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制 不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write
    过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/934763.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Y3编辑器文档4:触发器1(对话、装备、特效、行为树、排行榜、不同步问题)

文章目录 一、触发器简介1.1 触发器界面1.2 ECA语句编辑及快捷键1.3 参数设置1.4 变量设置1.5 实体触发器1.6 函数库与触发器复用 二、触发器的多层结构2.1 子触发器(在游戏内对新的事件进行注册)2.2 触发器变量作用域2.3 复合条件2.4 循环2.5 计时器2.6…

Scala递归中求汉罗塔游戏的步骤

记:f(n,"A","B","C")表示n个盘子从A柱子上移动到C柱子上,借用B柱子的过程 f(要移动的盘子的个数,起点,辅助柱子,终点) 1.基本情况(直接能求的):f(1,"A","B&…

UE5制作简单水材质

首先准备一张水材质法线贴图,也可以去 爱给网 和 花瓣网 找一张 导入后创建一个材质,如图所示 根据 Unreal 文档,吸收系数设置为红色 0.0033、绿色 0.0016、蓝色 0.0011。水看起来会更真实 然后放到一块平面上,就成功了&#xf…

【Web】2024“国城杯”网络安全挑战大赛题解

目录 Ez_Gallery 法一:shell盲注 法二:反弹shell 法三:响应钩子回显 Easy Jelly 法一:无回显XXE 法二:Jexl表达式RCE signal 法一:SSRF 法二:filterchain RCE Ez_Gallery 用这个bp验证…

【Rust 学习笔记】Rust 基础数据类型介绍——数组、向量和切片

博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 博客内容主要围绕: 5G/6G协议讲解 高级C语言讲解 Rust语言讲解 文章目录 Rust 基础数据类型介绍——数组、向量和切片一、数组、向量和…

【软件工程】一篇入门UML建模图(状态图、活动图、构件图、部署图)

🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀软件开发必练内功_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前…

BGP路由优选

BGP是一个应用广泛的边界网关路由协议,定义了多种路径属性,拥有丰富的路由策略工具 BGP路由的各种属性的操作会影响路由的优选,从而对网络流量产生影响,BGP路由的优选规则十分重要 BGP路由优选的规则 当到达同一个目的网段存在…

路径规划之启发式算法之十四:蜘蛛蜂优化算法(Spider Wasp Optimizer, SWO)

蜘蛛蜂优化算法(Spider Wasp Optimizer, SWO)是一种受自然界中蜘蛛蜂行为启发的元启发式智能优化算法。由Mohamed Abdel-Basset等人于2023年提出,算法模拟了雌性蜘蛛蜂的狩猎、筑巢和交配行为,具有独特的更新策略,适用于具有不同探索和开发需求的广泛优化问题。 一、算法背…

在 Ansys Q3D 中求解直流和交流电感

提取电缆的电感对于确保电气和电子系统的性能和可靠性至关重要。本篇博客文章将介绍使用 Ansys Q3D 求解直流和交流电感的过程。 概述 在这个例子中,我们将考虑一个由两组电缆组成的简单几何:正极和负极,如下所示: 可以使用“自…

算法日记 47 day 最小生成树(prim,kruskal)

今天主要是针对最小生成树的两种算法。 用题目来举例 题目:寻宝 53. 寻宝(第七期模拟笔试) (kamacoder.com) 题目描述 在世界的某个区域,有一些分散的神秘岛屿,每个岛屿上都有一种珍稀的资源或者宝藏。国王打算在这…

三、nginx实现lnmp+discuz论坛

lnmp l:linux操作系统 n:nginx前端页面 m:mysql数据库,账号密码,数据库等等都保存在这个数据库里面 p:php——nginx擅长处理的是静态页面,页面登录账户,需要请求到数据库&#…

“, ”逗号分隔打印序列不显最后一个(Python)

可以if条件语句过滤,更可以’, .join()拼接序列省却循环打印。 (笔记模板由python脚本于2024年12月10日 19:03:54创建,本篇笔记适合学过Python基本数据类型的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网:https://www.python.org/ Fr…

初阶2 顺序表

本章重点 线性表顺序表 1.线性表 线性表(linear list)是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使 用的数据结构,常见的线性表:顺序表、链表、栈、队列、字符串… 线性表在逻辑上是线性结构&#xff0…

破局沉寂的区块链市场:未来之路与战略思考

近年来,区块链行业经历了高速增长、泡沫破裂和市场低谷。如今,尽管技术发展仍在持续,市场热度却明显降温。无论是公链项目、去中心化金融(DeFi),还是NFT和GameFi,许多领域都陷入了创新瓶颈和用户…

leetcode-289.生命游戏-day3

时间复杂度O(Mn) public void gameOfLife(int[][] board){if(board.length 0 || board[0].length0) return;int m board.length, n board[0].length;int[] neighbor {0, 1, -1};for(int i 0; i < m; i)for(int j 0; j < n; j)if(board[i][j] % 10 1)for(int k 0…

SYN6288语音合成模块使用说明(MicroPython、STM32、Arduino)

模块介绍 SYN6288中文语音合成模块是北京宇音天下科技有限公司推出的语音合成模块。该模块通过串口接收主控传来的语音编码后&#xff0c;可自动进行自然流畅的中文语音播报。 注&#xff1a;SYN6288模块无法播报英文单词和句子&#xff0c;只能按字母播报英文 &#xff1b;而…

JS API事件流

事件流两个阶段说明 目标&#xff1a;能够说出事件流经过的2个阶段 事件流指的是事件完整执行过程的流动路径 说明&#xff1a;假设页面里有个div&#xff0c;当触发事件时&#xff0c;会经历两个阶段&#xff0c;分别是捕获阶段、冒泡阶段 简单来说&#xff1a;捕获阶段是 …

15.Java 网络编程(网络相关概念、InetAddress、NetworkInterface、TCP 网络通信、UDP 网络通信、超时中断)

一、网络相关概念 1、网络通信 网络通信指两台设备之间通过网络实现数据传输&#xff0c;将数据通过网络从一台设备传输到另一台设备 java.net 包下提供了一系列的类和接口用于完成网络通信 2、网络 两台以上设备通过一定物理设备连接构成网络&#xff0c;根据网络的覆盖范…

Moretl轻量化日志采集工具

永久免费: 至Gitee下载 使用教程: Moretl使用说明 用途 定时全量或增量采集工控机,电脑文件或日志. 优势 开箱即用: 解压直接运行.不需额外下载.管理设备: 后台统一管理客户端.无人值守: 客户端自启动,自更新.稳定安全: 架构简单,兼容性好,通过授权控制访问. 架构 技术架…

Spring Security6.3 自定义AuthorizationManager问题

项目环境&#xff1a; Springboot3.3.5, 对应的SpringFrameWork6.1&#xff0c;Security为6.3 问题&#xff1a;我想自定义AuthorizationManager接口实现类&#xff0c;在里面判断如果角色为amdin则放行请求&#xff1b; 在AdminAuthorizationManager类的check()方法中pass变量…