Flink系列之:背压下的检查点

Flink系列之:背压下的检查点

  • 一、Checkpointing under backpressure
  • 二、缓冲区 Debloating
  • 三、非对齐 Checkpoints
  • 四、对齐 Checkpoint 的超时
  • 五、限制
  • 六、故障排除

一、Checkpointing under backpressure

通常情况下,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响。 然而,当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间。这在 checkpointing process) 的概述中有说明原因。并且可以通过高 alignment time and start delay metrics 观察到。 当这种情况发生并成为一个问题时,有三种方法可以解决这个问题:

  • 消除背压源头,通过优化 Flink 作业,通过调整 Flink 或 JVM 参数,抑或是通过扩容。
  • 减少 Flink 作业中缓冲在 In-flight 数据的数据量。
  • 启用非对齐 Checkpoints。 这些选项并不是互斥的,可以组合在一起。本文档重点介绍后两个选项。

二、缓冲区 Debloating

Flink 1.14 引入了一个新的工具,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量。缓冲区 Debloating 机 制可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。

此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 最明显。 当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,额外的好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。

有关缓冲区 Debloating 功能如何工作以及如何配置的更多信息,可以参考 network memory tuning guide。 请注意,您仍然可以继续使用在前面调优指南中介绍过的方式来手动减少缓冲在 In-flight 数据的数据量。

三、非对齐 Checkpoints

从Flink 1.11开始,Checkpoint 可以是非对齐的。 Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区。因此, Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。

如果您的 Checkpointing 由于背压导致周期非常的长,您应该使用非对齐 Checkpoint。这样,Checkpointing 时间基本上就与 端到端延迟无关。请注意,非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真 正的瓶颈时,您不应当使用非对齐 Checkpointing。

为了启用非对齐 Checkpoint,您可以:

Java代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用非对齐 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 启用非对齐 Checkpoint
env.getCheckpointConfig.enableUnalignedCheckpoints()

Python代码:

env = StreamExecutionEnvironment.get_execution_environment()

# 启用非对齐 Checkpoint
env.get_checkpoint_config().enable_unaligned_checkpoints()

或者在 flink-conf.yml 配置文件中增加配置:

execution.checkpointing.unaligned: true

四、对齐 Checkpoint 的超时

在启用非对齐 Checkpoint 后,你依然可以通过编程的方式指定对齐 Checkpoint 的超时:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

或是在 flink-conf.yml 配置文件中配置:

execution.checkpointing.aligned-checkpoint-timeout: 30 s

在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。

五、限制

并发 Checkpoint

Flink 当前并不支持并发的非对齐 Checkpoint。然而,由于更可预测的和更短的 Checkpointing 时长,可能也根本就不需要并发的 Checkpoint。此外,Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费稍长的时间。

与 Watermark 的相互影响

非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证。目前,Flink 确保了 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark。如果您的 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于 使用对齐 Checkpoint产生不同的结果。如果您的 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中。在这种情况下,Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。

与长时间运行的记录处理的相互作用

尽管未对齐的检查点障碍仍然能够超越队列中的所有其他记录。如果当前记录需要花费大量时间来处理,则此屏障的处理仍然可能会被延迟。当同时触发多个计时器时(例如在窗口操作中),可能会发生这种情况。当系统在处理单个输入记录时被阻塞等待多个网络缓冲区可用性时,可能会出现第二种有问题的情况。 Flink 无法中断单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理。这可能会在两种情况下导致问题。由于不适合单个网络缓冲区的大记录的序列化或在 flatMap 操作中,会为一个输入记录生成许多输出记录。在这种情况下,背压可能会阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区都可用。当处理单个记录需要一段时间时,它也可能发生在任何其他情况下。因此,检查点的时间可能会比预期的时间长,或者可能会有所不同。

某些数据分布模式没有检查点

有些属性包含的连接无法与 Channel 中的数据一样保存在 Checkpoint 中。为了保留这些功能并确保没有状态冲突或非预期的行为,非同一 Checkpoint 对于这些类型的连接是禁用的。所有其他的交换仍然执行非单色检查点。

点对点连接

我们目前没有任何对于点对点连接中有关数据有序性的强保证。然而,由于数据已经被以前置的 Source 或是 KeyBy 相同的方式隐式 组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。

只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性。但是如果加上UC的伸缩容,这些特性将会被改变。

针对如下任务

在这里插入图片描述
如果我们想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据突然的划分到3个 Channel 中去。这 很容易做到,通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法(不管实际使用的是什么方法)。对于 Forward 的 Channel,我们根本没有 KeyContext。Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key仍然存在。

广播 Connections

广播 Connection 带来了另一个问题。无法保证所有 Channel 中的记录都以相同的速率被消费。这可能导致某些 Task 已经应用了与 特定广播事件对应的状态变更,而其他任务则没有,如图所示。

在这里插入图片描述
广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同。Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。在恢复时,我们将该份副本发往所有的 Operator。因此,可能会发生以下情况:某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应有于记录来获得状态。

六、故障排除

Corrupted in-flight data

以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。

为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略。除非已经持久化的 In-flight 数据内部的损坏导致无 法恢复的情况,否则不要设置该属性。只有在重新部署作业后该属性才会生效,这就意味着只有启用 externalized checkpoint时,此操作才有意义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/262305.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用Pycharm一键将.ui文件生成.py文件配置教程、一键打开QTDesigner教程

2df3621a-7ffd-4f18-9735-b86464b83a5b 前言 我痛恨所有将白嫖归为理所应当的猪🐖。 教程 打开pycharm之后,依次点击File->Settings->Tools->External Tools,进入如下界面: 1、配置快捷打开Qt Designer 点击号&…

基于深度学习的森林火焰烟雾检测系统(含UI界面,yolov8、Python代码,数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下: 算法模型:     yolov8 yolov8主要包含以下几种创新:         1. 添加注意力机制(SE、CBAM等)         2. 修改可变形卷积(DySnake-主干c…

gem5 RubyPort: mem_request_port作用与连接 simple-MI_example.py

简介 回答这个问题:RubyPort的口下,一共定义了六个口,分别是mem_request_port,mem_response_port,pio_request_port,pio_response_port,in_ports, interrupt_out_ports,他们分别有什…

YOLOv8改进 | 主干篇 | 利用MobileNetV2替换Backbone(轻量化网络结构)

一、本文介绍 本文给大家带来的改进机制是MobileNetV2,其是专为移动和嵌入式视觉应用设计的轻量化网络结构。其在MobilNetV1的基础上采用反转残差结构和线性瓶颈层。这种结构通过轻量级的深度卷积和线性卷积过滤特征,同时去除狭窄层中的非线性&#xff…

Circulation:室性早搏会增加不良心血管事件|UK Biobank周报(12.14)

欢迎报名2023年郑老师团队课程! 郑老师科研统计培训,包括临床数据、公共数据分析课程等,欢迎报名 英国生物银行(UK Biobank,UKB)是英国迄今以来规模最大的有关致病或预防疾病的基因和环境因子的信息资源库。…

【案例】图片预览

效果图 如何让图片放大,大多数的UI组件都带有这种功能,今天给大家介绍的这个插件除了放大之外,还可以旋转、移动、翻转、旋转、二次放大(全屏) 实现 npm i v-viewer -Smain.js 中引入 import viewerjs/dist/viewer.c…

java并发编程六 共享模型之内存

文章目录 Java 内存模型可见性解决方法 有序性解决方法 Java 内存模型 JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。 JMM 体现在以下几个方面 原子性 - 保证指令不会受到线程上…

前端ICON库

前端ICON库 1.mingcute mingcute 2.lordicon lordicon 3.字节iconpark(推荐) 字节iconpark 4.iconbuddy iconbuddy.app/ 5.商标寻找youicons 免费下载数百万个徽标以获得设计灵感 | YouIcons.com 还有一堆工具

黑盒测试中的完整性测试:确保系统的功能完整性

在软件开发过程中,为了保证系统的质量和可靠性,测试是一个不可或缺的环节。而黑盒测试作为常用的测试方法之一,以用户的角度出发,测试系统在不知道内部工作原理的情况下,对输入数据的处理和输出结果的正确性进行验证。…

如何直接使用别人的Python项目的虚拟环境

Cannot set up a python SDK at Python 3.10 (flaskTest) (2) (H:\WorkPlace\PyWorkPlace\flaskTest\flaskTest\venv\Scripts\python.exe). The SDK seems invalid 如何复制别人的虚拟环境 修改步骤 1. 修改pyvenv.cfg文件里的home和version 2. Scripts\activate以及Scripts\a…

助力工业产品质检,基于YOLOv8开发构建智能PCB电路板质检分析系统

AI助力工业质检智能生产制造已经有很多成功的实践应用了,在我们前面的系列博文中也有很多对应的实践,感兴趣的话可以自行移步阅读前面的博文即可: 《助力质量生产,基于目标检测模型MobileNetV2-YOLOv3-Lite实现PCB电路板缺陷检测…

【算法】算法题-20231221

这里写目录标题 一、830. 较大分组的位置二、657. 机器人能否返回原点三、771. 宝石与石头 一、830. 较大分组的位置 在一个由小写字母构成的字符串 s 中,包含由一些连续的相同字符所构成的分组。 例如,在字符串 s "abbxxxxzyy"中&#xff0…

九、W5100S/W5500+RP2040之MicroPython开发<HTTPOneNET示例>

文章目录 1. 前言2. 平台操作流程2.1 创建设备2.2 创建数据流模板 3. WIZnet以太网芯片4. 示例讲解以及使用4.1 程序流程图4.2 测试准备4.3 连接方式4.4 相关代码4.5 烧录验证 5. 注意事项6. 相关链接 1. 前言 在这个智能硬件和物联网时代,MicroPython和树莓派PICO正…

依托亚马逊云科技构建韧性应用

背景 现代业务系统受到越来越多的韧性相关的挑战,特别是客户要求他们的业务系统 724 不间断的运行。因此,韧性对于云的基础设施和应用系统有着至关重要的作用。 亚马逊云科技把韧性视为一项最基本的工作,为了让我们的业务系统能持续优雅地提供…

LLM之RAG实战(七)| 使用llama_index实现多模态RAG

一、多模态RAG OpenAI开发日上最令人兴奋的发布之一是GPT-4V API(https://platform.openai.com/docs/guides/vision)的发布。GPT-4V是一个多模态模型,可以接收文本/图像,并可以输出文本响应。最近还有一些其他的多模态模型&#x…

【大数据存储与处理】实验二 HBase 过滤器操作

实验二 HBase 过滤器操作 【实验目的】: 1.掌握使用 HBase 过滤器进行全表扫描。 【实验内容与要求】: 在 HBase 中,Get 和 Scan 操作都可以使用过滤器来设置输出的范围,类似于 SQL 里面 的 Where 查询条件。使用 show_filte…

中国自动驾驶行业:迈向无限可能

中国自动驾驶行业正在经历蓬勃发展,取得了令人瞩目的成果。这一行业在技术创新、政策支持和市场需求等方面展现出巨大潜力。本文将从技术创新、产业生态和前景发展等角度,探讨中国自动驾驶行业的现状和未来前景。 中国自动驾驶行业正处于一个令人瞩目的快…

Codeforces Round 638 (Div. 2)B. Phoenix and Beauty(思维构造)

B. Phoenix and Beauty 这道题目学到的东西: 从给出的数据范围观察,得到一些有用信息(峰哥教的)考虑无解的情况‘ 其实这题考虑怎么操作是比较难的,如果能想出来满足条件的结果就比较好了(我在说什么我自…

ASP.NET Core基础之定时任务(二)-Quartz.NET入门

阅读本文你的收获 了解任务调度框架QuartZ.NET的核心构成学会在ASP.NET Core 中使用QuartZ.NET 在项目的开发过程中,难免会遇见需要后台处理的任务,例如定时发送邮件通知、后台处理耗时的数据处理等,上次分享了ASP.NET Core中实现定时任务的…

vitepress项目使用github的action自动部署到github-pages中,理论上可以通用所有

使用github的action自动部署到github-pages中 创建部署的deploy.yml文件,在项目的根目录下面 .github\workflows\deploy.yml 完整的代码:使用的是pnpm进行依赖安装。 name: 部署VitePresson:push:branches:- docs # 这段是在推送到 docs 分支时触发该…