Flink Checkpoint 超时问题详解

第一种、计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP【过滤掉部分数据;增大并行度】

代表性作业为算法指标-用户偏好的计算,需要对用户在商城的曝光、点击、订单、出价、上下滑等所有事件进行比例计算,并且对各个偏好值进行比例计算,事件时间范围为近24小时。等于说每来一条数据,都需要对用户近24小时内所有的行为事件进行分类汇总,求比例,再汇总,再求比例,而QPS是1500,24小时1.5亿的累积数据,逻辑处理的算子根本无法将接收到的数据在合适的时间内计算完毕,这里还有个有趣的现象,为了提高处理性能,我将并行度翻倍,结果checkpoint的时间反而更长了,原因是Source的并行度也增加后,读取源数据的速度更快了~
从图片中可以看到source、sink的cp时间都很快,只有处理节点的‘End to End Duration’时间特别长,其他的‘Checkpoint Duration (Sync)’、‘Checkpoint Duration (Async)’时间都很短,都为几百毫秒。

那么怎么办呢?这里我也反思了自己的实现逻辑,实时计算中,flink是流引擎,正确的使用姿势应该是对每一条数据进行实时处理,而不应该对较长历史时间范围内的历史数据进行批处理,如果每条数据来还需要对历史数据重新计算计算,那么就不符合flink的定位。所以和算法同学商议后,将实现逻辑进行修改,进行批流分开计算,比如离线数据每半个小时进行一次计算,而实时计算只需要计算最近半小时内的数据即可。总之两个方法,一、减少源数据量,过滤黑名单或者非法ID;window聚合; 二、简化处理逻辑,特别是减少遍历。

第二种、数据倾斜 解决方法

代表性作业对手机的uuid(设备编号)进行keyby,结果导致subtask的state大小差异一倍,两种方法,第一,两阶段聚合;第二,重新设置并行度,改变KeyGroup的分布

在这里插入图片描述

第三种 频繁FULL GC【减少key数量;增大TM内存】

当StateSize达到200M以上,Async的时间会超过1min。
这种情况特别少见,因为RocksDb State的异步阶段做的事情主要是将本地KV数据库里的增量State写到HDFS上,如果flink配置了增量chekcPoint是不太可能出现单个作业异步处理特别慢的现象。因此猜测是由于TM出现频繁FGC,导致线程根本没有足够的时间片去处理。
结果也确实如此,jstat -gcutil pid 1s,发现每4秒一个fgc。
dump分析

jmap -dump:format=b,file=jconsole.dump PID
./ParseHeapDump.sh jconsole.dump org.eclipse.mat.api:suspects
org.eclipse.mat.api:overview org.eclipse.mat.api:top_components

还有个有趣的现象是出现FGC时,反压机制会无法生效,在‘BackPressure’界面会一片空白~
通过Dump分析,CopyOnWriteStateTable/CopyOnWriteStateMap占用绝大多数堆内存,也就是flink内部用于存储keyedState,CopyOnWriteStateTable 中保存多个 KeyGroup 的状态,每个 KeyGroup 对应一个 CopyOnWriteStateMap。
解决方法,keyby的key过多,要么减少key的数量,要么加大TM的内存。

如上,key的Selector定义中有日期,那么就导致key的数量会按天暴涨,也解释了为什么CopyOnWriteMapState对象会这么多了,因为即使KeyedProcessFunction中设置了StateTtl,State会过期,但是Key不会过期。

第四种 出现反压

还有一种情况是当一个作业出现反压时,也会导致超时,表现的形式就是 AcknowledgeTime 都无法拿到,或者 E2E 时间很长,等反压降才去就好了,如降不下去,阅读后面还有解决办法。

下面我们详细介绍一下老版本flink检查点异常的情况:

1. Checkpoint 流程简介

首先我们需要了解 Flink 中 Checkpoint 的整个流程是怎样的,在了解整个流程之后,我们才能在出问题的时候,更好的进行定位分析。

从上图我们可以知道,Flink 的 Checkpoint 包括如下几个部分:

  • JM trigger checkpoint

  • Source 收到 trigger checkpoint 的 PRC,自己开始做 snapshot,并往下游发送 barrier

  • 下游接收 barrier(需要 barrier 都到齐才会开始做 checkpoint)

  • Task 开始同步阶段 snapshot

  • Task 开始异步阶段 snapshot

  • Task snapshot 完成,汇报给 JM

上面的任何一个步骤不成功,整个 checkpoint 都会失败。

2 Checkpoint 异常情况排查

2.1 Checkpoint 失败

可以在 Checkpoint 界面看到如下图所示,下图中 Checkpoint 10423 失败了。

点击 Checkpoint 10423 的详情,我们可以看到类系下图所示的表格(下图中将 operator 名字截取掉了)。

图中我们看到三行,表示三个 operator,其中每一列的含义分别如下:

  • 其中 Acknowledged 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中我们可以知道第三个 operator 总共有 5 个 subtask,但是只有 4 个进行了 ack;

  • 第二列 Latest Acknowledgement 表示该 operator 的所有 subtask 最后 ack 的时间;

  • End to End Duration 表示整个 operator 的所有 subtask 中完成 snapshot 的最长时间;

  • State Size 表示当前 Checkpoint 的 state 大小 -- 主要这里如果是增量 checkpoint 的话,则表示增量大小;

  • Buffered During Alignment 表示在 barrier 对齐阶段积攒了多少数据,如果这个数据过大也间接表示对齐比较慢);

Checkpoint 失败大致分为两种情况:Checkpoint Decline 和 Checkpoint Expire。

2.1.1 Checkpoint Decline

我们能从 jobmanager.log 中看到类似下面的日志 Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. 其中 10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id,85d268e6fbc19411185f7e4868a44178 是 job id,我们可以在 中查找 execution id,找到被调度到哪个 taskmanager 上,类似如下所示:

2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph -XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.
2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph -Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

 

从上面的日志我们知道该 execution 被调度到 hostnameABCDE 的 slot 上,接下来我们就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因了。

另外对于 Checkpoint Decline 的情况,有一种情况我们在这里单独抽取出来进行介绍:Checkpoint Cancel。

当前 Flink 中如果较小的 Checkpoint 还没有对齐的情况下,收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消掉。我们可以看到类似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

这个日志表示,当前 Checkpoint 19 还在对齐阶段,我们收到了 Checkpoint 20 的 barrier。然后会逐级通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。

在下游 task 收到被 cancelBarrier 的时候,会打印类似如下的日志:

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

或者

DEBUG
19 canceled, skipping alignment.

或者

WARN
: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

上面三种日志都表示当前 task 接收到上游发送过来的 barrierCancel 消息,从而取消了对应的 Checkpoint。

2.1.2 Checkpoint Expire

如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。当一个 Checkpoint 由于超时而失败是,会在 中看到如下的日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178 expired before completing.

表示 Chekpoint 1 由于超时而失败,这个时候可以可以看这个日志后面是否有类似下面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job85d268e6fbc19411185f7e4868a44178.

我们按照下面的日志把 TM 端的 snapshot 分为三个阶段,开始做 snapshot 前,同步阶段,异步阶段:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

这个日志表示 TM 端 barrier 对齐后,准备开始做 Checkpoint。

DEBUG
2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot(FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx
_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.

上面的日志表示当前这个 backend 的同步阶段完成,共使用了 0 ms。

DEBUG
snapshot ({fileSystem=org.apache.flink.core.fs.@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, =xxxxx, metadataFilePath=xxxxxx, reference=(default), =1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms

上面的日志表示异步阶段完成,异步阶段使用了 369 ms

在现有的日志情况下,我们通过上面三个日志,定位 snapshot 是开始晚,同步阶段做的慢,还是异步阶段做的慢。然后再按照情况继续进一步排查问题。

2.2 Checkpoint 慢

在 2.1 节中,我们介绍了 Checkpoint 失败的排查思路,本节会分情况介绍 Checkpoint 慢的情况。

Checkpoint 慢的情况如下:比如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟(我们希望 1 分钟左右就能够做完),而且我们预期 state size 不是非常大。

对于 Checkpoint 慢的情况,我们可以按照下面的顺序逐一检查。

2.2.0 Source Trigger Checkpoint 慢

这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考[1])。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。

2.2.1 使用增量 Checkpoint

现在 Flink 中 Checkpoint 有两种模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。

现在 Flink 中仅在 RocksDBStateBackend 中支持增量 Checkpoint,如果你已经使用 RocksDBStateBackend,可以通过开启增量 Checkpoint 来加速。

不熟悉的RocksDBStateBackend相关的可以参考官方文档:Stateful Stream Processing | Apache Flink

2.2.2 作业存在反压或者数据倾斜

我们知道 task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间,这两个可以通过如下的页面进行检查:

上图中我们选择了一个 task,查看所有 subtask 的反压情况,发现都是 high,表示反压情况严重,这种情况下会导致下游接收 barrier 比较晚。

上图中我们选择其中一个 operator,点击所有的 subtask,然后按照 Records Received/Bytes Received/TPS 从大到小进行排序,能看到前面几个 subtask 会比其他的 subtask 要处理的数据多。

如果存在反压或者数据倾斜的情况,我们需要首先解决反压或者数据倾斜问题之后,再查看 Checkpoint 的时间是否符合预期。

2.2.2 Barrier 对齐慢

从前面我们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot。

barrier 对齐之后会有如下日志打印:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果 中没有这个日志,则表示 barrier 一直没有对齐,接下来我们需要了解哪些上游的 barrier 没有发送下来,如果你使用 At Least Once 的话,可以观察下面的日志:

DEBUG
Received barrier for checkpoint 96508 from channel 5

表示该 task 收到了 channel 5 来的 barrier,然后看对应 Checkpoint,再查看还剩哪些上游的 barrier 没有接受到,对于 ExactlyOnce 暂时没有类似的日志,可以考虑自己添加,或者 jmap 查看。

2.2.3 主线程太忙,导致没机会做 snapshot

在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,在这一步我们需要能够查看某个 PID 对应 hotmethod,这里推荐两个方法:

  1. 多次连续 jstack,查看一直处于 RUNNABLE 状态的线程有哪些;

  2. 使用工具 AsyncProfile dump 一份火焰图,查看占用 CPU 最多的栈;

如果有其他更方便的方法当然更好,也欢迎推荐。

2.2.4 同步阶段做的慢

同步阶段一般不会太慢,但是如果我们通过日志发现同步阶段比较慢的话,对于非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具。对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少。

RocksDB 开始 snapshot 的日志如下:

2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

snapshot 结束的日志如下:

2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

2.2.6 异步阶段做的慢

对于异步阶段来说,tm 端主要将 state 备份到持久化存储上,对于非 RocksDBBackend 来说,主要瓶颈来自于网络,这个阶段可以考虑观察网络的 metric,或者对应机器上能够观察到网络流量的情况(比如 iftop)。

对于 RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能。另外对于 RocksDBBackend 来说,如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试考虑开启多线程上传功能[3]。

3 总结

在第二部分内容中,我们介绍了官方编译的包的情况下排查一些 Checkpoint 异常情况的主要场景,以及相应的排查方法,如果排查了上面所有的情况,还是没有发现瓶颈所在,则可以考虑添加更详细的日志,逐步将范围缩小,然后最终定位原因。

如果还要继续深入该内容,可参考文章:读Flink源码谈设计:Exactly Once - 知乎 (zhihu.com)

或自行阅读flink源码; 

还有一种常见的解决超时的思路,可以参考:Flink Checkpoint超时问题_execution.checkpointing.tolerable-failed-checkpoin-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/356274.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

监听项目中指定属性数据,点击或模块显示时

当项目中,需要获取某个页面上、某个标签上、有指定自定义属性时,需要在点击该元素时进行公共逻辑处理,或该元素在显示的时候进行逻辑处理,这时可以定义一个公共的方法,在每个页面引用,并写入数据即可 &…

SOME/IP SD 协议介绍(一)

概述 服务发现用于定位服务实例并检测服务实例是否正在运行。在车载网络中,服务实例的位置通常是已知的;因此,服务实例的状态是首要关注的。服务的位置(即IP地址、传输协议和端口号)是次要关注的内容。 术语和定义 S…

防御保护--防火墙的可靠性

目录 前提: VGMP 接口故障切换场景 状态切换备份的过程 HRP 第一种备份方式 --- 自动备份 第二种备份方式 --- 手工备份 第三种备份方式 --- 快速备份 各备份场景过程分析 1,主备形成场景 2,主备模式下,接口故障切…

防火墙用户认证、NAT、策略路由、DNS透明代理以及双机热备笔记

用户认证 防火墙管理员登录认证 --- 检验身份的合法性,划分身份权限 用户认证 --- 上网行为管理的一部分 用户,行为,流量 --- 上网行为管理三要素 用户认证的分类 上网用户认证 --- 三层认证 --- 所有的跨网段的通信都可以属于上网行为。…

redis-主从复制

1.主从复制 1.1简介 主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主 1.2作用 1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。 2、故…

群辉开启WebDav服务+cpolar内网穿透实现移动端ES文件浏览器远程访问本地NAS文件

文章目录 1. 安装启用WebDAV2. 安装cpolar3. 配置公网访问地址4. 公网测试连接5. 固定连接公网地址6. 使用固定地址测试连接 本文主要介绍如何在群辉中开启WebDav服务,并结合cpolar内网穿透工具生成的公网地址,通过移动客户端ES文件浏览器即可实现移动设…

如何搭建开源笔记Joplin服务并实现远程访问本地数据

文章目录 1. 安装Docker2. 自建Joplin服务器3. 搭建Joplin Sever4. 安装cpolar内网穿透5. 创建远程连接的固定公网地址 Joplin 是一个开源的笔记工具,拥有 Windows/macOS/Linux/iOS/Android/Terminal 版本的客户端。多端同步功能是笔记工具最重要的功能,…

API:低代码平台的强大秘诀与无限可能

应用编程接口 (API) 是应用程序以可编程格式访问其关键能力和功能的一种方式,从而其他应用程序可以利用它们。API 本质上支持应用程序之间的无缝数据流,使开发人员能够在应用程序中添加更多功能,而无需依赖大量编码。 举一个简单的例子。 您…

阿里云如何找回域名,进行添加或删除?

权威域名管理介绍说明,包含添加域名、删除域名、找回域名、域名分组等操作介绍。 一、添加域名 非阿里云注册域名或子域名如需使用云解析DNS,需要通过添加域名功能,将主域名或子域名添加到云解析控制台,才可以启用域名解析服务。…

基于springboot+vue的医院管理系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 研究背景…

如何发现帕鲁私服漏洞

白天当帕鲁、晚上抓帕鲁 相信所有的帕鲁玩家都不希望辛辛苦苦肝了几百小时抓的帕鲁因为网络入侵消失,除了抵御游戏内的强盗入侵,还要抵御现实世界的网络入侵,原本单纯的帕鲁变的复杂无比。 服务器弱口令、服务漏洞、未授权访问等入侵手段&a…

计算机网络——网络层(2)

计算机网络——网络层(2) 小程一言专栏链接: [link](http://t.csdnimg.cn/ZUTXU) 网络层——控制平面概述路由选择转发表路由协议路由信息的交换小结 路由选择算法常见的路由选择算法距离矢量路由算法工作原理优缺点分析 链路状态路由算法基本工作原理优…

D35XB100-ASEMI整流桥D35XB100参数、封装、规格

编辑:ll D35XB100-ASEMI整流桥D35XB100参数、封装、规格 型号:D35XB100 品牌:ASEMI 正向电流(Id):35A 反向耐压(VRRM):1000V 正向浪涌电流:550A 正向…

JavaScript定义变量及赋值

定义变量及赋值 ☞ 定义变量,未赋值var 变量名; 默认值是undefined ☞ 定义变量,且赋值var 变量名 数据;☞ 总结:1. 一个变量一次只能保存一个值;2. 以最后一次赋值为准3. JS变量区分大小写变量命名规范 ☞ 规则 必须遵守的,不遵守的话 JS引擎 发…

PC电脑端的小程序顶部自定义标题失效的原因

windows客户端不被支持:navigationStyle:custom!! navigationStylestringdefault导航栏样式,仅支持以下值: default 默认样式 custom 自定义导航栏,只保留右上角胶囊按钮。iOS/Android 微信客户端 7.0.0,Windows 微信客户端不支…

一文读懂Python中的映射

python中的反射功能是由以下四个内置函数提供:hasattr、getattr、setattr、delattr,改四个函数分别用于对对象内部执行:检查是否含有某成员、获取成员、设置成员、删除成员。 获取成员: getattr class Foo:def __init__(self, name, age):se…

c语言实战之贪吃蛇

文章目录 前言效果展示游戏用到的图片游戏思路一览游戏前准备一、贪吃蛇、食物、障碍物节点坐标的结构体二、枚举游戏状态、和贪吃蛇的方向三、维护运行的结构体 游戏开始前的初始化一、学习图形库相关知识二、设置背景三、欢迎界面四、初始化贪吃蛇五、生成障碍物六、生成食物…

【Uni-App】Vue3如何使用pinia状态管理库与持久化

安装插件 pinia-plugin-unistorage 引入 // main.js import { createSSRApp } from "vue"; import * as Pinia from "pinia"; import { createUnistorage } from "pinia-plugin-unistorage";export function createApp() {const app create…

Backtrader 文档学习-Order StopTrail(Limit)

Backtrader 文档学习-Order StopTrail(Limit) 1.概述 版本1.9.36.116之后支持[StopTrail, StopTrailLimit and OCO]的订单类型,并支持broker的实时交互 。 StopTrail订单,它是一种追踪止损订单。当市场价格朝定义的交易方向移动时,StopTrai…

国考省考行测:分析推理,形式逻辑,所有有的分析

国考省考行测: 2022找工作是学历、能力和运气的超强结合体! 公务员特招重点就是专业技能,附带行测和申论,而常规国考省考最重要的还是申论和行测,所以大家认真准备吧,我讲一起屡屡申论和行测的重要知识点 遇到寒冬&am…