大数据技术之SparkCore

第1章 RDD概述 

1.1 什么是RDD

        RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。

        代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

RDD代表的是弹性、可分区、不可变、元素可并行计算的计算。

  • 1. 弹性
    •         存储的弹性: 如果内存充足,中间结果会全部保存在内存中,如果内存不足,数据一部分保存在内存中,一部分保存在磁盘中。
              计算的弹性: 如果计算出错会自动重试
              容错的弹性: 如果RDD数据丢失可以根据依赖关系+封装的计算逻辑重新读取数据重新计算得到数据。
              分区的弹性: RDD的分区可以自动根据文件的切片动态生成。
    • 2. 可分区
      •         spark是分布式计算框架,Spark根据文件的切片生成分区,一个切片对应一个分区,后续每个分区计算逻辑是一样,处理的数据不一样,每个分区间是并行的。
      • 3. 不可变
        •         RDD中只封装了数据的处理逻辑,如果想要重新改变数据只能生成新的RDD。
        • 4. 可并行计算
          •         每个分区计算逻辑是一样,处理的数据不一样,每个分区间是并行的。
          • 5. 不存储数据
            •         RDD中只封装了数据的处理逻辑,不存储数据。
               

1.2 RDD五大特性


第2章 RDD编程

2.1 IDEA环境准备

创建一个maven工程,并添加scala框架支持,在pom文件中添加spark-core的依赖和scala的编译插件:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

<build>
    <finalName>SparkCoreTest</finalName>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.2 RDD编程

        首先,创建sparkContext:

// 1.创建sc的配置对象

    val conf: SparkConf = new SparkConf()

      .setAppName("sparkCore").setMaster("local[*]")

    // 2. 创建sc对象

    val sc = new SparkContext(conf)

 2.2.1 创建RDD

1、通过本地集合创建: sc.makeRDD(集合)/sc.parallelize(集合)
2、通过读取文件创建: sc.textFile("/../..")
3、通过其他RDD衍生: val rdd = rdd1.flatMap/filter/groupBy..
 

2.2.2 分区规则

1、通过本地集合创建RDD的分区数: sc.parallelize(集合[,numSlices=defaultParallelism])

        (1) 如果有设置 numSlices参数, 此时分区数 = 设置的numSlices

        (2)没有设置的话, 此时分区数 = defaultParallelism

                a.  如果在 sparkconf中设置spark.default.parallelism参数, 此时 defaultParallelism=设置的defaultParallelism。

                b.  如果在 sparkconf中没有设置spark.default.parallelism参数,

                        1、master=local,此时defaultParallelism=1
                        2、master=local[N],此时defaultParallelism=N
                        3、master=local[*],此时defaultParallelism=本地cpu个数
                        4、master=spark://...,此时defaultParallelism = max( 所有executor总核数,2 )

2、通过读取文件创建RDD的分区数:

sc.text(path[,minPartitions=defaultMinPartitions])

1、如果有指定minPartitions参数值,此时RDD的分区数>=指定minPartitions参数值
2、如果没有指定minPartitions参数值,此时RDD的分区数>= min(defaultParallelism,2)
            读取文件生成的新RDD的分区数最终由文件切片数决定。


第3章  Spark算子

Spark算子分为两类:

1. Transformation转换算子
     生成的是新RDD,不会触发任务计算。
2. Action行动算子
    没有返回值或者返回scala数据类型,会触发任务的计算。

3.1 Transformation转换算子

RDD整体上分为Value类型、双Value类型和Key-Value类型。

3.1.1 Value类型

  • map(func:RDD元素类型 =>B(任意类型))
    • map里面的函数是针对RDD每个元素操作,元素有多少个,函数就执行多少次;
    • map生成新的RDD元素个数和原RDD的元素个数相同。
  • flatMap(func: RDD元素类型=>集合) = map + flatten : 转换+压平
    • flatMap的函数是针对每个元素操作,元素有多少个,函数就调用多少次;
    • flatMap生成新RDD元素个数一般 >= 原RDD元素个数;
    • flatMap的使用场景: 一对多。
  • mapPartitions(func: Iterator[RDD元素类型]=>Iterator[B]):
    • 一对一映射[原RDD一个分区计算得到新RDD一个分区]
    • mapPartitions的函数是针对每个分区操作,分区有多少个,函数就调用多少次;
    • mapPartitions的使用场景: 一般用于从Mysql/hbase/redis等查询数据,可以减少连接创建与销毁的次数。

map与mapPartitions的区别:
   1、函数针对的对象不一样
          map的函数是针对每个元素操作
          mapPartitions的函数是针对每个分区操作
   2、函数的返回值不一样
          map的函数是针对每个元素操作,要求返回一个新的元素,map生成的新RDD元素个数 = 原RDD元素个数;
          mapPartitions的函数是针对分区操作,要求返回新分区的迭代器,mapPartitions生成新RDD元素个数不一定=原RDD元素个数;
   3、元素内存回收的时机不一样    
          map对元素操作完成之后就可以垃圾回收了;
          mapPartitions必须要等到分区数据迭代器里面数据全部处理完成之后才会统一垃圾回收,如果分区数据比较大可能出现内存溢出,此时可以用map代替。

  • mapPartitionsWithIndex(func: (Int,Iterator[RDD元素类型])=>Iterator[B]): 一对一映射[原RDD一个分区计算得到新RDD一个分区]
    • mapPartitionsWithIndex与mapPartitions的区别:

                        mapPartitionsWithIndex的函数相比mapPartitions函数多了一个分区号。

  • groupBy(func: RDD元素类型=>K): 按照指定字段对元素分组
    • groupBy的函数是针对每个元素操作,元素有多少个,函数就调用多少次
    • groupBy是根据函数的返回值对元素进行分组
    • groupBy生成新RDD元素类型是KV键值对,K就是函数的返回值,V就是K对应原RDD中所有元素的集合
    • groupBy会产生shuffle
  • distinct: 去重
    • distinct会产生shuffle操作
  • coalesce(分区数[,shuffle=false]): 合并分区
    •  coalesce默认只能减少分区数, 此时没有shuffle操作
    • 如果想要增大分区数,需要将shuffle设置为true,此时会产生shuffle操作
  • repartition(分区数): 重分区
    • repartition既可以增大分区数也可以减少分区数,但是都会产生shuffle操作

 coalesce与repartition的使用场景
    coalesce一般用于减少分区数,一般搭配filter使用。
    repartition一般用于增大分区数,当数据量膨胀的时候需要将分区数增大,加速数据处理速度  

  •  sortBy(func: RDD元素类型=>K[,ascding=true]): 按照指定字段排序
    • sortBy的函数是针对每个元素操作,元素有多少个,函数就调用多少次
    • sortBy是根据函数的返回值对元素排序,默认升序,如果想要降序需要将ascding设置为false
    • sortBy会产生shuffle操作

3.1.2 双Value类型交互

  • intersection: 交集

    • intersection会产生shuffle操作,会产生两个shuffle操作

  • union: 并集

    • union不会产生shuffle操作

    • union生成的新RDD分区数 = 两个父RDD分区数之和

  • subtract: 差集

    • subtract会产生shuffle操作,会产生两个shuffle操作

  • zip: 拉链 

    • 两个RDD要想拉链必须元素个数与分区数一致

3.1.3 Key-Value类型

  • partitionBy(partitioner): 按照指定分区器重分区

自定义分区器
        1、定义class继承Partitioner
        2、重写抽象方法

        override def numPartitions: Int = num          //获取新RDD的分区数
        override def getPartition(key: Any): Int         //获取key获取分区号

        3、使用: 在shuffle算子中一般都可以传入partitioner对象

  • groupByKey: 根据key分组
    • groupByKey生成的RDD里面的元素是KV键值对,K是分组的key,V是K对应原RDD中所有元素的value值的集合。
  • reduceByKey(func: (Value值类型,Value值类型)=>Value值类型): 按照key分组,对每个组所有value值聚合
    • reduceByKey函数第一个参数代表该组上一次聚合结果,如果是第一次聚合初始值 = 该组第一个value值
    • reduceByKey函数第二个参数代表该组当前待聚合的value值

groupByKey与reduceByKey的区别

  • reduceByKey有combiner预聚合操作,工作中推荐使用这种高性能shuffle算子
  • groupByKey没有预聚合
  • aggregateByKey(默认值)(combiner:(默认值类型,Value值类型)=>默认值类型, reducer:(默认值类型,默认值类型)=>默认值类型 ):  按照key分组,对每个组所有value值聚合。
    • aggregateByKey第一个函数是combiner聚合逻辑
    • aggregateByKey第二个函数是reducer聚合逻辑
    • aggregateByKey第一个函数在针对每个组第一次计算的时候,第一个参数的初始值 = 默认值
  • sortByKey: 根据key排序

  • mapValues(func: Value值类型=>B): 一对一转换 [原RDD一个元素的value值计算得到新RDD一个元素新Value值] 
    • mapValues里面的函数是针对每个元素的value值操作
  • join: 相当于sql的inner join, 结果集 = 能够join的数据
    • 两个RDD要想join必须元素都是KV键值对,两个RDD元素K的类型必须一样
    •  两个join的条件就是key相同就能join
    •  join生成的新RDD元素类似(join的元素key,( key对应左RDD的value值,key对应右RDD的value值 ))
  • leftOuterJoin:相当于sql的left join, 结果集 = 能够join的数据 + 左RDD不能Join的数据
    • leftOuterJOin生成的新RDD元素类似(join的元素key,( key对应左RDD的value值,Option[key对应右RDD的value值] ))
  • RightOuterJOin:相当于sql的right join, 结果集 = 能够join的数据 + 右RDD不能Join的数据
    • 生成的新RDD元素类似(join的元素key,( Option[key对应左RDD的value值],key对应右RDD的value值 ))
  • fullOuterJoin:相当于sql的full join, 结果集 = 能够join的数据 + 右RDD不能Join的数据 + 左RDD不能Join的数据
    • 生成的新RDD元素类似(join的元素key,( Option[key对应左RDD的value值],Option[key对应右RDD的value值] ))
  • cogroup: 相当于先对两个RDD执行groupByKey之后进行fullOuterJoin
    • cogroup生成的RDD元素类型(元素的key,(左RDDkey对应的所有value值集合,右RDD所有value值集合))

3.2  Action行动算子

  • collect: 收集RDD每个分区的数据以数组封装之后发给Driver
    • 如果rdd数据量比较大,Driver内存默认是1G,所以可能出现内存溢出。
    • 工作中一般需要设置Driver的内存为5-10G:
      • 可以通过bin/spark-submit --driver-memory 5G 设置
  • count: 统计RDD元素个数
  • first=take(1): 获取RDD第一个元素
  • take: 获取RDD前N个元素
    • first与take会首先启动一个job从RDD 0号分区获取前N个元素,如果0号分区数据不够会再次启动一个job从其他分区获取数据。
  • takeOrdered: 获取排序之后的前N个元素
  • countByKey: 统计每个key的个数
  • saveAsTextFile: 保存数据到文本
  • foreach(func: RDD元素类型=>Unit):Unit : 对每个元素遍历
    • foreach里面的函数是针对每个元素操作,元素有多少个,函数就执行多少次
  • foreachPartition(func: Iterator[RDD元素类型]=>Unit):Unit: 对每个分区遍历
    • foreachPartition里面的函数是针对每个分区操作,分区有多少个,函数就执行多少次
    • foreachPartition一般用于将数据保存到mysql/hbase/redis等存储介质中,可以减少链接的创建与销毁的次数,能够提高效率。

第3章  RDD序列化

        在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

注意: (有闭包就需要进行序列化

3.1  Kryo序列化框架

        

        参考地址: https://github.com/EsotericSoftware/kryo

        Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。

        Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable10。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

使用方法:

object serializable02_Kryo {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf()
                .setAppName("SerDemo")
                .setMaster("local[*]")
                // 替换默认的序列化机制
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                // 注册需要使用kryo序列化的自定义类
                .registerKryoClasses(Array(classOf[Search]))

        val sc = new SparkContext(conf)

        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)

        val search = new Search("hello")
        val result: RDD[String] = rdd.filter(search.isMatch)

        result.collect.foreach(println)
}
  // 关键字封装在一个类里面
  // 需要自己先让类实现序列化  之后才能替换使用kryo序列化
  class Search(val query: String) extends Serializable {
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }
  }
}

第四章 RDD依赖关系

4.1 查看血缘关系

        RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

打印结果:

(2) input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []

 |  input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []

----------------------

(2) MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []

 |  input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []

 |  input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []

----------------------

(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []

 |  MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []

 |  input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []

 |  input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []

----------------------

(2) ShuffledRDD[4] at reduceByKey at Lineage01.scala:27 []

 +-(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []

    |  MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []

    |  input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []

    |  input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []

注意:圆括号中的数字表示RDD的并行度,也就是有几个分区

 4.2 查看依赖关系

打印结果:

List(org.apache.spark.OneToOneDependency@f2ce6b)

----------------------

List(org.apache.spark.OneToOneDependency@692fd26)

----------------------

List(org.apache.spark.OneToOneDependency@627d8516)

----------------------

List(org.apache.spark.ShuffleDependency@a518813)

注意:要想理解RDDS是如何工作的,最重要的就是理解Transformations

        RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是 RDDparent RDD(s)是什么(血缘; 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖(依赖)。

RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。

4.2.1 窄依赖

        窄依赖表示每一个父RDDPartition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。

 4.2.2 宽依赖

        宽依赖表示同一个父RDDPartition被多个子RDDPartition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

        具有宽依赖的transformations包括:sortreduceByKeygroupByKeyjoin和调用rePartition函数的任何操作。

        宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。

在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。

4.3 Stage任务划分

1)DAG有向无环图

        DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

 

2)RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

 


第5章  RDD持久化

5.1 RDD Cache缓存

        RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

 源码解析:

mapRdd.cache()

def cache(): this.type = persist()

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

object StorageLevel {

  val NONE = new StorageLevel(false, false, false, false)

  val DISK_ONLY = new StorageLevel(true, false, false, false)

  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。

        缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

(2) 自带缓存算子

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

5.2 RDD CheckPoint检查点

1)检查点:是通过将RDD中间结果写入磁盘。

2)为什么要做检查点?

        由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

4)检查点数据存储格式为:二进制的文件

5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

7)设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")

(2)调用检查点方法:wordToOneRdd.checkpoint()

5.3 缓存和检查点区别

(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

(4)如果使用完了缓存,可以通过unpersist()方法释放缓存。

5.4 检查点存储到HDFS集群

        如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。

 

object checkpoint02 {

    def main(args: Array[String]): Unit = {

        // 设置访问HDFS集群的用户名
        System.setProperty("HADOOP_USER_NAME","atguigu")

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        // 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
        sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint")

        //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
            word => {
                (word, System.currentTimeMillis())
            }
        }

        //3.4 增加缓存,避免再重新跑一个job做checkpoint
        wordToOneRdd.cache()

        //3.3 数据检查点:针对wordToOneRdd做检查点计算
        wordToOneRdd.checkpoint()

        //3.2 触发执行逻辑
        wordToOneRdd.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

5.4 键值对RDD数据分区

        Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

1)注意:

1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

2)获取RDD分区

(1)创建包名:com.atguigu.partitioner

(2)代码实现

object partitioner01_get {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3 创建RDD
        val pairRDD: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))

        //3.1 打印分区器
        println(pairRDD.partitioner)

        //3.2 使用HashPartitioner对RDD进行重新分区
        val partitionRDD: RDD[(Int, Int)] = pairRDD.partitionBy(new HashPartitioner(2))

        //3.3 打印分区器
        println(partitionRDD.partitioner)

        //4.关闭连接
        sc.stop()
    }
}

5.4.1 Hash分区

 5.4.2 Ranger分区


第6章 累加器

        累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)

        累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

 

1)累加器使用

         (1)累加器定义(SparkContext.accumulator(initialValue)方法)

                val sum: LongAccumulator = sc.longAccumulator("sum")

         (2)累加器添加数据(累加器.add方法)

                 sum.add(count)

(3)累加器获取数据(累加器.value)

                sum.value

2)创建包名:com.atguigu.accumulator

3)代码实现

object accumulator01_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object accumulator01_system {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") 
val sc = new SparkContext(conf)

    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
    //需求:统计a出现的所有次数 ("a",10)

    //普通算子实现 reduceByKey 代码会走shuffle 效率低
    //val rdd1: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)

//普通变量无法实现 
//结论:普通变量只能从driver端发给executor端,在executor计算完以后,结果不会返回给driver端
/*
    var sum = 0

    dataRDD.foreach{
      case (a,count) => {
        sum += count
        println("sum = " + sum)
      }
    }

    println(("a",sum))
*/
    //累加器实现
    //1 声明累加器
    val accSum: LongAccumulator = sc.longAccumulator("sum")

    dataRDD.foreach{
      case (a,count) => {
        //2 使用累加器累加  累加器.add()
        accSum.add(count)
        // 4 不要在executor端获取累加器的值,因为不准确 
//因此我们说累加器叫分布式共享只写变量
        //println("sum = " + accSum.value)
      }
    }
    //3 获取累加器的值 累加器.value
    println(("a",accSum.value))

    sc.stop()
  }
}

注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量

3)累加器要放在行动算子中

因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。

对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。

object accumulator02_updateCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
    //需求:统计a出现的所有次数 ("a",10)
    //累加器实现
    //1 声明累加器
    val accSum: LongAccumulator = sc.longAccumulator("sum")

    val mapRDD: RDD[Unit] = dataRDD.map {
      case (a, count) => {
        //2 使用累加器累加  累加器.add()
        accSum.add(count)
        // 4 不要在executor端获取累加器的值,因为不准确 因此我们说累加器叫分布式共享只写变量
        //println("sum = " + accSum.value)
      }
    }

    //调用两次行动算子,map执行两次,导致最终累加器的值翻倍
    mapRDD.collect()
    mapRDD.collect()

    /**
     * 结论:使用累加器最好要在行动算子中使用,因为行动算子只会执行一次,而转换算子的执行次数不确定!
     */ 
    //2 获取累加器的值 累加器.value
    println(("a",accSum.value))
    
    sc.stop()
  }
}

一般在开发中使用的累加器为集合累加器,在某些场景可以减少shuffle。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/14808.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Pytorch】六行代码实现:特征图提取与特征图可视化

前言 之前记录过特征图的可视化&#xff1a;Pytorch实现特征图可视化&#xff0c;当时是利用IntermediateLayerGetter 实现的&#xff0c;但是有很大缺陷&#xff0c;只能获取到一级的子模块的特征图输出&#xff0c;无法获取内部二级子模块的输出。今天补充另一种Pytorch官方…

数字孪生新能源智慧充电桩Web3D可视化运维系统

放眼全球&#xff0c;近十年来&#xff0c;新能源汽车赛道堪称“热得发烫”。伴随着进入成年期的新能源汽车行业对相关配套设备支撑水平的提升&#xff0c;作为其“新基建”的充电桩领域表现更为突出的价值势能。过去&#xff0c;在一系列补贴政策和资本刺激下&#xff0c;充电…

插装式两位两通电磁阀DSV-080-2NCP、DDSV-080-2NCP

特性 压力4000 PSI(276 Bar) 持续的电磁。 硬化处理的提升阀和柱塞可获得更长的寿命和低泄漏量。 有效的混式电磁铁结构。 插装阀允许交流电压。可选的线圈电压和端子。 标准的滤网低泄漏量选择 手动关闭选择。 工业化通用阀腔。 紧凑的尺寸。 两位两通常闭式双向电磁…

vue element-ui web端 引入高德地图,并获取经纬度

发版前接到一个临时新需求 &#xff0c;需要在web端地址选择时用地图&#xff0c;并获取经纬度。 临阵发版之际加需求&#xff0c;真的是很头疼&#xff0c;于是赶紧找度娘&#xff0c;找api。 我引入的是高德地图&#xff0c;首先要去申请key &#xff0c; 和密钥&#xff0c;…

在安装docker配置端口时 centos7 防火墙规则失效

一、问题 1、做端口映射管理的时候&#xff0c;自己关闭了防火墙&#xff0c;或者开启防火墙&#xff0c;或者指定开关端口&#xff0c;但是都不影响端口的使用&#xff0c;这就很奇怪&#xff0c;也就是本文的内容&#xff01; 2、思路&#xff0c;确认是请求到了防火墙的那…

老板们搞怪营业,品牌好感度upup真有梗

老板下场营业最经典的莫过于“老乡鸡”了。在手撕联名信事件出圈后&#xff0c;老乡鸡围绕束从轩创始人IP&#xff0c;开展了一系列社交传播宣传&#xff0c;比如“咯咯哒糊弄学”等。 50多岁的老乡鸡董事长束从轩&#xff0c;一改传统企业家严肃正经的形象&#xff0c;跟着老乡…

Windows下virtualbox相关软件安装设置全过程

一、下载 virtual box 程序 virtual box扩展程序-Oracle_VM_VirtualBox_Extension_Pack-7.0.8.vbox-extpack Virtualbox GuestAdditions 程序-解决分辨率&#xff0c;主机虚拟机之间共享文件、剪贴板等问题 http://download.virtualbox.org/virtualbox/7.0.8/ 或者 virtual b…

【shell脚本】条件语句

一、条件测试操作 1.1test命令与 [ ] 符号 测试表达试是否成立&#xff0c;若成立返回0&#xff0c;否则返回其它数值 1.1.1文件测试常用的测试操作符 符号作用-d测试是否为目录-e测试是否为目录或文件-f测试是否为文件-r测试当前用户是否有读取权限-w测试当前用户是否有写…

你掌握了stream流的全部新特性吗?

我们知道很早之前java8对于之前的版本更新了许多 新的支持&#xff0c;比如lamda函数式接口的支持&#xff0c;支持更多函数式接口的使用&#xff0c;对链表&#xff0c;数组&#xff0c;队列&#xff0c;集合等实现了Collectio接口的数据结构提供了StreamSupport.stream()支持…

运维监控工具PIGOSS BSM扩展指标介绍

PIGOSS BSM运维监控工具&#xff0c;除系统自带指标外&#xff0c;还支持添加SNMP扩展指标、脚本扩展指标、JMX扩展指标、自定义JDBC指标等&#xff0c;今天本文将介绍如何添加SNMP扩展指标和脚本扩展指标。 添加SNMP扩展指标 前提&#xff1a;需要知道指标的oid 例子&#xff…

如何实现Spring AOP以及Spring AOP的实现原理

AOP:面向切面编程,它和OOP&#xff08;面向对象编程)类似。 AOP组成: 1、切面:定义AOP是针对那个统一的功能的&#xff0c;这个功能就叫做一个切面&#xff0c;比如用户登录功能或方法的统计日志&#xff0c;他们就各种是一个切面。切面是有切点加通知组成的。 2、连接点:所有可…

Redis入门学习笔记【一】

目录 一、Redis是什么 二、Redis数据结构 2.1 Redis 的五种基本数据类型 2.1.1String&#xff08;字符串&#xff09; 2.1.2字符串列表&#xff08;lists&#xff09; 2.1.3字符串集合&#xff08;sets&#xff09; 2.1.5哈希&#xff08;hashes&#xff09; 2.2 Red…

MQTT协议 详解

文章目录 一、啥是MQTT&#xff1f;1. MQTT协议特点2. 发布和订阅3. QoS&#xff08;Quality of Service levels&#xff09;QoS 0 —— 最多1次QoS 1 —— 最少1次QoS 2 —— 只有1次 二、MQTT 数据包结构1. MQTT固定头2. MQTT可变头 / Variable header3. Payload消息体 三、M…

Ceph入门到精通- storcli安装

storcli 是LSI公司官方提供的Raid卡管理工具&#xff0c;storcli已经基本代替了megacli&#xff0c;是一款比较简单易用的小工具。将命令写成一个个的小脚本&#xff0c;会将使用变得更方便。 安装简单&#xff0c;Windows系统下解压出来以后可以直接运行。 Linux系统默认位置…

Android程序员向音视频进阶,有前景吗

随着移动互联网的普及和发展&#xff0c;Android开发成为了很多人的就业选择&#xff0c;希望在这个行业能获得自己的一席之地。然而&#xff0c;随着时间的推移&#xff0c;越来越多的人进入到了Android开发行业&#xff0c;就导致目前Android开发的工作越来越难找&#xff0c…

7.Shuffle详解

1.分区规则 ps."&"指的是按位与运算&#xff0c;可以强制转换为正数 ps."%",假设reduceTask的个数为3&#xff0c;则余数为0&#xff0c;1&#xff0c;2正好指代了三个分区 以上代码的含义就是对key的hash值强制取正之后&#xff0c;对reduce的个数取…

大数据技术之Kafka集成

一、集成Flume 1.1 Flume生产者 &#xff08;1&#xff09;启动Kafka集群 zkServer.sh startnohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties & &#xff08;2&#xff09;启动Kafka消费者 kafka-console-consumer.sh --bootstrap-server 192…

动态内存管理

文章目录 1.动态内存函数1.1free1.2malloc1.3calloc1.4realloc 2.动态内存错误2.1解引用空指针--非法访问内存2.2越界访问动态空间2.3free释放非动态空间2.4free释放部分动态空间2.5free多次释放动态空间2.6未释放动态内存 3.动态内存题目3.1形参不影响实参3.2地址返回&#xf…

APP渗透—查脱壳、反编译、重打包签名

APP渗透—查脱壳、反编译、重打包签名 1. 前言1.1. 其它 2. 安装工具2.1. 下载jadx工具2.1.1. 下载链接2.1.2. 执行文件 2.2. 下载apktool工具2.2.1. 下载链接2.2.2. 测试 2.3. 下载dex2jar工具2.3.1. 下载链接 3. 查壳脱壳3.1. 查壳3.1.1. 探探查壳3.1.2. 棋牌查壳 3.2. 脱壳3…

FVM初启,Filecoin生态爆发着力点在哪?

Filecoin 小高潮 2023年初&#xff0c;Filecoin发文分享了今年的三项重大变更&#xff0c;分别是FVM、数据计算和检索市场的更新&#xff0c;这些更新消息在发布后迅速吸引了市场的广泛关注。 特别是在3月14日&#xff0c;Filecoin正式推出了FVM&#xff0c;这一变革使得Filec…