Flink中StateBackend(工作状态)与Checkpoint(状态快照)的关系

State Backends

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现:

  • 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,将其状态快照持久化到(分布式)文件系统;
  • 另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:
    • FsStateBackend,将其状态快照持久化到(分布式)文件系统;
    • MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

在这里插入图片描述

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

Checkpoint

Flink 定期对每个算子的所有状态进行持久化快照,并将这些快照复制到更持久的地方,例如分布式文件系统。 如果发生故障,Flink 可以恢复应用程序的完整状态并恢复处理,就好像没有出现任何问题一样。

这些快照的存储位置是通过作业_checkpoint storage_定义的。 有两种可用检查点存储实现:一种持久保存其状态快照 到一个分布式文件系统,另一种是使用 JobManager 的堆。

在这里插入图片描述

Flink不同版本StateBackend(状态)与Checkpoint Storage(快照) 关系

在Flink1.14之前StateBackend与Checkpoint Storage 耦合在一起,但在Flink1.14之后把StateBackend与Checkpoint Storage 实现了解耦,使逻辑更加清晰。

Flink1.14之前

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:8020/data/rocksdb/ck", true));//true: 增量checkpoint; false:全量checkpoint
env.setStateBackend(new RocksDBStateBackend("file:///data/rocksdb/ck", true));//本地文件系统
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:8020/data/fs/ck"));//远程分布式文件系统
env.setStateBackend(new FsStateBackend("file:///data/fs/ck"));//本地文件系统
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend());

Flink1.14之后(推荐使用)

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));//true: 增量checkpoint; false:全量checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/rocksdb/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/rocksdb/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/fs/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/fs/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: hashmap
 state.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

flink-conf.yaml配置:

 state.backend: hashmap
 state.checkpoint-storage: jobmanager

总结

  • 默认情况下 checkpoint 是禁用的,需要手动开启:

    env.enableCheckpointing(long interval, CheckpointingMode mode)

  • 默认情况下,StateBackend是保持在 TaskManagers 的heap内存中,checkpoint 保存在 JobManager 的内存中。
  • 只有基于 RocksDB state backend的状态快照才支持增量checkpoint,基于heap的并不支持
  • Flink状态分为Keyed State和非keyed State:
    • Keyed State,可以使用RocksDB state backend和heap state backend。 所有支持的状态类型如下所示:

      • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

      • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

      • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

      • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

      • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

    • 非keyed State,不使用 RocksDB state backend,需要保存在内存中,包括:

      • 算子状态 (Operator State);
      • 广播状态 (Broadcast State),尤其需要考虑保证充足的内存;
      • 自定义 Operator State:CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

        void snapshotState(FunctionSnapshotContext context) throws Exception;
        void initializeState(FunctionInitializationContext context) throws Exception;

参考:

Fault Tolerance via State Snapshots

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/360233.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python面向对象编程:探索代码的结构之美

文章目录 一、引言二、为什么学习面向对象编程2.1 提高代码的可维护性&#xff1a;通过封装、继承和多态实现模块化设计2.2 提升代码的复用性&#xff1a;通过类和对象的创建实现代码的重用 三、类和对象的基本概念3.1 类和对象的定义和关系&#xff1a;类是对象的模板&#xf…

通过Nacos权重配置,实现微服务金丝雀发布效果(不停机部署)

在微服务项目迭代的过程中&#xff0c;不可避免需要上线&#xff1b;上线对应着部署&#xff0c;或者升级部署&#xff1b;部署对应着修改,修改则意味着风险。 传统的部署都需要先停止旧系统&#xff0c;然后部署新系统&#xff0c;之后需要对新系统进行全面的功能测试&#xf…

力扣hot100 n皇后 满注释版 通俗易懂

Problem: 51. N 皇后 文章目录 思路Code 思路 &#x1f468;‍&#x1f3eb; 参考地址 考虑每一行哪个位置放皇后判断是否合法递归下一行 Code class Solution {int n;char[][] board;List<List<String>> res new ArrayList<>();public List<List&l…

python_蓝桥杯刷题记录_笔记_入门2

前言 现在正式进入蓝桥杯的刷题啦&#xff0c;用python来做算法题&#xff0c;因为我之前其实都是用C来做题的&#xff0c;但是今年的话我打算换python来试试&#xff0c;很明显因为也才这学期接触python 加上之前C做题也比较菜&#xff0c;所以我打算用python重新来做题&#…

常用抓包软件集合(Fiddler、Charles)

1. Fiddler 介绍&#xff1a;Fiddler是一个免费的HTTP和HTTPS调试工具&#xff0c;支持Windows平台。它可以捕获HTTP和HTTPS流量&#xff0c;并提供了丰富的调试和分析功能。优点&#xff1a;易于安装、易于使用、支持多种扩展、可以提高开发效率。缺点&#xff1a;只支持Wind…

向上调整向下调整算法

目录 AdjustUp向上调整 AdjustDown向下调整 AdjustUp向上调整 前提是&#xff1a;插入数据之后&#xff0c;除去插入的数据其他的数据还是为堆 应用&#xff1a;插入数据。 先插入一个10到数组的尾上&#xff0c;再进行向上调整算法&#xff0c;直到满足堆。 性质&#xff1…

SD卡写保护无法格式化怎么办?

一般来说&#xff0c;写保护&#xff08;也称为只读&#xff09;是数据存储设备防止写入新数据或修改旧信息的能力。换句话说&#xff0c;您可以读取存储在磁盘上的信息&#xff0c;但是却不能删除、更改或复制它们&#xff0c;因为访问会被拒绝。那么SD卡有写保护怎么格式化呢…

Vue(十九):ElementUI 扩展实现树形结构表格组件的勾父选子、半勾选、过滤出半勾选节点功能

效果 原理分析 从后端获取数据后,判断当前节点是否勾选,从而判断是否勾选子节点勾选当前节点时,子节点均勾选全勾选与半勾选与不勾选的样式处理全勾选和全取消勾选的逻辑筛选出半勾选的节点定义变量 import {computed, nextTick, reactive, ref} from vue; import {tree} f…

解决打开页面显示源代码和乱码

用系统记事本打开 点击文件》另存为 选择编码&#xff1a;ANSI 保存》要替换它吗?》是 重新打开页面&#xff0c;显示正常&#xff0c;解决问题。

D2025——双通道音频功率放大电路,外接元件少, 通道分离性好,3V 的低压下可正常使用

D2025 为立体声音频功率放大集成电路&#xff0c;适用于各类袖珍或便携式立体声 收录机中作功率放放大器。 D2025 采用 DIP16 封装形式。 主要特点&#xff1a;  适用于立体声或 BTL 工作模式  外接元件少  通道分离性好  电源电压范围宽&#xff08;3V~12V…

Jenkins自动化打包

Jenkins自动化打包 下载安装 我们直接从官网https://www.jenkins.io/download/ 下载所需的Jenkins文件 如上图所示, 选择Windows版本,下面就是一路安装即可,需要注意的是,选择作为系统服务选项, 不要自己设置账号密码登录. Web配置 安装完根据提示在浏览器打开 http://lo…

01、全文检索 ------ 反向索引库 与 Lucene 的介绍

目录 全文检索 ------ 反向索引库 与 LuceneSQL模糊查询的问题反向索引库反向索引库的查询 Lucene&#xff08;全文检索技术&#xff09;Lucene能做什么Lucene存在的问题Solr 和 Elasticsearch 与 Lucene 的关系 全文检索 ------ 反向索引库 与 Lucene MySQL一些索引词汇解释 …

故障诊断 | 一文解决,BP神经网络的故障诊断(Matlab)

文章目录 效果一览文章概述专栏介绍模型描述源码设计参考资料效果一览 文章概述 故障诊断 | 一文解决,BP神经网络的故障诊断(Matlab) 专栏介绍 订阅【故障诊断】专栏,不定期更新机器学习和深度学习在故障诊断中的应用;订阅

52个值得收藏的无代码AI平台【2024】

“无代码”不仅仅是炒作。 这是一场革命。 在无代码之前&#xff0c;如果你想制作一个网站&#xff0c;你需要一名技术网络开发人员。 现在&#xff0c;你可以使用 Bubble、Webflow、Carrd 或无数其他可视化工具。 人工智能领域也发生了同样的情况。 在无代码之前&#xff0c;你…

国网四川宜宾供电公司:基于“RPA+AI”融合技术的电网设备隐患缺陷智能化识别应用

推荐单位&#xff1a;国网四川省电力公司宜宾供电公司 本文作者&#xff1a;杨鑫、唐龙、钟睿、李小航、孙雪冬 摘 要&#xff1a;为推进电力企业生产业务数字化转型&#xff0c;提高基层班组数字化运维水平。本文通过一线班组对变电站视频巡视、设备故障判断应用场景需求分析…

搭建幻兽帕鲁需要什么样的服务器

作为一个开放世界生存制造类游戏《幻兽帕鲁》收获了空前绝后的热度&#xff0c;玩家们在游戏中通过在地图上捕捉收集到的“帕鲁”进行训练&#xff0c;合理利用他们的能力进行战斗&#xff0c;建立自己的家园、开辟新的世界、解锁新的冒险情节&#xff0c;获取更多游戏信息增加…

langchain+xray:prompt控制漏洞扫描

写在前面 xray是长亭推出的一款漏洞扫描工具。 langchain是调用LLM大模型完成自动化任务的框架。 本篇文章是对langchain自定义工具的探索&#xff0c;通过编写一个xray调用的工具&#xff0c;联合ChatGPT对xray进行调用&#xff0c;实现对目标的漏洞扫描。 xray功能分析 …

悄悄话 - 华为OD统一考试

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 给定一个二叉树&#xff0c;每个节点上站一个人&#xff0c;节点数字表示父节点到该节点传递悄悄话需要花费的时间。 初始时&#xff0c;根节点所在位置的人有一…

vue3.0 + 动态加载组件 + 全局注册组件

首先 vue 动态加载组件使用的是 component 标签&#xff0c;并通过设置组件的is 属性来指定要渲染的组件。例如&#xff1a; <component :is"currentComponent"></component>其中&#xff0c;currentComponent 是一个变量&#xff0c;它的值可以是以下几…

【图论】【状态压缩】【树】【深度优先搜索】1617. 统计子树中城市之间最大距离

作者推荐 【动态规划】【字符串】【行程码】1531. 压缩字符串 本文涉及的知识点 图论 深度优先搜索 状态压缩 树 LeetCode1617. 统计子树中城市之间最大距离 给你 n 个城市&#xff0c;编号为从 1 到 n 。同时给你一个大小为 n-1 的数组 edges &#xff0c;其中 edges[i] …