Spark-06:Spark 共享变量

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscala

import org.apache.spark.util.AccumulatorV2


class CustomAccumulator extends AccumulatorV2[Int, Int]{
  //初始化累加器的值
  private var sum = 0
  override def isZero: Boolean = sum == 0

  override def copy(): AccumulatorV2[Int, Int] = {
    val newAcc = new CustomAccumulator()
    newAcc.sum = sum
    newAcc
  }

  override def reset(): Unit = sum = 0

  override def add(v: Int): Unit = sum += v

  override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value

  override def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.util.AccumulatorV2;

public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {
    // 初始化累加器的值
    private Integer sum = 0;
    @Override
    public boolean isZero() {
        return sum == 0;
    }

    @Override
    public AccumulatorV2<Integer, Integer> copy() {
        CustomAccumulator customAccumulator = new CustomAccumulator();
        customAccumulator.sum = this.sum;
        return customAccumulator;
    }

    @Override
    public void reset() {
        this.sum = 0;
    }

    @Override
    public void add(Integer v) {
        this.sum += v;
    }

    @Override
    public void merge(AccumulatorV2<Integer, Integer> other) {
        this.sum += ((CustomAccumulator) other).sum;
    }

    @Override
    public Integer value() {
        return sum;
    }
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;
import java.util.List;

public class AccumulatorTest {
    public static void main(String[] args) {
        //1.初始化SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        CustomAccumulator customAccumulator = new CustomAccumulator();
        //注册自定义累加器才能使用
        sc.sc().register(customAccumulator);
        sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));
        System.out.println(customAccumulator.value());
        //5.停止SparkContext
        sc.stop();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/190828.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

I/O多路转接之epoll

承接上文 I/O多路转接之poll-CSDN博客 简介 epoll的相关系统调用 epoll底层原理 编写epoll的server 重新回归一下epoll原理&#xff0c;LT&#xff0c;ET epoll改成ET工作模式 -- 初识(有bug) epoll初识 按照man手册的说法: 是为处理大批量句柄而作了改进的poll. 它是在2.5.4…

运用工具Postman快速导出python接口测试脚本

的脚本可以导出多种语言的脚本&#xff0c;方便二次维护开发。 Python的requests库&#xff0c;支持python2和python3&#xff0c;用于发送http/https请求 使用unittest进行接口自动化测试 一、环境准备 1、安装python&#xff08;使用python2或3都可以&#xff09; 2、安…

React入门使用 (官方文档向 Part1)

文章目录 React组件:万物皆组件 JSX: 将标签引入 JavaScriptJSX 规则1. 只能返回一个根元素2. 标签必须闭合3. 使用驼峰式命名法给 ~~所有~~ 大部分属性命名&#xff01;高级提示&#xff1a;使用 JSX 转化器 在 JSX 中通过大括号使用 JavaScript使用引号传递字符串使用大括号&…

RocketMQ相关概念与使用入门详解

文章目录 RocketMQ 相关概念消息模型MQ 的简单消息模型RocketMQ 的复杂消息模型 RocketMQ 中消息相关概念消息&#xff08;Message&#xff09;主题&#xff08;Topic&#xff09;Tags队列消息标识 RocketMQ 中的物理对象NameServerBrokerProducerConsumer NameServer 与 Broke…

MATLAB 和 Simulink 官方文档下载地址

MATLAB 官方文档中文版下载网址&#xff1a; https://ww2.mathworks.cn/help/pdf_doc/matlab/index.html 如图&#xff1a; MATLAB 官方文档英文版下载网址&#xff1a; https://ww2.mathworks.cn/help/pdf_doc/matlab/index.html?langen 如图&#xff1a; Simulink 官…

Xilinx Zynq-7000系列FPGA实现视频拼接显示,提供两套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐FPGA图像处理方案FPGA视频拼接叠加融合方案推荐 3、设计思路详解Video Mixer介绍 4、工程代码1&#xff1a;2路视频拼接 HDMI 输出PL 端 FPGA 逻辑设计PS 端 SDK 软件设计 5、工程代码2&#xff1a;4路视频拼接 HDMI 输出PL 端 FPGA 逻辑设…

Kotlin学习——kt入门合集博客 kt里的委派模式Delegation kt里的特性

Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复用代码&#xff0c;以实现高效编程。 https://play.kotlinlang.org/byExample/01_introduction/02_Functio…

OpenCV检测圆形东西是否存在缺口?

文章目录 前言一、试过的方法二、最终使用的方法1.先极坐标变换2.计算斜率 总结 前言 想了挺久&#xff0c;一直没解决这个问题。后面勉强解决了。 一、试过的方法 1.想用圆度来解决&#xff0c;后来发现圆度差值很小&#xff0c;完整的圆圆度0.89&#xff0c;然后有缺角的圆圆…

emu8086汇编语言输出“Hello World!“

输出Hello world 首先我们尝试用C语言来实现该功能&#xff1a; #include <stdio.h>int main() {printf("Hello World!"); // 输出“Hello World!”return 0; } 将这行代码翻译成汇编语言... ; DS 数据段定义 DATA SEGMENTZIFU DB Hello World!,$ ;字符串…

LeetCode198.打家劫舍

打家劫舍和背包问题一样是一道非常经典的动态规划问题&#xff0c;只要做过几道动态规划的题&#xff0c;这道题简直就非常容易做出来。我应该花了10来分钟左右就写出来了&#xff0c;动态规划问题最重要的就是建立状态转移方程&#xff0c;就是说如何从上一个状态转移到下一个…

CentOS 7 安装 Weblogic 14 版本

安装JDK程序 注意&#xff1a;安装weblogic前&#xff0c;先安装JDK&#xff01;&#xff08;要求jdk(1.7以上)&#xff09;&#xff1a; 一、创建用户组weblogic及用户weblogic groupadd weblogic useradd -g weblogic weblogic二、将下载好的jdk及weblogic上传至/home/webl…

重量级消息,微软将ThreadX RTOS全家桶贡献给Eclipse基金会,免费供大家商用,宽松的MIT授权方式

从明年第1季度开始&#xff0c;任何人&#xff0c;任何厂家的芯片都可以免费商用&#xff0c;MIT授权就这点好。 贡献出来后&#xff0c;多方可以一起努力开发&#xff0c;当前首批兴趣小组AMD, Cypherbridge, Microsoft, NXP, PX5, Renesas, ST Microelectronics, Silicon Lab…

速记:一个TL431应用电路

一个TL431应用电路 仿真结果 输出电压为&#xff1a;5V 负载电阻为&#xff1a; R4 50Ω 如果负载R4加重 显然负载加重&#xff0c;输出就达不到5V. 三极管T1 的作用 没有三极管的情况 同样是保持负载 R 50Ω 可见三极管的作用就是用来放大电流

第十九章 解读利用pytorch可视化特征图以及卷积核参数(工具)

介绍一种可视化feaature maps以及kernel weights的方法 推荐可视化工具TensorBoard&#xff1a;可以查看整个计算图的数据流向&#xff0c;保存再训练过程中的损失信息&#xff0c;准确率信息等 学习视频&#xff1a; 使用pytorch查看中间层特征矩阵以及卷积核参数_哔哩哔哩…

rdf-file:分布式环境下的文件处理

一&#xff1a;数据量大了以后&#xff0c;单机解析或者生成文件的效率就很低&#xff0c;需要通过集群处理 机构过来的文件&#xff1a;我们先对文件进行分片&#xff0c;在利用集群集群处理分片文件。给机构文件&#xff1a;分库分表数据&#xff0c;每个分表生成一个分片文…

基于SSM的企业订单跟踪管理系统(有报告)。Javaee项目

演示视频&#xff1a; 基于SSM的企业订单跟踪管理系统&#xff08;有报告&#xff09;。Javaee项目 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringM…

只狼 资源分享

版本介绍 v1.06版|容量15GB|官方简体中文|支持键盘.鼠标.手柄|赠官方原声4首BGM|赠多项修改器|赠一周目全义手忍具强化通关存档|2020年01月15号更新 只狼中文设置&#xff1a; https://jingyan.baidu.com/article/cb5d6105bc8556005d2fe048.html 只狼键盘对应按键&#xff1…

vite-性能优化-构建优化-cnd加速优化

CDN 加速优化 - 感觉用不大到 主要作用 &#xff1a; 将引入的依赖&#xff0c;打包部署后&#xff0c;在用户访问的时候&#xff0c; 通过网络CDN的方式进行加载&#xff0c;而非直接从你自己的服务器上加载。优点 &#xff1a; 1、直接降低了你自己的打包的体积&#xff0c…

excel表中慎用合并单元格,多用跨列居中

如下一个excel例表&#xff1a; 要将首行居中&#xff0c;最好的办法如下&#xff1a; 1、选中首行单元格 2、按下ctrl1&#xff0c;调出“设置单元格格式”&#xff0c;选中“对齐”&#xff0c;在“水平对齐”中选择“跨列居中” 3、完成任务 这样居中的好处是&#xff1a;可…

【C++】new和delete

这里是目录 C内存管理方式new/delete操作内置类型new和delete操作自定义类型定位new内存泄漏 前言 我们的程序当中主要有以下类型的数据&#xff08;用途/存储角度&#xff09;&#xff1a; 局部数据、静态数据、全局数据、常量数据、动态申请的数据 内存布局&#xff1a; C内…