【大数据学习 | Spark-Core】广播变量和累加器

1. 共享变量

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。

累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象,相当于semijoin中的DistributedCache 。

共享变量出现的原因:

我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object TestAcc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test acc")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),3)

    val count = rdd.map(t=> 1).reduce(_+_)

    println(count)

//    val acc = sc.longAccumulator("count")
//
//    rdd.foreach(t=>{
//      acc.add(1)
//    })
//
//    println(acc.value)

//    println(rdd.count())
  }
}

原因总结:

对于executor端,driver端的变量是外部变量。

excutor端修改了变量count,根本不会让driver端跟着修改。如果想在driver端得到executor端修改的变量,需要用累加器实现。

当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低,也可能会造成内存溢出。使用广播变量以后,在每个Executor中只有一个Driver端变量副本,在一个executor中的并行执行的task任务会引用该一个变量副本即可,需要广播变量提高运行效率。

2. 累加器

累加器的执行流程:

通过SparkContext创建一个累加器并初始化。当driver端将任务分发给executor时,每个executor会接收一个任务和一个引用到该累加器的副本。每个executor上的任务可以调用累加器的add方法来增加累加器的值,这些操作是线程安全的,因为每个任务都会在自己的executor线程中执行。当每个任务完成,executor将累加器的更新值发送到driver端进行聚合过程,得到最终的聚合结果。

累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。

用法:

var acc: LongAccumulator = sc.longAccumulator // 创建累加器

acc.add(1) // 累加器累加

acc.value // 获取累加器的值

累加器的简单使用

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object WordCountWithAcc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test acc")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val acc = sc.longAccumulator("bad word")
    sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .filter(t=>{
        if(t.equals("shit")){
          acc.add(1)
          false
        }else
          true
      }).map((_,1))
      .reduceByKey(_+_)
      .foreach(println)

    println("invalid words:"+acc.value)
  }
}

3. 广播变量

ip转换工具

public class IpUtils {

    public static Long ip2Long(String ip) {
        String fragments[] = ip.split("[.]");
        Long ipNum = 0L;
        for(int i=0;i<fragments.length;i++) {
            ipNum = Long.parseLong(fragments[i]) | ipNum << 8L;
        }
        return ipNum;
    }
}

ip案例代码

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object IpTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ip")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val accessRDD = sc.textFile("data/access.log")
      .map(t=>{
        val strs = t.split("\\|")
        IpUtils.ip2Long(strs(1))
      })
    val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{
      val strs = t.split("\\|")
      (strs(2).toLong,strs(3).toLong,strs(6)+strs(7))
    }).collect()

//    accessRDD.map(ip=>{
//      ipRDD.filter(t=>{
//        ip>= t._1 && ip<= t._2
//      })
//    }).foreach(println)

    accessRDD.map(ip=>{
      ipArr.find(t=>{
        t._1<= ip && t._2>=ip
      }) match {
        case Some(v) => (v._3,1)
        case None => ("unknow",1)
      }
      //option
    }).reduceByKey(_+_)
      .foreach(println)
  }
}

使用广播变量可以使程序高效地将一个很大的只读数据发送到executor节点,会将广播变量放到executor的BlockManager中,而且对每个executor节点只需要传输一次,该executor节点的多个task可以共用这一个。

用法:

val broad: Broadcast[List[Int]] = sc.broadcast(list) // 把driver端的变量用广播变量包装

broad.value // 从广播变量获取包装的数据,用于计算

我们可能遇到这样的问题:如果我们需要广播的数据为100M,如果需要driver端亲自向每个executor端发送100M的数据,在工作中executor节点的个数可能是很多的,比如是200个,这意味着driver端要发送20G的数据,这对于driver端的压力太大了。所以要用到比特洪流技术。

就是说driver端不必向每个executor发送一份完整的广播变量的数据,而是将一份广播变量切分成200份,发送给两百个executor,然后200个executor间通过BlockManager中的组件transferService与其他executor通信,进行完整的数据。

这样driver端只需要发送一份广播变量的数据,压力就会小很多,而且其他executor也都拿到了这一份广播变量的数据 。

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object IpTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ip")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val accessRDD = sc.textFile("data/access.log")
      .map(t=>{
        val strs = t.split("\\|")
        IpUtils.ip2Long(strs(1))
      })
    val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{
      val strs = t.split("\\|")
      (strs(2).toLong,strs(3).toLong,strs(6)+strs(7))
    }).collect()

    val bs = sc.broadcast(ipArr)

    //    accessRDD.map(ip=>{
    //      ipRDD.filter(t=>{
    //        ip>= t._1 && ip<= t._2
    //      })
    //    }).foreach(println)

    accessRDD.map(ip=>{
      bs.value.find(t=>{
        t._1<= ip && t._2>=ip
      }) match {
        case Some(v) => (v._3,1)
        case None => ("unknow",1)
      }
      //option
    }).reduceByKey(_+_)
      .foreach(println)
  }
}

为了提高查找的效率,可以使用二分法查找代码。将时间复杂度由O(n)优化到了O(logn)。

      val start = System.currentTimeMillis()
      val res =  (binarySearch(ip,bs.value),1)
//      val res = bs.value.find(t=>{
//        t._1<= ip && t._2>=ip
//      }) match {
//        case Some(v) => (v._3,1)
//        case None => ("unknow",1)
//      }
      val end = System.currentTimeMillis()
      acc.add(end-start)

累加器实现运行时间的统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924505.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024年11月24日Github流行趋势

项目名称&#xff1a;FreeCAD 项目维护者&#xff1a;wwmayer, yorikvanhavre, berndhahnebach, chennes, WandererFan等项目介绍&#xff1a;FreeCAD是一个免费且开源的多平台3D参数化建模工具。项目star数&#xff1a;20,875项目fork数&#xff1a;4,117 项目名称&#xff1…

零基础学安全--shell脚本学习(1)脚本创建执行及变量使用

目录 学习连接 什么是shell shell的分类 查看当前系统支持shell 学习前提 开始学习 第一种执行脚本方法 ​编辑 第二种执行脚本方法 第三种执行脚本方法 变量声明和定义 ​编辑 查看变量 删除变量 学习连接 声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣…

Java后端如何进行文件上传和下载 —— 本地版

简介&#xff1a; 本文详细介绍了在Java后端进行文件上传和下载的实现方法&#xff0c;包括文件上传保存到本地的完整流程、文件下载的代码实现&#xff0c;以及如何处理文件预览、下载大小限制和运行失败的问题&#xff0c;并提供了完整的代码示例。 大体思路 1、文件上传 …

Z2400024基于Java+SSM+mysql+maven开发的社区论坛系统的设计与实现(附源码 配置 文档)

基于SSM开发的社区论坛系统 1.摘要2.主要功能3.系统运行环境4.项目技术5.系统界面截图6.源码获取 1.摘要 本文介绍了一个基于SSM&#xff08;Spring、Spring MVC、MyBatis&#xff09;框架开发的社区论坛系统。该系统旨在打造一个高品质的开发者社区&#xff0c;为开发者提供一…

JAVA笔记 | 策略模式+枚举Enum简单实现策略模式(可直接套用)

本篇为更为简单的策略模式应用&#xff0c;使用枚举来进行策略分配 上一篇(链接如下)更像是策略工厂模式来分配策略 JAVA笔记 | 实际上用到的策略模式(可直接套用)-CSDN博客 先创建策略相关类 //策略类 public interface PetStrategy {/*** 执行动作 - 跑RUN*/String run(Str…

RabbitMQ 篇-深入了解延迟消息、MQ 可靠性(生产者可靠性、MQ 可靠性、消费者可靠性)

??博客主页&#xff1a;【_-CSDN博客】** 感谢大家点赞??收藏评论** 文章目录 ???1.0 RabbitMQ 的可靠性 ? ? ? ? 2.0 发送者的可靠性 ? ? ? ? 2.1 生产者重试机制 ? ? ? ? 2.2 生产者确认机制 ? ? ? ? 2.2.1 开启生产者确认机制 ? ? ? ? 2.2…

Redis(概念、IO模型、多路选择算法、安装和启停)

一、概念 关系型数据库是典型的行存储数据库&#xff0c;存在的问题是&#xff0c;按行存储的数据在物理层面占用的是连续存储空间&#xff0c;不适合海量数据存储。 Redis在生产中使用的最多的是用作数据缓存。 服务器先在缓存中查询数据&#xff0c;查到则返回&#xff0c;…

JAVA:Spring Boot 3 实现 Gzip 压缩优化的技术指南

1、简述 随着 Web 应用的用户量和数据量增加&#xff0c;网络带宽和页面加载速度逐渐成为瓶颈。为了减少数据传输量&#xff0c;提高用户体验&#xff0c;我们可以使用 Gzip 压缩 HTTP 响应。本文将介绍如何在 Spring Boot 3 中实现 Gzip 压缩优化。 2、配置 Spring Boot 3 对…

python期末复习

其他复习资料 Python期末复习-系列数据类型-CSDN博客 期末python复习-异常和函数-CSDN博客 期末Python复习-输入输出-CSDN博客 目录 一、面向对象程序设计 1.思维导图 2.基本概念 3.类对象和实例对象 3.1创建对象 3.2定义类中的成员变量 3.3类中属性的公有和私有 3.…

HDU Go Running(最小点覆盖 + 网络流优化)

题目大意&#xff1a;有一条无限长跑道&#xff0c;每个人可以规定自己跑步的方向&#xff0c;起点&#xff0c;跑步起止时间。每个人跑步的速度都是1m/s。最后从监控人员哪里得到了n个报告&#xff0c;每个报告给出了某人在某一时候所在的位置&#xff0c;问跑步的最少可能人数…

《用Python实现3D动态旋转爱心模型》

简介 如果二维的爱心图案已经无法满足你的创意&#xff0c;那今天的内容一定适合你&#xff01;通过Python和matplotlib库&#xff0c;我们可以实现一个动态旋转的3D爱心模型&#xff0c;充满立体感和动感。# 实现代码&#xff08;完整代码底部名片私信&#xff09; 以下是完…

Unity-Lightmap入门篇

&#xff1a;&#xff1a;这是一个实战文章&#xff0c;并没有知识分享&#xff0c;或理论知识&#xff1b;完全没有 关键字&#xff1a; “lightmap","全局光照”&#xff0c;“light Probe" (会混合一些中英文搜索&#xff0c;或者全英文搜索&#xff09; …

ElasticSearch通过es-head插件安装可视化及相关问题

1.es-head下载地址 GitHub - mobz/elasticsearch-head: A web front end for an elastic search cluster 2.启动 建议使用vscode启动&#xff0c;并安装好node.js环境 npm installnpm run start 通过http://localhost:9100就可以看到本地添加的es库 3.相关问题 3.1跨域问…

Android PMS(Package Manager Service)源码介绍

文章目录 前言一、PMS 启动流程二、APK 安装流程三、APK 卸载流程四、权限管理静态权限动态权限 五、 数据存储与一致性六、 PMS 的安全性策略1、权限检查2、签名认证3、动态权限管理4、应用安装验证5、保护系统目录 七、PMS 调试方法总结 前言 PackageManagerService&#xf…

OSPTrack:一个包含多个生态系统中软件包执行时生成的静态和动态特征的标记数据集,用于识别开源软件中的恶意行为。

2024-11-22 &#xff0c;由格拉斯哥大学创建的OSPTrack数据集&#xff0c;目的是通过捕获在隔离环境中执行包和库时生成的特征&#xff0c;包括静态和动态特征&#xff0c;来识别开源软件&#xff08;OSS&#xff09;中的恶意指标&#xff0c;特别是在源代码访问受限时&#xf…

Web登录页面设计

记录第一个前端界面&#xff0c;暑假期间写的&#xff0c;用了Lottie动画和canvas标签做动画&#xff0c;登录和注册也连接了数据库。 图片是从网上找的&#xff0c;如有侵权私信我删除&#xff0c;谢谢啦~

MySQL45讲 第29讲 如何判断一个数据库是不是出问题了?——阅读总结

文章目录 MySQL45讲 第二十九讲 如何判断一个数据库是不是出问题了&#xff1f;——阅读总结一、检测数据库实例健康状态的重要性二、常见检测方法及问题分析&#xff08;一&#xff09;select 1 判断法&#xff08;二&#xff09;查表判断法&#xff08;三&#xff09;更新判断…

mac下Gpt Chrome升级成GptBrowser书签和保存的密码恢复

cd /Users/自己的用户名/Library/Application\ Support/ 目录下有 GPT\ Chrome/ Google/ GptBrowser/ GPT\ Chrome 为原来的chrome浏览器的文件存储目录. GptBrowser 为升级后chrome浏览器存储目录 书签所在的文件 Bookmarks 登录账号Login 相关的文件 拷贝到GptBrow…

论文阅读笔记 | EEG:运动执行过程中的ERD

参考&#xff1a;https://mp.weixin.qq.com/s/RmcPSLv1ITMZZwqe2uZ_og?token1093147649&langzh_CN

Android U ART young cc流程分析

概述&#xff1a; 众所周知jvm虚拟机为了提高内存回收效率&#xff0c;更高效的进行内存管理与回收&#xff0c;对堆内存进行了分代管理比如hotspot虚拟机的新生代&#xff0c;老年代。根据各代的特征&#xff08; 新生代对象分配频繁而生存周期短&#xff0c;老年代生存周期长…