大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject

// 定义全局缓存单例对象
object GlobalCache extends Serializable {

  // 广播变量,用于存储缓存数据
  private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _

  // 设置 SparkSession 和广播变量
  def setSparkSession(spark: SparkSession): Unit = {
    cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])
  }


  // 按订单ID和用户ID缓存JSONObject对象
  def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.put(generateKey(orderId, userId), jsonObject)
    }
  }

  // 根据订单ID和用户ID删除缓存的JSONObject对象
  def removeJSONObject(orderId: String, userId: String): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.remove(generateKey(orderId, userId))
    }
  }

  // 根据订单ID和用户ID获取缓存的JSONObject对象
  def getJSONObjet(orderId: String, userId: String): JSONObject = {
    // 获取广播变量的值并进行访问
    val data = cacheData.value
    data.synchronized {
      data.get(generateKey(orderId, userId)).orNull
    }
  }

  // 生成缓存键,使用订单ID和用户ID拼接
  private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}

object CacheTest {
  Logger.getLogger("org").setLevel(Level.ERROR)
  Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别


  def addItem(orderId:String, userId:String, name:String): Unit = {
    val jsonObject = new JSONObject()
    jsonObject.put("name", name)

    // 缓存JSONObject对象
    GlobalCache.cacheJSONObject(orderId, userId, jsonObject)
  }


  def getCache(orderId: String, userId: String): JSONObject = {
    // 获取缓存的JSONObject对象
    GlobalCache.getJSONObjet(orderId, userId)
  }

  def delItem(orderId:String, userId:String): Unit = {
    // 删除缓存的JSONObject对象
    GlobalCache.removeJSONObject(orderId, userId)
  }


  def getSparkSession(appName: String, localType: Int): SparkSession = {
    val builder: SparkSession.Builder = SparkSession.builder().appName(appName)
    if (localType == 1) {
      builder.master("local[8]") // 本地模式,启用8个核心
    }

    val spark = builder.getOrCreate() // 获取或创建一个新的SparkSession
    spark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别
    spark
  }

  def main(args: Array[String]): Unit = {
    println("Start CacheTest")
    val spark: SparkSession = getSparkSession("CacheTest", 1)

    GlobalCache.setSparkSession(spark)  // 构造全局缓存

    addItem("001", "456", "苹果")      // 添加元素
    addItem("002", "789", "香蕉")      // 添加元素
    var cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")

    delItem("001", "456")      // 删除元素
    cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")
    spark.stop()
  }
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: null

Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/49679.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【【51单片机11.0592晶振红外遥控】】

51单片机11.0592晶振红外遥控 红外遥控,51单片机完结 这是初步实现的架构 怎么实现内部的详细逻辑 我们用状态机的方法 0状态时一个空闲状态 当它接收到下降沿开始计时然后转为1状态 1状态下 寻找start 或者repeat的信号 再来下降沿读出定时器的值 如果是start 那…

Python爬虫基础知识点有哪些

目录 Python爬虫基础知识点 Requests库 Beautiful Soup库 正则表达式 数据存储 防止被反爬虫策略 爬虫调度和任务管理 认识robots.txt文件 反爬虫法律与道德 示例代码 Requests库 Beautiful Soup库 正则表达式 数据存储 防止被反爬虫策略 结语 网络世界中信息的…

如图,△ABC中,AD是角平分线,E、F分别为AC、AB上的点,且∠AED+∠AFD=180°.试问:DE与DF有何关系,并说明理由.

Question: 如图,△ABC中,AD是角平分线,E、F分别为AC、AB上的点,且∠AED∠AFD180.试问:DE与DF有何关系,并说明理由. Answer: 分析:过D作DM⊥AB于…

为 Google Play 即将推出基于区块链的内容政策做好准备

作者 / Joseph Mills, Group Product Manager, Google Play 作为一个平台,Google Play 一直致力于帮助开发者将创新理念变为现实。Google Play 上托管了许多和区块链相关的应用,我们深知合作伙伴们希望扩展这些应用,并利用 NFT 等代币化数字资…

两数相加 II——力扣445

题目描述 法一 栈 本题旨在从后往前加,为了逆序处理所有数位,利用栈,把数字压入栈中,再依次取出相加,注意进位!进位是/10,另外需要注意栈的常用函数,push()、pop()、top()&#xff0…

Unity游戏源码分享-2.5D塔防类游戏

Unity游戏源码分享-2.5D塔防类游戏 项目地址: https://download.csdn.net/download/Highning0007/88118947

android存储4--初始化.emulated设备的挂载

android版本:android-11.0.0_r21http://aospxref.com/android-11.0.0_r21 android手机的挂载非常复杂。这篇文章针对emulated存储,介绍它的挂载过程。 一、为什么emulted存储要用很复杂的挂载方式 1, emulted存储是什么 android早期&#…

RCU 使用及机制源码的一些分析

》内核新视界文章汇总《 文章目录 1 介绍2 使用方法2.1 经典 RCU2.2 不可抢占RCU2.3 加速版不可抢占RCU2.4 链表操作的RCU版本2.5 slab 缓存支持RCU 3 源码与实现机制的简单分析3.1 数据结构3.2 不可抢占RCU3.3 加速版不可抢占RCU3.4 可抢占RCU3.5 报告禁止状态3.6 宽限期的开…

Photoshop2023beta常见问题|ps 2023测试版智能AI功能不能用如何解决?

PS beta ai创成式填充用不了怎么办 生成图像出错解决方法?PS 2023最新版本更新了超强大的AI功能,可以一键生成或删除用户选中的内容,这可大大提高了生成图片的效率。生成出来的图片也被公认为质量超高,虽然偶尔可能有点小瑕疵&…

vue2中开发时通过template中的div等标签自动输出对应的less形式带层级的class,只显示带class的

1.写完静态不是要写less吗,自动生成一下实现 this.getLevelClass(domId); domId是自定义的class名称,跟根据自己的需要设置 //vue2中开发时通过template中的div等标签自动输出对应的less形式带层级的class,只显示带class的getLevelClass(name) {let dom…

flask的配置项

flask的配置项 为了使 Flask 应用程序正常运行,有多种配置选项需要考虑。下面是一些基本的 Flask 配置选项: DEBUG: 这个配置项决定 Flask 是否应该在调试模式下运行。如果这个值被设为 True,Flask 将会提供更详细的错误信息,并…

STM32 I2C OVR 错误

一、问题 STM32 I2C 用作从机时,开启如下中断并启用 callback 回调函数。 每一次复位后,从机都可以正常触发地址匹配中断ADDR,之后在该中断的回调函数中启用接收中断去收取数据时,却无法进入RXNE中断,而是触发了 OVR …

Exadata磁盘损坏导致磁盘组无法mount恢复(oracle一体机磁盘组异常恢复)---惜分飞

Oracle Exadata客户,在换盘过程中,cell节点又一块磁盘损坏,导致datac1磁盘组(该磁盘组是normal方式冗余)无法mount Thu Jul 20 22:01:21 2023 SQL> alter diskgroup datac1 mount force NOTE: cache registered group DATAC1 number1 incarn0x0728ad12 NOTE: ca…

【Spring Boot丨序列化、反序列化】

序列化、反序列化 概述Jackson 序列化和反序列化简介自定义序列化器注册外部序列化程序: 指定类的 Json 序列化、反序列化 主页传送门:📀 传送 概述 序列化是将对象转换为字节序列的过程,而反序列化则是将字节序列恢复为对象的过…

16.Netty源码之ChannelPipeline

highlight: arduino-light 服务编排层:ChannelPipeline协调ChannelHandlerHandler EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 Cha…

【数据分析专栏之Python篇】二、Jupyer Notebook安装配置及基本使用

文章目录 前言一、Jupter Notebook是什么1.1 简介1.2 组成部分1.3 Jupyter Notebook的主要特点 二、为什么使用Jupyter Notebook?三、安装四、Jupyter Notebok配置4.1 基本配置4.2 配置开机自启与后台运行4.3 开启代码自动补全 五、两种键盘输入模式5.1 编辑模式5.2 命令模式5…

智安网络|常见的网络安全陷阱:你是否掉入了其中?

在数字化时代,网络安全成为了一个重要的议题。随着我们越来越多地在互联网上进行各种活动,诸如在线银行交易、社交媒体分享和在线购物等,我们的个人信息也更容易受到攻击和滥用。虽然有许多关于网络安全的指导和建议,但仍然有许多…

深度学习实战44-Keras框架下实现高中数学题目的智能分类功能应用

大家好,我是微学AI ,今天给大家介绍一下深度学习实战44-Keras框架实现高中数学题目的智能分类功能应用,该功能是基于人工智能技术的创新应用,通过对数学题目进行智能分类,提供个性化的学习辅助和教学支持。该功能的实现可以通过以下步骤:首先,采集大量的高中数学题目数据…

练习时长两年半的双机热备

1.双机热备技术产生的背景 传统的组网方式如下左图所示,内部用户和外部用户的交互报文全部通过防火墙A。如果防火墙A出现故障,内部网络中所有以防火墙A作为默认网关的主机与外部网络之间的通讯将中断,通讯可靠性无法保证。防火墙作为安全设备…