【Spark精讲】RDD特性之数据本地化

目录

首选运行位置

数据的本地化级别

谁来负责数据本地化

数据本地化执行流程

调优

代码中的设置方法 


首选运行位置

上图红框为RDD的特性五:每个RDD的每个分区都有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置可以让RDD的某个分区的计算任务直接在指定的主机上运行,从而实现了移动计算而不是移动数据的目的减少了网络传输的开销,如Spark中HadoopRDD能够实现家装数据的任务在相应的数据节点上执行。

数据的本地化级别

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

@DeveloperApi
object TaskLocality extends Enumeration {
  // Process local is expected to be used ONLY within TaskSetManager for now.
  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

  type TaskLocality = Value

  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
    condition <= constraint
  }
}
  1. PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
  2. NODE_LOCAL:节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
  3. NO_PREF:没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
  4. RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。
  5. ANY:跨机架,数据在非同一机架的网络上,速度最慢。

谁来负责数据本地化

val rdd1 = sc.textFile("hdfs://...") 
rdd1.cache()
rdd1.map.filter.count()

上面这段简单的代码,背后其实做什么很多事情。Driver 的 TaskScheduler 在发送 task 之前,首先应该拿到 rdd1 数据所在的位置,rdd1 封装了这个文件所对应的 block 的位置,DAGScheduler 通过调用 getPrerredLocations() 拿到 partition 所对应的数据的位置,TaskScheduler 根据这些位置来发送相应的 task。 

具体的解释:

DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

数据本地化执行流程

第一步:PROCESS_LOCAL

TaskScheduler 根据数据的位置向数据节点发送 task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新发送 task 到 worker2 中的 Executor1 中执行。

第四步:

当 task 分配完成之后,task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemote() 方法,通过 ConnectionManager 与原 task 所在节点的 BlockManager 中的 ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。这一步很像 shuffle 的文件寻址流程。

调优

TaskScheduler在发送task的时候,会根据数据所在的节点发送task,这时候的数据本地化的级别是最高的,如果这个task在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低一级数据本地化的级别,重新发送task到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...

如果想让每一个 task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 task 都拿到了最好的数据本地化级别,但是我们 job 执行的时间也会随之延长。

官方参数:Configuration - Spark 3.5.0 Documentation

spark.locality.wait3sHow long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.0.5.0
spark.locality.wait.nodespark.locality.waitCustomize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).0.8.0
spark.locality.wait.processspark.locality.waitCustomize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.0.8.0
spark.locality.wait.rackspark.locality.waitCustomize the locality wait for rack locality.0.8.0

代码中的设置方法 

new SparkConf.set("spark.locality.wait","100") //默认3秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/247174.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式系统挑战赛---多线程并发打印奇偶数

一、题目要求 编写一个C语言程序&#xff0c;实现多线程并发打印奇偶数。要求使用两个线程&#xff0c;一个线程打印奇数&#xff0c;另一个线程打印偶数&#xff0c;打印范围为1到100。要求奇数线程先打印&#xff0c;偶数线程后打印&#xff0c;且要保证线程按次序交替进行。…

32、应急响应——linux

文章目录 一、linux进程排查二、linux文件排查三、linux用户排查四、linux持久化排查4.1 历史命令4.2 定时任务排查4.3 开机启动项排查 五、linux日志分析六、工具应用 一、linux进程排查 查看资源占用&#xff1a;top查看所有进程&#xff1a;ps -ef根据进程PID查看进程详细信…

LeetCode(60)K 个一组翻转链表【链表】【困难】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; K 个一组翻转链表 1.题目 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍&#xf…

【源码】车牌检测+QT界面+附带数据库

目录 1、基本介绍2、基本环境3、核心代码3.1、车牌识别3.2、车牌定位3.3、车牌坐标矫正 4、界面展示4.1、主界面4.2、车牌检测4.3、查询功能 5、演示6、链接 1、基本介绍 本项目采用tensorflow&#xff0c;opencv&#xff0c;pyside6和pymql编写&#xff0c;pyside6用来编写UI界…

Java架构师-数据机构与算法实战(第一篇)

数学知识回顾 指数 指数函数是重要的基本初等函数之一。一般地&#xff0c;ya^x函数(a为常数且以a>0&#xff0c;a≠1)叫做指数函数&#xff0c;函数的定义域是 R 。注意&#xff0c;在指数函数的定义表达式中&#xff0c;在a^x前的系数必须是数1&#xff0c;自变量x必须在…

ubantu22.04.3 安装4080驱动

新电脑安装驱动网卡EX211只适配22.04的内核&#xff0c;其他系统升级内核易出问题不推荐。 安装系统为系统盘安装制作Ubuntu22.04启动盘_ubuntu下制作pe启动盘-CSDN博客&#xff0c;参考此作者&#xff0c;选择系统为22.04.3 其他版本不推荐因前面用22.04安装显卡后出现兼容性…

Power BI - 5分钟学习增加索引列

每天5分钟&#xff0c;今天介绍Power BI增加索引列。 什么是增加索引列&#xff1f; 增加索引列就是向表中添加一个具有显式位置值的新列&#xff0c;一般从0或者从1开始。 举例&#xff1a; 首先&#xff0c;导入一张【Sales】样例表(Excel数据源导入请参考每天5分钟第一天)…

【Linux】tree命令使用

tree命令 tree命令用于以树状图列出目录的内容。 语法 tree [参数] [目录] tree 命令 -Linux手册页 bash: tree: 未找到命令... 安装tree yum -y install tree如果你系统中有安装tree 但是还是执行找不到该命令的话&#xff0c;那原因就是&#xff1a;环境变量错误&#x…

Google Shopping Action

Google Shopping Action是Google推出的一项在线购物服务&#xff0c;可以帮助零售商将产品推广和销售到Google平台上的消费者中。通过Google Shopping Action&#xff0c;用户可以在谷歌搜索页面上直接购买商品&#xff0c;而不需要离开搜索结果页面。 Google Shopping Action的…

神通数据库字段空与非空

神通数据库可以在建表时指定字段非空或可空&#xff0c; -- 指定column1字段非空 CREATE TABLE SYSDBA.tmp_test1(column1 varchar(100) NOT NULL)--尝试向column1字段插入空值 INSERT INTO SYSDBA.tmp_test1(column1) VALUES(NULL) 会收到插入失败的提示&#xff1a; 而如果…

基于JavaWeb实现的勤工俭学管理系统

一、系统架构 前端&#xff1a;jsp | js | css | jquery | layui 后端&#xff1a;spring | springmvc | mybatis 环境&#xff1a;jdk1.8 | mysql 二、代码及数据库 三、功能介绍 01. web端-首页 02. web端-论坛 03. web端-个人中心 04. web端-平台公告 05. web端-平…

音视频技术开发周刊 | 323

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 Meta牵头组建开源「AI复仇者联盟」&#xff0c;AMD等盟友800亿美元力战OpenAI英伟达 超过50家科技大厂名校和机构&#xff0c;共同成立了全新的人工智能联盟。以开源为旗号…

CBTC上海新能源锂电池展览会奋战华东!2024携手共赢!

2024CBTC上海新能源锂电池技术展览会|上海锂离子电池生产设备展览会 时 间&#xff1a;2024年7月24&#xff5e;26日 地 点&#xff1a;国家会展中心&#xff08;上海虹桥&#xff09; 发展前景&#xff1a; 随着科技的不断进步&#xff0c;锂电池市场逐渐成为全球能源市场的…

@Transactional注解详细使用

Transactional注解详细使用 Transactional注解是Spring框架中用于管理事务的注解&#xff0c;它可以应用于类或方法上。使用该注解可以确保一个方法或类中的操作要么全部成功提交&#xff0c;要么全部回滚&#xff0c;从而保证数据的完整性和一致性。下面是Transactional注解的…

Gradio入门详细教程

常用的两款AI可视化交互应用比较&#xff1a; Gradio Gradio的优势在于易用性&#xff0c;代码结构相比Streamlit简单&#xff0c;只需简单定义输入和输出接口即可快速构建简单的交互页面&#xff0c;更轻松部署模型。适合场景相对简单&#xff0c;想要快速部署应用的开发者。便…

力扣二叉树--总结篇(2)

前言 总体回顾&#xff1a;11.18-12.14&#xff0c;中间有一个星期左右因为考试没有写题。37道题。 内容 这是第二阶段刷的题 从路径到构造二叉树&#xff0c;合并二叉树&#xff0c;再到二叉搜索树&#xff0c;公共祖先问题 看到二叉树&#xff0c;看到递归 都会想&#…

常见的Linux基本指令

目录 什么是Linux&#xff1f; Xshell如何远程控制云服务器 Xshell远程连接云服务器 Linux基本指令 用户管理指令 pwd指令 touch指令 mkdir指令 ls指令 cd指令 rm指令 man命令 cp指令 mv指令 cat指令 head指令 ​编辑 tail指令 ​编辑echo指令 find命令 gr…

【教程】源代码加密、防泄密软件

​ 什么是代码混淆&#xff1f; 代码混淆 是一种将应用程序二进制文件转换为功能上等价&#xff0c;但人类难于阅读和理解的行为。在编译 Dart 代码时&#xff0c;混淆会隐藏函数和类的名称&#xff0c;并用其他符号替代每个符号&#xff0c;从而使攻击者难以进行逆向工程。 …

认识产品经理以及Axure简单安装与入门

目录 一.认识产品经理 1.1.项目团队 1.2.概述 1.3.认识产品经理 1.4.产品经理工作范围 1.5.产品经理工作流程 1.6.产品经理的职责 1.7.产品经理的分类 1.8.产品经理能力要求 1.9.产品工具 1.10.产品体验报告 二.Axure简介 三.应用场景 四.安装与汉化 4.1.安装 4…

认知觉醒(七)

认知觉醒(七) 第三章 元认知——人类的终极能能力 第一节 元认知&#xff1a;成长慢&#xff0c;是因为你不会“飞” 1946年10月24日&#xff0c;一群科学家为了研究太阳的紫外线&#xff0c;在美国新墨西哥州白沙导弹试验场发射了当时世界上最先进的V2液体火箭&#xff0…