Spark Structured Streaming 分流或双写多表 / 多数据源(Multi Sinks / Writes)

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在 Spark Structured Streaming 中,我们有时候需要将最后的处理结果分流或双写到多张表或多个数据源(Multi Sinks / Writes),一个典型的例子是:在 CDC 数据入湖场景里,一个 Kafka Topic 上存放着整库或多张表的 CDC 消息,使用 Spark 从 Kafka 中摄取这些消息后,需要根据消息中提供的数据库名和数据表名对 CDC 消息分流,然后写到数据湖上对应的 ODS 表中,这就是一种典型的“数据分流”场景。在 Spark Structured Streaming 中,实现多表 / 多数据源的分流或双写主要依赖 foreachBatchforeach 这两个方法,本文就围绕它们介绍一下分流或双写多表 / 多数据源的具体实现。

首先,要明确 foreachforeachBatch 都是 action,也就意味着使用它们时已经到了流的末端,绝大数情况下,就是要将记录写入目标数据源了,这也是foreachforeachBatch 这两个方法绝大多数的应用场景。通常,在 Spark 中将数据写入一个数据源是这样做的(以写 parquet 文件为例):

writeStream
    .format("parquet")
    .option("path", "path/to/destination/dir")
    .start()

由于 Spark 内置了 parquet 格式的 data writer, 所以我们只需填写一些相应的配置,就可以直接把 DF 按对应的格式写到目标位置了,那什么情况下我们要使用 foreachforeachBatch 呢?下面展开来介绍一下。

1. foreachBatch 的应用场景


大多数情况下,一条的流处理的 pipeline 都是从一个 Source 开始,中间经历各种处理后,最终写入了一个 Sink,但是,在某些场景下,我们流的重点可能需要写入的并不是一个 Sink,而是多个,典型的情形有:

  • 数据分流:需要将数据“分流”写入不同的数据源或数据表( 简单说就是 dispatch )

  • 数据多写:需要同时向多个下游数据源相同相同数据( 简单说就是 duplicate )

虽然我们可以非常“粗暴”地通过 for 循环构建多个 writer 实现上述两种典型的写入需求,但是这种做法会让每一个 sink 变成独立的 streaming query(作业),是代价很高的应对方法,并不实用。最好的做法就是通过 foreachBatch 来实现,实际上上面两种需求正是 foreachBatch 的典型应用场景。我们看一下 foreachBatch 的接口声明:

def foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

我们需要为 foreachBatch 传入一个函数字面量,它有两参数,第一个对应一个 micro-batch 的 DataFrame, 第二个是这个 micro-batch 的 ID,拿到 micro-batch 的 DataFrame 后,我们可以在这个 DataFrame 上作相应的转换处理,最后调用现成的 writer 写入目标端。这里涉及到 Spark Streaming 的 Micro-Batch,也就是上述参数列表中的 Dataset[T] 类型的那个 DataFrame ,关于 Micro-Batch 在流上运行方式,下图给出了非常形象的描绘:

在这里插入图片描述

简单地说,Micro-Batch 模式下需要收集齐一定量(或一小段时间范围内)的数据,整理成一个 DataFrame 去处理,它的延迟是在秒级。上图下方时间轴上的每一小撮数据就是 foreachBatch 中传入的那个 DataFrame。

这里,我们特别澄清一个容易误解的地方: foreachBatch 是没有“循环”语义的,这里的 foreach 其实是意在针对每一个 micro-batch 的,不是空间维度上迭代多个 micro-batch, 而是时间维度上针对每一个流经的 micro-batch 进行处理。这里也能提现从 source 构建出的 DF 和这个方法里的 micro-batch 的 DF 的差异,前者是一个无界的 DF,本质上是一个流,更加“实体”的 DF 其实是 foreachBatch 中的这个 DF,它是较短时间内聚齐的“一小撮”数据,边界是确定的!

下面,我们针对分流和双写两种典型场景给出详细的示例代码。

1.1. 通过 foreachBatch 实现数据“分流”


我们以向两种不同的 Hudi 表写入数据为例,先将数据过滤,得到分流后的 DF,然后向对应的 Hudi 表中写入:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // 分流 table_1 的数据并写入
  filteredDF1 = batchDF.filter(...)
  filteredDF1.write.format("hudi").
      option(TABLE_NAME, "table_1").
      mode(SaveMode.Append).
      save("/path/1")
    
  // 分流 table_2 的数据并写入
  filteredDF2 = batchDF.filter(...)
  filteredDF2.write.format("hudi").
      option(TABLE_NAME, "table_2").
      mode(SaveMode.Append).
      save("/path/2")
}

1.2. 通过 foreachBatch 实现数据“双写”


我们以向两种不同的数据源写入数据为例,可以调用多次 write 操作,但是,由于每次写入都会导致数据被 recomputed,流本身可能不再存在或状态发生了改变,所以,必须要在写入前使用 persist, 保证向下游多次写入的数据是完全一样,最后记得再执行一遍 unpersist 即可。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format("csv").save(...)  // location 1, 对应数据源的 data writer 是已存在的
  batchDF.write.format("hudi").save(...)  // location 2, 对应数据源的 data writer 是已存在的
  batchDF.unpersist()
}

2. foreach 的应用场景


foreachBatch 很实用,但是在如下两种场景下无法工作的:

  • 没有现成的支持目标数据源的 data writer;
  • 当前流运行于 continuous processing 模式,不支持 micro-batch

如果是上述情形,我们就得使用 foreach 了,因为 foreach 要自行实现对目标数据源的链接和读写,同时,它的自定义处理逻辑又是作用到每一行上的,所以它能解决上述两种场景的问题。某种程度上,foreach 相比 foreachBatch 是一种更底层的 API。使用 foreach 需要提供一个 ForeachWriter,实现 open, process, 和close 三个方法,不过要注意的是这三个方案的调用时机是不同的,open / close 显然是 per-partition 要调用一次的, proess 则是要针对每条记录进行处理的。以下是 一个自行实现 foreach 的代码模板:

streamingDatasetOfString.writeStream.foreach(
  // 没有现成的 DataStreamWriter,需要自行实现行级别的存储逻辑。
  new ForeachWriter[String] {
	// 在 partition 这个粒度上创建针对目标数据源的连接,这比较符合常规
    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }
	// 数据梳理逻辑会作用到记录级别,而不是 miro-batch 的 df 级别。
    def process(record: String): Unit = {
      // Write string to connection
    }
	// 关闭连接,释放资源
    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

关于 foreach 更多信息可以参考官方文档,这里就不再深究了,大多数情况,我们更多使用的还是 foreachBatch

3. 小结


foreachforeachBatch 都能在向目标数据源写入数据时实现定制化的逻辑,它们之间的差别在于:

  • foreachBatch多应用于数据分流或双写场景,目标数据源往往是已经有线程的 data writer 了
  • foreach 则要自行实现对目标数据的连接和读写处理
  • 两者操纵数据的颗粒度不同,foreach 对数据的梳理逻辑(process 方法)作用到 DF 中的每一行上,而 foreachBatch 则直接操纵的是每一个 micro-batch 对应的 DF。

参考资料

  • Spark 关于 foreach 和 foreachBatch 的官方文档
  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/586557.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用 scikit-learn 进行机器学习的基本原理-2

介绍 scikit-learn 估计器对象 每个算法都通过“Estimator”对象在 scikit-learn 中公开。 例如,线性回归是:sklearn.linear_model.LinearRegression 估计器参数:估计器的所有参数都可以在实例化时设置: 拟合数据 让我们用 nump…

第7篇:创建Nios II工程之控制LED<二>

Q:上一期我们完成了Quartus硬件工程部分,本期我们创建Nios II软件工程这部分。 A:创建完BSP和Nios II Application之后,在source文件main.c中添加LED控制代码:system.h头文件包含了Platform Designer系统中IP的硬件信…

VUE3----Tabs swiper 滑动切换

Tabs swiper 滑动切换 <template><view class"cc-tab-container"><scroll-view class"tab-head" :class"tabClassName" scroll-x"true" scroll-with-animation :scroll-left"state.scrollLeft"><view…

变电站综合自动化系统:Modbus-PLC-645转IEC104网关方案

前言 电力行业作为关系国计民生的重要基础产业&#xff0c;是关系千家万户的公用事业。但是要做好电力行业安全保障工作的前提&#xff0c;是需要对应的技术人员详细了解电力工业使用的系统、设备以及各类协议的安全特性&#xff0c;本文将主要介绍IEC 104协议的定义和钡铼技术…

STL——stackqueue

stack stack即为栈&#xff0c;先进后出是其特点 栈只有栈顶元素能被外界使用&#xff0c;故不存在遍历行为 栈中常用接口 构造函数 stack<T> stk; //默认构造方式 stack(const stack &stk); //拷贝构造 赋值操作 stack& operator(const stack &stk); …

对汉诺塔递归算法的简单理解

一.历史背景:汉诺塔&#xff08;Tower of Hanoi&#xff09;&#xff0c;又称河内塔&#xff0c;是一个源于印度古老传说的益智玩具。大梵天创造世界的时候做了三根金刚石柱子&#xff0c;在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘。大梵天命令婆罗门把圆盘从下面开始…

网络安全是智能汽车下一个要卷的方向?

2024年一季度&#xff0c;中国汽车市场延续了2023年的风格&#xff0c;核心就是「卷」。 2023年&#xff0c;我国汽车市场爆发「最强价格战」&#xff0c;燃油车的市场空间不断被挤压&#xff0c;如今只剩下最后一口气。近日乘联会发布4月1-14日最新数据&#xff0c;新能源&am…

基于昇腾AI | 英码科技EA500I使用AscendCL实现垃圾分类和视频物体分类应用

现如今&#xff0c;人工智能迅猛发展&#xff0c;AI赋能产业发展的速度正在加快&#xff0c;“AI”的需求蜂拥而来&#xff0c;但AI应用快速落地的过程中仍存在很大的挑战&#xff1a;向下需要适配的硬件&#xff0c;向上需要完善的技术支持&#xff0c;两者缺一不可。 基于此&…

如何利用仪表构造InfiniBand流量在数据中心测试中的应用

一、什么是Infiniband&#xff1f; 在当今数据爆炸的时代&#xff0c;数据中心作为信息处理的中心枢纽&#xff0c;面临着前所未有的挑战。传统的通信方式已经难以满足日益增长的数据传输需求&#xff0c;而InfiniBand技术的出现&#xff0c;为数据中心带来了全新的通信解决方…

使用xshell工具连接ubuntu的root账户被拒绝的解决方法

问题描述&#xff1a; 我在使用xshell工具远程连接Ubuntu虚拟机的过程中&#xff0c;如果连接的是的普通用户则xshell工具可以正常连接&#xff0c;但是当我向连接ubuntu系统的root用户&#xff0c;即便是密码输入正确但还是不能连接成功。不能连接成功的截图如下&#xff1a; …

requests库进行接口请求

请求的常规写法 requests.post() 、requests.get() 从中可以看出&#xff1a; 必填参数&#xff1a; url可缺省参数&#xff1a; data&#xff0c;json等、关键字参数 **kwargs 如下进行了一个post请求的登录&#xff0c;且请求体在body中 知识点1 当为post请求时&#xff1…

建堆时间复杂度

片头 嗨&#xff01;小伙伴们&#xff0c;大家好&#xff01; 在上一篇中&#xff0c;我们学习了什么是堆&#xff0c;以及如何实现堆。这一篇中&#xff0c;我将继续带领大家来深入学习堆&#xff0c;准备好了吗&#xff1f;我要开始咯&#xff01; 首先&#xff0c;大家还记…

opencv_17_翻转与旋转

一、图像翻转 1&#xff09;void flip_test(Mat& image); 2&#xff09;void ColorInvert::flip_test(Mat& image) { Mat dst; //flip(image, dst, 0); //上下翻转 flip(image, dst, 1); //左右翻转 // flip(image, dst, -1); //180度翻转 imsho…

VScode 无法连接云服务器

试了很多方法&#xff0c;比如更换VScode版本&#xff0c;卸载重装&#xff0c;删除配置文件 重启电脑&#xff0c;都无法成功。最后重置电脑后才连接上&#xff0c;但是重启服务器后又出现该问题。 方法一&#xff1a;修改环境 方法二&#xff1a;把vscode卸载干净重下

【快速入门】数据库的增删改查与结构讲解

文章的操作都是基于小皮php study的MySQL5.7.26进行演示 what 数据库是能长期存储在计算机内&#xff0c;有组织的&#xff0c;可共享的大量数据的集合。数据库中的数据按照一定的数据模型存储&#xff0c;具有较小的冗余性&#xff0c;较高的独立性和易扩展性&#xff0c;并为…

LabVIEW智能变电站监控系统设计与实现

LabVIEW智能变电站监控系统设计与实现 随着电力系统和智能化技术的快速发展&#xff0c;建立一个高效、可靠的变电站监控系统显得尤为重要。通过分析变电站监控系统的需求&#xff0c;设计了一个基于LabVIEW软件的监控平台。该平台利用虚拟仪器技术、传感器技术和无线传输技术…

数据结构中的栈(C语言版)

一.栈的概念 栈是一种常见的数据结构&#xff0c;它遵循后进先出的原则。栈可以看作是一种容器&#xff0c;其中的元素按照一种特定的顺序进行插入和删除操作。 压栈&#xff1a;栈的插入操作叫做进栈/压栈/入栈&#xff0c;入数据在栈顶。 出栈&#xff1a;栈的删除操作叫做…

2024年的十大技术趋势 - AI 等等

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

网优小工具-基站ID行列转换

网优小工具&#xff0d;基站ID行列转换 因在日常工作需要对基站ID批量行转列&#xff0c;以方便在网管上批量筛选指定网元&#xff0c;该小工具基于微软Power Query插件编写&#xff0c;工具方便、简洁、易用&#xff0c;共享出来以方便工作。 工作界面 &#xff11;.粘贴需筛…

学习VUE2第6天

一.请求拦截器 可以节流&#xff0c;防止多次点击请求 toast是单例 二.前置路由守卫 在Vue.js中&#xff0c;前置路由守卫是指在路由转换实际发生之前执行的钩子函数。这是Vue Router&#xff08;Vue.js官方的路由管理器&#xff09;提供的一种功能&#xff0c;允许开发者在用…