Spark核心组件解析:Executor、RDD与缓存优化

Spark核心组件解析:Executor、RDD与缓存优化

Spark Executor

Executor 是 Spark 中用于执行任务(task)的执行单元,运行在 worker 上,但并不等同于 worker。实际上,Executor 是一组计算资源(如 CPU 核心和内存)的集合,多个 executor 共享 worker 上的 CPU 和内存资源。

Executor 的功能

  • 任务执行:Executor 负责执行分配给它的任务,并返回结果到 driver 程序。

  • 缓存机制:如果应用程序调用了 cache() 或 persist() 函数,Executor 会通过 Block Manager 为RDD 提供缓存机制,优化重复计算。

  • 生命周期:Executor 存在于整个 Spark 应用的生命周期内。

Executor 的创建

Spark 在以下几种情况下创建 Executor:

  • 当资源管理器为 Standalone 或 YARN,且 CoarseGrainedExecutorBackend 进程接收到
    RegisteredExecutor 消息时;
  • 当使用 Mesos 资源管理器时,MesosExecutorBackend 进程注册时;
  • 在本地模式下,当 LocalEndpoint 被创建时。

创建成功后,日志会显示如下信息:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

心跳发送线程

Executor 会定期向 driver 发送心跳信号以确保连接活跃。心跳线程通常是一个调度线程池,利用 ScheduledThreadPoolExecutor 来维持任务的实时性。

执行任务

Executor 通过 launchTask 方法来执行任务。这个方法会创建一个 TaskRunner 线程,并在 Executor Task Launch Worker 线程池中执行任务。

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")

Spark RDD (Resilient Distributed Dataset)

RDD 是 Spark 的基础数据结构,表示一个不可变的分布式数据集。RDD 在集群中的各个节点上并行计算,并且具有弹性(容错性)和分布式的特性。

RDD 的特性

  • 弹性:RDD 是容错的,丢失的数据可以通过其父 RDD 重新计算。
  • 分布式:RDD 的数据分布在集群的不同节点上,支持分布式计算。
  • 不可修改:RDD 一旦创建,其数据不可修改,这也保证了数据的一致性。
  • 分区:RDD 会被划分为多个分区,以便并行处理。

RDD 的创建方式

(1)并行化:可以通过 SparkContext.parallelize() 方法从一个数据集合创建 RDD。
(2)从外部存储:可以通过 SparkContext.textFile() 等方法从外部存储系统(如 HDFS)加载数据创建 RDD。
(3)从其他 RDD:通过 Spark 的 Transformation 操作从已有的 RDD 创建新的 RDD。

RDD 操作

RDD 支持两种类型的操作:

  • Transformation 操作(转换):如 map()、filter(),返回新的 RDD。
  • Action 操作(行动):如 count()、collect(),触发实际计算并返回结果。

RDD 的容错性

RDD 提供容错能力。当某个节点失败时,可以根据其父 RDD 的计算逻辑恢复丢失的数据。这是通过 DAG(有向无环图)和父 RDD 关系来实现的。

RDD 的持久化

RDD 可以使用 cache() 或 persist() 进行持久化存储,缓存的 RDD 会存储在内存中,若内存不足则溢写到磁盘,避免重复计算。

RDD 的局限性

缺少内置优化引擎:RDD 无法像 DataFrame 和 Dataset 一样利用 Spark 的 Catalyst 优化器进行自动优化。

性能问题:随着数据量增大,RDD 计算的性能可能下降,尤其是与 JVM 垃圾回收和序列化相关的开销。

存储问题:当内存不足时,RDD 会将数据溢写到磁盘,这会导致计算性能大幅下降。

创建 RDD 的例子

  1. 并行化创建 RDD:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = data.map(_ * 2)
result.collect()  // 返回 [2, 4, 6, 8, 10]

  1. 从外部存储创建 RDD:
val rdd = sc.textFile("hdfs://path/to/file")

  1. 从其他 RDD 创建 RDD:
val newRdd = oldRdd.filter(_ > 10)

Spark RDD 缓存机制

Spark RDD 缓存是一种优化技术,用于将中间计算结果存储在内存中,以便在后续操作中复用,从而减少重复计算,提高性能。RDD 缓存可以显著加速一些需要迭代计算的应用,特别是在机器学习和图计算等场景中。

持久化 RDD

持久化操作会将 RDD 的计算结果存储到内存中。这样,每次对 RDD 进行操作时,Spark 会直接使用内存中的数据,而不必重新计算。通过持久化,可以避免重复计算从而提高效率。

  • cache():cache() 是 persist() 的简化方法,默认将 RDD 数据存储在内存中,使用 MEMORY_ONLY
    存储级别。
  • persist():可以通过 persist() 方法选择不同的存储级别,例如 MEMORY_ONLY、DISK_ONLY 等。
  • unpersist():用于移除已缓存的数据,释放内存。

RDD 持久化存储级别

Spark 提供了多种存储级别,每种级别的存储方式不同,根据具体的需求选择合适的存储级别。

存储级别使用空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY默认级别,数据未序列化,全部存储在内存中
MEMORY_ONLY_2数据存储 2 份
MEMORY_ONLY_SER数据序列化存储,占用更少内存
MEMORY_AND_DISK中等部分部分内存不够时,数据溢写到磁盘
MEMORY_AND_DISK_SER部分部分数据序列化,内存不够时溢写到磁盘
DISK_ONLY数据仅存储在磁盘中
OFF_HEAP----存储在堆外内存,目前为试验性选项

副本机制

带有 _2 后缀的存储级别表示在每个节点上缓存数据的副本。副本机制是为了提高容错性。如果某个节点的数据丢失,Spark 可以从其他节点的副本中恢复数据,而不必重新计算。

缓存策略的选择

  • MEMORY_ONLY:适用于内存足够大的场景,避免序列化和磁盘 I/O开销。性能较高,但如果内存不足可能会导致计算失败。
  • MEMORY_ONLY_SER:适用于内存较为紧张的场景,将数据进行序列化后保存在内存中,减少内存占用,但会增加序列化的开销。
  • MEMORY_AND_DISK:适用于内存不足的场景,数据无法完全存储在内存时会溢写到磁盘,确保数据不会丢失。
  • DISK_ONLY:适用于数据量极大的情况,全部数据存储在磁盘中,性能较低,但可以处理大规模数据。

如何使用 Spark RDD 缓存

缓存 RDD:

val rdd = sc.textFile("data.txt")
rdd.cache()  // 使用默认的 MEMORY_ONLY 存储级别

选择存储级别:

rdd.persist(StorageLevel.MEMORY_AND_DISK)  // 选择 MEMORY_AND_DISK 存储级别

清除缓存

rdd.unpersist()  // 移除缓存

Spark 键值对 RDD

Spark 通过 PairRDD 处理键值对类型的数据,提供了多种用于处理键值对数据的转换操作。

  1. 如何创建键值对 RDD
    通过 map 操作将普通 RDD 转换为键值对 RDD。
    Scala 示例:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))

Python 示例:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))

  1. 常见的键值对操作
    reduceByKey(func):对具有相同键的值进行规约操作。
val pairs = sc.parallelize(List((1,2),(3,4),(3,6)))
val result = pairs.reduceByKey((a, b) => a + b)
println(result.collect().mkString(","))

groupByKey():对具有相同键的值进行分组。

val result = pairs.groupByKey()
println(result.collect().mkString(","))

mapValues():对值进行转换操作,但不改变键。

val result = pairs.mapValues(x => x + 1)
println(result.collect().mkString(","))

sortByKey():按键排序。

val sorted = pairs.sortByKey()
println(sorted.collect().mkString(","))

  1. 对两个 RDD 的操作
    join():连接两个 RDD,返回键相同的数据。
val other = sc.parallelize(List((3, 9)))
val joined = pairs.join(other)
println(joined.collect().mkString(","))

leftOuterJoin() 和 rightOuterJoin():左外连接和右外连接,分别确保第一个和第二个 RDD 中的键存在。

val leftJoined = pairs.leftOuterJoin(other)
val rightJoined = pairs.rightOuterJoin(other)
println(leftJoined.collect().mkString(","))
println(rightJoined.collect().mkString(","))

通过合理使用 Spark 的缓存和键值对 RDD 操作,可以显著提升大数据计算的效率,尤其是在迭代计算和需要频繁访问中间数据的场景下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/921789.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

速度革命:esbuild如何改变前端构建游戏 (1)

什么是 esbuild? esbuild 是一款基于 Go 语言开发的 JavaScript 构建打包工具,以其卓越的性能著称。相比传统的构建工具(如 Webpack),esbuild 在打包速度上有着显著的优势,能够将打包速度提升 10 到 100 倍…

Ros Noetic 20.04 跑通mpc_ros包保姆级教程

前言: 本文将简述mpc_ros包在noetic20.04中的安装,mpc是 一种跟踪、MPC_ROS 是一个基于ROS(Robot Operating System)的模型预测控制(Model Predictive Control,MPC)库。该项目旨在为机器人控制提供一个灵活且高效的MPC实现,使得开发者能够在ROS环境中轻松集成和使用MPC…

接上一主题,C++14中如何设计类似于std::any,使集合在C++中与Python一样支持任意数据?

这篇文章的重点是C多态的应用,但是如果你是C新手, 你需要了解以下C知识: 类 构造函数 拷贝构造函数 虚拟函数 纯虚拟函数 析构函数 类的继承 运算符重写 模板类 模板参数 数组 数组的传递 指针与动态内存分配 Python: s …

AndroidStudio与开发板调试时连接失败或APP闪退的解决方案,涉及SELINUX及获取Root权限

现象 用AndroidStudio打开工程代码,点击运行后,报错: 解决方案 具体原因是尝试运行 su(通常用于获取超级用户权限)时失败了,提示 “Permission denied” 通过 CONFIG_SECURITY_SELINUX 变量控制 SElinux 开启或关闭 在vim /rk3568_android_sdk/device/rockchip/rk…

数据结构 (6)栈的应用举例

1. 递归调用 递归函数在执行时,会将每一层的函数调用信息(包括局部变量、参数和返回地址)存储在栈中。当递归函数返回时,这些信息会从栈中弹出,以便恢复之前的执行状态。栈的后进先出(LIFO)特性…

QT 网络编程 数据库模块 TCP UDP QT5.12.3环境 C++实现

一、网络编程 1. 模块引入 QT network 2. 头文件 #include <QTcpServer> //TCP服务端使用 #include <QTcpSocket> //TCP服务器和客户端都使用 3. TCP网络编程流程 1) 服务端 实例化QTcpServer对象----------------------------->socket 进入监听状态…

使用ENSP实现NAT

一、项目拓扑 二、项目实现 1.路由器AR1配置 进入系统试图 sys将路由器命名为R1 sysname R1关闭信息中心 undo info-center enable进入g0/0/0接口 int g0/0/0将g0/0/0接口IP地址配置为12.12.12.1/30 ip address 12.12.12.1 30进入e0/0/1接口 int g0/0/1将g0/0/1接口IP地址配置…

Python的tkinter如何把日志弄进文本框(Text)

当我们用python的Tkinter包给程序设计界面时&#xff0c;在有些时候&#xff0c;我们是希望程序的日志显示在界面上的&#xff0c;因为用户也需要知道程序目前运行到哪一步了&#xff0c;以及程序当前的运行状态是否良好。python的通过print函数打印出来的日志通常显示在后台&a…

flux的版本

1.flux1-dev.safetensors https://huggingface.co/black-forest-labs/FLUX.1-devhttps://huggingface.co/black-forest-labs/FLUX.1-dev原生的23.8G的模型。原生12B的模型,float16的。需要配合ae.safetensors,flux1-dev.safetensors以及clip-l和T5的权重使用,注意ae.sft和f…

阿里云私服地址

1.解压apache-maven-3.6.1-bin 2.配置本地仓库&#xff1a;修改conf/dettings.xml中的<localReoisitory>为一个指定目录。56行 <localRepository>D:\apache-maven-3.6.1-bin\apache-maven-3.6.1\mvn_repo</localRepository> 3.配置阿里云私服&#xff1a;…

【大数据学习 | Spark-Core】yarn-client与yarn-cluster的区别

1. yarn的提交命令 # yarn的提交命令参数 --master yarn #执行集群 --deploy-mode # 部署模式 --class #指定运行的类 --executor-memory #指定executor的内存 --executor-cores # 指定核数 --num-executors # 直接指定executor的数量 --queue # 指定队列 2. yarn-client模式…

【汽车制动】汽车制动相关控制系统

目录 1.ABS (Anti-lock Brake System&#xff0c;防抱死制动系统) 2.EBD&#xff08;Electronic Brake-force Distribution&#xff0c;电子制动力分配系统&#xff09; 3.TCS&#xff08;Traction Control System&#xff0c;牵引力控制系统&#xff09; 4.VDC&#xff08…

《TCP/IP网络编程》学习笔记 | Chapter 15:套接字与标准 I/O

《TCP/IP网络编程》学习笔记 | Chapter 15&#xff1a;套接字与标准 I/O 《TCP/IP网络编程》学习笔记 | Chapter 15&#xff1a;套接字与标准 I/O标准 I/O 函数标准 I/O 函数的两个优点标准 I/O 函数和系统函数之间的性能对比标准 I/O 函数的几个缺点 使用标准 I/O 函数利用 fd…

<OS 有关> ubuntu 24 不同版本介绍 安装 Vmware tools

原因 想用 apt-get download 存到本地 / NAS上&#xff0c;减少网络流浪。 看到 VMware 上的确实有 ubuntu&#xff0c;只是版本是16。 ubuntu 版本比较&#xff1a;LTS vs RR LTS: Long-Term Support 长周期支持&#xff0c; 一般每 2 年更新&#xff0c;会更可靠与更稳定…

支持多种快充协议和支持多种功能的诱骗取电协议芯片

汇铭达XSP15是一款应用于手持电动工具、智能家居、显示器、音箱等充电方案的大功率快充协议芯片&#xff0c;支持最大功率100W给设备快速充电&#xff0c;大大缩短了充电时间。芯片支持通过UART串口发送电压/电流消息供其它芯片读取。支持自动识别连接的是电脑或是充电器。支持…

【一篇搞定配置】网络分析工具WireShark的安装与入门使用

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;各种软件安装与配置_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1.…

JavaWeb之综合案例

前言 这一节讲一个案例 1. 环境搭建 然后就是把这些数据全部用到sql语句中执行 2.查询所有-后台&前台 我们先写后台代码 2.1 后台 2.2 Dao BrandMapper&#xff1a; 注意因为数据库里面的名称是下划线分割的&#xff0c;我们类里面是驼峰的&#xff0c;所以要映射 …

【STM32】MPU6050初始化常用寄存器说明及示例代码

一、MPU6050常用配置寄存器 1、电源管理寄存器1&#xff08; PWR_MGMT_1 &#xff09; 此寄存器允许用户配置电源模式和时钟源。 DEVICE_RESET &#xff1a;用于控制复位的比特位。设置为1时复位 MPU6050&#xff0c;内部寄存器恢复为默认值&#xff0c;复位结束…

隐私友好型分析平台Plausible Analytics

什么是 Plausible Analytics &#xff1f; Plausible Analytics 是一个简单、轻量级&#xff08;小于1KB&#xff09;、开源且隐私友好的网站分析工具&#xff0c;旨在作为 Google Analytics 的替代品。它不使用 cookies 并且完全符合 GDPR、CCPA 和 PECR 法规&#xff0c;因此…

Flutter:RotationTransition旋转动画

配置vsync&#xff0c;需要实现一下with SingleTickerProviderStateMixinclass _MyHomePageState extends State<MyHomePage> with SingleTickerProviderStateMixin{// 定义 AnimationController late AnimationController _controller;overridevoid initState() {super…