一、依赖与血缘关系
- 依赖:两个相邻 RDD 之间的关系
- 血缘关系:多个连续的 RDD 的依赖
- 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("data/word.txt")
println(rdd1.toDebugString) // 打印血缘关系
println(rdd1.dependencies) // 打印依赖关系
println("----------------------")
val rdd2 = rdd1.flatMap(_.split(" "))
println(rdd2.toDebugString) // 打印血缘关系
println(rdd2.dependencies) // 打印依赖关系
println("----------------------")
val rdd3 = rdd2.map((_, 1))
println(rdd3.toDebugString) // 打印血缘关系
println(rdd3.dependencies) // 打印依赖关系
println("----------------------")
val rdd4 = rdd3.reduceByKey(_ + _)
println(rdd4.toDebugString) // 打印血缘关系
println(rdd4.dependencies) // 打印依赖关系
println("----------------------")
}
}
二、宽窄依赖
-
窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
-
宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false ) extends Dependency[Product2[K, V]]
三、阶段划分
- 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务
-
宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
-
阶段划分源码:
/** 结论: 1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段 2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage) 3.阶段的数量 = shuffle 依赖数量 + 1 */ // 行动算子触发作业执行 rdd.collect() // collect() 深入底层 dagScheduler.runJob() // runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted() // handleJobSubmitted() 中的阶段划分 try { finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { ... } // createResultStage() 方法 private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段 val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 至少存在一个 resultStage 阶段 stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } // getOrCreateParentStages(),判断是否有上一阶段 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖 getShuffleDependencies(rdd).map { shuffleDep => // 为 shuffle 依赖创建 ShuffleMapStage 阶段 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖 private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } parents }
四、任务划分
-
RDD 任务划分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application
- Job:一个 Action 算子就会生成一个 Job
- Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
-
Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系
-
任务划分源码:
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } // val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // override def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions) }