第1关:集合并行化创建RDD
任务描述
本关任务:编写一个集合并行化创建RDD的程序。
相关知识
为了完成本关任务,你需要掌握:1.如何使用集合并行化创建一个Spark RDD 。
什么是 RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
简单的来说RDD就是一个集合,一个将集合中数据存储在不同机器上的集合。
RDD直观图,如下:
RDD 的 五大特性
一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
相关API介绍
SparkContext创建;
sc = SparkContext("local", "Simple App")
说明:"local" 是指让Spark程序本地运行,"Simple App" 是指Spark程序的名称,这个名称可以任意(为了直观明了的查看,最好设置有意义的名称)。
集合并行化创建RDD;
data = [1,2,3,4]
rdd = sc.parallelize(data)
collect算子:在驱动程序中将数据集的所有元素作为数组返回(注意数据集不能过大);
rdd.collect()
停止SparkContext。
sc.stop()
编程要求
根据提示,在右侧编辑器begin-end处补充代码,完成Spark RDD创建。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple RDD App")
# 2.创建一个1到8的列表List
data = list(range(1, 9))
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
rdd_content = rdd.collect()
# 5.打印 rdd 的内容
print(rdd_content)
# 6.停止 SparkContext
sc.stop()
#********** End **********#
第2关:读取外部数据集创建RDD
任务描述
本关任务:编写读取本地文件创建Spark RDD的程序。
相关知识
为了完成本关任务,你需要掌握:1.如何读取本地文件系统中的文件来创建Spark RDD。
textFile 介绍
PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
文本文件RDD可以使用创建SparkContex的textFile方法。此方法需要一个 URI的文件(本地路径的机器上,或一个hdfs://,s3a:// 等 URI),并读取其作为行的集合。这是一个示例调用:
distFile = sc.textFile("data.txt")
编程要求
根据提示,在右侧编辑器begin-end处补充代码,完成读取本地文件系统的文件并创建Spark RDD 。
测试说明
平台会对你编写的代码进行测试,若是与预期输出相同,则算通关。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == '__main__':
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
# 2.读取本地文件,URI为:/root/wordcount.txt
raw = sc.textFile("/root/wordcount.txt")
rdd = raw.map(lambda x:x)
# 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
rdd.collect()
# 4.打印 rdd 的内容
print(rdd.collect())
# 5.停止 SparkContext
sc.stop()
#********** End **********#