Spark核心之02:常用算子详解

1、RDD操作详解

# 启动spark-shell
spark-shell --master local[2] 

1.1 基本转换

1) map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

2) filter

filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

val rdd = sc.parallelize(List(1,2,3,4,5,6))  
val filterRdd = rdd.filter(_ > 5)
filterRdd.collect() //返回所有大于5的数据的一个Array, Array(6,8,10,12)

3) flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

⭐️4) mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
								preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数ff的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
                                                                var res = List[(T, T)]() 
                                                                var pre = iter.next 
                                                                while (iter.hasNext) {
                                                                  val cur = iter.next
                                                                  res.::=(pre, cur)
                                                                  pre = cur  } 
                                                                  res.iterator
                                                                }
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

5) mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd2 = rdd1.mapPartitionsWithIndex{
    |     (x,iter) => {
    |       var result = List[String]()
    |       var i = 0
    |       while(iter.hasNext){
    |        i += iter.next()
    |       }
    |       result.::(x + "|" + i).iterator
    |     }
    | }

//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)  //p-0(1,2) p-1(3,4,5)

🈂️好像没用了🈂️6) mapWith

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10加2,作为新的RDD的元素。

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect 

结果:

(1,2)

(2,2)

(3,2)

(4,12)

(5,12)

(6,12)

(7,22)

(8,22)

(9,22)

(10,22)

🈂️好像没用了🈂️7) flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

⭐️8) coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

该函数用于将RDD进行重分区,使用HashPartitioner。

第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

以下面的例子来看:

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data.coalesce(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data.coalesce(4) 
scala> rdd1.partitions.size
res2: Int = 1   //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
scala> var rdd1 = data.coalesce(4,true) 
scala> rdd1.partitions.size
res3: Int = 4

⭐️9) repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

该函数其实就是coalesce函数第二个参数为true的实现

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data. repartition(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data. repartition(4) 
scala> rdd1.partitions.size
res3: Int = 4

10) randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

该函数根据weights权重,将一个RDD切分成多个RDD。

该权重参数为一个Double数组

第二个参数为random的种子,基本可忽略。

scala> var rdd = sc.makeRDD(1 to 12,12)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21

scala> rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  
 
scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 
MapPartitionsRDD[18] at randomSplit at :23, 
MapPartitionsRDD[19] at randomSplit at :23, 
MapPartitionsRDD[20] at randomSplit at :23)
 
//这里注意:randomSplit的结果是一个RDD数组
scala> splitRDD.size
res8: Int = 4
//由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
//把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
//注意,权重的总和加起来为1,否则会不正常 
scala> splitRDD(0).collect
res10: Array[Int] = Array(1, 4)
 
scala> splitRDD(1).collect
res11: Array[Int] = Array(3)                                                    
 
scala> splitRDD(2).collect
res12: Array[Int] = Array(5, 9)
 
scala> splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)

11) glom

def glom(): RDD[Array[T]]

该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素

scala> var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd.partitions.size
res33: Int = 3  //该RDD有3个分区
scala> rdd.glom().collect
res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组

⭐️12) union并集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求并集

val rdd3 = rdd1.union(rdd2)

rdd3.collect

⭐️13) distinct去重

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求并集

val rdd3 = rdd1.union(rdd2)

//去重输出

rdd3.distinct.collect

⭐️14) intersection交集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求交集

val rdd4 = rdd1.intersection(rdd2) 

rdd4.collect

⭐️15) subtract

def subtract(other: RDD[T]): RDD[T]

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。

val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求差集

val rdd4 = rdd1.subtract(rdd2)

rdd4.collect

16) subtractByKey

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]

def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 

scala> rdd1.subtractByKey(rdd2).collect

res13: Array[(String, String)] = Array((B,2))

17) groupbyKey

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd5 = rdd4.groupByKe
rdd5.collect

18) reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd6 = rdd4.reduceByKey(_ + _)
rdd6.collect()

19) sortByKey

将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按名称排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key进行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

//false降序

val rdd5 = rdd4.sortByKey(false)

rdd5.collect

20) sortBy

将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按数值排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key进行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

//false降序

val rdd5 = rdd4.sortBy(_._2, false)

rdd5.collect

21) zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 5,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21



scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21



scala> rdd1.zip(rdd2).collect

res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))      



scala> rdd2.zip(rdd1).collect

res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))



scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)

rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.zip(rdd3).collect

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

//如果两个RDD分区数不同,则抛出异常

22) zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

该函数有好几种实现,可分为三类:

1. 参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
  |     (x,iter) => {
  |      var result = List[String]()
  |       while(iter.hasNext){
  |        result ::= ("part_" + x + "|" + iter.next())
  |       }
  |       result.iterator
  |       
  |     }
  |    }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
  |     (x,iter) => {
  |      var result = List[String]()
  |       while(iter.hasNext){
  |        result ::= ("part_" + x + "|" + iter.next())
  |       }
  |       result.iterator
  |       
  |     }
  |    }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
  |    (rdd1Iter,rdd2Iter) => {
  |     var result = List[String]()
  |     while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
  |      result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
  |     }
  |     result.iterator
  |    }
  |   }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
2. 参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21

scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21

//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{
  |     (x,iter) => {
  |      var result = List[String]()
  |       while(iter.hasNext){
  |        result ::= ("part_" + x + "|" + iter.next())
  |       }
  |       result.iterator
  |       
  |     }
  |    }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)

//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
  |    (rdd1Iter,rdd2Iter,rdd3Iter) => {
  |     var result = List[String]()
  |     while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
  |      result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
  |     }
  |     result.iterator
  |    }
  |   }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27

scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
3. 参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不过这里又多了个一个RDD而已。

23) zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21

scala> rdd2.zipWithIndex().collect

res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

24) zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

看下面的例子:

scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21

//rdd1有两个分区,

scala> rdd1.zipWithUniqueId().collect

res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))

//总分区数为2

//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1

//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4

//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

1.2 键值转换

⭐️25) partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21

scala> rdd1.partitions.size
res20: Int = 2
 
//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
   |     (partIdx,iter) => {
   |      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
   |       while(iter.hasNext){
   |        var part_name = "part_" + partIdx;
   |        var elem = iter.next()
   |        if(part_map.contains(part_name)) {
   |         var elems = part_map(part_name)
   |         elems ::= elem
   |         part_map(part_name) = elems
   |        } else {
   |         part_map(part_name) = List[(Int,String)]{elem}
   |        }
   |       }
   |       part_map.iterator
   |       
   |     }
   |    }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))

//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中
 
//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23

scala> rdd2.partitions.size
res23: Int = 2

//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{
   |     (partIdx,iter) => {
   |      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
   |       while(iter.hasNext){
   |        var part_name = "part_" + partIdx;
   |        var elem = iter.next()
   |        if(part_map.contains(part_name)) {
   |         var elems = part_map(part_name)
   |         elems ::= elem
   |         part_map(part_name) = elems
   |        } else {
   |         part_map(part_name) = List[(Int,String)]{elem}
   |        }
   |       }
   |       part_map.iterator
   |     }
   |    }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))

//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

26) mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)

scala> val b = a.map(x => (x.length, x))

scala> b.mapValues("x" + _ + "x").collect

res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

27) flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

val b = a.flatMapValues(x => 1.to(x))

b.collect.foreach(println)

28) combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做

mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

看下面例子:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 

scala> rdd1.combineByKey(
   |    (v : Int) => v + "_",  
   |    (c : String, v : Int) => c + "@" + v,  
   |    (c1 : String, c2 : String) => c1 + "$" + c2
   |   ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三个映射函数分别为:

createCombiner: (V) => C

(v : Int) => v + “” //在每一个V值后面加上字符,返回C类型(String)

mergeValue: (C, V) => C

(c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)

mergeCombiners: (C, C) => C

(c1 : String, c2 : String) => c1 + “ ” + c 2 / / 合并 C 类型和 C 类型,中间加 ” + c2 //合并C类型和C类型,中间加 +c2//合并C类型和C类型,中间加,返回C(String)

其他参数为默认值。

最终,将RDD[String,Int]转换为RDD[String,String]。

再看例子:

rdd1.combineByKey(
   (v : Int) => List(v),
   (c : List[Int], v : Int) => v :: c,
   (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

最终将RDD[String,Int]转换为RDD[String,List[Int]]。

29) foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

scala> rdd1.foldByKey(0)(_+_).collect

res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 

//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操

//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:

//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(_+_).collect

res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))

//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函

//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

scala> rdd1.foldByKey(0)(__).collect

res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))

//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",00), ("A",20),

//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,00),即:(A,0)

//其他K也一样,最终都得到了V=0

scala> rdd1.foldByKey(1)(__).collect

res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))

//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。

在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。

30) reduceByKeyLocally

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21

scala> rdd1.reduceByKeyLocally((x,y) => x + y)

res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)

31) cogroup和groupByKey的区别

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//groupbykey

val rdd4 = rdd1.union(rdd2).groupByKey

//注意cogroup与groupByKey的区别

rdd3.foreach(println)

rdd4.foreach(println)

⭐️32) join

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//求jion

val rdd3 = rdd1.join(rdd2)

rdd3.collect

33) leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

scala> rdd1.leftOuterJoin(rdd2).collect

res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))

34) rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 

rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

scala> rdd1.rightOuterJoin(rdd2).collect

res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

1.3 Action操作

35) first

def first(): T

first返回RDD中的第一个元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21

scala> rdd1.first

res14: (String, String) = (A,1)

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.first

res8: Int = 10

36) count

def count(): Long

count返回RDD中的元素数量。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21

scala> rdd1.count

res15: Long = 3

36) reduce

def reduce(f: (T, T) ⇒ T): T

根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.reduce(_ + _)
res18: Int = 55

scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21

scala> rdd2.reduce((x,y) => {
  |    (x._1 + y._1,x._2 + y._2)
  |   })
res21: (String, Int) = (CBBAA,6)

37) collect

def collect(): Array[T]

collect用于将一个RDD转换成数组。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

38) take

def take(num: Int): Array[T]

take用于获取RDD中从0到num-1下标的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.take(1)

res0: Array[Int] = Array(10)                           

scala> rdd1.take(2)

res1: Array[Int] = Array(10, 4)

39) top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.top(1)
res2: Array[Int] = Array(12)

scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)

//指定排序规则
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef

scala> rdd1.top(1)
res4: Array[Int] = Array(2)

scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

40) takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

takeOrdered和top类似,只不过以和top相反的顺序返回元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.top(1)
res4: Array[Int] = Array(12)

scala> rdd1.top(2)
res5: Array[Int] = Array(12, 10)

scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(2)

scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(2, 3)

⭐️41) aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
    (partIdx,iter) => {
     var part_map = scala.collection.mutable.Map[String,List[Int]]()
      while(iter.hasNext){
       var part_name = "part_" + partIdx;
       var elem = iter.next()
       if(part_map.contains(part_name)) {
        var elems = part_map(part_name)
        elems ::= elem
        part_map(part_name) = elems
       } else {
        part_map(part_name) = List[Int]{elem}
       }
      }
      part_map.iterator
      
    }
   }.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))

##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6

scala> rdd1.aggregate(1)(
   |      {(x : Int,y : Int) => x + y}, 
   |      {(a : Int,b : Int) => a + b}
   |   )
res17: Int = 58

结果为什么是58,看下面的计算过程:

##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1

##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

##part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1

##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(
  |      {(x : Int,y : Int) => x + y}, 
  |      {(a : Int,b : Int) => a  b}
  |   )
res18: Int = 1428
##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValuepart_0part_1 = 2  17  42 = 1428

因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

42) fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(
   |    (x,y) => x + y   
   |   )
res19: Int = 58
##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
   |      {(x,y) => x + y}, 
   |      {(a,b) => a + b}
   |   )
res20: Int = 58

43) lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)

scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)

44) countByKey

def countByKey(): Map[K, Long]

countByKey用于统计RDD[K,V]中每个K的数量。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21

scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

45) foreach

def foreach(f: (T)Unit): Unit

foreach用于遍历RDD,将函数f应用于每一个元素。

但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。

比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。

我在Spark1.4中是这样,不知道是否真如此。

这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.foreach(x => cnt += x)

scala> cnt.value
res51: Int = 55

scala> rdd1.collect.foreach(println) 

⭐️46) foreachPartition

def foreachPartition(f: (Iterator[T])Unit): Unit

foreachPartition和foreach类似,只不过是对每一个分区使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0

scala>   rdd1.foreachPartition { x => {
   |    allsize += x.size
   |   }}
scala> println(allsize.value)
#10

⭐️47) sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根据给定的排序k函数将RDD中的元素进行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)

scala> rdd1.sortBy(x => x).collect

res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序

scala> rdd1.sortBy(x => x,false).collect

res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序

//RDD[K,V]类型

scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

scala> rdd1.sortBy(x => x).collect

res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))

//按照V进行降序排序

scala> rdd1.sortBy(x => x._2,false).collect

res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))

48) saveAsTextFile

def saveAsTextFile(path: String): Unit

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

codec参数可以指定压缩的类名。

var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS

hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--  2 lxw1234 supergroup     0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup     21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000

hadoop fs -cat /tmp/lxw1234.com/part-00000

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。

## 指定压缩格式保存

rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])

hadoop fs -ls /tmp/lxw1234.com
-rw-r--r--  2 lxw1234 supergroup   0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup   71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo

hadoop fs -text /tmp/lxw1234.com/part-00000.lzo

49) saveAsSequenceFile

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。

用法同saveAsTextFile。

50) saveAsObjectFile

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

对于HDFS,默认采用SequenceFile保存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")

hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
  1. saveAsHadoopFile
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit

def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf =, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及压缩格式。

每个分区输出一个文件。

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[com.hadoop.compression.lzo.LzopCodec])

52) saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。

在JobConf中,通常需要关注或者设置五个参数:

文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。

##使用saveAsHadoopDataset将RDD保存到HDFS中

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)

结果:

hadoop fs -cat /tmp/lxw1234/part-00000
A    2
A    1

hadoop fs -cat /tmp/lxw1234/part-00001
B    6
B    3
B    7

##保存数据到HBASE

##HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

var conf = HBaseConfiguration.create()
  var jobConf = new JobConf(conf)
  jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
  jobConf.set("zookeeper.znode.parent","/hbase")
  jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
  jobConf.setOutputFormat(classOf[TableOutputFormat])

  var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
  rdd1.map(x => 
   {
    var put = new Put(Bytes.toBytes(x._1))
    put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
    (new ImmutableBytesWritable,put)
   }
  ).saveAsHadoopDataset(jobConf)
##结果:
hbase(main):005:0> scan 'lxw1234'
ROW   COLUMN+CELL                                                 
 A    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                        
 B    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                        
 C    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                        
3 row(s) in 0.0550 seconds

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

53) saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit

saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

54) saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。

以写入HBase为例:

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

完整的Spark应用程序:

package com.lxw1234.test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put

object Test {
 def main(args : Array[String]) {
  val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
  val sc = new SparkContext(sparkConf);
  var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))

  sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
  sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
  sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")

  var job = new Job(sc.hadoopConfiguration)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  rdd1.map(
   x => {
    var put = new Put(Bytes.toBytes(x._1))
    put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
    (new ImmutableBytesWritable,put)
   }   
  ).saveAsNewAPIHadoopDataset(job.getConfiguration)
  
  sc.stop()  
 }
}

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/982260.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

K8s The connection to the server 192.168.56.120:6443 was refused报错解决

虚拟机获取不到其他node节点的信息&#xff0c;通过使用docker ps -a 排查看到k8s的组件都是exited的状态&#xff0c;通过手动拉起docker 镜像id 起来之后&#xff0c;又变为exited的状态&#xff01;&#xff01;&#xff01; 解决方法&#xff1a;重置k8s集群 使用 kubeadm…

随机树算法 自动驾驶汽车的路径规划 静态障碍物(Matlab)

随着自动驾驶技术的蓬勃发展&#xff0c;安全、高效的路径规划成为核心挑战之一。快速探索随机树&#xff08;RRT&#xff09;算法作为一种强大的路径搜索策略&#xff0c;为自动驾驶汽车在复杂环境下绕过静态障碍物规划合理路径提供了有效解决方案。 RRT 算法基于随机采样思想…

【实战 ES】实战 Elasticsearch:快速上手与深度实践-2.3.1 避免频繁更新(Update by Query的代价)

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 Elasticsearch数据更新与删除深度解析&#xff1a;2.3.1 避免频繁更新&#xff08;Update by Query的代价&#xff09;案例背景1. Update by Query的内部机制解析1.1 文档更…

Baklib内容中台赋能企业智管

内容中台构建全场景智管 现代企业数字化运营中&#xff0c;全域内容管理能力已成为核心竞争力。通过智能知识引擎驱动的内容中台架构&#xff0c;企业能够实现跨部门、多形态数据的统一归集与动态调度。以某制造企业为例&#xff0c;其利用中台系统将分散在CRM、ERP及内部文档…

今天来介绍和讨论 AGI(通用人工智能)

首先介绍&#xff0c;AGI&#xff08;通用人工智能&#xff09;是什么&#xff1f; AGI&#xff08;Artificial General Intelligence&#xff0c;通用人工智能&#xff09;指的是能够像人类一样理解、学习、推理和解决广泛任务的人工智能系统。与目前的AI不同&#xff0c;AGI可…

计算机毕业设计SpringBoot+Vue.js乐享田园系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

navicat导出postgresql的数据库结构、字段名、备注等等

1、执行sql语句 SELECT A.attnum AS "序号",C.relname AS "表名",CAST ( obj_description ( relfilenode, pg_class ) AS VARCHAR ) AS "表名描述",A.attname AS "字段名称",A.attnotnull as "是否不为null",(case when A…

FPGA开发,使用Deepseek V3还是R1(9):FPGA的全流程(详细版)

以下都是Deepseek生成的答案 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;1&#xff09;&#xff1a;应用场景 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;2&#xff09;&#xff1a;V3和R1的区别 FPGA开发&#xff0c;使用Deepseek V3还是R1&#x…

DeepSeek、Grok 和 ChatGPT 对比分析:从技术与应用场景的角度深入探讨

文章目录 一、DeepSeek&#xff1a;知识图谱与高效信息检索1. 核心技术2. 主要特点3. 应用场景4. 实际案例 二、Grok&#xff1a;通用人工智能框架1. 核心技术2. 主要特点3. 应用场景4. 实际案例 三、ChatGPT&#xff1a;聊天机器人与通用对话系统1. 核心技术2. 主要特点3. 应用…

三、0-1搭建springboot+vue3前后端分离-idea新建springboot项目

一、ideal新建项目1 ideal新建项目2 至此父项目就创建好了&#xff0c;下面创建多模块&#xff1a; 填好之后点击create 不删了&#xff0c;直接改包名&#xff0c;看自己喜欢 修改包名和启动类名&#xff1a; 打开ServiceApplication启动类&#xff0c;修改如下&#xff1a; …

快速生成viso流程图图片形式

我们在写详细设计文档的过程中总会不可避免的涉及到时序图或者流程图的绘制&#xff0c;viso这个软件大部分技术人员都会使用&#xff0c;但是想要画的好看&#xff0c;画的科学还是比较难的&#xff0c;现在我总结一套比较好的方法可以生成好看科学的viso图(图片格式)。主要思…

【前端基础】Day 9 PC端品优购项目

目录 1. 品优购项目规划 1.1 网站制作流程 1.2 品优购项目整体介绍 1.3 学习目的 1.4 开发工具以及技术栈 1.5 项目搭建工作 1.6 网站favicon图标 1.7 网站TDK三大标签SEO优化 2. 品优购首页制作 2.1 常见模块类命名 2.2 快捷导航shortcut制作 2.3 header制作 2.4…

仿mudou库one thread oneloop式并发服务器

项目gitee&#xff1a;仿muduo: 仿muduo 一&#xff1a;项目目的 1.1项目简介 通过咱们实现的⾼并发服务器组件&#xff0c;可以简洁快速的完成⼀个⾼性能的服务器搭建。 并且&#xff0c;通过组件内提供的不同应⽤层协议⽀持&#xff0c;也可以快速完成⼀个⾼性能应⽤服务器…

一文学会Spring

一、Spring简介 Spring的优点 Spring是一个开源免费的框架、容器Spring是一个轻量级的框架&#xff0c;非侵入式的控制反转IOC、面向切面AOP支持事务 Spring是一个轻量级的控制反转(IOC)和面向切面(AOP)的容器 二、IOC 2.1 IOC本质 控制反转IOC&#xff0c;是一种设计思想…

解决Spring Boot中LocalDateTime返回前端数据为数组结构的问题

在Spring Boot开发中&#xff0c;处理日期时间数据是一个常见的需求。Java 8 引入了新的日期时间API&#xff0c;如LocalDateTime&#xff0c;它提供了更强大的日期时间处理功能。然而&#xff0c;在将LocalDateTime对象序列化为JSON时&#xff0c;可能会遇到返回为数组结构的问…

【一个月备战蓝桥算法】递归与递推

字典序 在刷题和计算机科学领域&#xff0c;字典序&#xff08;Lexicographical order&#xff09;也称为词典序、字典顺序、字母序&#xff0c;是一种对序列元素进行排序的方式&#xff0c;它模仿了字典中单词的排序规则。下面从不同的数据类型来详细解释字典序&#xff1a; …

CSDN 1024天 创作纪念日

机缘 还记得那是2022年5月&#xff0c;在上家公司工作时候&#xff0c;意外发现同事在通过CSDN记录一些日常遇到、解决的问题&#xff0c;也会更新一些他擅长领域的知识点&#xff0c;并且收获了不少的粉丝和阅读量&#xff0c;这不由得激起了我的兴趣。也在有空时候&#xff…

用于管理 Elasticsearch Serverless 项目的 AI Agent

作者&#xff1a;来自 Elastic Fram Souza 由自然语言驱动的 AI 代理&#xff0c;可轻松管理 Elasticsearch Serverless 项目 - 支持项目创建、删除和状态检查。 这个小型命令行工具让你可以用简单的英语管理你的无服务器 Elasticsearch 项目。它通过AI&#xff08;这里是 Ope…

机器学习数学通关指南

✨ 写在前面 &#x1f4a1; 在代码的世界里沉浸了十余载&#xff0c;我一直自诩逻辑思维敏捷&#xff0c;编程能力不俗。然而&#xff0c;当我初次接触 DeepSeek-R1 并领略其清晰、系统的思考过程时&#xff0c;我不禁为之震撼。那一刻&#xff0c;我深刻意识到&#xff1a;在A…

< 自用文儿 > DELETED 设置速读 in Ubuntu24

systemctl 和 DELETED&#xff1a; 配置文件&#xff1a; vi /etc/systemd/system/ DELETED.service [Unit] DescriptionV2Ray Service Documentation DELETED Afternetwork.target nss-lookup.target[Service] #Usernobody CapabilityBoundingSetCAP_NET_ADMIN CAP_NET_BIN…