【Flink状态管理五】Checkpoint的设计与实现

文章目录

  • 1. Checkpoint的整体设计
  • 2. Checkpoint创建源码解析
    • 2.1. DefaultExecutionGraphBuilder.buildGraph
    • 2.2. ExecutionGraph.enableCheckpointing

由于系统原因导致Flink作业无法正常运行的情况非常多,且很多时候都是无法避免的。对于Flink集群来讲,能够快速从异常状态中恢复,同时保证处理数据的正确性和一致性非常重要。Flink主要借助Checkpoint的方式保障整个系统状态数据的一致性,也就是基于ABS算法实现轻量级快照服务。

本节我们详细了解Checkpoint的设计与实现。

 

1. Checkpoint的整体设计

Checkpoint的执行过程分为三个阶段:启动、执行以及确认完成。其中Checkpoint的启动过程由JobManager管理节点中的CheckpointCoordinator组件控制,该组件会周期性地向数据源节点发送执行Checkpoint的请求,执行频率取决于用户配置的CheckpointInterval参数。

执行过程:

  1. 在JobManager管理节点通过CheckpointCoordinator组件向每个数据源节点发送Checkpoint执行请求,此时数据源节点中的算子会将消费数据对应的Position发送到JobManager管理节点中。
  2. JobManager节点会存储Checkpoint元数据,用于记录每次执行Checkpoint操作过程中算子的元数据信息,例如在FlinkKafkaConsumer中会记录消费Kafka主题的偏移量,用于确认从Kafka主题中读取数据的位置。
  3. 在数据源节点执行完Checkpoint操作后,继续向下游节点发送CheckpointBarrier事件,下游算子通过对齐Barrier事件,触发该算子的Checkpoint操作。
    当下游的map算子接收到数据源节点的Checkpoint
    Barrier事件后,首先对当前算子的数据进行处理,并等待其他上游数据源节点的Barrier事件到达。该过程就是Checkpoint
    Barrier对齐,目的是确保属于同一Checkpoint的数据能够全部到达当前节点。

在这里插入图片描述

Barrier事件的作用就是切分不同Checkpoint批次的数据。

  • 当map算子接收到所有上游的Barrier事件后,就会触发当前算子的Checkpoint操作,并将状态数据快照到指定的外部持久化介质中,该操作主要借助状态后端存储实现。

  • 当状态数据执行完毕后,继续将Barrier事件发送至下游的算子,进行后续算子的Checkpoint操作。

  • 另外,在map算子中执行完Checkpoint操作后,也会向JobManager管理节点发送Ack消息,确认当前算子的Checkpoint操作正常执行。此时Checkpoint数据会存储该算子对应的状态数据,如果StateBackend为MemoryStateBackend,则主要会将状态数据存储在JobManager的堆内存中

sink节点的ack

像map算子节点一样,当Barrier事件到达sink类型的节点后,sink节点也会进行Barrier对齐操作,确认上游节点的数据全部接入。然后对接入的数据进行处理,将结果输出到外部系统中。完成以上步骤后,sink节点会向JobManager管理节点发送Ack确认消息,确认当前Checkpoint中的状态数据都正常进行了持久化操作。(之后呢?当任务结束之后,cp会消失还是?)

 

2. Checkpoint创建源码解析

通过调用StreamExecutionEnvironment.enableCheckpointing(),开启Checkpoint。
此时Checkpoint的配置会被存储在StreamGraph中,然后将StreamGraph中的CheckpointConfig转换为JobCheckpointingSettings数据结构存储在JobGraph对象中,并伴随JobGraph提交到集群运行。启动JobMaster服务后,JobMaster调度和执行Checkpoint操作。

2.1. DefaultExecutionGraphBuilder.buildGraph

如下代码,通过JobGraph构建ExecutionGraph的过程中,获取JobGraph中存储的JobCheckpointingSettings配置,然后创建ExecutionGraph。

1)根据snapshotSettings配置获取triggerVertices、ackVertices以及confirmVertices节点集合,并转换为对应的ExecutionJobVertex集合。

  • 其中triggerVertices集合存储了所有SourceOperator节点,这些节点通过CheckpointCoordinator主动触发Checkpoint操作。
  • ackVertices和confirmVertices集合存储了StreamGraph中的全部节点,代表所有节点都需要返回Ack确认信息并确认Checkpoint执行成功。

2)创建CompletedCheckpointStore组件,用于存储Checkpoint过程中的元数据。

  • 当对作业进行恢复操作时会在CompletedCheckpointStore中检索最新完成的Checkpoint元数据信息,然后基于元数据信息恢复Checkpoint中存储的状态数据。CompletedCheckpointStore有两种实现,分别为StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore。
  • 在CompletedCheckpointStore中通过maxNumberOfCheckpointsToRetain参数配置以及结合checkpointIdCounter计数器保证只会存储固定数量的CompletedCheckpoint。

3)创建CheckpointStatsTracker实例
用于监控和追踪Checkpoint执行和更新的情况,包括Checkpoint执行的统计信息以及执行状况,WebUI中显示的Checkpoint监控数据主要来自CheckpointStatsTracker。

4)创建StateBackend,从UserClassLoader中反序列化出应用指定的StateBackend并设定为applicationConfiguredBackend。

5)初始化用户自定义的Checkpoint Hook函数

6)最终调用executionGraph.enableCheckpointing()方法,在作业的执行和调度过程中开启Checkpoint。

// 配置状态数据checkpointing
// 从jobGraph中获取JobCheckpointingSettings
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
//如果snapshotSettings不为空,则开启checkpoint功能
if (snapshotSettings != null) {
   List<ExecutionJobVertex> triggerVertices =
         idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
   List<ExecutionJobVertex> ackVertices =
         idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
   List<ExecutionJobVertex> confirmVertices =
         idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
   //创建CompletedCheckpointStore
   CompletedCheckpointStore completedCheckpoints;
   CheckpointIDCounter checkpointIdCounter;
   try {
      int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
          CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
      if (maxNumberOfCheckpointsToRetain <= 0) {
         maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_
            CHECKPOINTS.defaultValue();
      }
      // 通过recoveryFactory创建CheckpointStore
      completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, 
         maxNumberOfCheckpointsToRetain, classLoader);   
      // 通过recoveryFactory创建CheckpointIDCounter
      checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
   }
   catch (Exception e) {
      throw new JobExecutionException(jobId, "Failed to initialize high-
         availability checkpoint handler", e);
   }
   // 获取checkpoints最长的记录次数
   int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
   // 创建CheckpointStatsTracker实例
   CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
         historySize,
         ackVertices,
         snapshotSettings.getCheckpointCoordinatorConfiguration(),
         metrics);
   // 从application中获取StateBackend
   final StateBackend applicationConfiguredBackend;
   final SerializedValue<StateBackend> serializedAppConfigured = 
      snapshotSettings.getDefaultStateBackend();
   if (serializedAppConfigured == null) {
      applicationConfiguredBackend = null;
   }
   else {
      try {
         applicationConfiguredBackend = serializedAppConfigured.
            deserializeValue(classLoader);
      } catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId,
            "Could not deserialize application-defined state backend.", e);
      }
   }
   // 获取最终的rootBackend
   final StateBackend rootBackend;
   try {
      rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
         applicationConfiguredBackend, jobManagerConfig, classLoader, log);
   }
   catch (IllegalConfigurationException | IOException | 
      DynamicCodeLoadingException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate configured state backend", e);
   }
   // 初始化用户自定义的checkpoint Hooks函数
   final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = 
      snapshotSettings.getMasterHooks();
   final List<MasterTriggerRestoreHook<?>> hooks;
   // 如果serializedHooks为空,则hooks为空
   if (serializedHooks == null) {
      hooks = Collections.emptyList();
   }
   else {
   // 加载MasterTriggerRestoreHook
      final MasterTriggerRestoreHook.Factory[] hookFactories;
      try {
         hookFactories = serializedHooks.deserializeValue(classLoader);
      }
      catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate user-defined checkpoint hooks", e);
      }
      // 设定ClassLoader为UserClassLoader
      final Thread thread = Thread.currentThread();
      final ClassLoader originalClassLoader = thread.getContextClassLoader();
      thread.setContextClassLoader(classLoader);
      // 创建hooks函数
      try {
         hooks = new ArrayList<>(hookFactories.length);
         for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
            hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
         }
      }
      // 将thread的ContextClassLoader设定为originalClassLoader
      finally {
         thread.setContextClassLoader(originalClassLoader);
      }
   }
   // 获取CheckpointCoordinatorConfiguration
   final CheckpointCoordinatorConfiguration chkConfig = 
      snapshotSettings.getCheckpointCoordinatorConfiguration();
   // 开启executionGraph中的Checkpoint功能
   executionGraph.enableCheckpointing(
      chkConfig,
      triggerVertices,
      ackVertices,
      confirmVertices,
      hooks,
      checkpointIdCounter,
      completedCheckpoints,
      rootBackend,
      checkpointStatsTracker);
}

 

2.2. ExecutionGraph.enableCheckpointing

继续看ExecutionGraph.enableCheckpointing()方法的实现,包含如下逻辑。

  1. 将tasksToTrigger、tasksToWaitFor以及tasksToCommitTo三个ExecutionJobVertex集合转换为ExecutionVertex[]数组,每个ExecutionVertex代表ExecutionJobVertex中的一个SubTask节点。
  2. 容错管理:创建CheckpointFailureManager,用于Checkpoint执行过程中的容错管理,包含failJob和failJobDueToTaskFailure两个处理方法。
  3. 定时调度和执行:创建checkpointCoordinatorTimer,用于Checkpoint异步线程的定时调度和执行
  4. 协调和管理作业中的Checkpoint:创建CheckpointCoordinator组件,通过CheckpointCoordinator协调和管理作业中的Checkpoint,同时收集各Task节点中Checkpoint的执行状况等信息。
  5. Hook:将Master Hook注册到CheckpointCoordinator中,实现用户自定义Hook代码的调用。
  6. 控制CheckpointCoordinator的启停:将JobStatusListener的实现类CheckpointCoordinatorDeActivator注册到JobManager中,此时系统会根据作业的运行状态控制CheckpointCoordinator的启停,当作业的状态为Running时会触发启动CheckpointCoordinator组件。
public void enableCheckpointing(
      CheckpointCoordinatorConfiguration chkConfig,
      List<ExecutionJobVertex> verticesToTrigger,
      List<ExecutionJobVertex> verticesToWaitFor,
      List<ExecutionJobVertex> verticesToCommitTo,
      List<MasterTriggerRestoreHook<?>> masterHooks,
      CheckpointIDCounter checkpointIDCounter,
      CompletedCheckpointStore checkpointStore,
      StateBackend checkpointStateBackend,
      CheckpointStatsTracker statsTracker) {
   checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
   checkState(checkpointCoordinator == null, "checkpointing already enabled");
   ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
   ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
   ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
   checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
   // 创建CheckpointFailureManager
   CheckpointFailureManager failureManager = new CheckpointFailureManager(
      chkConfig.getTolerableCheckpointFailureNumber(),
      new CheckpointFailureManager.FailJobCallback() {
         @Override
         public void failJob(Throwable cause) {
            getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
         }
         @Override
         public void failJobDueToTaskFailure(Throwable cause, 
                                             ExecutionAttemptID failingTask) {
            getJobMasterMainThreadExecutor()
               .execute(()  -> failGlobalIfExecutionIsStillRunning(cause, 
                  failingTask));
         }
      }
   );
   // 创建checkpointCoordinatorTimer
   checkState(checkpointCoordinatorTimer == null);
   checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(
      new DispatcherThreadFactory(
         Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
   // 创建checkpointCoordinator
   checkpointCoordinator = new CheckpointCoordinator(
      jobInformation.getJobId(),
      chkConfig,
      tasksToTrigger,
      tasksToWaitFor,
      tasksToCommitTo,
      checkpointIDCounter,
      checkpointStore,
      checkpointStateBackend,
      ioExecutor,
      new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
      SharedStateRegistry.DEFAULT_FACTORY,
      failureManager);
   // 向checkpoint Coordinator中注册master Hooks
   for (MasterTriggerRestoreHook<?> hook : masterHooks) {
      if (!checkpointCoordinator.addMasterHook(hook)) {
         LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",
                  hook.getIdentifier());
      }
   }
   //向checkpointCoordinator中设定checkpointStatsTracker
   checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
     // 注册JobStatusListener,用于自动启动CheckpointCoordinator
   if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
      registerJobStatusListener(checkpointCoordinator.
         createActivatorDeactivator());
   }
   this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/398971.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在同一个module里面集成多个数据库的多张表数据

确保本公司数据安全&#xff0c;通常对数据的管理采取很多措施进行隔离访问。 但是&#xff0c;Mendix应怎样访问散布于异地的多个数据库呢&#xff1f; 前几期我们介绍过出海跨境的大企业对于Mendix的技术、人才的诉求后&#xff0c;陆陆续续有其他客户希望更聚焦具体的实际场…

springboot+vue的飘香水果购物网站(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

ip https证书推荐

公网IP地址是每个连接到互联网的设备所必需的标识。公网IP地址是用于在互联网上唯一标识一个设备的IP地址&#xff0c;它由一组由四个数字组成的字符串组成&#xff0c;每个数字在0到255之间。随着互联网的发展&#xff0c;只有公网IP地址的站点也开始重视传输信息安全&#xf…

小脑萎缩怎么办?我们如何战胜这个疾病?

小脑萎缩是一种神经系统疾病&#xff0c;主要表现为小脑功能的进行性退化。这种疾病可能导致患者出现行走困难、平衡障碍、言语不清等症状。近年来&#xff0c;中医治疗小脑萎缩的研究逐渐受到关注&#xff0c;其中刘家峰中医使用中草药治疗小脑萎缩取得了一定的成果。 刘家峰中…

springboot+vue的宠物咖啡馆平台(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

观察者模式和发布订阅模式的区别

从下图中可以看出&#xff0c;观察者模式中观察者和目标直接进行交互&#xff0c;而发布订阅模式中统一由调度中心进行处理&#xff0c;订阅者和发布者互不干扰。这样一方面实现了解耦&#xff0c;还有就是可以实现更细粒度的一些控制。比如发布者发布了很多消息&#xff0c;但…

【探究大语言模型中G、P、T各自的作用】

文章目录 前言一、GPT全称二、Generative&#xff1a;生成式三、Pre-trained&#xff1a;预训练四、Transformer&#xff1a;变换模型 前言 偷偷告诉你们&#xff0c;在写这篇文章时&#xff0c;标题就是用chatGPT生成的 一、GPT全称 大语言模型的全称是Generative Pre-train…

[嵌入式AI从0开始到入土]15_orangepi_aipro欢迎界面、ATC bug修复、镜像导出备份

[嵌入式AI从0开始到入土]嵌入式AI系列教程 注&#xff1a;等我摸完鱼再把链接补上 可以关注我的B站号工具人呵呵的个人空间&#xff0c;后期会考虑出视频教程&#xff0c;务必催更&#xff0c;以防我变身鸽王。 第1期 昇腾Altas 200 DK上手 第2期 下载昇腾案例并运行 第3期 官…

同城系统源码_城市o2o系统源码OctShop

同城系统源码O2O的模式就是一种将线下商品或服务与线上互联网相结合&#xff0c;让互联网成为线下交易的前台。比如&#xff1a;商家企业可以通过线上的方式展示自己商品或服务的详细信息&#xff0c;以及各种营销活动的宣传&#xff0c;引导买家消费者下单&#xff0c;通过线上…

Java的String类

目录 String类的常用方法 1.1 字符串构造 1.2 String对象的比较 1.3 字符串查找 1.4 转换 1.5 字符串替换 1.6字符串拆分 1.7 字符串截取 1.8 其他操作方法 1.9 字符串的不可变性 1.10 字符串修改 String类的常用方法 1.1 字符串构造 String类常用的构造方法有很多…

优化|非强凸问题的一阶算法线性收敛条件(一)

原文信息&#xff08;包括题目、发表期刊、原文链接等&#xff09;&#xff1a;Linear convergence of first order methods for non-strongly convex optimization 原文作者&#xff1a;I. Necoara, Yu. Nesterov, F. Glineur 论文解读者&#xff1a;陈宇文 编者按&#xf…

LabVIEW压电驱动迟滞补偿控制

LabVIEW压电驱动迟滞补偿控制 随着精密控制技术的迅速发展&#xff0c;压电陶瓷驱动器因其高精度和快速响应特性&#xff0c;在微纳精密定位系统中得到了广泛应用。然而&#xff0c;压电材料固有的迟滞非线性特性严重影响了其定位精度和重复性。开发了一种基于LabVIEWFPGA的压…

WSL里的Ubuntu 登录密码忘了怎么更改

环境&#xff1a; Win10 专业版 WSL2 如何 Ubuntu22.04 问题描述&#xff1a; WSL里的Ubuntu 登录密码忘了怎么更改 解决方案&#xff1a; 在WSL中的Ubuntu系统中&#xff0c;忘记了密码&#xff0c;可以通过以下步骤重置密码&#xff1a; 1.打开命令提示符或PowerShel…

HTTP的详细介绍

目录 一、HTTP 相关概念 二、HTTP请求访问的完整过程 1、 建立连接 2、 接收请求 3、 处理请求 3.1 常见的HTTP方法 3.2 GET和POST比较 4、访问资源 5、构建响应报文 6、发送响应报文 7、记录日志 三、HTTP安装组成 1、常见http 服务器程序 2、apache介绍和特点 …

Idea中使用git将多次提交记录合并成一次提交记录

一、查看Idea中的提交记录 查看Idea中的提交记录&#xff0c;我们希望将新增了bbb.txt、新增了ccc.txt、新增了ddd.txt,这三次提交记录合并成一次提交记录。 二、使用Interactively Rebase from Here进行合并 2.1、把鼠标放在新增了bbb.txt这次提交记录上并右键单击 把鼠标放…

文件上传漏洞--Upload-labs--Pass17--条件竞争

一、条件竞争原理&#xff08;结合代码审计&#xff09; 1、首先进行代码审计&#xff0c;查看源代码。 我们可知&#xff0c;将文件上传至服务器后&#xff0c;不会被立即删除&#xff0c;而是做短暂的停留&#xff0c;中间会有一小部分时间差&#xff0c;这部分时间差是代码…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 2月21日,星期三

每天一分钟&#xff0c;知晓天下事&#xff01; 2024年2月21日 星期三 农历正月十二 1、 央行&#xff1a;5年期LPR下调25个基点至3.95%。100万元房贷30年少还5.2万元。 2、 民航局等四部门明确&#xff1a;到2025年&#xff0c;机场噪声污染防控标准体系基本建成。 3、 应急…

云数据库 Redis 性能深度评测(阿里云、华为云、腾讯云、百度智能云)

在当今的云服务市场中&#xff0c;阿里云、腾讯云、华为云和百度智能云都是领先的云服务提供商&#xff0c;他们都提供了全套的云数据库服务&#xff0c;其中 Redis属于RDS 之后第二被广泛应用的服务&#xff0c;本次测试旨在深入比较这四家云服务巨头在Redis云数据库性能方面的…

相机图像质量研究(25)常见问题总结:CMOS期间对成像的影响--过曝、欠曝

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

欲速则不达,慢就是快!

引言 随着生活水平的提高&#xff0c;不少人的目标从原先的解决温饱转变为追求内心充实&#xff0c;但由于现在的时间过得越来越快以及其他外部因素&#xff0c;我们对很多东西的获取越来越没耐心&#xff0c;例如书店经常会看到《7天精通Java》、《3天掌握XXX》等等之类的书籍…