【Spark系列1】DAG中Stage和Task的划分全流程

一、整体流程

每个Aciton操作会创建一个JOB,JOB会提交给DAGScheduler,DAGScheduler根据RDD依赖的关系划分为多个Stage,每个Stage又会创建多个TaskSet,每个TaskSet包含多个Task,这个Task就是每个分区的并行计算的任务。DAGScheduler将TaskSet按照顺序提交给TaskScheduler,TaskScheduler将每一个任务去找SchedulerBackend申请执行所需要的资源,获取到资源后,SchedulerBackend将这些Task提交给Executor,Executor负责将这些任务运行起来。

二、JOB提交

2.1、为什么需要action操作

在Spark中,分为transformation操作和action操作。执行用户程序时,transformation操作将一个RDD转换成了新的RDD,并在compute()函数中,记录了如何根据父RDD计算出当前RDD的数据、RDD如何分区等信息,并且能够得出最后一个RDD的数据。 但是RDD中的每个分区中依然是一条一条的分散的数据,那么要对最后一个RDD执行什么操作呢?这就是action操作的作用。

2.2、Job提交

每个action操作都会生成一个Job,这个Job包含了需要计算的RDD对象、需要计算的分区、需要执行什么样的计算。RDD和用户执行的计算都是可以序列化的,RDD序列化之后,在Executor中反序列化之后即可得到该RDD对象,再根据对象compute()函数就可以计算出某个分区的数据。JOB中包含的数据如下所示

2.3、分布式执行

当提交Job以后,就可以将Job划分为多个并行的任务,每个任务计算指定分区的一个分区即可。通过RDD的计算函数即可计算出该分区的数据,今儿计算出分区的结果。

三、Stage划分

3.1、宽依赖和窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

3.2、Stage划分

在用户编写的一系列转换中,多个RDD可能既形成了多次窄依赖,也形成了多次宽依赖,连续的窄依赖可以通过一个任务进行流水线处理,但是如果遇到了宽依赖,就必须先将父RDD的所有数据都进行计算并保存起来,再进行RDD的运算。在一个Job中,action操作知识定义了在最后的RDD中执行何种操作,而最后的RDD会依赖上个RDD,上个RDD又会有其他依赖,这样就形成了一系列的依赖关系。如果为宽依赖的话,就在依赖的地方进行切分,先将宽依赖的父RDD进行计算出来,再计算后续的RDD,按照快依赖被划分的过程,即为Stage划分的过程。

如上图所示,rdd1->rdd2,rdd3->rdd4是窄依赖,rdd2->rdd3,rdd4->rdd5是宽依赖。在发生shuffle的位置,Spark将计算分为两个阶段分别执行,每发生一次shuffle,Spark就将计算划分为先后的两个阶段,如下图

在划分阶段的过程中,对于某个阶段而言其并行的计算任务都完全相同,因此在Job执行的过程中,并行计算就是指每个阶段中任务并行的计算。如在Stage1中,每个分区的数据可以使用一个任务进行计算。10000个分区即可在集群中并行运行10000个任务进行计算。如果集群资源不够,可以将10000个任务依次在集群中运行,直到运行完毕,再进行Stage2的计算。Stage2也会根据分区数启动多个任务并行的加载Stage1生成的数据,完成Stage2的计算。

在一个Job的运行过程中,所有的Stage其实都是为最后一个Stage做准备,因为action操作只需要最后一个RDD的数据。因此最后一个Stage称为ResultStage,之前所有的Stage都是由Shuffle引起的中间计算过程,被称为ShuffleMapStage。其过程如下图

3.3、Spark实现

再Spark实现中,SparkContext将Job提交至DAGScheduler,DAGScheduler获取Job中执行action操作的RDD,将最后执行action操作的RDD划分到最后的ResultStage中,然后遍历该RDD的依赖和所有的父依赖,每遇到宽依赖就将两个RDD划分到两个不同的Stage中,遇到窄依赖就将窄依赖的多个RDD划分到一个Stage中,经过这次操作,一个RDD就划分为有多个依赖关系的Stage。再每个Stage中,所有的RDD之间都是窄依赖的关系,Stage之间的RDD都是宽依赖的关系。DAGScheduler将最初被依赖的Stage提交,计算该Stage中的数据,计算完成后,再将后续的Stage提交,知道最后运行的ResultStage,则整个计算Job完成。ResultStage和ShuffleMapStage结构如下图

在生成ShuffleapStage时,ShuffleDependency起到了承上启下的作用,如果两个RDD之间为宽依赖,子RDD的依赖为ShuffleDependency;在划分Stage的时候,父Stage会保存该ShuffleDependency,以便在执行父Stage的时候,根据ShuffleDependency获取Shuffle的写入器,在子Stage执行的时候,会根据RDD的依赖关系使用相同的ShuffleDependency获取Shuffle的读取器。

在计算过程中,ShuffleMapStage会生成该Stage的结果,为下一个Stage提供数据,计算下一个Stage的RDD的时候,会拉取上一个Stage的计算结果。上一个Stage的计算保存在哪呢?答案是Spark的组件MapOutputTracker。MapOutputTracker也是主从结构,Executor端是MapOutputTrackerWroker,当ShuffleMapStage的任务运行完成后,会通过Executor上的MapOutputTrackerWroker将数据保存的位置发送到Driver上的MapOutputTrackerMaster中。在后续Stage需要上一个Stage的计算结果的时候,就通过MapOutputTrackerMaster询问计算结果的保存位置,进而加载相应的数据。

四、Task划分

DAGScheduler将Job划分为多个Stage之后,下一步就是将Stage划分为多个可以在集群中并行执行的任务,只有将任务并行执行,Stage才能更快的完成。

4.1、任务的个数

由于Stage中都是对RDD的计算,RDD又是分区的,所以在对任务进行划分的时候,每个分区可以启动一个任务进行计算。无论是ResultStage还是ShuffleMapStage,每个阶段能够并行执行的任务数量都取决于该阶段中最后一个Rdd的分区数量

上面已经介绍,在一个Stage中,RDD的依赖关系是窄依赖,所以最后一个RDD的分区数量取决于其依赖的RDD的分区数量,一直依赖到该阶段的开始的RDD的分区。对于第一阶段开始的RDD分为两种情况:

  1. 第一种为初始的RDD,即从数据源加载数据形成的初始RDD,这种情况的分区数量取决于初始RDD的形成分区方式。
  2. 第二种为该阶段的初始RDD为Shuffle阶段的Reduce任务,这种情况下,该RDD的分区数量取决于在Shuffle的Map阶段最后一个RDD的分区器设置的分区数量。

4.2、Task的生成

当确定了每个Stage的分区数量之后,就需要为每个分区生成相应的计算任务,该计算任务就是需要对该阶段的最后一个RDD执行什么操作

在ResultStage中,需要对最后一个RDD的每个分区分别执行用户自定义的action操作,所以在ResultStage中生成的每个Task都包含以下三个部分

  1. 需要对哪个RDD进行操作
  2. 需要对RDD哪个分区进行操作
  3. 需要对分区的内容执行什么样的操作

在ResultStage中划分的Task称为ResultTask,ResultTask中包含了ResultStage中最后一个RDD,即执行action操作的的RDD,需要计算的RDD分区的id和执行action操作的函数。

在ShuffleMapStage中,最终需要完成Shuffle过程中的Map阶段的操作,每个分区按照Shuffle中的Map端定义的过程执行数据的分组操作,将分组结果进行保存,并将保存结果位置通知Driver端的MapOutputTrackerMaster,MapOutputTrackerMaster保存着每一个Shuffle中Map输出的位置。在ShuffleMapStage中划分的Task称为ShuffleMapTask。ShuffleMapTask同样由三个重要的部分组成:Stage中最后的RDD、需要计算的分区的id、划分Stage的ShuffleDependency

4.3、Task的最佳运行位置

生成Task时,还会计算Task的最佳运行位置。虽然RDD包含计算RDD的所有信息,可以在任何节点上运行,但是如果通过为Task计算分配最佳的运行位置,可以将Task调度到含有该Task需要的数据的节点,从而实现移动计算而不是移动数据的目的。Spark会根据RDD可能分布的的情况,将Task的运行位置主要分为Host级别和Executor级别当一个RDD被某个Executor缓存,则对该RDD计算时,优先会把计算的Task调度到该Executor中执行。当一个RDD需要的数据存在某个host中时,则会把该Task调度到这个节点的Executor中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/356029.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

头戴式耳机哪个牌子音质好?2024音质超好的百元头戴式耳机品牌推荐

在当今数字化的时代,音乐已成为我们生活中不可或缺的一部分,而头戴式耳机因其优质的音效和舒适的佩戴感,成为了许多音乐爱好者的首选,在众多品牌中,究竟哪个牌子的头戴式耳机音质最好呢?今天我就来给大家推…

echarts坐标轴文字样式

https://echarts.apache.org/zh/option.html#xAxis.nameTextStyle xAxis. nameTextStyle、 yAxis: {type: value,// max: -0.15,name: 沉降累计值/mm,nameTextStyle: {padding: [0, 0, 0, 10],color: #93B8E2,fontSize: 12,fontFamily: Alibaba-PuHuiTi-R},splitLine: {show:…

procmethues 二进制安装

pormethues是一个开源的系统监控以及报警系统。整合zabbix的功能,系统,网络,设备。 procmeteus可以兼容网络,设备。容器监控。告警系统。因为他和k8s是一个项目开发的产品,天生匹配k8s的原生系统。容器化和云原生服务…

【Java基础】JVM关闭回调函数(ShutdownHook)的应用场景

文章目录 一.ShutdownHook介绍二.ShutdownHook被调用场景三.ShutdownHook如何使用四.ShutdownHook实践 一.ShutdownHook介绍 ShutdownHook就是一个简单的 已初始化 但是 未启动 的 线程 。当虚拟机开始关闭时,它将会调用所有已注册ShutdownHook的回调函数&#xff0…

Gnuplot安装与配置

安装默认选项,下一步配置环境变量 找到系统环境变量,找到PATH 新建 浏览 将bin目录加进去 如图 再按winR,输入cmd打开终端,输入gnuplot,如果提示以下信息就可以绘图 如果要在Visual Studio中结合代码使用,需…

【局部自动数据增强】YOCO:将图片一分为二,各自增强后拼合为一

【自动数据增强】YOCO:将图片一分为二,各自增强后拼合为一 核心思想好在哪里?切哪里、切几次?何时用? 总结 核心思想 论文:https://arxiv.org/pdf/2201.12078.pdf 代码:https://github.com/Ju…

MySql的使用方法

一.什么是MySql MySql是一种数据库管理系统,是用来存储数据的,可以有效的管理数据,数据库的存储介质为硬盘和内存。 和文件相比,它具有以下优点: 文件存储数据是不安全的,且不方便数据的查找和管理&#xf…

Python网络拓扑库之mininet使用详解

概要 网络工程师、研究人员和开发人员需要进行各种网络实验和测试,以评估网络应用和协议的性能,以及解决网络问题。Python Mininet是一个功能强大的工具,它允许用户创建、配置和仿真复杂的网络拓扑,以满足各种实际应用场景。本文…

SQL注入:二次注入

SQL注入系列文章: 初识SQL注入-CSDN博客 SQL注入:联合查询的三个绕过技巧-CSDN博客 SQL注入:报错注入-CSDN博客 SQL注入:盲注-CSDN博客 目录 什么是二次注入? 二次注入演示 1、可以注册新用户 2、可以登录->…

C++ 类与对象(上)

目录 本节目标 1.面向过程和面向对象初步认识 2.类的引入 3.类的定义 4.类的访问限定符及封装 4.1 访问限定符 4.2 封装 5. 类的作用域 6. 类的实例化 7.类对象模型 7.1 如何计算类对象的大小 7.2 类对象的存储方式猜测 7.3 结构体内存对齐规则 8.this指针 8.1 thi…

前言:穿越迷雾,探索C语言指针的奇幻之旅

各位少年,大家好,我是博主那一脸阳光,今天给大家分享指针,内存和地址的使用,以及使用。 前言: 在编程的世界中,若论灵活多变、深邃神秘的角色,非“指针”莫属。如同哈利波特手中的魔…

C++ 数论相关题目 求组合数I

直接按照公式求解会超时。 常用组合数递推式。 因此用递推式先预处理出来&#xff0c;然后查表。 #include <iostream> #include <algorithm>using namespace std;const int N 2010, mod 1e9 7;int c[N][N];void init() {for(int i 0; i < N; i )for(in…

C/C++ - 函数进阶(C++)

目录 默认参数 函数重载 内联函数 函数模板 递归函数 回调函数 默认参数 定义 默认参数是在函数声明或定义中指定的具有默认值的函数参数。默认参数允许在调用函数时可以省略对应的参数&#xff0c;使用默认值进行替代。 使用 默认参数可以用于全局函数和成员函数。默认参…

RDMA技术赋能:构建高速网络基础设施,加速大型模型高效训练

深入剖析RDMA在高速网络环境中的应用价值与实现方式 远程直接内存访问&#xff08;RDMA&#xff09;作为超高速网络内存访问技术的领军者&#xff0c;彻底颠覆了传统程序对远程计算节点内存资源的访问模式。其卓越性能的核心在于巧妙地绕过了操作系统内核层&#xff08;如套接…

npm v10.4.0 is known not to run on Node.js v14.21.3

问题起因 vue项目在打包的时候突然报如下错误&#xff0c;项目原来打包的时候是没问题的。 request to https://registry.npm.taobao.org/acorn failed, reason: certificate然后找到了一篇帖子&#xff0c;淘宝npm镜像地址https证书到期了&#xff0c;发现确实是这个问题。在…

Redis客户端之Redisson(三)Redisson分布式锁

一、背景&#xff1a; 高效的分布式锁设计应该包含以下几个要点&#xff1a; 1、互斥&#xff1a; 在分布式高并发的条件下&#xff0c;我们最需要保证&#xff0c;同一时刻只能有一个线程获得锁&#xff0c;这是最基本的一点 2、防止死锁&#xff1a; 在分布式高并发的条…

骑砍战团MOD开发(41)-LOD渲染技术

一.LOD技术 LOD技术&#xff0c;即Level Of Details&#xff0c;是一种在3D图形渲染中常用的技术&#xff0c;主要用于优化渲染性能。 通过在建模时添加LOD模型(低模模型,面数较少),游戏引擎通过计算模型的远近和光照等情况选择性加载原模型(高模)/LOD模型(低模),实现游戏…

D3485——+5V工作电压,内置失效保护电路等功能高达10Mbps的传输速率,可应用在智能电表,安防监控等产品上

D3485 是一款 5V 供电、半双工的 RS-485 收发器&#xff0c;芯片内部包含一路驱动器和 一路接收器。D3485 使用限摆率驱动器&#xff0c;能显著减小 EMI 和由于不恰当的终端匹 配电缆所引起的反射&#xff0c;并实现高达10Mbps 的无差错数据传输。 D3485 内置失效保 护电路&…

小红树上染色

记忆化深搜 #include <iostream> #include <string> #include <stack> #include <vector> #include <queue> #include <deque> #include <set> #include <map> #include <unordered_map> #include <unordered_set&g…

详解OpenHarmony各部分文件在XR806上的编译顺序

大家好&#xff0c;今天我们来谈一谈编程时一个很有趣的话题——编译顺序。我知道&#xff0c;一提到编译可能大家会感到有点儿头疼&#xff0c;但请放心&#xff0c;我不会让大家头疼的。我们要明白&#xff0c;在开始写代码之前&#xff0c;了解整个程序的编译路径是十分有必…