大数据 - Spark系列《十一》- Spark累加器详解

 Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客

大数据 - Spark系列《八》- 闭包引用-CSDN博客

大数据 - Spark系列《九》- 广播变量-CSDN博客

大数据 - Spark系列《十》- rdd缓存详解-CSDN博客

  1. 简介  

累加器用来把Executor端变量信息聚合到Driver端。在 Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回 Driver端进行merge。

观察一个问题: 原因是数据在executor端执行完毕以后并没有将acc结果数据返回

def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.getSc
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
    var count:Long = 0L
    //rdd.map(count+=_)
    rdd.foreach(num=>{count+=num})
    //计算的结果为0
    println(count)
    sc.stop()
  }

 

 解决方案:应该将每个executor执行的结果数据返回到Driver端进行聚合操作 , 返回最终结果数据

  2. LongAccumulator  

LongAccumulator 是 Spark 中的一种累加器(Accumulator)类型,用于在分布式计算中对长整型(Long)类型的数据进行累加。累加器是一种特殊的共享变量,它可以在各个节点上对其进行添加操作,并将结果汇总到驱动器程序中。

2.1 🥙主要特点:

  1. 分布式累加器: LongAccumulator 可以在不同节点上的任务中并行地对其进行添加操作,然后将结果汇总到驱动器程序中。

  2. 长整型数据类型: 适用于对长整型数据进行累加的场景,如计数器、求和等。

  3. 只支持累加操作: LongAccumulator 只支持累加操作,不能进行减法或其他运算。

  4. 原子性操作: 对累加器的操作是原子性的,可以保证在并发执行的情况下不会发生数据错误。

package com.doit.day0219

import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @日期: 2024/2/20
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description: 使用 Spark 累加器统计解析 JSON 数据失败的次数
 */

object Test02 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 从文件加载 JSON 数据
    val rdd1 = sc.textFile("Data/movie.json")

    // 定义计数器变量
    //var cnt = 0

    // 使用 spark 内置的全局计数器
    val cnt1 = sc.longAccumulator("my long Accumulator") // 合并

    // 遍历 RDD 中的每一行数据
    rdd1.foreach(line => {
      try {
        // 尝试解析 JSON 数据
        val bean = JSON.parseObject(line, classOf[Movie])
        println(bean)
      } catch {
        case e: Exception => {
          // JSON 解析失败,增加计数器
          cnt1.add(1) // 计数1
        }
      }
    })

    // 打印累加器的值,即解析失败的次数
    println(cnt1.value) // 合并后的结果
  }
}

 

 

2.2 🥙注意事项:

  • 累加器数据属于全局变量 ,由行动算子触发执行 , 没有触发不执行累加 没算

  • 如果多次触发行动算子 , 累加器会执行多次

  • 建议将累加器的变化操作编写在行动算子中

2.3 🥙累加器方法

  •   add(value: T)  

向累加器中添加一个值,将这个值与累加器中已有的值进行累加。累加器的值类型必须与添加的值类型相符合。

package com.doit.day0219
import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @日期: 2024/2/20
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test05 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建 SparkContext 对象,传入 SparkConf 对象
    val sc = new SparkContext(conf)

    val sumAccumulate = sc.longAccumulator("sumAccumulate")

    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
    rdd.foreach(x=>sumAccumulate.add(x))

    println("累加器的值:"+sumAccumulate.value) //15

    sc.stop()
  }
}
  •  reset() 

重置累加器的值为初始值,通常是零或空。

// 重置累加器的值为初始值
accumulator.reset()
  •  value 

获取累加器的当前值。

// 获取累加器的当前值
val currentValue = accumulator.value
println("当前累加器的值:" + currentValue)

2.3 🧀实例1

累加器,主要用于正常业务作业过程中的一些附属信息统计

*       (比如程序遇到的脏数据条数,

*       程序处理的数据总行数,

*       程序处理的特定数据条数,

*       程序处理所花费的时长)

业务上需要对如下数据进行统计:比如统计每个city的用户数

"1,Mr.duan,18,beijing"
"2,Mr.zhao,28,beijing"
"b,Mr.liu,24,shanghai"
"4,Mr.nai,22,shanghai"
"a,Mr.liu,24,shanghai"
"6,Mr.ma"

 同时,还想在业务统计的过程中,附带统计出原始数据中的脏数据条数,并按多种不正确的格式进行分别统计,如:id字段无法数字化的条数字段数量不够的条数其他不正确的条数

package com.doit.day0219

import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @日期: 2024/2/20
 * @作者: Wang NaPao
 * @博客: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @描述: 这个对象包含了一个 Spark 应用程序的入口点,用于处理从文件加载 JSON 数据的场景,统计每个城市的用户数量,并且附带统计原始数据中的脏数据条数。
 */
object Test03 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建 SparkContext 对象,传入 SparkConf 对象
    val sc = new SparkContext(conf)

    // 从文件加载 JSON 数据
    val rdd1 = sc.textFile("Data/city.txt")

    // 创建一个累加器用于统计脏数据条数
    val cnt1 = sc.longAccumulator("dirtyDataCount")

    // 对 RDD 进行处理:将每行数据拆分为数组,判断数组长度,若为4则返回 (城市, 1),否则更新累加器并返回 None
    val rdd2 = rdd1.map(line => {
      try {
        val arr1 = line.split(",")
        val no = arr1(0).toInt
        val name = arr1(1)
        val age = arr1(2).toInt
        val city = arr1(3)
        (city,1)
      } catch {
        case exception: Exception => {
          cnt1.add(1)
          ("error", 1)
        }
      }

    }).filter(e=>{e._1!="error"}) // 过滤掉脏数据
      .reduceByKey(_ + _) // 对相同键的值进行累加
      .foreach(println) // 打印结果

    // 打印脏数据条数
    println("脏数据条数为:" + cnt1.value)
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404536.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024/02/23

使用消息队列完成两个进程间相互通信 A.c #include<myhead.h> struct msgbuf {long mtype;char mtext[1024]; }; //定义表示正文内容大小的宏 #define MSGSIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc, const char *argv[]) {//创建一个key值key_t key;ke…

知乎66条高赞回答,句句醍醐灌顶!

-01- 穷人是小心翼翼地大方&#xff0c; 有钱人是大大方方地小气。 ——论如何判断一个人是真有钱还是装有钱 -02- 枕头要常晒&#xff0c; 因为里面装满了心酸的泪和发霉的梦。 ——一切终将随风而逝 -03- 人活得累&#xff0c;一是太认真&#xff0c;二是太想要。 …

第3部分 原理篇2去中心化数字身份标识符(DID)(3)

3.2.2.4. DID文档 (DID Document) 本聪老师&#xff1a;DID标识符和DID URL还都只是ID&#xff0c;必须为它附加一个基本属性才可以证明是该主体独有的。这个就是我们下面介绍的DID文档。 本聪老师&#xff1a;每个DID标识符都唯一对应一个DID文档&#xff0c;也可以说&#x…

计算机功能简介:EC, NVMe, SCSI/ISCSI与块存储接口 RBD,NUMA

一 EC是指Embedded Controller 主要应用于移动计算机系统和嵌入式计算机系统中&#xff0c;为此类计算机提供系统管理功能。EC的主要功能是控制计算机主板上电时序、管理电池充电和放电&#xff0c;提供键盘矩阵接口、智能风扇接口、串口、GPIO、PS/2等常规IO功能&#xff0c;…

docker自定义网络实现容器之间的通信

Background docker原理 docker是一个Client-Server结构的系统&#xff0c;Docker的守护进程运行在主机上。通过Socket从客户端访问。docker核心三大组件&#xff1a;image–镜像、container-容器、 repository-仓库。docker使用的cpu、内存以及系统内核等资源都是直接使用宿主…

A Novel Two-Layer DAG-based Reactive Protocol for IoT Data Reliability in Metaverse

在IOT 场景中&#xff0c;需要保证数据的完整性和可靠性。通常区块链可以用来做这件事&#xff0c;但是IoT 设备的计算能力和贷款都是有限的。 对于PBFT 要求的通信量太大。 本文提出的 two layer directed acycle graph (2LDAG) 是一种被动共识协议&#xff0c;除非有节点主动…

快速构建 Debezium MySQL Example 数据库

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

EXCEL 在列不同单元格之间插入N个空行

1、第一步数据&#xff0c;要求在每个数字之间之间插入3个空格 2、拿数据个数*&#xff08;要插入空格数1&#xff09; 19*4 3、填充 4、复制数据到D列 5、下拉数据&#xff0c;选择复制填充这样1-19就会重复4次 6、全选数据D列排序&#xff0c;这样即完成了插入空格 以…

SQL语法-DQL-测试练习

因篇幅原因&#xff0c;本篇承接此篇->第八篇&#xff1a;SQL语法-DQL-数据查询语言-CSDN博客 本篇是对于SQL语法DQL语句的练习&#xff0c;因水平和精力有限&#xff08;就不像前两篇的DDL&#xff0c;DML那样自出练习了&#xff09;直接照搬了【黑马程序员】在哔哩哔哩的…

基于卷积神经网络的图像去噪

目录 背影 卷积神经网络CNN的原理 卷积神经网络CNN的定义 卷积神经网络CNN的神经元 卷积神经网络CNN的激活函数 卷积神经网络CNN的传递函数 基于卷积神经网络的图像去噪 完整代码:基于卷积神经网络的图像去噪.rar资源-CSDN文库 https://download.csdn.net/download/abc9918351…

如何在java中使用 Excel 动态函数生成依赖列表

前言 在Excel 中&#xff0c;依赖列表或级联下拉列表表示两个或多个列表&#xff0c;其中一个列表的项根据另一个列表而变化。依赖列表通常用于Excel的业务报告&#xff0c;例如学术记分卡中的【班级-学生】列表、区域销售报告中的【区域-国家/地区】列表、人口仪表板中的【年…

vue3 + ts + echart 实现柱形图表

首先封装Echart一个文件 代码如下 <script setup lang"ts"> import { ECharts, EChartsOption, init } from echarts; import { ref, watch, onMounted, onBeforeUnmount } from vue;// 定义props interface Props {width?: string;height?: string;optio…

网工内推 | 信息安全售前,国企、上市公司,补贴福利多

01 中电科网络安全科技有限公司 招聘岗位&#xff1a;信息安全售前工程师 职责描述&#xff1a; 1.负责为客户提供整体信息安全规划、IT治理需求调研、现状分析、蓝图规划与实施路线设计&#xff0c;为客户提供设计方案&#xff1b; 2.承担行业信息安全发展研究、行业业务规划…

vue3 vuex

目录 Vuex 是什么 什么是“状态管理模式”&#xff1f; 什么情况下我应该使用 Vuex&#xff1f; 使用方法&#xff1a; 提交载荷&#xff08;Payload&#xff09; 对象风格的提交方式 使用常量替代 Mutation 事件类型 Mutation 必须是同步函数 在组件中提交 Mutation …

sentinel中监听器的运用--规则管理

sentinel中监听器的运用–规则管理 规则结构 类图关系 类关系图如下 Rule 将规则抽象成一个类, 规则与资源是紧密关联的, 也就是说规则作用于资源。因此, 我们需要将规则表示为一个类, 并包含一个获取资源的方法 这里采用接口的原因就是规则是一个抽象概念而非具体实现。…

导入excel某些数值是0

目录 导入excel某些数值是0数据全部都是0原因解决 部分数据是0原因解决 导入excel某些数值是0 数据全部都是0 有一列“工单本月入库重量”全部的数据都是0 原因 展示的时候&#xff0c;展示的字段和内表需要展示的字段不一致&#xff0c;导致显示的是0。 解决 修改展示的字…

Vue | (四)使用Vue脚手架(上) | 尚硅谷Vue2.0+Vue3.0全套教程

文章目录 &#x1f4da;初始化脚手架&#x1f407;创建初体验&#x1f407;分析脚手架结构&#x1f407;关于render&#x1f407;查看默认配置 &#x1f4da;ref与props&#x1f407;ref属性&#x1f407;props配置项 &#x1f4da;混入&#x1f4da;插件&#x1f4da;scoped样…

DBeaver的下载安装和连接MySQL数据库

DBeaver的下载安装和连接MySQL数据库 1、dbeaver的下载 dbeaver是一款的数据库连接工具&#xff0c;免费&#xff0c;跨平台。 官网&#xff1a;https://dbeaver.io/ 下载地址&#xff1a;https://dbeaver.io/download/ GitHub下载地址&#xff1a;https://github.com/dbeav…

使用向量数据库pinecone构建应用02:检索增强生成RAG

Building Applications with Vector Databases 下面是这门课的学习笔记&#xff1a;https://www.deeplearning.ai/short-courses/building-applications-vector-databases/ Learn to create six exciting applications of vector databases and implement them using Pinecon…

记一次 Flink 作业启动缓慢

记一次 Flink 作业启动缓慢 背景 应用发现&#xff0c;Hadoop集群的hdfs较之前更加缓慢&#xff0c;且离线ELT任务也以前晚半个多小时才能跑完。此前一直没有找到突破口所以没有管他&#xff0c;推测应该重启一下Hadoop集群就可以了。今天突然要重启一个Flink作业&#xff0c…