Flink checkpoint 源码分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
		return timer.scheduleAtFixedRate(
			new ScheduledTrigger(),
			initDelay, baseInterval, TimeUnit.MILLISECONDS);
	}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(
			CheckpointMetaData metadata,
			CheckpointOptions options,
			CheckpointMetricsBuilder metrics,
			OperatorChain<?, ?> operatorChain,
			Supplier<Boolean> isCanceled) throws Exception {

		checkNotNull(options);
		checkNotNull(metrics);

		// All of the following steps happen as an atomic step from the perspective of barriers and
		// records/watermarks/timers/callbacks.
		// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
		// checkpoint alignments

		if (lastCheckpointId >= metadata.getCheckpointId()) {
			LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
			channelStateWriter.abort(
				metadata.getCheckpointId(),
				new CancellationException("checkpoint aborted via notification"),
				true);
			checkAndClearAbortedStatus(metadata.getCheckpointId());
			return;
		}

		// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
		lastCheckpointId = metadata.getCheckpointId();
		if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
			// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
			operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
			LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
			return;
		}

        // if checkpoint has been previously unaligned, but was forced to be aligned (pointwise
        // connection), revert it here so that it can jump over output data
        if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            options = options.withUnalignedSupported();
            initInputsCheckpoint(metadata.getCheckpointId(), options);
        }

		// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
		//           The pre-barrier work should be nothing or minimal in the common case.
		operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

		// Step (2): Send the checkpoint barrier downstream
        LOG.debug(
                "Task {} broadcastEvent at {}, triggerTime {}, passed time {}",
                taskName,
                System.currentTimeMillis(),
                metadata.getTimestamp(),
                System.currentTimeMillis() - metadata.getTimestamp());
        CheckpointBarrier checkpointBarrier =
                new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
        operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

        // Step (3): Register alignment timer to timeout aligned barrier to unaligned barrier
        registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);

        // Step (4): Prepare to spill the in-flight buffers for input and output
        if (options.needsChannelState()) {
			// output data already written while broadcasting event
			channelStateWriter.finishOutput(metadata.getCheckpointId());
		}

        // Step (5): Take the state snapshot. This should be largely asynchronous, to not impact
        // progress of the
		// streaming topology

		Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
		try {
			if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
				finishAndReportAsync(snapshotFutures, metadata, metrics, options);
			} else {
				cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
			}
		} catch (Exception ex) {
			cleanup(snapshotFutures, metadata, metrics, ex);
			throw ex;
		}
	}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/584193.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mysql-sql-练习题-4-标记

标记 连续登录2-7天用户建表排名找规律 最大连胜次数建表只输出连胜结果输出所有连续结果 连续登录2-7天用户 建表 create table continuous_login(user_id1 integer comment 用户id,date_login date comment 登陆日期 ) comment 用户登录表;insert into continuous_login val…

这是一个简单网站,后续还会更新

1、首页效果图 代码 <!DOCTYPE html> <html> <head> <meta charset"utf-8" /> <title>爱德照明网站首页</title> <style> /*外部样式*/ charset "utf-8"…

【计算机毕业设计】基于SSM++jsp的社区管理与服务系统【源码+lw+部署文档+讲解】

目录 摘 要 Abstract 第一章 绪论 第二章 系统关键技术 第三章 系统分析 3.1.1技术可行性 3.1.2经济可行性 3.1.3运行可行性 3.1.4法律可行性 3.4.1注册流程 3.4.2登录流程 3.4.3活动报名流程 第四章 系统设计 4.3.1登录模块顺序图 4.3.2添加信息模块顺序图 4.4.1 数据库E-…

使用STM32CubeMX对STM32F4进行串口配置

目录 1. 配置1.1 Pin脚1.2 RCC开启外部晶振1.3 时钟1.4 串口配置 2. 代码2.1 默认生成代码2.1 开启串口中断函数2.3 接收中断2.4 接收回调函数2.5 增加Printf 的使用 1. 配置 1.1 Pin脚 1.2 RCC开启外部晶振 1.3 时钟 外部使用8MHz晶振 开启内部16MHz晶振 使用锁相环 开启最高…

动手写一个简单的Android 表格控件支持固定列

Android 动手写一个简洁版表格控件 简介 源码已放到 gitee 作为在测绘地理信息行业中穿梭的打工人&#xff0c;遇到各种数据采集需求&#xff0c;既然有数据采集需求&#xff0c;那当然少不了数据展示功能&#xff0c;最常见的如表格方式展示。 当然&#xff0c;类似表格这些…

【消息队列】MQ介绍

MQ MQ&#xff08;MessageQueue&#xff09;&#xff0c;中文是消息队列&#xff0c;就是存放消息的队列&#xff0c;也是下面提到的事件驱动架构中的Broker 同步调用的优点&#xff1a; 时效性强&#xff0c;可以立即得到结果 同步调用的问题&#xff1a; 耦合度高性能和吞吐…

汽车信息安全入门总结(2)

目录 1.引入 2.汽车信息安全技术 3.密码学基础知识 4.小结 1.引入 上篇汽车信息安全入门总结(1)-CSDN博客主要讲述了汽车信息安全应该关注的点&#xff0c;以及相关法规和标准&#xff0c;限于篇幅&#xff0c;继续聊信息安全相关技术以及需要掌握的密码学基础知识。 2.汽…

Costas-Barker序列模糊函数仿真

文章目录 前言一、Costas 序列二、Barker 码三、Costas-Barker 序列模糊函数仿真1、MATLAB 核心代码2、仿真结果①、Costas-Barker 模糊函数图②、Costas-Barker 距离模糊函数图③、Costas-Barker 速度模糊函数图 四、资源自取 前言 Costas 码是一种用于载波同步的频率调制序列…

基于SpringBoot+Vue高校竞赛管理系统的设计与实现

项目介绍&#xff1a; 高校竞赛管理系统管理系统按照操作主体分为管理员和用户。管理员的功能包括字典管理、论坛管理、竞赛公告管理、获奖管理、老师管理、评审管理、评审分配管理、评审打分管理、赛事管理、赛事提交管理、赛事报名管理、用户管理、专家管理、管理员管理。用…

万兴PDF专家 PDFelement Pro v10.3.8 破姐版!

&#x1f9d1;‍&#x1f4bb;万兴PDF专家 PDFelement Pro v10.3.8 破姐版 (https://docs.qq.com/sheet/DRVVxTHJ3RXJFVHVr)

FreeRTOS软件定时器

说明本文章基于百问网RTOS教程文档 1.硬件定时器 什么是硬件定时器&#xff0c;由硬件电路构成的定时器。在学习STM32时我们都会学到定时器&#xff0c;这个就是硬件定时器。硬件定时器不单单可以定时&#xff0c;它还可以进行PWM输出等等。硬件定时器每隔一段固定的时间会进…

近几年视频取证、视频篡改检测技术发展现状及挑战

前言 本文主要搜集了视频取证各个子领域近几年的高影响因子/引用数的文章及其主要思想和做法&#xff0c;旨在分析目前视频篡改检测的发展现状与热点领域&#xff0c;文章中也融合了自己的一点看法和展望&#xff0c;欢迎感兴趣的同学和我多多沟通。 本文无论是文献搜集还是方…

基于Spring Boot的外卖点餐系统设计与实现

基于Spring Boot的外卖点餐系统设计与实现 开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/idea 系统部分展示 网站首页界面图&#xff0c;通过进入网站可以查看首页、…

并查集应用-连通块中点的数量and食物链

文章目录 连通块中点的数量思路代码javaC 代码 食物链带扩展域的并查集代码带边权的并查集数组d的真正含义以及find()函数调用过程核心代码注意事项&#xff0c;即明白 d[i] 的含义 代码C Java 连通块中点的数量 给定一个包含 n 个点&#xff08;编号为 1∼n &#xff09;的无向…

Leetcode—860. 柠檬水找零【简单】

2024每日刷题&#xff08;122&#xff09; Leetcode—860. 柠檬水找零 实现代码 class Solution { public:bool lemonadeChange(vector<int>& bills) {int count5 0;int count10 0;for(int i 0; i < bills.size(); i) {if(bills[i] 5) {count5;} else if(bi…

【自然语言处理】Word2VecTranE的实现

作业一 Word2Vec&TranE的实现 1 任务目标 1.1 案例简介 Word2Vec是词嵌入的经典模型&#xff0c;它通过词之间的上下文信息来建模词的相似度。TransE是知识表示学习领域的经典模型&#xff0c;它借鉴了Word2Vec的思路&#xff0c;用“头实体关系尾实体”这一简单的训练目…

【matplot】【matlab】绘制简洁美观二维坐标系的一个例子

觉得下图不错美观大方&#xff0c;现仿制下图&#xff1a; import numpy as np import matplotlib.pyplot as pltdef sigmoid(x):return 1 / (1 np.exp(-x))def sigmoid_derivative(x):return sigmoid(x) * (1 - sigmoid(x))# 设置中文字体 plt.rcParams[font.family] [Tim…

如何使用Go语言的标准库和第三方库?

文章目录 一、如何使用Go语言的标准库示例&#xff1a;使用标准库中的fmt包打印输出 二、如何使用Go语言的第三方库示例&#xff1a;使用第三方库github.com/gin-gonic/gin创建Web服务器 总结 在Go语言中&#xff0c;标准库和第三方库的使用是日常编程中不可或缺的一部分。标准…

Facebook的声音:听见社交媒体的心跳

社交媒体如今已经成为人们日常生活中不可或缺的一部分&#xff0c;而Facebook作为其中的佼佼者&#xff0c;承载着数以亿计的用户的交流、分享和连接。在这个信息爆炸的时代&#xff0c;Facebook的声音就像是社交媒体的心跳&#xff0c;传递着无数个体的情感、思想和生活。本文…

NASA数据集——NASA 标准三级(L3)每月深蓝气溶胶产品提供了全球陆地和海洋上空气溶胶光学厚度(AOT)

VIIRS/NOAA20 Deep Blue Level 3 monthly aerosol data, 1x1 degree grid 简介 联合极地卫星系统&#xff08;JPSS&#xff09;系列 NOAA-20 仪器中的可见红外成像辐射计套件&#xff08;VIIRS&#xff09;NASA 标准三级&#xff08;L3&#xff09;每月深蓝气溶胶产品提供了全…