【大数据面试知识点】Spark的DAGScheduler

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md")
               .flatMap(x=>x.split("\\W+"))
               .filter(_.matches(".*\\d.*"))
               .map(x=>(x,1))
               .reduceByKey(_+_)
 // 调用一个action函数,用来触发任务的提交和执行
 scala> counts.collect()
 ​
 // 打印RDD的依赖关系(Lineage)
 scala> counts.toDebugString
 res7: String =
 (2) ShuffledRDD[17] at reduceByKey at <console>:24 []
  +-(2) MapPartitionsRDD[16] at map at <console>:24 []
     |  MapPartitionsRDD[15] at filter at <console>:24 []
     |  MapPartitionsRDD[14] at flatMap at <console>:24 []
     |  README.md MapPartitionsRDD[13] at textFile at <console>:24 []
     |  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(
     id: Int,
     rdd: RDD[_],
     val func: (TaskContext, Iterator[_]) => _,
     val partitions: Array[Int],
     parents: List[Stage],   //依赖的父Stage
     firstJobId: Int,
     callSite: CallSite)
   extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1

scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1

scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1

scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1

scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))

scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []
 |  MapPartitionsRDD[12] at join at <console>:1 []
 |  MapPartitionsRDD[11] at join at <console>:1 []
 |  CoGroupedRDD[10] at join at <console>:1 []
 |  ShuffledRDD[4] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[3] at map at <console>:1 []
    |  MapPartitionsRDD[2] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []
    |  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []
 |  ShuffledRDD[9] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[8] at map at <console>:1 []
    |  MapPartitionsRDD[7] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []
    |  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

畅捷通的 Serverless 探索实践之路

作者&#xff1a;计缘&#xff0c;阿里云云原生架构师 畅捷通介绍 畅捷通是中国领先的小微企业财税及业务云服务提供商&#xff0c;成立于 2010 年。畅捷通在 2021 年中国小微企业云财税市场份额排名第一&#xff0c;在产品前瞻性及行业全覆盖方面领跑市场&#xff0c;位居中…

C:Huffman编码a

【问题描述】 给定一组字符的Huffman编码表&#xff08;从标准输入读取&#xff09;&#xff0c;以及一个用该编码表进行编码的Huffman编码文件&#xff08;存在当前目录下的in.txt中&#xff09;&#xff0c;编写程序实现对Huffman编码文件的解码&#xff0c;并按照后序遍历序…

【UE 截图】 自定义截图路径 文件名

目录 0 引言1 实践 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;UE虚幻引擎专栏&#x1f4a5; 标题&#xff1a;【UE 截图】 自定义截图路径 文件名❣️ 寄语&#xff1a;书到用时方恨少&#xff0c;事非经过不知难&#xff01;&#x1f388; 最…

zlib.decompressFile报错 【Bug已解决-鸿蒙开发】

文章目录 项目场景:问题描述原因分析:解决方案:方案1方案2此Bug解决方案总结寄语项目场景: 最近也是遇到了这个问题,看到网上也有人在询问这个问题,本文总结了自己和其他人的解决经验,解决了zlib.decompressFile报错 的问题。 问题: zlib.decompressFile报错,怎么解…

基于ssm的房屋租赁管理系统

功能介绍 房源信息模块&#xff1a; 房源信息展示、房源信息更新、房源信息增加、房源信息删除 账户管理模块&#xff1a; 账户登录、账户绑定、账户管理 租金结算模块&#xff1a; 每月租金信息、租金交付功能、月租金收入总额统计 房屋租赁合同管理模块&#xff1a; 房屋租赁…

【C语言】函数

函数是什么&#xff1f; “函数”是我们早些年在学习数学的过程中常见的概念&#xff0c;简单回顾一下&#xff1a;比如下图中&#xff0c;你给函数 f(x)2*x3 一个具体的x,这个函数通过一系列的计算来返回给你一个结果(图示如下)。 这就是数学中函数的基本过程和作用。但是你…

1.4 FMEA概述

FMEA适用场景 FMEA在三种基本情形下使用&#xff0c;每种情形都有不同的范围或重点。 情形1&#xff1a;新设计、新技术或新过程 FMEA的范围包括完整的设计、技术或过程。 情形2&#xff1a;现有设计或过程的新应用 FMEA的范围包含新环境、新场地、新应用或使用概况&#…

Servlet见解3

13 Cookie和Session http协议是一个无状态的协议&#xff0c;你每一个跳转到下一个页面的时候都是需要先登录才能使用&#xff0c;这样就很麻烦比如淘宝&#xff0c;没有cookie和session的话&#xff0c;用户在首页已经登录上去了&#xff0c;但是需要再次登录才能选择商品&am…

买对好车省钱又防坑,高性价比的买车攻略

一、教程描述 正所谓隔行如隔山&#xff0c;买车这件事情并不简单&#xff0c;买车的内幕还是有不少的&#xff0c;本套教程讲述买车攻略&#xff0c;非常适合准备买车的朋友&#xff0c;可以帮助大家买车少入坑&#xff0c;高性价比买到自己心仪的车。本套买车教程&#xff0…

Android 13 动态启用或禁用IPV6

介绍 客户想要通过APK来控制IPV6的启用和禁用&#xff0c;这里我们通过广播的方式来让客户控制IPV6。 效果展示 adb shell ifconfig 这里我们用debug软件&#xff0c;将下面节点置为1 如图ipv6已被禁用了 echo 1 > /proc/sys/net/ipv6/conf/all/disable_ipv6 修改 接下来…

十大排序总结之——冒泡排序、插入排序

同样&#xff0c;这两几乎也是被淘汰了的算法&#xff0c;尽管它们是稳定的&#xff0c;但是时间复杂度没人喜欢&#xff0c;了解一下就好&#xff0c;没啥好说的&#xff0c;注意最后一句话就行了 一&#xff0c;冒泡排序 1. 算法步骤 共n-1趟&#xff0c;谁两敢冒泡就换了…

IoT 物联网常用协议

物联网协议是指在物联网环境中用于设备间通信和数据传输的协议。根据不同的作用&#xff0c;物联网协议可分为传输协议、通信协议和行业协议。 传输协议&#xff1a;一般负责子网内设备间的组网及通信。例如 Wi-Fi、Ethernet、NFC、 Zigbee、Bluetooth、GPRS、3G/4G/5G等。这些…

算法基础之能被整除的数

能被整除的数 核心思想&#xff1a; 容斥原理 总面积 1-23-4…. 总集合元素中个数 1-23-4…. #include<iostream>#include<cstring>#include<algorithm>using namespace std;const int N 20;typedef long long LL;int p[N];int main(){int n,m;cin&…

2024年【茶艺师(初级)】报名考试及茶艺师(初级)考试技巧

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年【茶艺师&#xff08;初级&#xff09;】报名考试及茶艺师&#xff08;初级&#xff09;考试技巧&#xff0c;包含茶艺师&#xff08;初级&#xff09;报名考试答案和解析及茶艺师&#xff08;初级&#xff09;…

03.QT命名规范及快捷键(部分)

一、命名规范 1.类名 大驼峰规则&#xff1a;首字母大写&#xff0c;单词和单词之间首字母大写。 2.变量名 小驼峰规则&#xff1a;首字母小写&#xff0c;单词和单词之间首字母大写。 二、快捷键 1.代码操作相关 注释&#xff1a;ctrl / 运行&#xff1a;ctrl r 编译…

基于Java SSM框架实现健康管理系统项目【项目源码】

基于java的SSM框架实现健康管理系统演示 JSP技术 JSP是一种跨平台的网页技术&#xff0c;最终实现网页的动态效果&#xff0c;与ASP技术类似&#xff0c;都是在HTML中混合一些程序的相关代码&#xff0c;运用语言引擎来执行代码&#xff0c;JSP能够实现与管理员的交互&#xf…

PyQt5-控件之QDialog(UI-业务分离搭建自定义xDialog)

1.继承QtWidgets.QWidget自定义对话框 继承于QtWidgets.QWidget自定义一个对话框类&#xff1a;SelectingDlg class SelectingDlg(QtWidgets.QWidget): def __init__(self): super(SelectingDlg, self).__init__() self.initUI() def initUI(self):s…

作业--day39

定义一个Person类&#xff0c;私有成员int age&#xff0c;string &name&#xff0c;定义一个Stu类&#xff0c;包含私有成员double *score&#xff0c;写出两个类的构造函数、析构函数、拷贝构造和拷贝赋值函数&#xff0c;完成对Person的运算符重载(算术运算符、条件运算…

十八、任务通知

1、前言 (1)所谓“任务通知”&#xff0c;可以反过来读"通知任务"。我们使用队列、信号量、事件组等等方法时&#xff0c;并不知道对方是谁。使用任务通知时&#xff0c;可以明确指定&#xff1a;通知哪个任务。 (2)使用队列、信号量、事件组时&#xff0c;我们都需…

DrGraph原理示教 - OpenCV 4 功能 - 阈值

普通阈值 OpenCV中的阈值用于相对于提供的阈值分配像素值。在阈值处理中&#xff0c;将每个像素值与阈值进行比较&#xff0c;如果像素值小于阈值则设置为0&#xff0c;否则设置为最大值&#xff08;一般为255&#xff09;。 在OpenCV中&#xff0c;有多种阈值类型可供选择&am…