一、Spark 分布式计算模拟
Driver 端将数据拆分成 n 个 Task 发送给 Executor,n 为 Executor 个数,Task 包含数据和计算逻辑,Executor 接收到 Task 后进行计算并将计算后的结果返回给 Driver
-
定义封装整体数据和逻辑的资源类
class Resource extends Serializable { // 数据集 val datas: List[Int] = List(1, 2, 3, 4) // 计算逻辑 val logic: Int => Int = _ * 2 }
-
定义 Task 类
class Task extends Serializable { // 持有的数据 var data: List[Int] = _ // 持有的逻辑 var logic: Int => Int = _ // 计算方法 def compute() = { data.map(logic) } }
-
定义 Driver 类
/* 负责与 Executor 通信并将准备好的 Task 发送给 Executor */ object Driver { def main(args: Array[String]): Unit = { // 1.建立与 Executor 的连接 val client1 = new Socket("localhost", 8888) val client2 = new Socket("localhost", 9999) // 2.封装 Task val resource = new Resource() val task1 = new Task() task1.data = resource.datas.take(2) task1.logic = resource.logic val task2 = new Task() task2.data = resource.datas.takeRight(2) task2.logic = resource.logic // 3.发送 Task val objOut1 = new ObjectOutputStream(client1.getOutputStream) objOut1.writeObject(task1) objOut1.close() objOut1.flush() client1.close() val objOut2 = new ObjectOutputStream(client2.getOutputStream) objOut2.writeObject(task2) objOut2.close() objOut2.flush() client2.close() println("客户端数据发送完毕") } }
-
定义两个 Executor 类
/* 负责接收 Driver 发送过来的 Task 并计算出结果 */ object Executor1 { def main(args: Array[String]): Unit = { // 1.启动服务并等待客户端连接 val server = new ServerSocket(8888) println("服务端[8888]启动,等待客户端连接...") val client = server.accept() // 2.接收 Task val objIn = new ObjectInputStream(client.getInputStream) val task = objIn.readObject() // 3.执行计算并输出结果 val result = task.compute() println("Executor[8888]计算结果为:" + result) objIn.close() client.close() server.close() } }
/* 负责接收 Driver 发送过来的 Task 并计算出结果 */ object Executor2 { def main(args: Array[String]): Unit = { // 1.启动服务并等待客户端连接 val server = new ServerSocket(9999) println("服务端[9999]启动,等待客户端连接...") val client = server.accept() // 2.接收 Task val objIn = new ObjectInputStream(client.getInputStream) val task = objIn.readObject() // 3.执行计算并输出结果 val result = task.compute() println("Executor[9999]计算结果为:" + result) objIn.close() client.close() server.close() } }
二、RDD 介绍
Resilient Distributed Dataset,简称 RDD,弹性分布式数据集
1. 概念
- RDD 是 Spark 中最基本的数据处理模型,在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
- 弹性的特点:
- 存储的弹性:内存与磁盘的自动切换
- 容错的弹性:数据丢失可以自动恢复
- 计算的弹性:计算出错重试机制
- 分片的弹性:可根据需要重新分片
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
2. 实现原理
2.1 IO 的基本实现原理
IO 的实现体现了装饰者设计模式思想,实现了对类的功能的增强
-
字节流
InputStream in = new FileInputStream("filePath"); int i = -1; while((i = in.read()) != -1) { // 每读取一个字节就打印输出一个字节 System.out.println(i); }
-
缓冲字节流
InputStream in = new BufferedInputStream(new FileInputStream("filePath")); int i = -1; while((i = in.read()) != -1) { // 每读取一个字节就放进缓存中,当超过缓存阈值就全部输出 System.out.println(i); }
-
字符流
Reader in = new BufferedReader(new InputStreamReader(new FileInputStream("filePath"), "UTF-8")); String s = null; while((s = in.readLine()) != null) { // 每读取一个字节就放入转换区,满足大小就转换成一个字符放入缓存区,超过缓存区阈值就将全部字符输出 System.out.println(s); }
2.2 RDD 与 IO 的关系
// 以 wordCount 案例
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map((_, 1))
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val wordArray: Array[(String, Int)] = wordToCount.collect()
wordArray.foreach(println)
- RDD 的数据处理方式类似于 IO 流,也包含了装饰者设计模式思想
- RDD 的数据只有在调用 collect 方法时才会真正地执行业务逻辑操作,之前的操作都是对 RDD 功能的扩展
- RDD 是不保存数据的,但是 IO 流的缓存区可以临时保存一部分数据
3. 核心属性
-
分区列表:用于执行任务时并行计算,是实现分布式计算的重要属性
protected def getPartitions: Array[Partition]
-
分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算
def compute(split: Partition, context: TaskContext): Iterator[T]
-
RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
-
分区器:可选,当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
@transient val partitioner: Option[Partitioner] = None
-
首选位置:可选,计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算,可以判断发送到哪个节点计算效率最优
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
4. 执行原理
- 启动 Yarn 集群环境(ResourceManager 和 NodeManager)
- Spark 通过申请资源创建调度节点(Driver)和计算节点(Executor)
- Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(task)
- 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
- 总结:RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算
5. RDD 创建
object TestRDDCreate {
def main(args: Array[String]): Unit = {
// 1.创建 spark 连接对象
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// 2.创建 RDD
// 2.1 从集合(内存)中创建 RDD:parallelize() 和 makeRDD()
val seq: Seq[Int] = Seq[Int](1,2,3,4)
val rdd1: RDD[Int] = sc.parallelize(seq)
val rdd2: RDD[Int] = sc.makeRDD(seq) // 推荐,makeRDD底层实现是调用parallelize方法
rdd1.collect.foreach(println)
rdd2.collect.foreach(println)
println("=================")
// 2.2 从外部存储(文件)创建 RDD:本地的文件系统、Hadoop支持的数据集(如HDFS、HBase等)
// 2.2.1 使用本地文件的绝对路径或相对路径(相对于项目的根目录)创建
val rdd3: RDD[String] = sc.textFile("D:\\data\\1.txt")
// val rdd3: RDD[String] = sc.textFile("input/1.txt")
// 2.2.2 使用目录创建(读取目录下所有文件)
val rdd4 = sc.textFile("D:\\data") // 以行为单位读取,结果是内容字符串
val rdd41 = sc.wholeTextFiles("D:\\data") // 以文件为单位读取,结果是二元组,第一个元素为文件路径,第二个元素为文件内容
// 2.2.3 使用路径通配符创建
val rdd5 = sc.textFile("input/1*.txt")
// 2.2.4 使用分布式文件系统路径
val rdd6 = sc.textFile("hdfs://hadoop102:8020/data/word.txt")
println("=================")
// 2.3 从其他 RDD 创建:通过一个 RDD 运算完后,再产生新的 RDD
// 2.4 使用 new 直接创建 RDD:一般由 Spark 框架自身使用
// 3.关闭连接
sc.stop()
}
}
6. RDD 并行度与分区
6.1 分区规则
-
RDD 有分区列表属性,可以将一个作业切分成多个任务后,发送给不同的 Executor 节点并行计算,而能够同时计算的任务数量称之为并行度
-
RDD 在创建时可以指定分区个数
// makeRDD 方法的第二个参数表示分区的数量,其默认值为 defaultParallelism 方法的返回值; // defaultParallelism 方法底层最终执行返回的是 scheduler.conf.getInt("spark.default.parallelism", totalCores) 的返回值; // 首先会在 SparkConf 中获取 key="spark.default.parallelism" 的值,获取到则返回;如果获取不到则返回 totalCores 的值,即当前运行环境(机器)的最大可用 CPU 核数 val rdd1 = sc.makeRDD(List(1,2,3,4), 2) // 将数据保存成对应个数的分区文件 rdd1.saveAsTextFile("output1") // 目录下有 2 个分区文件 // textFile 方法的第二个参数表示最小的分区数量;其默认值为 defaultMinPartitions 方法的返回值; // defaultMinPartitions 方法实现为:math.min(defaultParallelism, 2),如果没指定 "spark.default.parallelism" 的值,则最小分区数为 2 // 可以通过第二个参数指定最小分区数 // 由于 spark 读取文件底层使用的是 Hadoop 的 TextInputFormat,所以其分区计算使用的是 TextInputFormat 的 getSplits 方法: // 首先获取文件的总字节数:totalSize (如 7 字节);再根据指定的最小分区数计算出每个分区存储的字节大小:goalSize = totalSize/(numSplits == 0 ? 1 : numSplits) = 7/2 = 3;再根据 1.1 倍原则判断剩余的大小是否需要创建新分区:7 - 7/3 = 1,由于 1/3 + 1 > 1.1,所以需要创建新分区,即 7/3 + 1 = 3 个分区 val rdd2 = sc.textFile("input/1.txt", 2) rdd2.saveAsTextFile("output2") // 目录下有 3 个分区文件
-
建立 Spark 连接时可以指定并行度配置
// local 表示单核运行;local[n] 表示指定核数运行;local[*] 表示最大核数运行 val sparkConf = new SparkConf.setMaster("local[*]").setAppName("spark") sparkConf.set("spark.default.parallelism", "5") val sc = new SparkContext(sparkConf)
6.2 分区数据分配规则
-
makeRDD 创建:集合(内存)数据
// 以 5 个元素的集合和 3 个分区创建 RDD val list = List(1,2,3,4,5) val rdd = sc.makeRDD(list, 3) // makeRDD 底层调用 parallelize parallelize(list, 3) // parallelize 中创建 ParallelCollectionRDD new ParallelCollectionRDD(.., list, 3, ..) // ParallelCollectionRDD 中有核心属性分区列表 def getPartitions: Array[Partition] = { // 调用 slice 方法 slice(list, 3) } // slice 方法中有分配的核心方法 positions def positions(length: Int, numSlice: Int): Iterator[(Int,Int)] = { // 5, 3 (0 until numSlices).iterator.map { // (0, 1, 2) i => { // 0 -- 1 -- 2 val start = ((i * length) / numSlices).toInt // 0 -- 1 -- 3 val end = (((i + 1) * length) / numSlices).toInt // 1 -- 3 -- 5 (start, end) // (0,1) -- (1,3) -- (3,5) } } } // slice 中会对数据集进行类型模式匹配判断 case _ => { // 调用 position 方法再映射 positions(list.toArray.length, 3).map { // 5, 3 case (start,end) => list.toArray.slice(start, end) // slice(from,until) // (0,1) -> (1,2,3,4,5) -> 1 // (1,3) -> (1,2,3,4,5) -> 2,3 // (3,5) -> (1,2,3,4,5) -> 4,5 } } // 结论:3 个分区文件中的数据分配:【1】,【2,3】,【4,5】
-
textFile 创建:文件数据
/** 读取的文件内容:1.txt,@表示换行符的位置 1@@ 2@@ 3 */ // 创建文件读取的 RDD val rdd = sc.textFile("input/1.txt", 2) // 由于文件为 7 字节,所以分区数为 3,每个分区存储 3 字节 // 1.Spark文件读取是使用 hadoop 的方式以行为单位读取,与字节数无关 // 2.Spark是以偏移量的形式来读取一行数据,且偏移量不会被重复读取 /* 1@@ -> 偏移量:012 2@@ -> 偏移量:345 3 -> 偏移量:6 */ // 3.每个分区所包含的偏移量范围:(起始偏移量 ~ 起始偏移量 + 分区字节数) /* 分区0:[0 - 0 + 3 = 3] -> 1@@2 分区1:[3 - 3 + 3 = 6] -> 3 分区2:[6 - 6 + 3 = 9] -> */ // 结论:3 个分区文件中的数据分配:【1, 2】,【3】,【】