Spark核心组件解析:Executor、RDD与缓存优化
Spark Executor
Executor 是 Spark 中用于执行任务(task)的执行单元,运行在 worker 上,但并不等同于 worker。实际上,Executor 是一组计算资源(如 CPU 核心和内存)的集合,多个 executor 共享 worker 上的 CPU 和内存资源。
Executor 的功能
-
任务执行:Executor 负责执行分配给它的任务,并返回结果到 driver 程序。
-
缓存机制:如果应用程序调用了 cache() 或 persist() 函数,Executor 会通过 Block Manager 为RDD 提供缓存机制,优化重复计算。
-
生命周期:Executor 存在于整个 Spark 应用的生命周期内。
Executor 的创建
Spark 在以下几种情况下创建 Executor:
- 当资源管理器为 Standalone 或 YARN,且 CoarseGrainedExecutorBackend 进程接收到
RegisteredExecutor 消息时; - 当使用 Mesos 资源管理器时,MesosExecutorBackend 进程注册时;
- 在本地模式下,当 LocalEndpoint 被创建时。
创建成功后,日志会显示如下信息:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
心跳发送线程
Executor 会定期向 driver 发送心跳信号以确保连接活跃。心跳线程通常是一个调度线程池,利用 ScheduledThreadPoolExecutor 来维持任务的实时性。
执行任务
Executor 通过 launchTask 方法来执行任务。这个方法会创建一个 TaskRunner 线程,并在 Executor Task Launch Worker 线程池中执行任务。
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
Spark RDD (Resilient Distributed Dataset)
RDD 是 Spark 的基础数据结构,表示一个不可变的分布式数据集。RDD 在集群中的各个节点上并行计算,并且具有弹性(容错性)和分布式的特性。
RDD 的特性
- 弹性:RDD 是容错的,丢失的数据可以通过其父 RDD 重新计算。
- 分布式:RDD 的数据分布在集群的不同节点上,支持分布式计算。
- 不可修改:RDD 一旦创建,其数据不可修改,这也保证了数据的一致性。
- 分区:RDD 会被划分为多个分区,以便并行处理。
RDD 的创建方式
(1)并行化:可以通过 SparkContext.parallelize() 方法从一个数据集合创建 RDD。
(2)从外部存储:可以通过 SparkContext.textFile() 等方法从外部存储系统(如 HDFS)加载数据创建 RDD。
(3)从其他 RDD:通过 Spark 的 Transformation 操作从已有的 RDD 创建新的 RDD。
RDD 操作
RDD 支持两种类型的操作:
- Transformation 操作(转换):如 map()、filter(),返回新的 RDD。
- Action 操作(行动):如 count()、collect(),触发实际计算并返回结果。
RDD 的容错性
RDD 提供容错能力。当某个节点失败时,可以根据其父 RDD 的计算逻辑恢复丢失的数据。这是通过 DAG(有向无环图)和父 RDD 关系来实现的。
RDD 的持久化
RDD 可以使用 cache() 或 persist() 进行持久化存储,缓存的 RDD 会存储在内存中,若内存不足则溢写到磁盘,避免重复计算。
RDD 的局限性
缺少内置优化引擎:RDD 无法像 DataFrame 和 Dataset 一样利用 Spark 的 Catalyst 优化器进行自动优化。
性能问题:随着数据量增大,RDD 计算的性能可能下降,尤其是与 JVM 垃圾回收和序列化相关的开销。
存储问题:当内存不足时,RDD 会将数据溢写到磁盘,这会导致计算性能大幅下降。
创建 RDD 的例子
- 并行化创建 RDD:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = data.map(_ * 2)
result.collect() // 返回 [2, 4, 6, 8, 10]
- 从外部存储创建 RDD:
val rdd = sc.textFile("hdfs://path/to/file")
- 从其他 RDD 创建 RDD:
val newRdd = oldRdd.filter(_ > 10)
Spark RDD 缓存机制
Spark RDD 缓存是一种优化技术,用于将中间计算结果存储在内存中,以便在后续操作中复用,从而减少重复计算,提高性能。RDD 缓存可以显著加速一些需要迭代计算的应用,特别是在机器学习和图计算等场景中。
持久化 RDD
持久化操作会将 RDD 的计算结果存储到内存中。这样,每次对 RDD 进行操作时,Spark 会直接使用内存中的数据,而不必重新计算。通过持久化,可以避免重复计算从而提高效率。
- cache():cache() 是 persist() 的简化方法,默认将 RDD 数据存储在内存中,使用 MEMORY_ONLY
存储级别。 - persist():可以通过 persist() 方法选择不同的存储级别,例如 MEMORY_ONLY、DISK_ONLY 等。
- unpersist():用于移除已缓存的数据,释放内存。
RDD 持久化存储级别
Spark 提供了多种存储级别,每种级别的存储方式不同,根据具体的需求选择合适的存储级别。
存储级别 | 使用空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | 默认级别,数据未序列化,全部存储在内存中 |
MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存储 2 份 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 数据序列化存储,占用更少内存 |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 内存不够时,数据溢写到磁盘 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 数据序列化,内存不够时溢写到磁盘 |
DISK_ONLY | 低 | 高 | 否 | 是 | 数据仅存储在磁盘中 |
OFF_HEAP | - | - | - | - | 存储在堆外内存,目前为试验性选项 |
副本机制
带有 _2 后缀的存储级别表示在每个节点上缓存数据的副本。副本机制是为了提高容错性。如果某个节点的数据丢失,Spark 可以从其他节点的副本中恢复数据,而不必重新计算。
缓存策略的选择
- MEMORY_ONLY:适用于内存足够大的场景,避免序列化和磁盘 I/O开销。性能较高,但如果内存不足可能会导致计算失败。
- MEMORY_ONLY_SER:适用于内存较为紧张的场景,将数据进行序列化后保存在内存中,减少内存占用,但会增加序列化的开销。
- MEMORY_AND_DISK:适用于内存不足的场景,数据无法完全存储在内存时会溢写到磁盘,确保数据不会丢失。
- DISK_ONLY:适用于数据量极大的情况,全部数据存储在磁盘中,性能较低,但可以处理大规模数据。
如何使用 Spark RDD 缓存
缓存 RDD:
val rdd = sc.textFile("data.txt")
rdd.cache() // 使用默认的 MEMORY_ONLY 存储级别
选择存储级别:
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 选择 MEMORY_AND_DISK 存储级别
清除缓存
rdd.unpersist() // 移除缓存
Spark 键值对 RDD
Spark 通过 PairRDD 处理键值对类型的数据,提供了多种用于处理键值对数据的转换操作。
- 如何创建键值对 RDD
通过 map 操作将普通 RDD 转换为键值对 RDD。
Scala 示例:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
Python 示例:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
- 常见的键值对操作
reduceByKey(func):对具有相同键的值进行规约操作。
val pairs = sc.parallelize(List((1,2),(3,4),(3,6)))
val result = pairs.reduceByKey((a, b) => a + b)
println(result.collect().mkString(","))
groupByKey():对具有相同键的值进行分组。
val result = pairs.groupByKey()
println(result.collect().mkString(","))
mapValues():对值进行转换操作,但不改变键。
val result = pairs.mapValues(x => x + 1)
println(result.collect().mkString(","))
sortByKey():按键排序。
val sorted = pairs.sortByKey()
println(sorted.collect().mkString(","))
- 对两个 RDD 的操作
join():连接两个 RDD,返回键相同的数据。
val other = sc.parallelize(List((3, 9)))
val joined = pairs.join(other)
println(joined.collect().mkString(","))
leftOuterJoin() 和 rightOuterJoin():左外连接和右外连接,分别确保第一个和第二个 RDD 中的键存在。
val leftJoined = pairs.leftOuterJoin(other)
val rightJoined = pairs.rightOuterJoin(other)
println(leftJoined.collect().mkString(","))
println(rightJoined.collect().mkString(","))
通过合理使用 Spark 的缓存和键值对 RDD 操作,可以显著提升大数据计算的效率,尤其是在迭代计算和需要频繁访问中间数据的场景下。