flink反压

flink反压(backpressure),简单来说就是当接收方的接收速率低于发送方的发送速率,这时如果不做处理就会导致接收方的数据积压越来越多直到内存溢出,所以此时需要一个机制来根据接收方的状态反过来限制发送方的发送速率,来达到一个两者速率匹配的状态。

TCP-based 反压

在flink1.5之前的版本,使用的是TCP-based 的反压机制,不过这种机制缺点明显。

(1)在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。(2)依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。

所以这里不再介绍TCP-based 的反压机制。

在flink1.5以后引入了Credit-based 反压,可以理解为就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。

Credit-based 反压

在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback 。Credit-base 的 feedback 步骤:(1)每一次 ResultPartition 向 InputGate 发送数据的时候,都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息。(backlog 的作用是为了让消费端感知到我们生产端的情况)(2)如果下游有充足的 Buffer ,就会返还给上游 Credit (表示剩余 buffer 数量),告知发送消息(图上两个虚线是还是采用 Netty 和 Socket 进行通信)。下面用图来解释:

如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。

假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。

过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。

总结:

1. 网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载。

2. 网络流控有静态限速和动态反压两种手段

3. Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压

4. Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制

是否有了动态反压,静态限速就没用了?不是的。实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。

反压影响

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。

反压会影响checkpoint
(1)checkpoint时长:checkpoint barrier跟随普通数据流动,如果数据处理被阻塞,使得checkpoint barrier流经整个数据管道的时长变长,导致checkpoint 总体时间变长。
(2)state大小:为保证Exactly-Once准确一次,对于有两个以上输入管道的 Operator,checkpoint barrier需要对齐,即接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面,导致checkpoint变大。
checkpoint是保证准确一次的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小可能拖慢checkpoint甚至导致OOM。

反压监控

1. flink web ui自带反压监控

该页面提供了 SubTask 级别的反压监控,1.13 版本以前是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采 样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。黑色表示反压严重,红色表示非常繁忙,蓝色表示比较空闲。Backpressure Status值有OK/LOW/HIGH。

OK: 0% <= back pressured <= 10%
LOW: 10% < back pressured <= 50%
HIGH: 50% < back pressured <= 100%

假如通过web ui 查看到某个算子处于反压状态,可以分析该算子瓶颈:

如果处于反压状态,那么有两种可能性:
(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。
(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况, 需要继续排查下游节点,一直找到第一个为 OK 的一般就是根源节点。 总体来看,如果我们找到第一个出现反压的节点,反压根源要么是就这个节点,要么是 它紧接着的下游节点。 通常来讲,第二种情况更常见。如果无法确定,还需要结合 Metrics 进一步判断。

2. 利用 Metrics 定位

监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个 Metrics:

inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

(1)采用 Metrics 分析反压的思路

如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它 被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。

(2)将inPoolUsage分为floatingBuffersUsage + exclusiveBuffersUsage进一步分析

Flink 1.9及以上版本,还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以 及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游 Subtask 的数据传输。
在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。

解析:
1)floatingBuffersUsage 为高则表明反压正在传导至上游。
2)exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。

如何分析反压

(1)数据倾斜
通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。
(2)用户代码的执行效率
对 TaskManager 进行 CPU profile,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动。
(3)TaskManager 的内存以及 GC
TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/401469.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

精英ECS Z97-MACHINE V1.0 BIOS MX25L6406E

官网上的两个BIOS我都无法亮机&#xff0c;这是我保存出来的BIOS&#xff0c;不知道是否能使用五代的处理器 官网&#xff1a;Z97-MACHINE&#xff5c;Motherboard&#xff5c;产品&#xff5c;ECS 精英电脑 国外老哥的看法&#xff1a;ECS Z97-MACHINE Closer Look: The BIO…

手拉手助成长社会融合实践活动之新春送温暖

2月21日上午来自合肥市第四十五中学橡树湾校区七(15)中队、共青团合肥市第六中学2023级36班委员会的40多名同学&#xff0c;带来了龙年的礼物看望陪伴合肥市庐阳区为民社会工作服务中心幸福小院的兄弟姐妹。 大家详细地了解了幸福小院孩子们学习、康复和社会实践及能力提升情况…

CSS列表学习2

之前学习了列表&#xff1b;继续熟悉&#xff1b; <!DOCTYPE html> <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/><title></title><meta charset"utf-8" /><…

通用性技术底座AI大模型与各行业专用性AI小模型搭建(第二篇)

五、小模型架构选择问题 在选择行业专用AI小模型的架构时&#xff0c;需要考虑以下几个关键因素&#xff1a; 1. **任务类型**&#xff1a; - 不同的任务类型&#xff08;如分类、回归、序列生成、图像识别等&#xff09;对应着不同的模型结构。例如&#xff0c;文本分类问…

Ansible cron模块 适用于管理计划任务 设置多个计划任务

目录 选项添加一个计划任务检查是否添加成功删除计划任务检查是否执行成功 选项 其使用的语法跟我们的crontab文件中的语法一致&#xff0c;同时&#xff0c;可以指定以下选项&#xff1a; day #日应该运行的工作( 1-31, , /2, ) hour # 小时 ( 0-23, , /2, ) minute #分钟( 0…

Leetcode 26-30题

删除有序数组中的重复项 给定一个有序数组&#xff0c;要求原地删除重复出现的元素&#xff0c;返回删除后的数组的长度。 这里的原地删除其实可以这样表示&#xff0c;用双指针从前往后扫一遍&#xff0c;遇到新的没出现过的元素就放到前面去&#xff0c;就可以实现删除后的数…

【杭州游戏业:创业热土,政策先行】

在前面的文章中&#xff0c;我们探讨了上海、北京、广州、深圳等城市的游戏产业现状。现在&#xff0c;我们切换视角&#xff0c;来看看另一个游戏创业热土——杭州的发展情况 最近第19届亚运会在杭州举办&#xff0c;本次亚运会上&#xff0c;电子竞技首次获准列为正式比赛项…

了解 Kubernetes

1 Kubernetes概述 1.1 k8s是什么 K8S 的全称为 Kubernetes (K12345678S)&#xff0c;PS&#xff1a;“嘛&#xff0c;写全称也太累了吧&#xff0c;不如整个缩写” 作用&#xff1a; 用于自动部署、扩展和管理“容器化&#xff08;containerized&#xff09;应用程序”的开…

Manacher算法和扩展kmp

Manacher算法 a情况 b情况 具体例子 c情况 总结 代码 #include<iostream> #include<algorithm> #include<string> #include<cmath>using namespace std; const int N 1.1e7 1; char ss[N << 1]; int p[N << 1]; int n; void manacherss…

vue中使用AraleQRCode生成二维码

vue中使用AraleQRCode生成二维码 问题背景 本文介绍vue中生成二维码的一种方案&#xff0c;使用AraleQRCode来实现。 问题分析 &#xff08;1&#xff09;安装对应的依赖包 npm i arale-qrcode --save &#xff08;2&#xff09;完整代码如下: <template><!-…

深入解析SDRAM:从工作原理到实际应用

深入解析SDRAM&#xff1a;从工作原理到实际应用 在众多内存技术中&#xff0c;同步动态随机访问存储器&#xff08;SDRAM&#xff09;因其出色的性能和广泛的应用而备受关注。本文将从SDRAM的工作原理入手&#xff0c;探讨其性能优化策略和在现代电子设备中的应用。 SDRAM工作…

多系统集成分析——WMS系统与PLM、ERP、MES、智库、WCS、AGV、OA系统的关联

多系统集成分析——WMS系统与PLM、ERP、MES、智库、WCS、AGV、OA系统的关联 原创 西游暖暖 白话聊IT 2024-02-19 00:06 天津 首先分享一个已上线的智能工厂架构图&#xff1a;智能制造全场景下&#xff0c;将WMS定位于不仅是仓储执行管理系统&#xff0c;更作为连接全方案的“…

企业数据资产入表路径及方法完整解析

源自&#xff1a;架构老人 “人工智能技术与咨询” 发布 01政策、背景、趋势 一、数字资源如表国家政策 为规范企业数据资源相关会计处理&#xff0c;强化相关会计信息披露&#xff0c;财政部制定印发了《企业数据资源相关会计处理暂行规定》&#xff08;以下简称《暂行规…

JVM工作原理与实战(三十九):G1垃圾回收器原理

专栏导航 JVM工作原理与实战 RabbitMQ入门指南 从零开始了解大数据 目录 专栏导航 前言 一、G1垃圾回收器 1.G1垃圾回收器执行流程 二、年轻代回收 1.年轻代回收原理 2.卡表(Card Table) 3.记忆集的生成流程 4.年轻代回收的详细步骤 5.G1年轻代回收核心技术总结 三、…

超声波清洗机应该怎么选?这几款超声波清洗机错后悔!

科技让我们的生活变得方便了许多&#xff0c;比如&#xff0c;自从有了超声波清洗机之后&#xff0c;有些人就改变了眼镜必须要手洗的想法&#xff0c;许多研究也证明&#xff0c;单靠手洗是无法眼镜内缝隙中的污渍彻底清洗干净的&#xff0c;一台专门的超声波清洗机就可以减轻…

JavaSprintBoot中一些运维方面的知识

1.配置文件四级分类 例如以下yml配置文件&#xff0c;权限一共有四级&#xff0c;高等级覆盖低等级并叠加&#xff08;权限向下兼容&#xff09; 2.自定义配置文件 可以自定义配置文件的名称&#xff0c;因为实际开发环境中可能不会就简单的叫做application.yml之类的&#x…

48 slab 的实现

前言 这里说的是 内核中分配小对象的一种内存分配方式 slab 呵呵 经典程度不必多说了, 内核使用的大多数数据结构 基本上是基于 slab 进行内存分配的 这里 我们来看一下 slab 如何分配对象 几个分配层级, c->free_list, c->page, c->partial, new_slab 1. 先…

C#上位机与三菱PLC的通信08---开发自己的通讯库(A-1E版)

1、A-1E报文回顾 具体细节请看&#xff1a; C#上位机与三菱PLC的通信03--MC协议之A-1E报文解析 C#上位机与三菱PLC的通信04--MC协议之A-1E报文测试 2、为何要开发自己的通讯库 前面使用了第3方的通讯库实现了与三菱PLC的通讯&#xff0c;实现了数据的读写&#xff0c;对于通…

Maven私服搭建Nexus3

第一部分&#xff1a;仓库部署 下载地址&#xff1a;https://help.sonatype.com/en/download.html 备用下载链接&#xff0c;部分已经失效了 解压后会有两个文件夹&#xff1a; nexus-3.20.1-01 sonatype-work 访问地址配置路径 \nexus-3.20.1-01\bin\nexus.vmoptions -Xms1…

Vue思维导图,复习+预习,其中有些已经弃用了,下期总结下

1、学前了解 2、基础知识 3、组件相关语法 4、高级语法 5、compositionAPI 6、配套工具