Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.dynamicAllocation.initialExecutors=2 \
 --conf spark.dynamicAllocation.maxExecutor=400 \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.shuffle.service.enabled=false \
 --conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/421899.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 表的基本操作,结合项目的表自动初始化来讲

有了数据库以后,我们就可以在数据库中对表进行增删改查了,这也就意味着,一名真正的 CRUD Boy 即将到来(😁)。 查表 查看当前数据库中所有的表,使用 show tables; 命令 由于当前数据库中还没有…

多层感知器(神经网络)与激活函数

单个神经元(二分类) 多个神经元(多分类) 多层感知器 多层感知器,他是一种深度学习模型,通过多层神经元的连接和激活来解决非线性问题。 激活函数 激活函数的种类包括relu,sigmoid和tanh等 …

torch的下载

cmd中输入 nvidia-smi 查看cuda版本,我这里是12.3 然后进入pytorch官网:PyTorch 直接本页面往下滑就能看到 选择与自己cuda版本相近的。 复制命令: pip3 install torch torchvision torchaudio --index-url https://download.pytorch.…

搭建stressapptest调试环境:VSCode的分步教程

vscode调试stressapptest详解 一、环境准备二、设置调试配置2.1、编辑launch.json文件和task.json文件2.2、将 stressapptest 编译成 debug 版本 三、运行调试总结 一、环境准备 stressapptest(简称SAT)是一种用于在Linux系统上测试系统稳定性和可靠性的…

【MySQL】:高效利用MySQL函数实用指南

🎥 屿小夏 : 个人主页 🔥个人专栏 : MySQL从入门到进阶 🌄 莫道桑榆晚,为霞尚满天! 文章目录 📑前言一. MySQL函数概论二. 字符串函数三. 数值函数四. 日期函数五. 流程函数&#x1…

MySQL:合并查询语句

1、查询表的数据 t_book表数据 SELECT * FROM db_book.t_book; t_booktype表数据 SELECT * FROM db_book.t_booktype; 提醒: 下面的查询操作的数据来自上图查询表的数据 2. 使用 UNION 查询结果合并,会去掉重复的数据 使用UNION关键字是,数…

Angular 由一个bug说起之四:jsonEditor使用不当造成的bug

一:问题 项目中使用了一个JSON第三方库: GitHub - josdejong/jsoneditor: A web-based tool to view, edit, format, and validate JSON 当用户编辑JSON格式的数据,查找替换时: 用户的期望结果是:$$ 被替换为$$_text&a…

centos7 部署harbor镜像仓库

1. 安装docker yum install -y yum-utils device-mapper-persistent-data lvm2 yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo yum list docker-ce.x86_64 --showduplicates |sort -r yum install docker-ce-17.09.1.ce -y syste…

分布式事务简介

分布式事务简介,通过组内分享学习到的知识,并进行讨论。 主要内容 分布式事务简介 分布式事务是指跨越多个数据库或服务的一系列操作,这些数据库或服务可能分布在网络的不同节点上,它们共同组成一个完整的逻辑工作单元&#xf…

用Python库angr来分析二进制文件

最近在学习二进制分析,了解到二进制加载器,于是跟着AI一起,学习了这个python可用的二进制加载器分析器angr,并写了这篇介绍的文章,儿童卡通风格,哈哈。 亲爱的代码侠客们,今天我们要一起踏上探索二进制文件…

CloudCanal x Hive 构建高效的实时数仓

简述 CloudCanal 最近对于全周期数据流动进行了初步探索,打通了Hive 目标端的实时同步,为实时数仓的构建提供了支持,这篇文章简要做下分享。 基于临时表的增量合并方式基于 HDFS 文件写入方式临时表统一 Schema任务级的临时表 基于临时表的…

蓝桥杯算法题汇总

一.线性表:链式 例题:旋转链表 二.栈: 例题:行星碰撞问题 三.队列 三.数组和矩阵 例题:

蓝灵娥驾到!国漫小师妹的魅力大揭晓!

在国漫中,有非常多出众的小师妹形象,如同璀璨的星辰,以其独特的魅力吸引着无数观众的目光。她们形象各异,或纯真善良,或勇敢智慧,或刁蛮任性,其魅力经久不衰。今天,就让我们以玄机科…

网络编程 io_uring

io_uring 1、概述 io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型; io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向…

2024年腾讯云会员老用户续费优惠活动,可领代金券

腾讯云优惠活动2024新春采购节活动上线,云服务器价格已经出来了,云服务器61元一年起,配置和价格基本上和上个月没什么变化,但是新增了8888元代金券和会员续费优惠,腾讯云百科txybk.com整理腾讯云最新优惠活动云服务器配…

数电实验之流水灯、序列发生器

最近又用到了数电实验设计的一些操作和设计思想,遂整理之。 广告流水灯 实验内容 用触发器、组合函数器件和门电路设计一个广告流水灯,该流水灯由 8 个 LED 组成,工作时始终为 1 暗 7 亮,且这一个暗灯循环右移。 1) 写出设计过…

MYSQL--JDBC优化

一.JDBC优化: 优化前提: 有时候我们并不清楚某些表当中一共有多少列,以及这些列的数据类型,这个时候我们就需要提前通过一些方法提前了解到这些数据,从而更好的进行输出 具体语句: package cn.jdbc;import java.sql.*;public class JDBCDEmo1 {public static void main(String…

MySQL篇—执行计划介绍(第二篇,总共三篇)

☘️博主介绍☘️: ✨又是一天没白过,我是奈斯,DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux,也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章,并且也会默默的点赞收藏加关注❣…

力扣SQL50 大的国家 查询

Problem: 595. 大的国家 Code select name,population,area from World where area > 3000000 or population > 25000000;

JS:原型与原型链(附带图解与代码)

一、原型 写在前面: 任何对象都有原型。 函数也是对象,所以函数也有原型。 1.什么是原型 在 JavaScript 中,对象有一个特殊的隐藏属性 [[Prototype]],它要么为 null,要么就是对另一个对象的引用,该对象…