StructStreaming Batch mode和Continuous mode

StructStreaming Batch mode和Continuous mode

让我们把目光集中到 Structured Streaming,也就是流处理引擎本身。Structured Streaming 与 Spark MLlib 并列,是 Spark 重要的子框架之一。值得一提的是,Structured Streaming 天然能够享受 Spark SQL 提供的处理能力与执行性能,同时也能与其他子框架无缝衔接。因此,基于 Structured Streaming 这个新一代框架开发的流处理应用,天然具备优良的执行性能与良好的扩展性。

知己知彼,百战百胜。想要灵活应对不同的实时计算需求,我们就要先了解 Structured Streaming 的计算模型长啥样,搞清楚它如何应对容错、保持数据一致性。我们先从计算模型说起。

计算模型

当数据像水流一样,源源不断地流进 Structured Streaming 引擎的时候,引擎并不会自动地依次消费并处理这些数据,它需要一种叫做 Trigger 的机制,来触发数据在引擎中的计算。

换句话说,Trigger 机制,决定了引擎在什么时候、以怎样的方式和频率去处理接收到的数据流。Structured Streaming 支持 4 种 Trigger,如下表所示。

image-20211218133604129

要为流处理设置 Trigger,我们只需基于 writeStream API,调用 trigger 函数即可。Trigger 的种类比较多,一下子深入细节,容易让你难以把握重点,所以现在你只需要知道 Structured Streaming 支持种类繁多的 Trigger 即可。

我们先把注意力,放在计算模型上面。对于流数据,Structured Streaming 支持两种计算模型,分别是 Batch mode 和 Continuous mode。所谓计算模型,本质上,它要解决的问题,就是 Spark 以怎样的方式,来对待并处理流数据

这是什么意思呢?没有对比就没有鉴别,咱们不妨通过对比讲解 Batch mode 和 Continuous mode,来深入理解计算模型的含义。

Batch mode

我们先来说说 Batch mode,所谓 Batch mode,它指的是 Spark 将连续的数据流,切割为离散的数据微批(Micro-batch),也即小份的数据集

形象一点说,Batch mode 就像是“抽刀断水”,两刀之间的水量,就是一个 Micro-batch。而每一份 Micro-batch,都会触发一个 Spark Job,每一个 Job 会包含若干个 Tasks。学习过基础知识与 Spark SQL 模块之后,我们知道,这些 Tasks 最终会交由 Spark SQL 与 Spark Core 去做优化与执行。

image-20211218213139033

在这样的计算模型下,不同种类的 Trigger,如 Default、Fixed interval 以及 One-time,无非是在以不同的方式控制 Micro-batch 切割的粒度罢了。

比方说,在 Default Trigger 下,Spark 会根据数据流的流入速率,自行决定切割粒度,无需开发者关心。而如果开发者想要对切割粒度进行人为的干预,则可以使用 Fixed interval Trigger,来明确定义 Micro-batch 切割的时间周期。例如,Trigger.ProcessingTime(“5 seconds”),表示的是,每隔 5 秒钟,切割一个 Micro-batch。

Continuous mode

与 Batch mode 不同,Continuous mode 并不切割数据流,而是以事件 / 消息(Event / Message)为粒度,用连续的方式来处理数据。这里的事件或是消息,指代的是原始数据流中最细粒度的数据形式,它可以是一个单词、一行文本,或是一个画面帧

image-20211218213245717

一图胜千言,对比两种计算模型的示意图,我们可以轻松地发现它们之间的差异所在。在 Continuous mode 下,Structured Streaming 使用一个常驻作业(Long running job)来处理数据流(或者说服务)中的每一条消息。

那么问题来了,相比每个 Micro-batch 触发一个作业,Continuous mode 选择采用常驻作业来进行服务,有什么特别的收益吗?或者换句话说,这两种不同的计算模型,各自都有哪些优劣势呢?

用一句话来概括,Batch mode 吞吐量大、延迟高(秒级),而 Continuous mode 吞吐量低、延迟也更低(毫秒级)。吞吐量指的是单位时间引擎处理的消息数量,批量数据能够更好地利用 Spark 分布式计算引擎的优势,因此 Batch mode 在吞吐量自然更胜一筹。

而要回答为什么 Continuous mode 能够在延迟方面表现得更加出色,我们还得从 Structured Streaming 的容错机制说起。

容错机制

对于任何一个流处理引擎来说,容错都是一项必备的能力。所谓容错,它指的是,在计算过程中出现错误(作业层面、或是任务层面,等等)的时候,流处理引擎有能力恢复被中断的计算过程,同时保证数据上的不重不漏,也即保证数据处理的一致性。

从数据一致性的角度出发,这种容错的能力,可以划分为 3 种水平:

  1. At most once:最多交付一次,数据存在丢失的风险;
  2. At least once:最少交付一次,数据存在重复的可能;
  3. Exactly once:交付且仅交付一次,数据不重不漏。

image-20211218213425640

这里的交付,指的是数据从 Source 到 Sink 的整个过程。对于同一条数据,它可能会被引擎处理一次或(在有作业或是任务失败的情况下)多次,但根据容错能力的不同,计算结果最终可能会交付给 Sink 零次、一次或是多次。

聊完基本的容错概念之后,我们再说回 Structured Streaming。就 Structured Streaming 的容错能力来说,Spark 社区官方的说法是:“结合幂等的 Sink,Structured Streaming 能够提供 Exactly once 的容错能力”。

实际上,这句话应该拆解为两部分。在数据处理上,结合容错机制,Structured Streaming 本身能够提供“At least once”的处理能力。而结合幂等的 Sink,Structured Streaming 可以实现端到端的“Exactly once”容错水平

比方说,应用广泛的 Kafka,在 Producer 级别提供跨会话、跨分区的幂等性。结合 Kafka 这样的 Sink,在端到端的处理过程中,Structured Streaming 可以实现“Exactly once”,保证数据的不重不漏。

不过,在 Structured Streaming 自身的容错机制中,为了在数据处理上做到“At least once”,Batch mode 与 Continuous mode 这两种不同的计算模型,分别采用了不同的实现方式。而容错实现的不同,正是导致两种计算模型在延迟方面差异巨大的重要因素之一。接下来,我们就来说一说,Batch mode 与 Continuous mode 分别如何做容错。

Batch mode 容错

在 Batch mode 下,Structured Streaming 利用 Checkpoint 机制来实现容错。在实际处理数据流中的 Micro-batch 之前,Checkpoint 机制会把该 Micro-batch 的元信息全部存储到开发者指定的文件系统路径,比如 HDFS 或是 Amazon S3。这样一来,当出现作业或是任务失败时,引擎只需要读取这些事先记录好的元信息,就可以恢复数据流的“断点续传”。

要指定 Checkpoint 目录,只需要在 writeStream API 的 option 选项中配置 checkpointLocation 即可。我们以上一讲的“流动的 Word Count”为例,代码只需要做如下修改即可。

df.writeStream
// 指定Sink为终端(Console)
.format("console")
 
// 指定输出选项
.option("truncate", false)
 
// 指定Checkpoint存储地址
.option("checkpointLocation", "path/to/HDFS")
 
// 指定输出模式
.outputMode("complete")
//.outputMode("update")
 
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()

在 Checkpoint 存储目录下,有几个子目录,分别是 offsets、sources、commits 和 state,它们所存储的内容,就是各个 Micro-batch 的元信息日志。对于不同子目录所记录的实际内容,我把它们整理到了下面的图解中,供你随时参考。

image-20211218214926473

对于每一个 Micro-batch 来说,在它被 Structured Streaming 引擎实际处理之前,Checkpoint 机制会先把它的元信息记录到日志文件,因此,这些日志文件又被称为 Write Ahead Log(WAL 日志)。

换句话说,当源数据流进 Source 之后,它需要先到 Checkpoint 目录下进行“报道”,然后才会被 Structured Streaming 引擎处理。毫无疑问,“报道”这一步耽搁了端到端的处理延迟,如下图所示。

image-20211218215011696

除此之外,由于每个 Micro-batch 都会触发一个 Spark 作业,我们知道,作业与任务的频繁调度会引入计算开销,因此也会带来不同程度的延迟。在运行模式与容错机制的双重加持下,Batch mode 的延迟水平往往维持在秒这个量级,在最好的情况下能达到几百毫秒左右

Continuous mode 容错

相比 Batch mode,Continuous mode 下的容错没那么复杂。在 Continuous mode 下,Structured Streaming 利用 Epoch Marker 机制,来实现容错。

因为 Continuous mode 天然没有微批,所以不会涉及到微批中的延迟,到达 Source 中的消息可以立即被 Structured Streaming 引擎消费并处理。但这同时也带来一个问题,那就是引擎如何把当前的处理进度做持久化,从而为失败重试提供可能。

为了解决这个问题,Spark 引入了 Epoch Marker 机制。所谓 Epoch Marker,你可以把它理解成是水流中的“游标”,这些“游标”随着水流一起流动。每个游标都是一个 Epoch Marker,而游标与游标之间的水量,就是一个 Epoch,开发者可以通过如下语句来指定 Epoch 间隔。

writeStream.trigger(continuous = "1 second")

以表格中的代码为例,对于 Source 中的数据流,Structured Streaming 每隔 1 秒,就会安插一个 Epoch Marker,而两个 Epoch Marker 之间的数据,就称为一个 Epoch。你可能会问:“Epoch Marker 的概念倒是不难理解,不过它有什么用呢?”

在引擎处理并交付数据的过程中,每当遇到 Epoch Marker 的时候,引擎都会把对应 Epoch 中最后一条消息的 Offset 写入日志,从而实现容错。需要指出的是,日志的写入是异步的,因此这个过程不会对数据的处理造成延迟。

有意思的是,对于这个日志的称呼,网上往往也把它叫作 Write Ahead Log。不过我觉得这么叫可能不太妥当,原因在于,准备写入日志的消息,都已经被引擎消费并处理过了。Batch mode 会先写日志、后处理数据,而 Continuous mode 不一样,它是先处理数据、然后再写日志。所以,把 Continuous mode 的日志称作是“Write After Log”,也许更合适一些

我们还是用对比的方法来加深理解,接下来,我们同样通过消息到达 Source 与 Structured Streaming 引擎的时间线,来示意 Continuous mode 下的处理延迟。

image-20211218215228788

可以看到,消息从 Source 产生之后,可以立即被 Structured Streaming 引擎消费并处理,因而在延迟性方面,能够得到更好的保障。而 Epoch Marker 则会帮助引擎识别当前最新处理的消息,从而把相应的 Offset 记录到日志中,以备失败重试。

总结

今天这一讲,我们学习了 Structured Streaming 中两种不同的计算模型——Batch mode 与 Continuous mode。只有了解了它们各自在吞吐量、延迟性和容错等方面的特点,在面对日常工作中不同的流计算场景时,我们才能更好地做出选择。

在 Batch mode 下,Structured Streaming 会将数据流切割为一个个的 Micro-batch。对于每一个 Micro-batch,引擎都会创建一个与之对应的作业,并将作业交付给 Spark SQL 与 Spark Core 付诸优化与执行。

Batch mode 的特点是吞吐量大,但是端到端的延迟也比较高,延迟往往维持在秒的量级。Batch mode 的高延迟,一方面来自作业调度本身,一方面来自它的容错机制,也就是 Checkpoint 机制需要预写 WAL(Write Ahead Log)日志。

要想获得更低的处理延迟,你可以采用 Structured Streaming 的 Continuous mode 计算模型。在 Continuous mode 下,引擎会创建一个 Long running job,来负责消费并服务来自 Source 的所有消息。

在这种情况下,Continuous mode 天然地避开了频繁生成、调度作业而引入的计算开销。与此同时,利用 Epoch Marker,通过先处理数据、后记录日志的方式,Continuous mode 进一步消除了容错带来的延迟影响。

尺有所短、寸有所长,Batch mode 在吞吐量上更胜一筹,而 Continuous mode 在延迟性方面则能达到毫秒级。不过,需要特别指出的是,到目前为止,在 Continuous mode 下,Structured Streaming 仅支持非聚合(Aggregation)类操作,比如 map、filter、flatMap,等等。而聚合类的操作,比如“流动的 Word Count”中的分组计数,Continuous mode 暂时是不支持的,这一点难免会限制 Continuous mode 的应用范围,需要你特别注意

Flink的Kappa架构,天然对流处理友好,尤其是对于实时性的支持。因为出发点就是流计算,因此随着Flink的发展、迭代,开发API也越来越丰富,功能也越来越完善。

而Spark不同,Spark实际上是Lambda架构,天然以批处理为导向,最初的流处理,也是微批模式,也就是Micro-batch,咱们分析了,微批模式,没法保证实时性,不过对于高吞吐,倒是比较友好。尽管Spark官方推出了Continuous mode,但是目前功能、API各方面还没有那么完善,至少现在为止,Continuous mode还不支持聚合操作,仅这一点,限制就太大了,试问现在的数据处理,有多少逻辑是不需要聚合操作的呢?太少了。因此,Continuous mode,在我看来,更多的是一种尝试和探索,至于后续能否提供更多的功能与能力,还要看Spark社区对于这方面的发力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/504783.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言刷题总结

1.网购问题 (1)这道题目需要分多种情况进行考虑:双11还是双12,有无优惠券,是否会出现优惠券的面值大于购买商品的单价(这个时候直接按0元进行处理); (2)在对于优惠券的分…

Ansible-3

block任务块:可以通过block关键字,将多个任务组合到一起;将整个block任务组一起控制是否要执行。 如果test组中的主机系统发行版是Redhat则安装并启动httpd。原本这是需要两个任务安装和执行,通过block整合为一个任务执行。 rescu…

PyLMKit(9):ChatTable与你的表格聊天,表格问答

功能介绍 与你的结构化数据聊天:支持主流数据库、表格型excel等数据! ChatDB:支持数据库问答ChatTable:支持txt,excel,csv等pandas dataframe表格的问答 1.下载安装 pip install pylmkit -U pip install pandasql2.ChatTable实…

阿里云ECS服务器经济型e实例详解,CPU性能测评

阿里云服务器ECS经济型e系列是阿里云面向个人开发者、学生、小微企业,在中小型网站建设、开发测试、轻量级应用等场景推出的全新入门级云服务器,CPU处理器采用Intel Xeon Platinum架构处理器,支持1:1、1:2、1:4多种处理器内存配比&#xff0c…

STM32——超声测距HC_SR04记录

一、HC_SR04简述 HC-SR04超声波测距模块可提供 2cm-400cm的非接触式距离感测功能,测距精度可达高到 3mm;模块包括超声波发射器、接收器与控制电路。 基本工作原理: (1)采用IO 口TRIG 触发测距,给最少10us 的高电平信呈。 (2)模块…

C/C++函数字符串和数字的互相转化(比赛超实用)

字符串和数字相互转化: 1 数字转字符串: 实现方法:to_string函数 存在头文件: string 实现代码: #include<iostream> #include<string> using namespace std; int main() {int a 114514;string s to_string(a);cout << s[0] << endl;cout <<…

Spring Web MVC的入门学习(一)

目录 一、什么是 Spring Web MVC 1、MVC 定义 二、学习Spring MVC 1、项目准备 2、建立连接 2.1 RequestMapping 注解的学习 2.2 RequestMapping 使用 3、请求 3.1 传递单个参数 3.2 传递多个参数 3.3 传递对象 3.4 后端参数重命名&#xff08;后端参数映射&#xf…

【LeetCode: 331. 验证二叉树的前序序列化 + DFS】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

我的创作纪念日-一周年

&#x1f600;前言 不知不觉过了一年了很感慨也很期待未来 &#x1f3e0;个人主页&#xff1a;尘觉主页 文章目录 我的创作纪念日-一周年机缘收获(荣誉)日常成就憧憬 我的创作纪念日-一周年 机缘 在我开始在 CSDN 上发布文章之前&#xff0c;我对这个平台几乎一无所知。随着时…

[创业之路-102] :结构化思考:产学研人才联合创业公司的特点、优点与困境

目录 前言&#xff1a; 一、什么是产学研 1.1 什么是产学研 1.2 什么是产学研人才联合创业 二、产、学、研的区别、各自的特点 2.1 产业&#xff08;产&#xff09;特点 2.2 其次&#xff0c;学术&#xff08;学&#xff09;特点 2.3 科学研究&#xff08;研&#xff0…

VS Code常用前端开发插件和基础配置

VS Code插件安装 VS Code提供了非常丰富的插件功能&#xff0c;根据你的需要&#xff0c;安装对应的插件可以大大提高开发效率。 完成前端开发&#xff0c;常见插件介绍&#xff1a; 1、Chinese (Simplified) Language Pack 适用于 VS Code 的中文&#xff08;简体&#xff…

PostCSS的安装及使用 (2):使用及问题解决

PostCSS是一个CSS处理器&#xff0c;它通过插件系统可以转换CSS代码&#xff0c;使其具备更多的功能或符合特定规范。 PostCSS的使用&#xff1a; 安装PostCSS CLI和插件 # 全局安装PostCSS CLI&#xff08;可选&#xff0c;一般在项目内安装&#xff09;npm install -g postc…

Kerberos 认证 javax.security.auth.logon.LoginException:拒绝链接 (Connection refused)

kerberos 服务重启之后异常 项目中用到了hive 和hdfs &#xff0c;权限认证使用了Kerberos&#xff0c;因为机房异常&#xff0c;导致了Kerberos 服务重启&#xff0c;结果发现本来运行正常的应用服务hive 和hdfs 认证失败&#xff0c;报错信息是 典型的网络连接异常 排查思路…

鸿蒙开发之ArkUI组件常用组件图片和文本

ArkUI即方舟开发框架是HarmonyOS应用的UI开发提供了完整的基础设施&#xff0c;包括简洁的UI语法、丰富的UI功能&#xff08;组件、布局、动画以及交互事件&#xff09;&#xff0c;以及实时界面预览工具等&#xff0c;可以支持开发者进行可视化界面开发。 开发文档地址 &…

AI大模型学习在医疗领域的应用与挑战

AI大模型学习在医疗领域的应用与挑战 文章正文&#xff1a;一、AI大模型学习在医疗领域的应用二、AI大模型学习在医疗领域面临的挑战总结&#xff1a; 文章正文&#xff1a; 随着人工智能技术的飞速发展&#xff0c;AI大模型学习在各个领域都取得了显著的成果。本文将探讨AI大…

使用Sui CLI在Sui上创建和执行PTBs

Sui命令行界面&#xff08;CLI&#xff09;中的新命令允许用户直接从终端或Bash脚本创建和执行可编程交易区块&#xff08;PTB&#xff09;。这个新命令为开发人员在实现和执行PTB方面提供了更大的灵活性。 PTB为开发人员提供了一种非常强大的编程工具&#xff0c;这在其他区块…

商务口语每天学习,柯桥英语培训

今天天气好 Its a nice day today. 今天天气真好。(搭讪) Ive heard too much about you. 久仰大名。(恭维) Remember me to... 请代我向……问好。(问候) 半个句型要记牢 Its a ~(nice/good/bad) day today. Tip: 如果你想和某人搭讪而找不着好的借口时&#xff0c;就说天气…

Macs Fan Control Pro--精准掌控Mac风扇,优化散热新选择

Macs Fan Control Pro是一款专为Mac电脑设计的高级风扇控制工具。它具备强大的温度监测能力&#xff0c;可以实时监测Mac电脑各个核心组件的温度&#xff0c;并通过直观的界面展示给用户。同时&#xff0c;用户可以根据个人需求自定义风扇速度&#xff0c;或者选择预设的自动风…

【STL】vector的模拟实现

目录 前言 vector概述 vector的数据结构 vector迭代器的运用 vector的构造和析构 vector的拷贝构造与赋值 拷贝构造 传统写法 现代写法 赋值重载 vector的扩容 reserve() resize() vector的元素操作 push_back() pop_back() insert() erase() 迭代器…

[linux初阶][vim-gcc-gdb] TwoCharter: gcc编译器

目录 一.Linux中gcc编译器的下载与安装 二.使用gcc编译器来翻译 C语言程序 ①.编写C语言代码 ②翻译C语言代码 a.预处理 b.编译 c.汇编 d.链接 ③.执行Main 二进制可执行程序(.exe文件) 三.总结 一.Linux中gcc编译器的下载与安装 使用yum命令(相当于手机上的应用…