1、Spark的RDD算子
RDD算子的概念和分类
1、1 Transformation算子
定义:RDD算子,返回值仍是一个RDD的,称之为转换算子
特性:这类算子是lazy懒加载的。如果没有Action算子,转换算子是不工作的。
1、2 Action算子
定义:返回值不是RDD的就是Action算子。
1、3 总结
对于这2类算子来讲,Transformation算子相当于在构建工作执行计划,action相当于是一个指令让这个执行计划开始工作。
如果没有action算子,Transformation算子之间的迭代关系,就相当于一个没有通电的流水线,只有action到来,这个数据处理的流水线才开始工作。
2、常用Transformation算子
2、1 map算子
功能:将RDD的数据一条一条的处理(基于map算子中接收的处理函数),返回新的RDD。
words_rdd.map(lambda x:(x,1))
2、2 flatMap算子
功能:对rdd执行map操作,然后进行接触嵌套操作。如:
file_rdd.flatMap(lambda line:line.split(" "))
实例:
嵌套的list:lst=[[1,2,3],[4,5,6],[7,8,9]]
接触嵌套的list:lst=[1,2,3,4,5,6,7,8,9]
2、3 reduceByKey算子
功能:针对KV型RDD,自动按照key分组,根据你提供的聚合逻辑,完成组内数据(value)的聚合
2、4 mapValues算子
功能:针对二元元组RDD,对其内部的二元元组的value执行map操作
2、5 groupBy算子
功能:将RDD的数据进行分组
2、6 filter算子
功能:过滤不想要的数据,保留想要的数据
2、7 distinct算子
功能:对RDD数据进行去重,返回一个RDD
2、8 union算子
功能:2个RDD合并返回一个RDD
2、9 join算子
功能:对2个RDD执行join操作(可实现SQL内\外连接),只能用于KV型二元元组。按照二元元组的key进行关联
2、10 intersection算子
功能:求2个RDD的交集,返回1个新的RDD
2、11 glom算子
功能:将RDD的数据加上嵌套,这个嵌套按照分区来进行。如:[1,2,3,4,5]嵌套后[[1,2,3],[4,5]]
2、12 groupByKey算子
功能:针对KV型RDD,自动按照key进行分组
2、13 常见面试题:groupByKey和reduceByKey区别:
2、13、1 功能上区别
2、13、1、1groupByKey仅有分组功能
2、13、1、2reduceByKey除了又ByKey的分组功能,还有reduce的聚合功能
2、13、2 性能上区别
2、13、2 、1 如果对数据进行分组+聚合,reduceByKey性能要优于groupByKey。主要原因是reduceByKey自带聚合逻辑
先在分区内做预聚合
然后再走分组流程(shuffle)
分组后再做最终聚合
2、13、2 、2 数据越大,reduceByKey性能优势越明显
2、14 sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
sortByKey算子
功能:针对KV型RDD,按照Key进行排序
3、常用Action算子
3、·1 countByKey算子
功能:统计key出现的次数(一般适用于KV型RDD)
3、2 collect算子
功能:将RDD各个分区内的数据,统一收集到driver中,形成一个list对象
用法:rdd.collect()
3、3 reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
3、4 fold算子
功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的
3、5 first算子
功能:取出RDD的第一个元素
3、6 take算子
功能:取RDD的前N个元素,组合成list返回给你
3、7 top算子
功能:对RDD数据集进行降序排序,取前N个
3、8 count算子
功能:计算RDD有多少条数据,返回值是一个数字
3、9 takeSample算子
功能:随机抽样RDD的数据
3、10 takeOrdered算子
功能:对RDD进行排序取前N个
3、11 foreach算子
功能:对RDD中的每一个元素,执行你提供的逻辑操作(类似map),但没有返回值
3、12 saveAsTextFile算子
功能:将RDD的数据写入到文本文件中
4、分区操作算子
4、1 分区mapPartitions算子
功能:mapPartition算子一次被传递的是一整个分区数据,作为一个迭代器对象传递过来。而map每次被传递是一条数据,性能较map比较有优势
4、2 foreachPartition算子
功能:和foreach算子一样,一次处理一整个分区的数据
4、3 partitionBy算子
功能:对RDD进行自定义分区操作
4、4 repartition算子
功能:对RDD的分区执行重新分区(仅数量)
5、RDD的持久化
RDD数据是过程数据
RDD之间相互迭代计算(Transformation的转换),当执行开启后,新的RDD生成,代表旧的RDD将消失
RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成就不见了
这个特性可以最大化利用计算机资源,老旧RDD不用了就会从内存中被清理,为后续计算腾出内存资源
RDD的缓存
checkPoint技术
将RDD数据缓存起来,仅支持硬盘存储
它被设计认为是安全的
不保留血缘关系
checkPoint存储RDD数据,是集中收取各个分区的数据进行存储,而缓存是分散存储
缓存和checkPoint对比
checkPoint不管分区数量是多少,风险是一样的;缓存的分区越多,风险越高
checkPoint支持写入hdfs,而缓存不行,hdfs是高可靠存储,被认为是安全的
checkPoint不支持写内存,缓存可以,缓存如果写内存性能要优于checkPoint
checkPoint设计被认为是安全的,不保留血缘关系,而缓存被认为是不安全的所以保留
6、共享变量
广播变量
将本地变量list标记为广播变量即可
broadcast = sc.broadcast(std_info_list)
使用广播变量,从broadcast对象中取出本地list对象即可
value = broadcast.value
注意:也就是先放进去broadcast内部,然后再从broadcast内部取出来使用,中间传输的就是broadcast这个对象了。只要传输的是broadcast对象,spark就会留意,只会给每个executor发一份,而不是给每个分区都发。
场景:本地集合对象和分布式集合对象(RDD)进行关联的时候,需要将本地集合对象共享为广播变量,可以节省:网络I/O次数和executor的内存
累加器变量
构建方法:acmlt = sc.accumulator(0)
总结
广播变量解决什么问题?
分布式集合RDD和本地集合关联的时候,降低内存占用及减少网络I\O传输,提高性能
累加器解决什么问题?
分布式代码执行中,进行全局累加
7、DAG
有向无环图
一个Action算子会产生一个DAG,如果在代码中有3个Action就会产生3个DAG,如下图:
一个Action算子产生一个Job(一个应用程序内的子任务),每个Job都有各自的DAG
宽窄依赖和阶段划分
宽窄依赖
窄依赖:父RDD的一个分区,全部将数据分发给子RDD的一个分区
宽依赖(shuffle):父RDD的一个分区,将数据分发给子RDD的多个分区
阶段划分
根据DAG,会按照宽依赖,划分不同的阶段。
划分依据:从后向前,遇到宽依赖,就划分出一个阶段,称之为stage。
内存迭代计算
Spark是怎么做内存计算的?DAG的作用?Stage划分的作用?
Spark会产生DAG图
DAG图会基于分区和宽窄依赖关系划分Stage阶段
一个阶段内部都是窄依赖,窄依赖内部如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道
这些内存迭代计算的管道,就是一个个具体执行的task
一个task就是一个具体的线程,一个任务跑在一个线程内,就是走内存计算了。
Spark为什么比MapReduce计算快?
Spark算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce编程模型很难在一套MR中处理复杂的任务,需要编写很多套串联的MR任务,多个MR串联通过磁盘交互数据
Spark可以执行内存迭代计算,算子之间形成的DAG基于宽依赖划分阶段后,在阶段内部形成内存迭代计算管道,但是MapReduce的Map和Reduce之间的交互依然是通过磁盘来交互的
Spark并行度
Spark的并行:同一时间内,有多少个task同时执行
并行度设置:代码中、配置文件中及提交程序的客户端参数中,优先级从高到低:代码中->客户端提交参数中->配置文件中。默认是1,但不会全部以1来跑,多数时候会根据读取文件的分片数量来作为默认并行度。
spark.default.parallelism = 100
并行度设置:建议设置为CPU核心的2-10倍
Spark任务调度
Driver被构建出来
构建Sparkcontext(执行环境入口对象)
基于DAG Scheduler(DAG调度器)构建逻辑Task分配
基于Task Scheduler(Task调度区)将逻辑Task分配到各个Executor上干活并监控他们
Worker(Executor)被Task Scheduler管理监控,听从他们的指令干活,并定期汇报进度