1. 什么是RDD?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。在代码中,RDD是一个抽象类,他代表着一个弹性的、不可变的、可分区的、里面的元素可并行计算的集合。注意,RDD只是封装了计算逻辑,并不保存数据。RDD是一个抽象类,需要子类去实现。不可变指的是计算逻辑不可变,如果想要改变,则要产生新的RDD。
2. 五大核心属性
源码中五大属性介绍如下
1)分区列表
分区的主要目的是实现并行计算/分布式计算
2)分区计算函数
以分区为单位,进行计算,每个分区的计算函数都是一样的
3)RDD之间的依赖关系
一个RDD能够转换成另一个RDD,形成一种包装的依赖关系
4)分区器
负责如何划分分区,分区器是Option属性,可能有,可能没有
5)计算每个分区的首选位置
数据存储的节点和数据计算节点可能不一样,判断计算发给哪个节点更好,移动数据不如移动计算
3. 执行原理
Spark框架在执行计算时,先申请资源,然后将数据处理逻辑分解成一个个计算任务,然后将计算任务发送到已经分配资源的计算节点上,按照指定的计算模型进行计算。以Yarn集群环境为例:
其中,Yarn只是负责资源调度的,而NodeManager中的Driver才是负责任务调度的,而NodeManager中的Executor是负责任务执行的。
4. 从集合中创建RDD
通过parallelize和makeRDD方法
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val seq = Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] = sc.parallelize(seq)
val rdd : RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
sc.stop()
其中local[*]表示使用当前本机的核数,如果不写[*]就用单核。parallelize和makeRDD方法本质是一样的,makeRDD方法内部调用了parallelize方法。
makeRDD可以加上第二个参数,表示分区数量,如果不传,会使用默认值scheduler.conf.getInt("spark.default.parallelism", totalCores),即会从sparkConf中获取配置参数,如果没配置,则使用totalCores,即当前环境最大核数。当然,这是针对本地模式的源码分析。
另外使用saveAsTextFile保存每个分区的文件。
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val seq = Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] = sc.parallelize(seq)
val rdd : RDD[Int] = sc.makeRDD(seq, 2)
rdd.saveAsTextFile("output")
rdd.collect().foreach(println)
sc.stop()
结果如下(2个分区):
可以设置sparkConf中的分区数量配置参数为5
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
sparkConf.set("spark.default.parallelism", "5")
val sc = new SparkContext(sparkConf)
val seq = Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] = sc.parallelize(seq)
val rdd : RDD[Int] = sc.makeRDD(seq)
rdd.saveAsTextFile("output")
rdd.collect().foreach(println)
sc.stop()
结果如下
分区数据的划分可以参考
036 RDD-集合数据源-分区数据的分配
5. 从文件中创建RDD
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val seq = Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] = sc.parallelize(seq)
val rdd : RDD[String] = sc.textFile("path")
rdd.collect().foreach(println)
sc.stop()
path可以是文件夹,也可以是文件 ,还可以加上通配符*。另外,path可以是分布式文件系统的路径。这里的textFile是以行为单位进行读取数据,不考虑数据来自于哪个文件。如果需要考虑数据来源于哪个文件,则需要用到wholeTextFiles方法。
val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val seq = Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] = sc.parallelize(seq)
val rdd : RDD[String] = sc.wholeTextFiles("path")
rdd.collect().foreach(println)
sc.stop()
读取结果形式类似如下:
可以看出,是以文件为单位进行读取,文件全路径名称和文件内容以逗号隔开。
textFile也可以通过第二个参数指定分区数量,如果不传,默认为min(scheduler.conf.getInt("spark.default.parallelism", totalCores), 2),但是第二个参数并不完全是最终分区的数量,这里只是表示最小分区数,实际分区数量可能比这个值要大。实际分区数量怎么计算可以考037 RDD-文件数据源-分区的设定。分区数据的划分可参考038 RDD-文件数据源-分区数据的分配和039 RDD-文件数据源-分区数据的分配-案例分析