目录
什么是Spark rdd算子
算子的分类
Transformation算子
Action算子
转换算子
Value类型
map
mapPartitions
mapPartitionsWithIndex
glom
groupBy
filter
sample
distinct
coalesce
sortBy
双Value类型
intersection
union
subtract
zip
K-V类型
partitionBy
reduceByKey
groupByKey
aggregateByKey
foldByKey
combineByKey
join
sortByKey
mapValues
cogroup
行动算子
reduce
collect
count
first
take
takeOrdered
aggregate
fold
countByValue
countByKey
foreach
save
什么是Spark rdd算子
算子:分布式对象上的API称之为算子
方法\函数:本地对象的API,叫做方法\函数
算子:分布式对象的API,叫做算子
算子的分类
rdd算子分为两类
- Transformation:转换算子
- Action:动作(行动)算子
Transformation算子
定义:RDD的算子,返回值仍旧是一个RDD的,称之为转换算子。
特征:这类算子是lazy 懒加载的,如果没有Action算子,Transformation算子是不工作的。
转换算子分为:Value类型、双Value类型和K-V类型。
Action算子
定义:返回值不是rdd的就是action算子。
对于这两类算子来说,Transformation算子,相当于在构建执行计划,action是一个指令让这个执行计划开始工作。如果没有action,Transformation算子之间的迭代关系,就是一个没有通电的流水线,只有action到来,这个数据处理流水线才开始工作。
转换算子
Value类型
map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4)
)
val mapRDD = rdd.map(_ * 2)
mapRDD.collect().foreach(println)
/**
* 2
* 4
* 6
* 8
* */
mapPartitions
以分区为单位对数据进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
val mapRDD = rdd.mapPartitions(datas => datas.map(_ * 2))
mapRDD.collect().foreach(println)
/** *
* 2
* 4
* 6
* 8
*/
TIP:
1、会将整个分区的数据加载到内存,如果处理完不被释放,在内存较小并且数据量较大的情况下,容易出现内存溢出(OOM)
2、可以实现一些特殊功能,比如取每个分区中最大值,map无法实现
mapPartitionsWithIndex
类似于mapPartitions,比mapPartitions多一个参数来表示分区号
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(List(1, 2), List(3, 4))
)
val fmRDD = rdd.flatMap(
list => {
list
}
)
fmRDD.collect().foreach(println)
/**
* 1
* 2
* 3
* 4
* */
当集合中的数据类型不同时,可以使用match case进行模式匹配,转换成集合类型。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(List(1, 2),3, List(3, 4))
)
val fmRDD = rdd.flatMap {
case list: List[_] => list
case d => List(d)
}
fmRDD.collect().foreach(println)
/**
* 1
* 2
* 3
* 3
* 4
* */
glom
将RDD中每一个分区变成一个数组,数组中元素类型与原分区中元素类型一致。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
val gRDD = rdd.glom()
gRDD.collect().foreach(data => println(data.mkString(",")))
/**
* 1,2
* 3,4
* */
groupBy
根据指定的规则进行分组,分区默认不变,数据会被打乱(shuffle)。极限情况下,数据可能会被分到同一个分区中。一个分区可以有多个组,一个组只能在一个分区中。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
// groupBy 会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会被放置在一个组中
def groupFunction(num: Int): Int = {
num % 2
}
val groupRDD = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
/**
* (0,CompactBuffer(2, 4))
* (1,CompactBuffer(1, 3))
*/
filter
根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。
当数据进行筛选过滤后,分区不变,但是分区内数据可能不均衡,导致数据倾斜。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
val filterRDD = rdd.filter(_ % 2 == 1)
filterRDD.collect().foreach(println)
/**
* 1
* 3
* */
sample
根据指定规则从数据集中采样数据。通过它可以找到数据倾斜的key。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4), 2
)
println(rdd.sample(
true,
2
).collect().mkString((",")))
/**
*1,1,2,3,3,4,4
* */
distinct
将数据集中的数据去重。使用分布式处理方式实现,与内存集合使用HashSet去重方式不同。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4, 2, 3, 1), 2
)
val distinctRDD = rdd.distinct()
distinctRDD.collect().foreach(println)
/**
* 4
* 2
* 1
* 3
* */
coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
当Spark程序中存在过多的小任务时,可以通过coalesce合并分区,减少分区个数,进而减少任务调度成本。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(1, 2, 3, 4, 5, 6), 3
)
// 无shuffle
val coaRDD = rdd.coalesce(2)
// 有shuffle
coaRDD.saveAsTextFile("output1")
TIP:
1、coalesce默认不会将分区数据打乱重新组合,这种情况会导致数据不均衡,出现数据倾斜
2、可以设置第二个参数为true,进行shuffle处理,让数据均衡
3、扩大分区时,可以使用coalesce(,true)或者repartition
sortBy
根据指定规则进行排序,默认升序,设置第二个参数改变排序方式。
默认情况下,不会改变分区个数,但是中间存在shuffle处理。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
List(4, 5, 1, 3, 2, 6), 2
)
val sortRDD = rdd.sortBy(num => num)
sortRDD.saveAsTextFile("output")
双Value类型
intersection
两个RDD求交集
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val one: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
val two: RDD[Int] = sc.parallelize(Array(1, 2, 4, 9, 8))
val value: RDD[Int] = one.intersection(two)
value.collect().foreach(println)
/**
* 1
* 2
* 4
* */
union
两个RDD求并集
只是合并不去重,要想去重可以使用 distinct 算子进行去重
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val one: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
val two: RDD[Int] = sc.parallelize(Array(1, 2, 4, 9, 8))
val value: RDD[Int] = one.union(two)
value.collect().foreach(x => print(x + " "))
/**
*1 2 3 4 5 1 2 4 9 8
* */
subtract
两个RDD求差集,
拉链, 对应位置一对一映射,组成(key,value),需要每个对应分区上的数据个数相同
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val one: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
val two: RDD[Int] = sc.parallelize(Array(1, 2, 4, 9, 8))
val value: RDD[Int] = one.subtract(two)
value.collect().foreach(x => print(x + " "))
/**
* 3 5
* */
zip
拉链操作,以键值对的形式进行合并。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
val newRDD = rdd1.zip(rdd2)
println(newRDD.collect().mkString(","))
/**
* (1,3),(2,4),(3,5),(4,6)
* */
TIP:
1、intersection,union和subtract要求两个RDD中的数据类型保持一致
2、zip:不要求两个RDD中的数据类型保持一致,但要求分区个数以及对应分区上的数据个数保持一致
K-V类型
partitionBy
将数据按照指定artitioner重新进行分区,默认的分区器是HashPartitioner。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val newRDD: RDD[(Int, Int)] = rdd.map((_, 1))
// partitionBy 根据指定的分区规则对数据进行重分区
newRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
reduceByKey
将数据按照相同的key对value进行聚合。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 2)))
// reduceByKey 相同的Key的数据进行value数据的聚合操作
// scala 语言中一般的聚合都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合的
// reduceByKey 中如果key的数据只有一个,是不会参与运算的。
val reduceRDD = rdd.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
/**
* (a,6)
* (b,2)
* */
groupByKey
将数据按照相同的key对value进行分组,形成一个对偶元祖。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 2)))
// groupByKey : 将数据源中的数据,相同的key的数据分到一个组中,形成一个对偶元组
// 元组中的第一个元素就是key,第二个元素就是相同key的value集合
val groupRDD = rdd.groupByKey()
groupRDD.collect().foreach(println)
/**
* (a,CompactBuffer(1, 2, 3))
* (b,CompactBuffer(2))
* */
aggregateByKey
根据不同的规则进行分区内计算和分区间计算。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
// 分区内和分区间的计算规则可以不同,也可以相同
rdd.aggregateByKey(0)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect().foreach(println)
/**
* (a,6)
* */
foldByKey
aggregateByKey的简化操作,分区内和分区间的计算规则一样
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
// 分区内和分区间的计算规则可以相同
// rdd.aggregateByKey(0)(
// (x,y) => x + y,
// (x,y) => x + y
// ).collect().foreach(println)
// 可以使用foldByKey来简化
rdd.foldByKey(0)(_ + _).collect().foreach(println)
/**
*(a,10)
* */
combineByKey
针对相同K,将V合并成一个集合。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)), 2)
// 获取相同key的数据的平均值 => (a,3) (b,4)
// combineByKey 需要三个参数
// 第一个参数表示:将相同key的第一个数据进行数据结构的转换,实现操作
// 第二个参数表示:分区内的计算规则
// 第三个参数表示:分区间的计算规则
val newRDD = rdd.combineByKey(
v => (v, 1), // 转换为 tuple是在运行当中动态得到的,所以下面的tuple需要添加数据类型
(t: (Int, Int), v) => {(t._1 + v, t._2 + 1)},
(t1: (Int, Int), t2: (Int, Int)) => {(t1._1 + t2._1, t1._2 + t2._2)})
val resultRDD = newRDD.mapValues {
case (sum, cnt) => sum / cnt
}
resultRDD.collect().foreach(println)
/**
* (b,4)
* (a,3)
* */
join
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素连接在一起的(K,(V,W))的RDD。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.makeRDD(List(("a", 5), ("a", 6), ("e", 8), ("c", 7)))
// join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组。
// 如果两个数据源中key没有匹配上,那么数据不会出现在结果中。
// 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔积,数据量会几何性增长,会导致性能降低
rdd.join(rdd2).collect().foreach(println)
/** (a,(1,5))
* (a,(1,6))
* (c,(3,7)) */
sortByKey
在一个(K,V)的RDD上调用,K必须实现ordered接口,返回一个按照key进行排序的(K,V)的RDD
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
//按照key对rdd中的元素进行排序,默认升序
rdd.sortByKey().collect().foreach(println)
//降序
println("----------")
rdd.sortByKey(false).collect().foreach(println)
/**
* (a,1)
* (b,2)
* (c,3)
* (d,4)
* ----------
* (d,4)
* (c,3)
* (b,2)
* (a,1)
* */
mapValues
针对于(K,V)形式的类型只对V进行操作
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
rdd.mapValues("pre_" + _).collect().foreach(println)
/**
* (a,pre_1)
* (b,pre_2)
* (c,pre_3)
* (d,pre_4)
* */
cogroup
相同的key,value分组后连接起来。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.makeRDD(List(("a", 5), ("a", 6), ("e", 8), ("c", 7)))
// cogroup : connect + group (分组,连接)
// 可以有多个参数
rdd.cogroup(rdd2).collect().foreach(println)
/** *
* (a,(CompactBuffer(1),CompactBuffer(5, 6)))
* (b,(CompactBuffer(2),CompactBuffer()))
* (c,(CompactBuffer(3),CompactBuffer(7)))
* (d,(CompactBuffer(4),CompactBuffer()))
* (e,(CompactBuffer(),CompactBuffer(8)))
* */
行动算子
行动算子,其实就是触发作业执行的方法
底层代码调用的是环境对象中 runJob 方法,调用dagScheduler的runJob方法创建ActiveJob,并提交执行。
reduce
聚合RDD中的所有数据,先聚合分区内数据,在聚合分区间数据。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val i = rdd.reduce(_ + _)
println(i)
/**
* 15
* */
collect
采集,该方法会将不同分区间的数据按照分区顺序采集到Driver端,形成数组。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val ints = rdd.collect()
ints.foreach(println)
/**
* 1
* 2
* 3
* 4
* 5
* */
count
统计数据个数。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val l = rdd.count()
println(l)
/**
* 5
* */
first
获取RDD中的第一个元素。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val i = rdd.first()
println(i)
/**
* 1
* */
take
获取RDD前n个元素组成的数组。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val ints = rdd.take(2)
println(ints.mkString(","))
/**
* 1,2
* */
takeOrdered
获取RDD排序后的前n个元素组成的数组
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val rdd1 = sc.makeRDD(List(4, 3, 2, 1))
val ints1 = rdd1.takeOrdered(2)
println(ints1.mkString(","))
/**
* 1,2
* */
aggregate
将每个分区里面的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值(zeroValue)进行操作。注意:分区间逻辑再次使用初始值和aggregateByKey是有区别的。
fold
折叠操作,aggregate的简化操作,分区内逻辑和分区间逻辑相同。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
println(rdd.aggregate(10)(_ + _, _ + _))
println("-----------")
//fold 是aggregate的简化版
println(rdd.fold(10)(_ + _))
/**
* 100
* -----------
* 100
* */
countByValue
统计每个value的个数
countByKey
统计每种key的个数。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
println(rdd.countByKey())
val intToLong = rdd.countByValue()
println(intToLong)
/**
* Map(1 -> 3, 2 -> 1, 3 -> 2)
* Map((3,c) -> 2, (1,a) -> 3, (2,b) -> 1)
* */
foreach
遍历RDD中每一个元素。
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val value: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
value.collect().foreach(println)
/**
* 1
* 2
* 3
* 4
* */
save
(1)saveAsTextFile(path)保存成Text文件
(2)saveAsSequenceFile(path) 保存成Sequencefile文件
(3)saveAsObjectFile(path) 序列化成对象保存到文件
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)), 2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile 方法要求数据的格式必须为 K - V 键值对类型
rdd.saveAsSequenceFile("output2")