概述
- 共享变量
- 共享变量的工作原理
- Broadcast Variable
- Accumulator
共享变量
共享变量的工作原理
通常,当给 Spark
操作的函数(如 mpa
或 reduce
) 在 Spark
集群上执行时,函数中的变量单独的拷贝到各个节点上,函数执行时,使用的是自己节点执行上的变量,节点上的变量更新不会更新至 driver
,在任务之间支持通用的读写共享变量是低效的;然而,Spark
的提供了两种有限类型的共享变量:broadcast variables
和 accumulators
。
Broadcast Variable
Broadcast Variable
会将使用到的变量,仅仅为每个节点
拷贝一份,而不会为每个task
都拷贝一份副本,因此其最大的作用,就是减少变量到各个节点的网络传输消耗
,以及在各个节点上的内存消耗
通过调用SparkContext
的broadcast()
方法,针对某个变量创建广播变量
注意: 广播变量,是只读的
,在算子函数内,使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的value()
方法获取值。
由下图,深入理解 Broadcast Variable
由图可知,普通变量
和 Broadcast Variable
区别就是,网络传输可以大大的降低,Broadcast Variable
是每个节点机器只有一份,而 普通变量
是每个 task
都会有一份,浪费内存存储。
可以想象一个极端情况,如果map算子有10个task,恰好这10个task还都在一个worker节点上,那么这个时候,map算子使用的外部变量就会在这个worker节点上保存10份,这样就很占用内存了。
接下来通过具体的案例,来使用一下这个广播案例;代码如下图:
object BroadcastOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("BroadcastOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRdd = sc.parallelize(Array(1, 2, 3, 4, 5))
val variable = 2
// 1.定义广播变量
val variableBroadcast = sc.broadcast(variable)
// 2.使用广播变量,调用其 value方法
dataRdd.map(_ * variableBroadcast.value).foreach(println _)
}
}
Accumulator
Spark
提供的 Accumulator
,主要用于多个节点对一个变量进行共享性的操作。
正常情况下在 Spark
的任务中,由于一个算子可能会产生多个 task
并行执行,所以在这个算子内部执行的聚合计算,都是局部的,想要实现多个 task
进行全局聚合计算,此时就需要用到 Accumulator
这个共享的累加变量 。
注意: Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作,不能读取它的值。只有在Driver进程中才可以读取Accumulator的值。
代码如下:
object AccumulatorOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("AccumulatorOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
// 1.定义累加变量
val sumAccumulator = sc.longAccumulator
// 2.使用累加变量
dataRDD.foreach(sumAccumulator.add(_))
println(sumAccumulator.value)
}
}
结束
至此共享变量
就结束了,如有问题,欢迎评论区提问。