详解 Spark 编程之 RDD 依赖关系

一、依赖与血缘关系

在这里插入图片描述

  • 依赖:两个相邻 RDD 之间的关系
  • 血缘关系:多个连续的 RDD 的依赖
  • 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")
        val sc = new SparkContext(conf)
        
        val rdd1 = sc.textFile("data/word.txt")
        println(rdd1.toDebugString) // 打印血缘关系
        println(rdd1.dependencies) // 打印依赖关系
		println("----------------------")
        
        val rdd2 = rdd1.flatMap(_.split(" "))
        println(rdd2.toDebugString) // 打印血缘关系
        println(rdd2.dependencies) // 打印依赖关系
		println("----------------------")
        
        val rdd3 = rdd2.map((_, 1))
        println(rdd3.toDebugString) // 打印血缘关系
        println(rdd3.dependencies) // 打印依赖关系
		println("----------------------")
        
        val rdd4 = rdd3.reduceByKey(_ + _)
        println(rdd4.toDebugString) // 打印血缘关系
        println(rdd4.dependencies) // 打印依赖关系
		println("----------------------")
        
    }
}

二、宽窄依赖

  • 窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
    
  • 宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false
    ) extends Dependency[Product2[K, V]] 
    

三、阶段划分

  • 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务

在这里插入图片描述

  • 宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
    在这里插入图片描述

  • 阶段划分源码:

    /**
    	结论:
    		1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段
    		2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)
    		3.阶段的数量 = shuffle 依赖数量 + 1
    */
    // 行动算子触发作业执行
    rdd.collect()
    
    // collect() 深入底层
    dagScheduler.runJob()
    
    // runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
    // handleJobSubmitted() 中的阶段划分
    try {
    	finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
    	...
    }
    
    // createResultStage() 方法
    private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {
        val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段
        val id = nextStageId.getAndIncrement()
        val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一个 resultStage 阶段
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
    }
    
    // getOrCreateParentStages(),判断是否有上一阶段
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
        getShuffleDependencies(rdd).map { shuffleDep =>
            // 为 shuffle 依赖创建 ShuffleMapStage 阶段
        	getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }
    
    // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
    private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new Stack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
            val toVisit = waitingForVisit.pop()
            if (!visited(toVisit)) {
                visited += toVisit
                toVisit.dependencies.foreach {
                    case shuffleDep: ShuffleDependency[_, _, _] =>
                    parents += shuffleDep
                    case dependency =>
                    waitingForVisit.push(dependency.rdd)
                }
            }
        }
        parents
    }
    

四、任务划分

  • RDD 任务划分中间分为:Application、Job、Stage 和 Task

    • Application:初始化一个 SparkContext 即生成一个 Application
    • Job:一个 Action 算子就会生成一个 Job
    • Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
    • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
  • Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系

  • 任务划分源码:

    val tasks: Seq[Task[_]] = try {
        stage match {
            case stage: ShuffleMapStage => 
                partitionsToCompute.map { id =>
                    val locs = taskIdToLocations(id)
                    val part = stage.rdd.partitions(id)
                    new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                    taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, 
                    Option(jobId),
                    Option(sc.applicationId), sc.applicationAttemptId)
            	}
            case stage: ResultStage => 
                partitionsToCompute.map { id =>
                    val p: Int = stage.partitions(id)
                    val part = stage.rdd.partitions(p)
                    val locs = taskIdToLocations(id)
                    new ResultTask(stage.id, stage.latestInfo.attemptId,
                    taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
                    Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
            	}
        }
    }
    
    //
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
    //
    override def findMissingPartitions(): Seq[Int] = {
        mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/668239.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开源VS闭源:AI未来的十字路口

人工智能领域的发展日益加速&#xff0c;其中关于模型的开源和闭源策略引起了业界的广泛关注。开源策略指的是将软件的源代码公开&#xff0c;允许任何人自由使用、研究甚至改进&#xff1b;而闭源策略则是指软件的源代码不公开&#xff0c;只有特定的个体或组织有权访问和修改…

【IPFS应用开发】基于IPFS的视频播放器

本系列文章是针对 https://blog.csdn.net/weixin_43668031/article/details/83962959 内容的实现所编写的。开发经历包括思考过程、重构和推翻重来。 基于IPFS的视频播放器 想写一个真正的、基于IPFS的&#xff0c;可以播放IPFS上的视频的程序支持多种数据加载格式同时支持单文…

docker部署Minio对象存储及使用

1.拉取镜像 docker pull minio/minio2.创建数据目录 mkdir -p /data/minio/data3.启动容器 docker run -p 39000:9000 -p 39090:9090 \ --name minio \ -d --restartalways \ -e "MINIO_ACCESS_KEYjyadmin" \ -e "MINIO_SECRET_KEYjyzx2023" \ -v /data…

C++数据结构之:链List

摘要&#xff1a; it人员无论是使用哪种高级语言开发东东&#xff0c;想要更高效有层次的开发程序的话都躲不开三件套&#xff1a;数据结构&#xff0c;算法和设计模式。数据结构是相互之间存在一种或多种特定关系的数据元素的集合&#xff0c;即带“结构”的数据元素的集合&am…

用esp prog烧录ESP32-C3板踩坑

附ESP32C3的GPIO一览&#xff1a; vscode选择Jtag烧录&#xff0c;终端输出esp_usb_jtag: could not find or open device&#xff1a; D:\Devtools\Espressif\tools\openocd-esp32\v0.12.0-esp32-20230921\openocd-esp32\bin\openocd.exe -f board/esp32s3-builtin.cfgOpen O…

xxl-job的使用

介绍 在分布式中&#xff0c;很多微服务可能存在多实例部署的现象&#xff0c;如果在某个具体的微服务中实现一个定时任务&#xff0c;而该微服务存在多个实例的话&#xff0c;那么会导致该定时任务在不同实例中都会进行执行&#xff01;这很容易导致脏数据、数据重复等问题&am…

黑白群晖激活AME(Advanced Media Extention)

黑群晖激活Advanced Media Extensions&#xff08;AME&#xff09;解码HEVC视频和HEIC图片 声明&#xff1a;此教程在正版群晖系统中进行的操作&#xff0c;虽然也能用于非正版系统中AME的安装&#xff0c;但是在非正版系统中安装AME属于破解行为&#xff0c;对系统造成的影响和…

安装vllm的时候卡主:Collecting vllm-nccl-cu12<2.19,>=2.18 (from vllm)

按照vllm的时候卡主&#xff1a; ... Requirement already satisfied: typing-extensions in /home/wangguisen/miniconda3/lib/python3.10/site-packages (from vllm) (4.9.0) Requirement already satisfied: filelock>3.10.4 in /home/wangguisen/miniconda3/lib/python…

落地台灯有什么作用?五款口碑好的落地台灯推荐

落地台灯有什么作用&#xff1f;面对长时间工作、学习已成为当代年轻人的真实写照&#xff0c;据目前不完全统计&#xff0c;60%以上的人群每天用眼时间都已经超过10小时&#xff0c;高强度的的用眼以及不可确定的环境因素都易导致双眼出现干涉、酸痛、红血丝等情况&#xff0c…

SpringBoot 七牛云 OSS 私有模式 获取访问链接

目录 一、问题引出 二、在SpringBoot中获取私有访问路径的操作 一、问题引出 由于七牛云OSS的公有模式存在被盗刷的风险&#xff0c;可能导致服务器额外的费用&#xff0c;于是我选择私有模式进行操作。私有模式的访问路径是一个问题&#xff0c;因为需要对应着token和e这两…

MyBatis系统学习篇 - 分页插件

MyBatis是一个非常流行的Java持久层框架&#xff0c;它简化了数据库操作的代码。分页是数据库查询中常见的需求&#xff0c;MyBatis本身并不直接支持分页功能&#xff0c;但可以通过插件来实现&#xff0c;从而帮助我们在查询数据库的时候更加方便快捷 引入依赖 <dependen…

Python 学习笔记【1】

此笔记仅适用于有任一编程语言基础&#xff0c;且对面向对象有一定了解者观看 文章目录 数据类型字面量数字类型数据容器字符串列表元组 type()方法数据类型强转 注释单行注释多行注释 输出基本输出连续输出&#xff0c;中间用“,”分隔更复杂的输出格式 变量定义del方法 标识符…

LeetCode84:柱形图中最大的矩形

题目描述 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 代码 单调栈 class Solution { public:int largestRectangleArea(vector<int>& h…

【数据结构】链表与顺序表的比较

不同点&#xff1a; 顺序表和链表是两种常见的数据结构&#xff0c;他们的不同点在于存储方式和插入、删除操作、随机访问、cpu缓存利用率等方面。 一、存储方式不同: 顺序表&#xff1a; 顺序表的存储方式是顺序存储&#xff0c;在内存中申请一块连续的空间&#xff0c;通…

文明互鉴促发展——2024“国际山地旅游日”主题活动在法国启幕

5月29日&#xff0c;2024“国际山地旅游日”主题活动在法国尼斯市成功举办。中国驻法国使领馆、法国文化旅游部门、地方政府、国际组织、国际山地旅游联盟会员代表、旅游机构、企业、专家、媒体等围绕“文明互鉴的山地旅游”大会主题和“气候变化与山地旅游应对之策”论坛主题展…

字符串-最长回文子串

一、题目描述 二、解题思路 设置双指针&#xff0c;定位连续子串&#xff0c;这里提供暴力解法&#xff0c;枚举各种子串并判断子串是否符合回文特征&#xff0c;符合则更新最长子串长度。 主要是注意一下java相关的语法API&#xff1a; 1.使用 (new StringBuilder(substr))…

MongoDB~存储引擎了解

存储引擎 存储引擎是一个数据库的核心&#xff0c;主要负责内存、磁盘里数据的管理和维护。 MongoBD的优势&#xff0c;在于其数据模型定义的灵活性、以及可拓展性。但不要忽略&#xff0c;其存储引擎也是插件式的存在&#xff0c;支持不同类型的存储引擎&#xff0c;使用不同…

B-TREE教程(个人总结版)

背景 在计算机科学中&#xff0c;数据存储和检索的效率是一个重要的研究课题。B-树&#xff08;B-Tree&#xff09;作为一种自平衡树结构&#xff0c;特别适合于在磁盘存储中处理大规模数据。它通过保持树的高度平衡&#xff0c;使得搜索、插入和删除操作的时间复杂度保持在对…

docker查看容器目录挂载

查看命令 docker inspect --format{{ json .Mounts }} <container_id_or_name> | jq 示例 docker inspect --format{{ json .Mounts }} af656ae540af | jq输出

Linux远程连接失败(Linux与Windows相互可以ping)

Linux远程连接解决办法记录 目录 文章目录 Linux远程连接解决办法记录1.SSH没有开启1.1查询ssh进程 1.SSH没有开启 sudo apt install openssh-serversudo /etc/init.d/ssh resart1.1查询ssh进程 ps -e | grep ssh 重新尝试连接