第1章 RDD概述
1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD代表的是弹性、可分区、不可变、元素可并行计算的计算。
- 1. 弹性
- 存储的弹性: 如果内存充足,中间结果会全部保存在内存中,如果内存不足,数据一部分保存在内存中,一部分保存在磁盘中。
计算的弹性: 如果计算出错会自动重试
容错的弹性: 如果RDD数据丢失可以根据依赖关系+封装的计算逻辑重新读取数据重新计算得到数据。
分区的弹性: RDD的分区可以自动根据文件的切片动态生成。 - 2. 可分区
- spark是分布式计算框架,Spark根据文件的切片生成分区,一个切片对应一个分区,后续每个分区计算逻辑是一样,处理的数据不一样,每个分区间是并行的。
- 3. 不可变
- RDD中只封装了数据的处理逻辑,如果想要重新改变数据只能生成新的RDD。
- 4. 可并行计算
- 每个分区计算逻辑是一样,处理的数据不一样,每个分区间是并行的。
- 5. 不存储数据
- RDD中只封装了数据的处理逻辑,不存储数据。
- RDD中只封装了数据的处理逻辑,不存储数据。
- 存储的弹性: 如果内存充足,中间结果会全部保存在内存中,如果内存不足,数据一部分保存在内存中,一部分保存在磁盘中。
1.2 RDD五大特性
第2章 RDD编程
2.1 IDEA环境准备
创建一个maven工程,并添加scala框架支持,在pom文件中添加spark-core的依赖和scala的编译插件:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<finalName>SparkCoreTest</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.2 RDD编程
首先,创建sparkContext:
// 1.创建sc的配置对象
val conf: SparkConf = new SparkConf()
.setAppName("sparkCore").setMaster("local[*]")
// 2. 创建sc对象
val sc = new SparkContext(conf)
2.2.1 创建RDD
1、通过本地集合创建: sc.makeRDD(集合)/sc.parallelize(集合)
2、通过读取文件创建: sc.textFile("/../..")
3、通过其他RDD衍生: val rdd = rdd1.flatMap/filter/groupBy..
2.2.2 分区规则
1、通过本地集合创建RDD的分区数: sc.parallelize(集合[,numSlices=defaultParallelism])
(1) 如果有设置 numSlices参数, 此时分区数 = 设置的numSlices
(2)没有设置的话, 此时分区数 = defaultParallelism
a. 如果在 sparkconf中设置spark.default.parallelism参数, 此时 defaultParallelism=设置的defaultParallelism。
b. 如果在 sparkconf中没有设置spark.default.parallelism参数,
1、master=local,此时defaultParallelism=1
2、master=local[N],此时defaultParallelism=N
3、master=local[*],此时defaultParallelism=本地cpu个数
4、master=spark://...,此时defaultParallelism = max( 所有executor总核数,2 )
2、通过读取文件创建RDD的分区数:
sc.text(path[,minPartitions=defaultMinPartitions])
1、如果有指定minPartitions参数值,此时RDD的分区数>=指定minPartitions参数值
2、如果没有指定minPartitions参数值,此时RDD的分区数>= min(defaultParallelism,2)
读取文件生成的新RDD的分区数最终由文件切片数决定。、
第3章 Spark算子
Spark算子分为两类:
1. Transformation转换算子
生成的是新RDD,不会触发任务计算。
2. Action行动算子
没有返回值或者返回scala数据类型,会触发任务的计算。
3.1 Transformation转换算子
RDD整体上分为Value类型、双Value类型和Key-Value类型。
3.1.1 Value类型
- map(func:RDD元素类型 =>B(任意类型))
- map里面的函数是针对RDD每个元素操作,元素有多少个,函数就执行多少次;
- map生成新的RDD元素个数和原RDD的元素个数相同。
- flatMap(func: RDD元素类型=>集合) = map + flatten : 转换+压平
- flatMap的函数是针对每个元素操作,元素有多少个,函数就调用多少次;
- flatMap生成新RDD元素个数一般 >= 原RDD元素个数;
- flatMap的使用场景: 一对多。
- mapPartitions(func: Iterator[RDD元素类型]=>Iterator[B]):
- 一对一映射[原RDD一个分区计算得到新RDD一个分区]
- mapPartitions的函数是针对每个分区操作,分区有多少个,函数就调用多少次;
- mapPartitions的使用场景: 一般用于从Mysql/hbase/redis等查询数据,可以减少连接创建与销毁的次数。
map与mapPartitions的区别:
1、函数针对的对象不一样
map的函数是针对每个元素操作
mapPartitions的函数是针对每个分区操作
2、函数的返回值不一样
map的函数是针对每个元素操作,要求返回一个新的元素,map生成的新RDD元素个数 = 原RDD元素个数;
mapPartitions的函数是针对分区操作,要求返回新分区的迭代器,mapPartitions生成新RDD元素个数不一定=原RDD元素个数;
3、元素内存回收的时机不一样
map对元素操作完成之后就可以垃圾回收了;
mapPartitions必须要等到分区数据迭代器里面数据全部处理完成之后才会统一垃圾回收,如果分区数据比较大可能出现内存溢出,此时可以用map代替。
- mapPartitionsWithIndex(func: (Int,Iterator[RDD元素类型])=>Iterator[B]): 一对一映射[原RDD一个分区计算得到新RDD一个分区]
- mapPartitionsWithIndex与mapPartitions的区别:
mapPartitionsWithIndex的函数相比mapPartitions函数多了一个分区号。
- groupBy(func: RDD元素类型=>K): 按照指定字段对元素分组
- groupBy的函数是针对每个元素操作,元素有多少个,函数就调用多少次
- groupBy是根据函数的返回值对元素进行分组
- groupBy生成新RDD元素类型是KV键值对,K就是函数的返回值,V就是K对应原RDD中所有元素的集合
- groupBy会产生shuffle
- distinct: 去重
- distinct会产生shuffle操作
- coalesce(分区数[,shuffle=false]): 合并分区
- coalesce默认只能减少分区数, 此时没有shuffle操作
- 如果想要增大分区数,需要将shuffle设置为true,此时会产生shuffle操作
- repartition(分区数): 重分区
- repartition既可以增大分区数也可以减少分区数,但是都会产生shuffle操作
coalesce与repartition的使用场景
coalesce一般用于减少分区数,一般搭配filter使用。
repartition一般用于增大分区数,当数据量膨胀的时候需要将分区数增大,加速数据处理速度
- sortBy(func: RDD元素类型=>K[,ascding=true]): 按照指定字段排序
- sortBy的函数是针对每个元素操作,元素有多少个,函数就调用多少次
- sortBy是根据函数的返回值对元素排序,默认升序,如果想要降序需要将ascding设置为false
- sortBy会产生shuffle操作
3.1.2 双Value类型交互
intersection: 交集
intersection会产生shuffle操作,会产生两个shuffle操作
union: 并集
union不会产生shuffle操作
union生成的新RDD分区数 = 两个父RDD分区数之和
subtract: 差集
subtract会产生shuffle操作,会产生两个shuffle操作
zip: 拉链
两个RDD要想拉链必须元素个数与分区数一致
3.1.3 Key-Value类型
- partitionBy(partitioner): 按照指定分区器重分区
自定义分区器
1、定义class继承Partitioner
2、重写抽象方法override def numPartitions: Int = num //获取新RDD的分区数
override def getPartition(key: Any): Int //获取key获取分区号
3、使用: 在shuffle算子中一般都可以传入partitioner对象
- groupByKey: 根据key分组
- groupByKey生成的RDD里面的元素是KV键值对,K是分组的key,V是K对应原RDD中所有元素的value值的集合。
- reduceByKey(func: (Value值类型,Value值类型)=>Value值类型): 按照key分组,对每个组所有value值聚合
- reduceByKey函数第一个参数代表该组上一次聚合结果,如果是第一次聚合初始值 = 该组第一个value值
- reduceByKey函数第二个参数代表该组当前待聚合的value值
groupByKey与reduceByKey的区别
- reduceByKey有combiner预聚合操作,工作中推荐使用这种高性能shuffle算子
- groupByKey没有预聚合
- aggregateByKey(默认值)(combiner:(默认值类型,Value值类型)=>默认值类型, reducer:(默认值类型,默认值类型)=>默认值类型 ): 按照key分组,对每个组所有value值聚合。
- aggregateByKey第一个函数是combiner聚合逻辑
- aggregateByKey第二个函数是reducer聚合逻辑
- aggregateByKey第一个函数在针对每个组第一次计算的时候,第一个参数的初始值 = 默认值
- sortByKey: 根据key排序
- mapValues(func: Value值类型=>B): 一对一转换 [原RDD一个元素的value值计算得到新RDD一个元素新Value值]
- mapValues里面的函数是针对每个元素的value值操作
- join: 相当于sql的inner join, 结果集 = 能够join的数据
- 两个RDD要想join必须元素都是KV键值对,两个RDD元素K的类型必须一样
- 两个join的条件就是key相同就能join
- join生成的新RDD元素类似(join的元素key,( key对应左RDD的value值,key对应右RDD的value值 ))
- leftOuterJoin:相当于sql的left join, 结果集 = 能够join的数据 + 左RDD不能Join的数据
- leftOuterJOin生成的新RDD元素类似(join的元素key,( key对应左RDD的value值,Option[key对应右RDD的value值] ))
- RightOuterJOin:相当于sql的right join, 结果集 = 能够join的数据 + 右RDD不能Join的数据
- 生成的新RDD元素类似(join的元素key,( Option[key对应左RDD的value值],key对应右RDD的value值 ))
- fullOuterJoin:相当于sql的full join, 结果集 = 能够join的数据 + 右RDD不能Join的数据 + 左RDD不能Join的数据
- 生成的新RDD元素类似(join的元素key,( Option[key对应左RDD的value值],Option[key对应右RDD的value值] ))
- cogroup: 相当于先对两个RDD执行groupByKey之后进行fullOuterJoin
- cogroup生成的RDD元素类型(元素的key,(左RDDkey对应的所有value值集合,右RDD所有value值集合))
3.2 Action行动算子
- collect: 收集RDD每个分区的数据以数组封装之后发给Driver
- 如果rdd数据量比较大,Driver内存默认是1G,所以可能出现内存溢出。
- 工作中一般需要设置Driver的内存为5-10G:
- 可以通过bin/spark-submit --driver-memory 5G 设置
- count: 统计RDD元素个数
- first=take(1): 获取RDD第一个元素
- take: 获取RDD前N个元素
- first与take会首先启动一个job从RDD 0号分区获取前N个元素,如果0号分区数据不够会再次启动一个job从其他分区获取数据。
- takeOrdered: 获取排序之后的前N个元素
- countByKey: 统计每个key的个数
- saveAsTextFile: 保存数据到文本
- foreach(func: RDD元素类型=>Unit):Unit : 对每个元素遍历
- foreach里面的函数是针对每个元素操作,元素有多少个,函数就执行多少次
- foreachPartition(func: Iterator[RDD元素类型]=>Unit):Unit: 对每个分区遍历
- foreachPartition里面的函数是针对每个分区操作,分区有多少个,函数就执行多少次
- foreachPartition一般用于将数据保存到mysql/hbase/redis等存储介质中,可以减少链接的创建与销毁的次数,能够提高效率。
第3章 RDD序列化
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
注意: (有闭包就需要进行序列化)
3.1 Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
使用方法:
object serializable02_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用kryo序列化的自定义类
.registerKryoClasses(Array(classOf[Search]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
val search = new Search("hello")
val result: RDD[String] = rdd.filter(search.isMatch)
result.collect.foreach(println)
}
// 关键字封装在一个类里面
// 需要自己先让类实现序列化 之后才能替换使用kryo序列化
class Search(val query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
}
}
第四章 RDD依赖关系
4.1 查看血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
打印结果:
(2) input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) ShuffledRDD[4] at reduceByKey at Lineage01.scala:27 []
+-(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
注意:圆括号中的数字表示RDD的并行度,也就是有几个分区
4.2 查看依赖关系
打印结果:
List(org.apache.spark.OneToOneDependency@f2ce6b)
----------------------
List(org.apache.spark.OneToOneDependency@692fd26)
----------------------
List(org.apache.spark.OneToOneDependency@627d8516)
----------------------
List(org.apache.spark.ShuffleDependency@a518813)
注意:要想理解RDDS是如何工作的,最重要的就是理解Transformations。
RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是 RDD的parent RDD(s)是什么(血缘); 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖(依赖)。
RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。
4.2.1 窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。
4.2.2 宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。
在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
4.3 Stage任务划分
1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
2)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
第5章 RDD持久化
5.1 RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
源码解析:
mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
(2) 自带缓存算子
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
5.2 RDD CheckPoint检查点
1)检查点:是通过将RDD中间结果写入磁盘。
2)为什么要做检查点?
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
4)检查点数据存储格式为:二进制的文件
5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。
7)设置检查点步骤
(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")
(2)调用检查点方法:wordToOneRdd.checkpoint()
5.3 缓存和检查点区别
(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
(4)如果使用完了缓存,可以通过unpersist()方法释放缓存。
5.4 检查点存储到HDFS集群
如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。
object checkpoint02 {
def main(args: Array[String]): Unit = {
// 设置访问HDFS集群的用户名
System.setProperty("HADOOP_USER_NAME","atguigu")
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint")
//3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input1")
//3.1.业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
//3.4 增加缓存,避免再重新跑一个job做checkpoint
wordToOneRdd.cache()
//3.3 数据检查点:针对wordToOneRdd做检查点计算
wordToOneRdd.checkpoint()
//3.2 触发执行逻辑
wordToOneRdd.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
5.4 键值对RDD数据分区
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
1)注意:
(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
2)获取RDD分区
(1)创建包名:com.atguigu.partitioner
(2)代码实现
object partitioner01_get {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3 创建RDD
val pairRDD: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))
//3.1 打印分区器
println(pairRDD.partitioner)
//3.2 使用HashPartitioner对RDD进行重新分区
val partitionRDD: RDD[(Int, Int)] = pairRDD.partitionBy(new HashPartitioner(2))
//3.3 打印分区器
println(partitionRDD.partitioner)
//4.关闭连接
sc.stop()
}
}
5.4.1 Hash分区
5.4.2 Ranger分区
第6章 累加器
累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)
累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。
1)累加器使用
(1)累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
(2)累加器添加数据(累加器.add方法)
sum.add(count)
(3)累加器获取数据(累加器.value)
sum.value
2)创建包名:com.atguigu.accumulator
3)代码实现
object accumulator01_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
//需求:统计a出现的所有次数 ("a",10)
//普通算子实现 reduceByKey 代码会走shuffle 效率低
//val rdd1: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
//普通变量无法实现
//结论:普通变量只能从driver端发给executor端,在executor计算完以后,结果不会返回给driver端
/*
var sum = 0
dataRDD.foreach{
case (a,count) => {
sum += count
println("sum = " + sum)
}
}
println(("a",sum))
*/
//累加器实现
//1 声明累加器
val accSum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (a,count) => {
//2 使用累加器累加 累加器.add()
accSum.add(count)
// 4 不要在executor端获取累加器的值,因为不准确
//因此我们说累加器叫分布式共享只写变量
//println("sum = " + accSum.value)
}
}
//3 获取累加器的值 累加器.value
println(("a",accSum.value))
sc.stop()
}
}
注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。
3)累加器要放在行动算子中
因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。
对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。
object accumulator02_updateCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
//需求:统计a出现的所有次数 ("a",10)
//累加器实现
//1 声明累加器
val accSum: LongAccumulator = sc.longAccumulator("sum")
val mapRDD: RDD[Unit] = dataRDD.map {
case (a, count) => {
//2 使用累加器累加 累加器.add()
accSum.add(count)
// 4 不要在executor端获取累加器的值,因为不准确 因此我们说累加器叫分布式共享只写变量
//println("sum = " + accSum.value)
}
}
//调用两次行动算子,map执行两次,导致最终累加器的值翻倍
mapRDD.collect()
mapRDD.collect()
/**
* 结论:使用累加器最好要在行动算子中使用,因为行动算子只会执行一次,而转换算子的执行次数不确定!
*/
//2 获取累加器的值 累加器.value
println(("a",accSum.value))
sc.stop()
}
}
一般在开发中使用的累加器为集合累加器,在某些场景可以减少shuffle。