Spark-RDD和共享变量

概览

        每个Spark应用程序都由一个driver program  组成,该驱动程序运行我们编写的main函数,并在集群上执行各种 并行 操作。Spark提供的主要抽象是一个 弹性分布式数据集(RDD),它是一个跨集群节点分区的元素集合,可以并行操作。RDD是通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或 driver program 中现有的Scala集合开始并转换它来创建的。用户还可以要求Spark在内存中 持久化RDD,允许它在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。

        Spark中的第二个抽象是可以在并行操作中使用的 共享变量 。默认情况下,当Spark在不同节点上作为一组任务并行运行函数时,它会将函数中使用的每个变量的副本发送给每个任务。有时,一个变量需要在任务之间共享,或者在任务和 driver program 之间共享。Spark支持两种类型的共享变量:广播变量,可用于在所有节点上的内存中缓存一个值,累加器,是仅可“添加”的变量,例如计数器和求和。

一、RDD

        弹性分布式数据集(Resilient Distributed Dataset) 简称 RDD,是一个可以并行操作的元素的容错集合。RDD有五大特性:

        1、有一系列分区构成(分区即切片)

        2、每个函数都作用在每个分区上(并行计算)

        3、和其他RDD是有依赖关系的

        4、分区类算子只能作用在kv型RDD上

        5、Spark会为每个分区计算最佳的位置,并把函数移动过去(计算向数据移动)

        RDD支持两种类型的操作:Transformations 和 Actions ,其中Transformations算子有 lazy特性,只要需要将结果返回给驱动程序时,才会触发计算。默认每个转换后的RDD都可以重新计算,也可以使用persist或cache方法将RDD持久化。这样,当后面的算子用这个RDD时就不用重新计算了。

1、Transformations 算子

map(func)

解释:返回一个新的分布式数据集,该数据集是通过函数func传递源的每个元素而形成的

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)

val mapRdd = sourceRdd.map(_+10)

mapRdd.foreach(println)

结果:

11
12
13
14
15
16
17
18
19
20        


filter(func)

解释:返回一个新的数据集,该数据集是通过选择func在其上返回true的源的那些元素而形成的。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)

val filterRdd = sourceRdd.filter(_>5)

filterRdd.foreach(println)

结果:

6
7
8
9
10


flatMap(func)

解释:类似于map,但每个输入项都可以映射到0个或多个输出项(因此func应该返回一个Seq而不是单个项)

val sourceRdd2 = sc.parallelize(List("java scala js","c++ c python","java vba python"))
val flatMapRdd = sourceRdd2.flatMap(_.split(" "))
flatMapRdd.foreach(println)        

结果:

java
scala
js
c++
c
python
java
vba
python


mapPartitions(func)

解释:类似于map,但在RDD的每个分区(块)上单独运行,因此在T类型的RDD上运行时,func必须是Iterator<T>=>Iterator<U>类型。

mapPartitionsWithIndex(func)

解释:类似于map分区,但也为func提供了一个表示分区索引的整数值,因此func必须是类型(Int, Iterator<T>)=>Iterator<U>当在T类型的RDD上运行时。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
//打印每个分区的数据
sourceRdd.mapPartitionsWithIndex((index,partition)=>{
  val sourceData = partition.mkString(",")
  println(index +"-"+sourceData)
  partition
}).collect()
println("------------------------")
val mapPartitionsRDD = sourceRdd.mapPartitions(iter => {
  // 在这个例子中,我们计算每个分区的总和
  val sum = iter.foldLeft(0)(_ + _)
  Iterator(sum) // 返回一个包含分区总和的新迭代器
})
mapPartitionsRDD.foreach(println)

结果:

0-1,2,3
1-4,5,6
2-7,8,9,10
------------------------
6
15
34


sample(withReplacement, fraction, seed)

解释:使用给定的随机数生成器种子对数据的一小部分进行采样,无论是否进行替换。

        所谓抽样那么返回的数据占比就没有那么准确

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
/ 进行随机采样,采样比例为0.5,有放回的抽样
val sampleRdd = sourceRdd.sample(true, 0.5)
sampleRdd.foreach(println)
println("-------------------------------")
// 进行随机采样,采样比例为0.5,无放回的抽样
val sampleRdd2 = sourceRdd.sample(false, 0.5)
sampleRdd2.foreach(println)

 结果:

1
2
5
7
7
9
-------------------------------
3
4
5
6
7
9


union(otherDataset)

解释:返回一个新数据集,其中包含源数据集中元素和参数的联合

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
val sourceRdd2 = sc.parallelize(List(1000,2000,9000))
val unionRdd = sourceRdd.union(sourceRdd2)
unionRdd.foreach(println)        

结果:

1
2
3
4
5
6
7
8
9
10
1000
2000
9000


intersection(otherDataset)

解释:返回一个新RDD,其中包含源数据集中元素和参数的交集。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
val sourceRdd2 = sc.parallelize(List(3,5,6,1000,2000,9000))
val intersectionRdd = sourceRdd.intersection(sourceRdd2)
intersectionRdd.foreach(println)

结果:

6
3
5


distinct([numTasks]))

解释:返回一个包含源数据集不同元素的新数据集

val sourceRdd3 = sc.parallelize(List(1,2,3,3,4,4,5,6,6))
val distinctRdd = sourceRdd3.distinct()
distinctRdd.foreach(println)

结果:(无序)

4
1
6
3
5
2


groupByKey([numTasks])

解释:当在(K, V)对的数据集上调用时,返回(K,Iterable<V>)对的数据集

注意:如果您正在分组以执行聚合(例如总和或 平均)在每个键上,使用reduceByKeyaggregateByKey会产生更好的结果 性能。

注意:默认情况下,输出中的并行级别取决于父RDD的分区数。 您可以传递一个可选的numTasks参数来设置不同数量的任务。

val sourceRdd4 = sc.parallelize(Seq(
  ("java", 1),
  ("java",3),
  ("java",18),
  ("c++", 1),
  ("js", 1),
  ("R",1),
  ("python",3),
  ("python",6),
  ("scala",2),
  ("scala", 1),
  ("scala",21)), 2)
val groupedRDD = sourceRdd4.groupByKey()
groupedRDD.foreach(println)

结果:

(scala,CompactBuffer(1, 2, 21))
(R,CompactBuffer(1))
(python,CompactBuffer(3, 6))
(java,CompactBuffer(1, 3, 18))
(c++,CompactBuffer(1))
(js,CompactBuffer(1))


reduceByKey(func, [numTasks])

解释:当在(K, V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的duce函数func聚合,该函数的类型必须是(V,V)=>V。与groupByKey中一样,可以通过可选的第二个参数配置Reduce任务的数量。

val sourceRdd4 = sc.parallelize(Seq(
  ("java", 1),
  ("java",3),
  ("java",18),
  ("c++", 1),
  ("js", 1),
  ("R",1),
  ("python",3),
  ("python",6),
  ("scala",2),
  ("scala", 1),
  ("scala",21)), 2)
val reduceByKeyRDD = sourceRdd4.reduceByKey(_+_)
reduceByKeyRDD.foreach(println)

结果:

(scala,24)
(R,1)
(python,9)
(java,22)
(c++,1)
(js,1)


aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

解释:当在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的组合函数和中性“零”值聚合。允许不同于输入值类型的聚合值类型,同时避免不必要的分配。与groupByKey中一样,可以通过可选的第二个参数来配置Reduce任务的数量。

val sourceRdd4 = sc.parallelize(Seq(
  ("java", 1),
  ("java",3),
  ("java",18),
  ("c++", 1),
  ("js", 1),
  ("R",1),
  ("python",3),
  ("python",6),
  ("scala",2),
  ("scala", 1),
  ("scala",21)), 2)
val aggregateRdd = sourceRdd4.aggregateByKey((0, 0))(//初始值
      (acc, num) => (acc._1 + num, acc._2 + 1), // 部分聚合
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 合并聚合结果
    )
aggregateRdd.foreach(println)
 

结果:

(scala,(24,3))
(R,(1,1))
(python,(9,2))
(java,(22,3))
(c++,(1,1))
(js,(1,1))


sortByKey([ascending], [numTasks])

解释:当在K实现Order的(K, V)对的数据集上调用时,返回(K,V)对的数据集,这些数据集按键按升序或降序排序,如布尔ascending参数中指定的那样。

val sourceRdd5 = sc.parallelize(Seq(
  ("java", 96),
  ("c++", 94),
  ("js", 85),
  ("R",88),
  ("python",96),
  ("scala",91)))
//使用 sortByKey 进行排序,默认为升序
val sortedAscRdd = sourceRdd5.sortByKey()
//降序排序
val sortedDescRdd = sourceRdd5.sortByKey(false)
sortedAscRdd.foreach(println)
println("-------------------------------")
sortedDescRdd.foreach(println)
println("------------将key和value调换-------------------")
val sortedAscRdd2 = sourceRdd5.map(x=>{(x._2,x._1)}).sortByKey()
val sortedDescRdd2 = sourceRdd5.map(x=>{(x._2,x._1)}).sortByKey(false)
sortedAscRdd2.foreach(println)
println("-------------------------------")
sortedDescRdd2.foreach(println)

 结果:

(R,88)
(c++,94)
(java,96)
(js,85)
(python,96)
(scala,91)
-------------------------------
(scala,91)
(python,96)
(js,85)
(java,96)
(c++,94)
(R,88)
------------将key和value调换-------------------
(85,js)
(88,R)
(91,scala)
(94,c++)
(96,java)
(96,python)
-------------------------------
(96,java)
(96,python)
(94,c++)
(91,scala)
(88,R)
(85,js)
 


join(otherDataset, [numTasks])

解释:当在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键的所有元素对。 外连接通过leftOuterJoinrightOuterJoin和fullOuterJoinfullOuterJoin

val languageTimeRdd = sc.parallelize(List(("pascal",1970), ("c", 1972),("c++", 1983),("perl", 1987),("python", 1991),("java", 1995)))
val languageCreatorRdd2 = sc.parallelize(List(("java", "James Gosling"), ("python", "Guido van Rossum"), ("perl", "Larry Wall"), ("ruby", "Yukihiro Matsumoto")))

//inner join
println("----------inner join-------------")
val innerJoinResult = languageTimeRdd.join(languageCreatorRdd2)
innerJoinResult.foreach(println)
//left join
println("----------left join-------------")
val leftJoinResult = languageTimeRdd.leftOuterJoin(languageCreatorRdd2)
leftJoinResult.foreach(println)
//right join
println("----------right join-------------")
val rightJoinResult = languageTimeRdd.rightOuterJoin(languageCreatorRdd2)
rightJoinResult.foreach(println)
//full outer join
println("----------full outer join-------------")
val fullJoinResult = languageTimeRdd.fullOuterJoin(languageCreatorRdd2)
fullJoinResult.foreach(println)

结果:

----------inner join-------------
(python,(1991,Guido van Rossum))
(java,(1995,James Gosling))
(perl,(1987,Larry Wall))
----------left join-------------
(c++,(1983,None))
(python,(1991,Some(Guido van Rossum)))
(pascal,(1970,None))
(java,(1995,Some(James Gosling)))
(perl,(1987,Some(Larry Wall)))
(c,(1972,None))
----------right join-------------
(python,(Some(1991),Guido van Rossum))
(java,(Some(1995),James Gosling))
(perl,(Some(1987),Larry Wall))
(ruby,(None,Yukihiro Matsumoto))
----------full outer join-------------
(c++,(Some(1983),None))
(python,(Some(1991),Some(Guido van Rossum)))
(pascal,(Some(1970),None))
(java,(Some(1995),Some(James Gosling)))
(perl,(Some(1987),Some(Larry Wall)))
(ruby,(None,Some(Yukihiro Matsumoto)))
(c,(Some(1972),None))


cogroup(otherDataset, [numTasks])

解释:当在(K, V)和(K,W)类型的数据集上调用时,返回(K,(Iterable<V>,Iterable<W>))元组的数据集。此操作也称为groupWith.

val languageTimeRdd = sc.parallelize(List(("pascal",1970), ("c", 1972),("c++", 1983),("perl", 1987),("python", 1991),("java", 1995)))
val languageCreatorRdd2 = sc.parallelize(List(("java", "James Gosling"), ("python", "Guido van Rossum"), ("perl", "Larry Wall"), ("ruby", "Yukihiro Matsumoto")))
val cogroupRdd = languageTimeRdd.cogroup(languageCreatorRdd2)
cogroupRdd.foreach(println)

结果:

(c++,(CompactBuffer(1983),CompactBuffer()))
(python,(CompactBuffer(1991),CompactBuffer(Guido van Rossum)))
(pascal,(CompactBuffer(1970),CompactBuffer()))
(java,(CompactBuffer(1995),CompactBuffer(James Gosling)))
(perl,(CompactBuffer(1987),CompactBuffer(Larry Wall)))
(ruby,(CompactBuffer(),CompactBuffer(Yukihiro Matsumoto)))
(c,(CompactBuffer(1972),CompactBuffer()))


cartesian(otherDataset)

解释:当在T和U类型的数据集上调用时,返回(T,U)对(所有元素对)的数据集。

val languageTimeRdd = sc.parallelize(List(("pascal",1970), ("c", 1972),("c++", 1983),("perl", 1987),("python", 1991),("java", 1995)))
val languageCreatorRdd2 = sc.parallelize(List(("java", "James Gosling"), ("python", "Guido van Rossum"), ("perl", "Larry Wall"), ("ruby", "Yukihiro Matsumoto")))
val cartesianRdd = languageTimeRdd.cartesian(languageCreatorRdd2)
cartesianRdd.foreach(println)

结果:

((pascal,1970),(java,James Gosling))
((pascal,1970),(python,Guido van Rossum))
((pascal,1970),(perl,Larry Wall))
((pascal,1970),(ruby,Yukihiro Matsumoto))
((c,1972),(java,James Gosling))
((c,1972),(python,Guido van Rossum))
((c,1972),(perl,Larry Wall))
((c,1972),(ruby,Yukihiro Matsumoto))
((c++,1983),(java,James Gosling))
((c++,1983),(python,Guido van Rossum))
((c++,1983),(perl,Larry Wall))
((c++,1983),(ruby,Yukihiro Matsumoto))
((perl,1987),(java,James Gosling))
((perl,1987),(python,Guido van Rossum))
((perl,1987),(perl,Larry Wall))
((perl,1987),(ruby,Yukihiro Matsumoto))
((python,1991),(java,James Gosling))
((python,1991),(python,Guido van Rossum))
((python,1991),(perl,Larry Wall))
((python,1991),(ruby,Yukihiro Matsumoto))
((java,1995),(java,James Gosling))
((java,1995),(python,Guido van Rossum))
((java,1995),(perl,Larry Wall))
((java,1995),(ruby,Yukihiro Matsumoto))


pipe(command, [envVars])

解释:通过shell命令(例如Perl或bash脚本)管道RDD的每个分区。RDD元素被写入进程的标准输入,输出到其标准输出的行作为字符串的RDD返回。

因为需要用到脚本,所以必须要到linux环境中

我们执行spark-shell --master yarn 来测试该算子

val sourceRdd6 = sc.parallelize(List("24/07/08 16:43:39 INFO Executor:",
  "24/07/08 16:43:40 INFO Utils:",
  "24/07/08 16:43:36 INFO SecurityManager",
  "24/07/08 16:43:35 INFO SparkContext"))
val pipeRdd = sourceRdd6.pipe("cut -c 19-")
pipeRdd.collect().foreach(println)

结果:

INFO Executor:
INFO Utils:
INFO SecurityManager
INFO SparkContext

coalesce(numPartitions)

解释:将RDD中的分区数量减少到num分区。对于过滤大型数据集后更有效地运行操作很有用。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
//分区变少 无shuffle
val coalesceRDD1 =  sourceRdd.coalesce(1,false)
//分区变少 有shuffle
val coalesceRDD2 =  sourceRdd.coalesce(1,true)
//分区变多 无shuffle
val coalesceRDD3 =  sourceRdd.coalesce(5,false)
//分区变多 有shuffle
val coalesceRDD4 =  sourceRdd.coalesce(5,true)
println(coalesceRDD1.getNumPartitions)
println(coalesceRDD2.getNumPartitions)
println(coalesceRDD3.getNumPartitions)
println(coalesceRDD4.getNumPartitions)

结果:

1
1
3
5


repartition(numPartitions)

解释:随机重新洗牌RDD中的数据以创建更多或更少的分区并在它们之间进行平衡。 这总是在网络上打乱所有数据。(内部调用的也是coalesce 默认是进行shuffle)

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
//分区变少
val repartitionRdd1 = sourceRdd.repartition(2)
//分区变多
val repartitionRdd2 = sourceRdd.repartition(6)
println(repartitionRdd1.getNumPartitions)
println(repartitionRdd2.getNumPartitions)

结果:

2
6


repartitionAndSortWithinPartitions(partitioner)

解释:根据给定的分区器重新分区RDD,并且在每个结果分区中, 按键对记录进行排序。这比调用repartition然后在其中排序更有效 每个分区,因为它可以将排序向下推到洗牌机械中。

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

val sourceRdd4 = sc.parallelize(Seq(
  ("java", 1),
  ("java",3),
  ("java",18),
  ("c++", 1),
  ("js", 1),
  ("R",1),
  ("python",3),
  ("python",6),
  ("scala",2),
  ("scala", 1),
  ("scala",21)), 2)
println("-----------------------key是字符串--分区变多--------------------------")
val repartitionAndSortWithinPartitionsRdd1 = sourceRdd4.repartitionAndSortWithinPartitions(new MyPartitioner(4))
repartitionAndSortWithinPartitionsRdd1.foreach(println)
println("-----------------------key是字符串--分区变少--------------------------")
val repartitionAndSortWithinPartitionsRdd2 = sourceRdd4.repartitionAndSortWithinPartitions(new MyPartitioner(1))
repartitionAndSortWithinPartitionsRdd2.foreach(println)

println("-----------------------key是数字--分区变多--------------------------")
val repartitionAndSortWithinPartitionsRdd3 = sourceRdd4.map(x=>{(x._2,x._1)}).repartitionAndSortWithinPartitions(new MyPartitioner(4))
repartitionAndSortWithinPartitionsRdd3.foreach(println)
println("-----------------------key是数字--分区变少--------------------------")
val repartitionAndSortWithinPartitionsRdd4 = sourceRdd4.map(x=>{(x._2,x._1)}).repartitionAndSortWithinPartitions(new MyPartitioner(1))
repartitionAndSortWithinPartitionsRdd4.foreach(println)

class MyPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = {
    val index = Math.abs(key.hashCode()) % numPartitions
    println("key:"+key+"---所属分区:"+index)
    index
  }
}

结果:

-----------------------key是字符串--分区变多--------------------------
key:java---所属分区:2
key:java---所属分区:2
key:java---所属分区:2
key:c++---所属分区:3
key:js---所属分区:1
key:R---所属分区:2
key:python---所属分区:0
key:python---所属分区:0
key:scala---所属分区:2
key:scala---所属分区:2
key:scala---所属分区:2
(python,3)
(python,6)
(js,1)
(R,1)
(java,1)
(java,3)
(java,18)
(scala,2)
(scala,1)
(scala,21)
(c++,1)
-----------------------key是字符串--分区变少--------------------------
key:java---所属分区:0
key:java---所属分区:0
key:java---所属分区:0
key:c++---所属分区:0
key:js---所属分区:0
key:R---所属分区:0
key:python---所属分区:0
key:python---所属分区:0
key:scala---所属分区:0
key:scala---所属分区:0
key:scala---所属分区:0
(R,1)
(c++,1)
(java,1)
(java,3)
(java,18)
(js,1)
(python,3)
(python,6)
(scala,2)
(scala,1)
(scala,21)
-----------------------key是数字--分区变多--------------------------
key:1---所属分区:1
key:3---所属分区:3
key:18---所属分区:2
key:1---所属分区:1
key:1---所属分区:1
key:1---所属分区:1
key:3---所属分区:3
key:6---所属分区:2
key:2---所属分区:2
key:1---所属分区:1
key:21---所属分区:1
(1,java)
(1,c++)
(1,js)
(1,R)
(1,scala)
(21,scala)
(2,scala)
(6,python)
(18,java)
(3,java)
(3,python)
-----------------------key是数字--分区变少--------------------------
key:1---所属分区:0
key:3---所属分区:0
key:18---所属分区:0
key:1---所属分区:0
key:1---所属分区:0
key:1---所属分区:0
key:3---所属分区:0
key:6---所属分区:0
key:2---所属分区:0
key:1---所属分区:0
key:21---所属分区:0
(1,java)
(1,c++)
(1,js)
(1,R)
(1,scala)
(2,scala)
(3,java)
(3,python)
(6,python)
(18,java)
(21,scala)

2、Actions 算子

reduce(func)

解释:使用函数func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换和关联的,以便可以并行正确计算。

val sourceRdd4 = sc.parallelize(Seq(
  ("java", 1),
  ("java",3),
  ("java",18),
  ("c++", 1),
  ("js", 1),
  ("R",1),
  ("python",3),
  ("python",6),
  ("scala",2),
  ("scala", 1),
  ("scala",21)), 2)

val reduceRdd = sourceRdd4.reduce((kv1,kv2)=>{
      var newKey:String = kv1._1
      val sumNum:Int = kv1._2+kv2._2
      if(!kv1._1.contains(kv2._1)){
        newKey = kv1._1 +"_"+kv2._1
      }
      (newKey,sumNum)
    })
println(reduceRdd._1+"----"+reduceRdd._2)

结果:

java_c++_js_R_python_scala----58


collect()

解释:在驱动程序中以数组的形式返回数据集的所有元素。这通常在返回足够小的数据子集的过滤器或其他操作之后很有用。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
sourceRdd.collect().foreach(println)

结果:

1
2
3
4
5
6
7
8
9
10


count()

解释:返回数据集中元素的数量。
 

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
println(sourceRdd.count())

结果:

10


first()

解释:返回数据集的第一个元素(类似于take(1) )。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
println(sourceRdd.first())

结果:

1


take(n)

解释:返回包含数据集前n个元素的数组

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)

sourceRdd.take(3).foreach(println)

结果:

1
2
3


takeSample(withReplacement, num, [seed])

解释:返回一个数组,其中包含数据集num元素的随机样本,有或没有替换,可选地预先指定随机数生成器种子。

val sourceRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
sourceRdd.takeSample(false,5).foreach(println)
println("---------------")
sourceRdd.takeSample(true,5).foreach(println)

结果:

6
1
9
4
3
---------------
3
2
9
4
2


takeOrdered(n, [ordering])

解释:使用它们的自然顺序或自定义比较器返回RDD的前n个元素

val sourceRd8 = sc.parallelize(List(13,2,6,4,8,3,7,8,91,1),3)
sourceRd8.takeOrdered(5).foreach(println)

结果:

1
2
3
4
6


saveAsTextFile(path)

解释:将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS或任何其他Hadoop支持的文件的给定目录中系统。Spark将在每个元素上调用toString将其转换为文件中的一行文本。


saveAsSequenceFile(path)

解释:将数据集的元素写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统的给定路径中的Hadoop Sequence File。这在实现Hadoop的Writable接口的键值对的RDD上可用。在Scala中,它也适用于隐式转换为Writable的类型(Spark包括Int、Double、String等基本类型的转换)。


saveAsObjectFile(path)

解释:使用Java序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile()。



countByKey()

解释:仅在(K,V)类型的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数。

val sourceRdd4 = sc.parallelize(Seq(
      ("java", 1),
      ("java",3),
      ("java",18),
      ("c++", 1),
      ("js", 1),
      ("R",1),
      ("python",3),
      ("python",6),
      ("scala",2),
      ("scala", 1),
      ("scala",21)), 2)
sourceRdd4.countByKey().foreach(println)

结果:

(java,3)
(c++,1)
(scala,3)
(python,2)
(js,1)
(R,1)


foreach(func)(上面已经演示过)

解释:对数据集的每个元素运行一个函数func。这通常用于副作用,例如更新累加器或与外部存储系统交互。

注意:在foreach()之外修改除Accumulator之外的变量可能会导致未定义的行为。有关详细信息,请参阅了解闭包。

3、Shuffle操作

Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区之间以不同的方式分组。这通常涉及跨 executors  和 节点 复制数据,因此shuffle是复杂且成本高昂的操作。

我们以 reduceByKey 为例说明:

reduceByKey 生成一个新的RDD,相同key的所有value会被组合到一起,这就需要将本分区中不同的key发送到不同的节点。这对于Spark来说是一个全向操作。它必须从所有分区中读取以查找所有key的所有value, 然后将跨分区的value聚集在一起以计算每个key的最终结果- 这就是Shuffle

尽管新洗牌数据的每个分区中的元素集是确定的,分区本身的顺序也是确定的,但这些元素的顺序不是。如果希望在洗牌后可预测地排序数据,那么可以使用:

mapPartitions(对每个分区进行排序,例如,使用.sorted) 

repartitionAndSortWithinPartitions(在重新分区的同时有效地对分区进行排序 

sortBy(创建全局排序的RDD

由于Shuffle涉及磁盘I/O、数据序列化和 网络输入输出。为了整理这些数据,Spark会产生map任务和reduce任务来实现(这里说的并不是MapReduce中的mapTask和reduceTask)

在内部,单个map任务的结果会保存在内存中,当数据不合适再内存中时会溢写到磁盘(会产生磁盘IO开销和垃圾回收机制)

Shuffle会在磁盘产生大量的中间文件,这些文件会一直保留,直到不再使用相关RDD并被垃圾收集。这是为了再次使用RDD时提升Shuffle的性能。

4、持久化

当我们有迭代算法的需求时,我们就需要将之前的RDD进行缓存或者持久化,可以直接使用persist()或cache()来实现。Spark的缓存也是容错的,如果发现缓存的RDD有分区丢失,会通过血缘关系再次创建它。

每个RDD都可以选择不同的持久化级别,比如磁盘、内存。这些级别通过传递一个 StorageLevel对象来实现。

存储级别含义
MEMORY_ONLY将RDD存储为JVM中反序列化的Java对象。如果RDD不适合内存,某些分区将不会被缓存,并且每次需要时都会立即重新计算。这是默认级别。
MEMORY_AND_DISK将RDD存储为JVM中反序列化的Java对象。如果RDD不适合内存,则将不适合磁盘的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_ONLY_SER只适用于Java和Scala,将RDD存储为序列化的Java对象(每个分区一个字节数组)。 这通常比反序列化对象更节省空间,尤其是在使用 快速序列化程序,但读取更占用CPU。
MEMORY_AND_DISK_SER只适用于Java和Scala,类似于MEMORY_ONLY_SER,但是将不适合内存的分区溢出到磁盘,而不是每次需要时立即重新计算它们。
DISK_ONLY仅将RDD分区存储在磁盘上
MEMORY_ONLY_2、MEMORY_AND_DISK_2等。与上面的级别相同,但在两个集群节点上复制每个分区。
OFF_HEAP(实验阶段)类似于MEMORY_ONLY_SER,但将数据存储在 堆外内存。这需要启用堆外内存。

Spark还会自动在shuffle操作中持久化一些中间数据(例如reduceByKey),即使没有用户调用persist。这样做是为了避免在洗牌过程中节点失败时重新计算整个输入。

那么选择哪种存储级别呢?

Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。官网给了我们一下建议:

1、如果您的RDD适合默认存储级别(MEMORY_ONLY),请保持不变。 这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。

2、如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库来使对象更加节省空间,但访问速度仍然相当快。(适用于Java和Scala)

3、不要溢出到磁盘,除非计算数据集的函数很昂贵,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快

4、如果您想要快速故障恢复(例如,如果使用Spark处理来自Web应用程序的请求),请使用复制的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

5、删除数据

Spark自动监控每个节点上的缓存使用情况,并在 最近最少使用(LRU)方式。如果您想手动删除RDD而不是等待 如果要从缓存中删除,请使用RDD.unpersist()方法。

二、共享变量

通常情况下,我们传递给算子的函数是作用在集群中各个worker上的,其中的变量被复制到每个worker,那么如何跨任务共享变量或者将变量回传给driver呢?Spark给我们提供了两种类型的共享变量即:广播变量和累加器。

1、广播变量

广播变量是将只读变量缓存在每台机器上,而不是将其副本与任务一起发送。例如,它们可以用来以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

创建方法为:

val broadcastVar = sc.broadcast(Array(1, 2, 3))

 使用方法为:

broadcastVar.value

2、累加器

累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在MapReduce中)或求和。Spark原生支持数字类型的累加器,我们可以添加对其他类型的支持。

累加器在了解整体任务运行进度时很有用,使用方法如下:

创建累加器

val accum = sc.longAccumulator("My Accumulator")

//各个节点累加操作

accum.add(x)

//driver端获取结果
accum.value

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/791015.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

maven7——(重要,构建项目)maven项目构建(命令)

Maven的常用命令管理项目的生命周期 clean命令 清除编译产生的target文件夹内容&#xff0c;可以配合相应命令在cmd中使用&#xff0c;如mvn clean package&#xff0c; mvn clean test D:\工作\公司培训-4班\day20\day20\untitled1>mvn clean compile命令 该命令可以…

Nginx: Rewrite功能配置/Nginx反向代理/Nginx的安全控制SSL

Rewrite功能配置 Rewrite是Nginx服务器提供的一个重要基本功能&#xff0c;是Web服务器产品中几乎必备的功能。主要的作用是用来实现URL的重写。www.jd.com 注意:Nginx服务器的Rewrite功能的实现依赖于PCRE的支持&#xff0c;因此在编译安装Nginx服务器之前&#xff0c;需要安…

[ICS] Inferno(地狱) ETH/IP未授权访问,远程控制工控设备利用工具

项目地址:https://github.com/MartinxMax/Inferno Inferno $ ./Install.sh $ python Inferno.py -h 模拟服务端 $ sudo python3 -m pip install --upgrade cpppo $ $ python -m cpppo.server.enip SCADAINT[1000] ADMININT[2] -v 创建一个EtherNet/IP设备 扫描设备 $ pyth…

【网络安全】Oracle:SSRF获取元数据

未经许可&#xff0c;不得转载。 文章目录 前言正文漏洞利用 前言 Acme 是一家广受欢迎的播客托管公司&#xff0c;拥有庞大的客户群体。与许多大型运营公司一样&#xff0c;Acme 采用了Apiary的服务&#xff0c;使用户能够安全高效地管理他们的播客。 Apiary 于2017年初被Or…

【Django项目】基于Python+Django+MySQL的音乐网站系统项目

功能介绍 首页&#xff1a;歌曲分类、歌曲搜索、热门歌曲、热门下载、新歌推荐 歌曲排行&#xff1a;歌曲分类、分页功能 用户板块&#xff1a;用户登陆/注册、播放历史 歌曲详情&#xff1a;歌曲播放、当前播放列表、歌曲点评、歌曲播放插件、下载歌曲 系统后台&#xff1a;歌…

如何将Docker镜像源更改为阿里云的镜像加速地址

在使用Docker时&#xff0c;尤其是在国内环境下&#xff0c;由于网络原因&#xff0c;从Docker Hub拉取镜像可能会遇到速度较慢的问题。为了提高拉取速度&#xff0c;我们可以将Docker的镜像源更改为阿里云等国内镜像源。下面详细介绍如何获取并配置阿里云的Docker镜像加速地址…

【中项第三版】系统集成项目管理工程师 | 第 4 章 信息系统架构⑤ | 4.8 - 4.9

前言 第4章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节属于技术相关的内容&#xff0c;学习要以教材为准。本章分值预计在4-5分。 目录 4.8 云原生架构 4.8.1 发展概述 4.8.2 架构定义 4.8.3 基本原则 4.8.4 常用架构模式 4.8.5 云原生案例 4.9 本章…

docker(六)--创建镜像

六、创建镜像 1.创建镜像两种方式 方式1&#xff1a; 更新镜像 docker commit 方式2&#xff1a;构建镜像 docker build 2.更新镜像 1&#xff09;用法 docker commit -m“描述信息” -a作者 容器id或者容器名 镜像名:tag 2&#xff09;步骤 ①根据镜像运行容器 ②进入容…

栈和队列题目详解

前言&#xff1a; 在前面我们学习了栈和队列&#xff0c;栈的特性是后进先出&#xff0c;队列的特性是先进先出&#xff0c;当我们了解了这些之后&#xff0c;我们就可以用到栈和队列的特性来简单的做一些题目了。 1. 有效的括号 有效的括号&#xff1a;. - 力扣&#xff08…

YOLOv10改进 | Conv篇 | 利用FasterBlock二次创新C2f提出一种全新的结构(全网独家首发,参数量下降70W)

一、本文介绍 本文给大家带来的改进机制是利用FasterNet的FasterBlock改进特征提取网络&#xff0c;将其用来改进ResNet网络&#xff0c;其旨在提高计算速度而不牺牲准确性&#xff0c;特别是在视觉任务中。它通过一种称为部分卷积&#xff08;PConv&#xff09;的新技术来减少…

火柴棒图python绘画

使用Python绘制二项分布的概率质量函数&#xff08;PMF&#xff09; 在这篇博客中&#xff0c;我们将探讨如何使用Python中的scipy库和matplotlib库来绘制二项分布的概率质量函数&#xff08;PMF&#xff09;。二项分布是统计学中常见的离散概率分布&#xff0c;描述了在固定次…

夏日智启:我的Datawhale AI夏令营探索之旅

前言 最近几年&#xff0c;AI&#xff08;人工智能&#xff09;的发展呈现出了前所未有的迅猛势头&#xff0c;其影响力和应用范围不断扩大&#xff0c;深刻地改变着我们的生活、工作和社会结构。尤其是AI大模型技术&#xff0c;国内外可谓是“百模大战”&#xff0c;百舸争流…

ESP32网络开发:1.创建一个基于TCP网络协议的网站

一、TCP协议的介绍 TCP&#xff08;传输控制协议&#xff0c;Transmission Control Protocol&#xff09;是互联网协议套件中的一种核心协议&#xff0c;主要用于在网络中的计算机之间可靠地传输数据。TCP协议位于OSI模型&#xff08;开放系统互联模型&#xff09;的传输层&…

虚拟机内安装vue-dev-tools

前言 项目开发调试都需要在Citrix在虚拟机环境下&#xff0c;Citrix内连接不到外网&#xff0c;在这边文章&#xff0c;我将介绍自己在Citrix环境内安装 vue-dev-tools的经验 环境 vue 步骤 1. 下载.crx文件 百度网盘里的 .crx文件的 下载链接 2. 加载.crx文件 打开浏览…

软件兼容性测试重要吗?有哪些测试流程和注意事项?

软件兼容性测试是指测试软件在不同硬件、操作系统、网络环境和软件环境下的稳定性和可用性的能力&#xff0c;也就是说&#xff0c;软件在不同的平台上是否能正常运行&#xff0c;是否能与其他软件和系统兼容。 兼容性问题是影响软件用户体验的重要因素之一&#xff0c;如果软…

学习大数据DAY13 PLSQL基础语法2

目录 选择结构 IF语句 简单判断语句 带判断不成立语句 多判断语句 IF语句注意事项&#xff1a; CASE 语句 简单CASE语句 搜索型CASE语句 作业 循环语句 循环结构 简单循环 属性 描述 位置 场景 WHILE循环 属性 FOR循环 数值型for循环 数值型for循环的特性…

【Redis】简单了解Redis中常用的命令与数据结构

希望文章能给到你启发和灵感&#xff5e; 如果觉得文章对你有帮助的话&#xff0c;点赞 关注 收藏 支持一下博主吧&#xff5e; 阅读指南 开篇说明一、基础环境说明1.1 硬件环境1.2 软件环境 二、Redis的特点和适用场景三、Redis的数据类型和使用3.1字符串&#xff08;String&…

谷粒商城实战笔记-24-分布式组件-SpringCloud Alibaba-Nacos配置中心-命名空间与配置分组

文章目录 一&#xff0c;命名空间1&#xff0c;简介1.1&#xff0c;命名空间的主要功能和特点1.2&#xff0c;使用场景1.3&#xff0c;如何指定命名空间 2&#xff0c;命名空间实战2.1&#xff0c;环境隔离2.2&#xff0c;服务隔离 二&#xff0c;配置集三&#xff0c;配置集ID…

HTML+CSS+JS 实现3D风吹草动效果(B站视频)

效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>3D effect&…

线下线上游戏电竞陪伴APP小程序H5同城线下约玩APP开发,语聊约玩平台搭建游戏陪玩APP源码

开发一款线下陪玩约玩APP的实际意义和在生活中的应用场景 1、满足社交需求:现代社会人们的社交圈往往受到时间、地点和其他限制的影响。线下陪玩约玩APP可以提供一个平台&#xff0c;让用户通过约玩的方式结识新朋友、扩大社交圈 2、解决孤独感:有些人由于工作忙碌、居住环境单…