03-240605
1. 行动算子-1
-
reduce
聚合
格式:
def reduce(f: (T, T) => T): T
例子:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1,2,3,4)) // TODO - 行动算子 //reduce val i: Int = rdd.reduce(_+_) println(i)
输出结果:
10
-
collect
采集
格式:
def collect(): Array[T]
例子:
// collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组 val ints: Array[Int] = rdd.collect() println(ints.mkString(","))
输出结果:
1,2,3,4
-
count
计数
格式:
def count(): Long
例子:
// count : 数据源中数据的个数 val cnt = rdd.count() println(cnt)
运行结果:
4
-
first
获取数据源的第一个数据
格式:
def first(): T
例子:
// first : 获取数据源中数据的第一个 val first = rdd.first() println(first)
输出结果:
1
-
take
获取数据源的N个数据
格式:
def take(num: Int): Array[T]
例子:
// take : 获取N个数据 val ints: Array[Int] = rdd.take(3) println(ints.mkString(","))
输出结果:
1,2,3
-
takeOrdered
数据排序后.再取第N个数据
格式:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
例子:
// takeOrdered : 数据排序后,取N个数据 val rdd1 = sc.makeRDD(List(4,2,3,1)) val ints1: Array[Int] = rdd1.takeOrdered(3) println(ints1.mkString(","))
输出结果:
1,2,3
-
aggregate
给定初始值,初始值参与分区内与分区间的计算
格式:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
例子:
val rdd = sc.makeRDD(List(1,2,3,4),2) //10 + 13 + 17 = 40 // aggregateByKey : 初始值只会参与分区内计算 // aggregate : 初始值会参与分区内计算,并且和参与分区间计算 val result = rdd.aggregate(10)(_+_._+_) println(result)
输出结果:
40
-
fold
折叠操作,aggregate的简化版操作
格式:
def fold(zeroValue: T)(op: (T, T) => T): T
例子:
//10 + 13 + 17 = 40 // aggregateByKey : 初始值只会参与分区内计算 // aggregate : 初始值会参与分区内计算,并且和参与分区间计算 //val result = rdd.aggregate(10)(_+_, _+_) val result = rdd.fold(10)(_+_) println(result)
输出结果:
40
-
countByKey 与 countByValue
都是统计每种Key或者Value出现的个数
格式:
def countByKey(): Map[K, Long]
例子:
val rdd = sc.makeRDD(List( ("a", 1),("a", 2),("a", 3) )) //val intToLong: collection.Map[Int, Long] = rdd.countByValue() //println(intToLong) val stringToLong: collection.Map[String, Long] = rdd.countByKey() println(stringToLong)
输出结果:
Map(a -> 3)
-
WordCount 不同的实现方式:
运用9种不同的方式实现WordCount
-
使用groupBy:
// groupBy def wordcount1(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word) val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size) }
-
使用groupByKey:
// groupByKey def wordcount2(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey() val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size) }
-
使用reduceByKey:
// reduceByKey def wordcount3(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_) }
-
使用aggregateByKey
// aggregateByKey def wordcount4(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_) }
-
使用foldByKey:
// foldByKey def wordcount5(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_) }
-
使用combineByKey:
// combineByKey def wordcount6(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val wordCount: RDD[(String, Int)] = wordOne.combineByKey( v=>v, (x:Int, y) => x + y, (x:Int, y:Int) => x + y ) }
-
使用countByKey:
// countByKey def wordcount7(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordOne = words.map((_,1)) val wordCount: collection.Map[String, Long] = wordOne.countByKey() }
-
使用countByValue:
// countByValue def wordcount8(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) val wordCount: collection.Map[String, Long] = words.countByValue() }
-
使用reduce:
def wordcount91011(sc : SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words = rdd.flatMap(_.split(" ")) // 【(word, count),(word, count)】 // word => Map[(word,1)] val mapWord = words.map( word => { mutable.Map[String, Long]((word,1)) } ) val wordCount = mapWord.reduce( (map1, map2) => { map2.foreach{ case (word, count) => { val newCount = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } map1 } ) println(wordCount) }
2. 序列化
算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。
-
RDD序列化
案例:
object Spark01_RDD_Serial { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu")) val search = new Search("h") //search.getMatch1(rdd).collect().foreach(println) search.getMatch2(rdd).collect().foreach(println) sc.stop() } // 查询对象 // 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测 class Search(query:String){ def isMatch(s: String): Boolean = { s.contains(this.query) } // 函数序列化案例 def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 def getMatch2(rdd: RDD[String]): RDD[String] = { val s = query rdd.filter(x => x.contains(s)) } } }
输出结果:
-
Kryo序列化框架
Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
了解一下就行
案例:
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("SerDemo") .setMaster("local[*]") // 替换默认的序列化机制 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用 kryo 序列化的自定义类 .registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2) val searcher = new Searcher("hello") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } } case class Searcher(val query: String) { def isMatch(s: String) = { s.contains(query) } def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q)) }
Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储
-
RDD 血缘关系
toDebugString查看血缘关系
多个连续的RDD的依赖关系,称之为血缘关系
演示:
关于如何将RDD间的关系保存下来:
血缘关系演示:
-
RDD的依赖关系
dependencies查看依赖关系
OneToOne依赖(窄依赖)
窄依赖我们形象的比喻为独生子女。
Shuffle依赖(宽依赖):
宽依赖我们形象的比喻为多生。
-
RDD 阶级划分
-
RDD 任务划分
源码演示:
-
RDD 的持久化
这样的复用在底层不是很好用:
应该这样:
放在内存中 mapRDD.cache()
放在磁盘中 mapRDD.persist()
Cache缓存:
-
RDD CheckPoint 检查点
checkpoint 需要落盘,需要指定检查点保存路径
检查点路径保存的文件,当作业执行完毕后,不会被删除
一般保存路径都是在分布式存储系统: HDFS
-
checkpoint、Cache、Persist的区别:
以上三个都可以存储,关于他们的区别:
cache : 将数据临时存储在内存中进行数据重用
会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据
persist : 将数据临时存储在硬盘文件中进行数据重用
涉及到磁盘IO,性能较低,但是数据安全
如果作业执行完毕,临时保存的数据文件就会丢失
checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
涉及到磁盘IO,性能较低,但是数据安全
为了保证数据安全,所以一般情况下,会独立执行作业
为了能够提高效率,一般情况下,是需要和cache联合使用
执行过程中,会切断血缘关系,重新建立新的血缘关系
checkpoint等同于改变数据源