23、Flink 的 Savepoints 详解

Savepoints
1.什么是 Savepoints

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像,可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。

Savepoint 由两部分组成:稳定存储(例如 HDFS,S3,…) 上包含二进制文件的目录(通常很大)和元数据文件(相对较小)。

  • 稳定存储上的文件表示作业执行状态的数据镜像。
  • Savepoint 的元数据文件以(相对路径)的形式主要包含指向作为 Savepoint 的稳定存储上的所有文件的指针。
2.分配算子 ID
a)概述

建议通过 uid(String) 方法手动指定算子 ID ,这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如果不手动指定 ID ,则会自动生成 ID ;只要这些 ID 不变,就可以从 Savepoint 自动恢复;生成的 ID 取决于程序的结构,并且对程序更改很敏感;强烈建议手动分配这些 ID 。

b)Savepoint 状态

可以将 Savepoint 想象为,每个有状态的算子保存一个映射 “算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分;默认情况下,尝试将 Savepoint 的每个条目映射回新程序。

3.算子
a)概述

可以使用 命令行客户端触发 Savepoint触发 Savepoint 并取消作业从 Savepoint 恢复,以及删除 Savepoint

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复

b)触发 Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,用于存储数据和元数据;可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制该目录的位置。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。

FsStateBackendRocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/

# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/

# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

从 1.11.0 开始,可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。

如下两种情况不支持 savepoint 目录的移动:

1)启用了 entropy injection :此时 savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中;由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。

2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。

和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。

如果使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中。

Savepoint 格式

可以在 savepoint 的两种二进制格式之间进行选择:

  • 标准格式 - 一种在所有 state backends 间统一的格式,允许使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。
  • 原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。

以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径,此路径可用来恢复和删除 Savepoint ;也可以指定创建 Savepoint 的格式,如果没有指定,会采用标准格式创建 Savepoint。

$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]

使用上述命令触发 savepoint 时,client 需要等待 savepoint 制作完成,因此当任务的状态较大时,可能会导致 client 出现超时的情况,可以使用 detach 模式来触发savepoint。

$ bin/flink savepoint :jobId [:targetDirectory] -detached

使用该命令时,client 拿到本次 savepoint 的 trigger id 后立即返回,可以通过 REST API 来监控本次 savepoint 的制作情况。

使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

使用 Savepoint 停止作业

$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业;可以指定一个目标文件系统目录来存储 Savepoint ,该目录需要能被 JobManager(s) 和 TaskManager(s) 访问;可以指定创建 Savepoint 的格式,如果没有指定,会采用标准格式创建 Savepoint。

如果想使用 detach 模式触发 Savepoint,在命令行后添加选项-detached即可。

c)从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]

将提交作业并指定要从中恢复的 Savepoint ,可以给出 Savepoint 目录或 _metadata 文件的路径。

跳过无法映射的状态恢复

默认,resume 操作将尝试将 Savepoint 的所有状态映射回要还原的程序,如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

Restore 模式

Restore 模式 决定了在 restore 之后谁拥有 Savepoint 或者 externalized checkpoint 的文件的所有权。

快照可被用户或者 Flink 自身拥有,如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。

每种 restore 模式都有特定的用途,默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。

可以通过如下方式指定 restore 模式:

$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]

NO_CLAIM (默认的)

NO_CLAIM 模式下,Flink 不会接管快照的所有权,它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件,该模式下可以从同一个快照上启动多个作业。

为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的,仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。

一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建,当第一个 checkpoint 成功制作,就可以删除原快照;在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。

在这里插入图片描述

CLAIM

CLAIM 模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它;手动删除快照和从同一个快照上启动两个作业都是不安全的,Flink 会保持配置数量的 checkpoint。

在这里插入图片描述

注意:

  1. Retained checkpoints 被存储在 //chk- 的目录中。Flink 不会接管 / 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。
  2. Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。即这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。

LEGACY (已废弃)

Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它,原因是 Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint,即恢复的 checkpoint 的所有权没有明确的界定。

在这里插入图片描述

注意: LEGACY 模式已经被废弃,在 Flink 2.0 版本将会被移除。请使用 CLAIM 或 NO_CLAIM 模式。

d)删除 Savepoint
$ bin/flink savepoint -d :savepointPath

删除存储在 :savepointPath 中的 Savepoint。

注意:还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint。

e)配置

可以通过 state.savepoints.dir 配置 savepoint 的默认目录,触发 savepoint 时,将使用此目录来存储 savepoint;可以通过使用触发器命令指定自定义目标目录来覆盖缺省值。

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

4.F.A.Q

应该为作业中的所有算子分配 ID 吗?

根据经验,是的。 严格来说,仅通过 uid 方法给有状态算子分配 ID 就足够了。Savepoint 仅包含这些有状态算子的状态,无状态算子不是 Savepoint 的一部分。

在实践中,建议给所有算子分配 ID,因为 Flink 的一些内置算子(如 Window 算子)也是有状态的,而内置算子是否有状态并不很明显。 如果完全确定算子是无状态的,则可以跳过 uid 方法。

如果在作业中添加一个需要状态的新算子,会发生什么?

当向作业添加新算子时,它将在没有任何状态的情况下进行初始化。 Savepoint 包含每个有状态算子的状态。 无状态算子根本不是 Savepoint 的一部分。 新算子的行为类似于无状态算子。

如果从作业中删除有状态的算子会发生什么?

默认情况下,从 Savepoint 恢复时将尝试将所有状态分配给新作业。如果有状态算子被删除,则无法从 Savepoint 恢复。

可以通过使用 run 命令设置 --allowNonRestoredState (简称:-n )来允许删除有状态算子:

$ bin/flink run -s :savepointPath -n [:runArgs]

如果在作业中重新排序有状态算子,会发生什么?

如果给这些算子分配了 ID,它们将像往常一样恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的 Savepoint 恢复。

如果我添加、删除或重新排序作业中没有状态的算子,会发生什么?

如果将 ID 分配给有状态操作符,则无状态操作符不会影响 Savepoint 恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的Savepoint 恢复。

当在恢复时改变程序的并行度时会发生什么?

如果 Savepoint 是用 Flink >= 1.2.0 触发的,并且没有使用像 Checkpointed 这样不推荐的状态 API,那么可以简单地从 Savepoint 恢复程序并指定新的并行度。

如果正在从 Flink < 1.2.0 触发的 Savepoint 恢复,或者使用现在已经废弃的 api,那么首先必须将作业和 Savepoint 迁移到 Flink >= 1.2.0,然后才能更改并行度。

可以将 savepoint 文件移动到稳定存储上吗?

从 Flink 1.11.0 版本开始,savepoint 是自包含的,可以按需迁移 savepoint 文件后进行恢复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/624789.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深度践行“IaaS on DPU”理念,中科驭数正式发布“驭云”高性能云异构算力解决方案

5月10日至14日&#xff0c;由国家发展改革委联合国务院国资委、市场监管总局、国家知识产权局共同主办的第八届中国品牌日活动在上海世博展览馆举行。中科驭数高级副总裁张宇在中国品牌日新品首发首秀环节正式发布驭云高性能云异构算力解决方案&#xff0c;为企业提供更快部署、…

干部谈话考察:精准洞悉,助推成长

在组织人事管理的精细布局中&#xff0c;干部谈话考察扮演着举足轻重的角色。它不仅是组织深度了解干部、精准评价其表现的重要窗口&#xff0c;更是推动干部个人成长、优化组织人才配置的关键一环。通过深入的谈话考察&#xff0c;我们能够全面把握干部的思想脉搏、工作能力、…

Vmware ESXi无法创建虚拟机

点击创建虚拟机&#xff0c;没有反应 esxi在网页端无法创建虚拟机&#xff0c;与浏览器插件supercopy超级复制有关。 关闭插件在此页面运行&#xff0c;即可解决问题。 这个插件严重影响虚拟机正常的操作&#xff0c; 我还以为我的虚拟机炸了&#xff0c;格式化后&#xff0c;又…

冯喜运:5.14黄金大幅度修正?原油价格下跌成拖累?

【黄金消息面分析】&#xff1a;本周重要的美国数据的发布可能会对美元以及黄金产生重大影响。周四将公布更多经济指标&#xff0c;包括新屋开工和许可证、费城联储指数、工业生产数据和每周初请失业金人数。对于黄金而言&#xff0c;人们的注意力集中在经济和劳动力市场疲软对…

【回溯 代数系统】679. 24 点游戏

本文涉及知识点 回溯 代数系统 LeetCode679. 24 点游戏 给定一个长度为4的整数数组 cards 。你有 4 张卡片&#xff0c;每张卡片上都包含一个范围在 [1,9] 的数字。您应该使用运算符 [‘’, ‘-’, ‘*’, ‘/’] 和括号 ‘(’ 和 ‘)’ 将这些卡片上的数字排列成数学表达式…

Java面试八股之Java中的IO流分为几种

Java中的IO流分为几种 按数据单位分类&#xff1a; 字节流&#xff08;Byte Stream&#xff09;&#xff1a;以字节&#xff08;8位二进制数&#xff09;为基本单位进行数据读写。字节流适合处理所有类型的数据&#xff0c;包括文本、图像、音频、视频等二进制文件。抽象基类…

小米打印机Mi All-in-One Inkjet Printer进行扫描

1&#xff0c;打开电脑的控制面板&#xff0c;找到打印机 2&#xff0c;&#xff0c;选择小米打印机【Mi All-in-One Inkjet Printer】&#xff1b;右键&#xff0c;选择开始扫描 3&#xff0c;可以预览&#xff0c;或者直接扫描 4&#xff0c;点击下一步&#xff0c;导入图片 …

redis深入理解之实战

1、SpringBoot整合redis 1.1 导入相关依赖 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId&g…

高校普法|基于SSM+vue的高校普法系统的设计与实现(源码+数据库+文档)

高校普法系统 目录 基于SSM&#xff0b;vue的高校普法系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1系统功能模块 2管理员功能模块 3律师功能模块 4学生功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获…

sklearn中多分类和多标签分类评估方法总结

一、任务区分 多分类分类任务&#xff1a;在多分类任务中&#xff0c;每个样本只能被分配到一个类别中。换句话说&#xff0c;每个样本只有一个正确的标签。例如&#xff0c;将图像分为不同的物体类别&#xff0c;如猫、狗、汽车等。 多标签分类任务&#xff1a;在多标签分类任…

抖音小店无货源怎么做?超详细教程,新手看这一篇就够了!

哈喽~我是电商月月 实体店做生意开店或是卖菜摆地摊&#xff0c;商家大部分都不是厂家自销&#xff0c;基本都是从批发厂里进货&#xff0c;然后加价卖出去的 “中间商赚差价”的行为在网上做店也是一种合理的行为 抖音小店里面的商家大部分选择的都是无货源模式运营 无货源…

Jmeter接口测试之参数化

在接口测试中&#xff0c;某些时候一些场景会使用到参数化的场景&#xff0c;参数化简单的说就是同一个请求需要不同的数据&#xff0c;比如在性能测试中需要并发多个用户的场景&#xff0c;这样的目的是为了模拟真实的用户场景&#xff0c;需要模拟不同的账号&#xff0c;这里…

2. C++入门:缺省参数及函数重载

缺省参数 缺省参数是声明或定义函数时为函数的参数指定一个缺省值。在调用该函数时&#xff0c;如果没有指定实参则采用该形参的缺省值&#xff0c;否则使用指定的实参。 void Func(int a 0) {cout << a << endl; }int main() {Func();Func(10);return 0; }在形…

【simulink】Scrambling 加扰

https://ww2.mathworks.cn/help/comm/ug/additive-scrambling-of-input-data-in-simulink.html 草图 simulink 代码图

GPT-4o正式发布;零一万物发布千亿参数模型;英国推出AI评估平台

OpenAI 正式发布 GPT-4o 今天凌晨&#xff0c;OpenAI 正式发布 GPT-4o&#xff0c;其中的「o」代表「omni」&#xff08;即全面、全能的意思&#xff09;&#xff0c;这个模型同时具备文本、图片、视频和语音方面的能力&#xff0c;甚至就是 GPT-5 的一个未完成版。 并且&…

mysql 一次删除多个备份表

show tables时&#xff0c;发现备份的表有点多&#xff0c;想要一个sql就删除 总不能drop table xx ; 写多次吧。 方式一 1.生成删除某个数据库下所有的表SQL -- 查询构建批量删除表语句&#xff08;根据数据库名称&#xff09; select concat(drop table , TABLE_NAME, ;)…

Axure10_win安装教程(安装、汉化、授权码,去弹窗)

1.下载Axure10 链接&#xff1a;https://pan.baidu.com/s/1fc8Bgyic8Ct__1IOv-afUg 提取码&#xff1a;9qew 2.安装Axure10 因为我的电脑是Windows操作系统&#xff0c;所以我下载的AxureRP-Setup-Beta v10.0.0.3816 (Win).exe 一直点下一步就行 3.Axure10中文 打开Axure…

fdatool中的幅值响应 怎么计算

设计一个幅值是100&#xff0c;45hz的正弦波&#xff0c;设计一个滤波器&#xff0c;观察滤波器的幅值响应&#xff1a; 滤波前的数据的峰峰值是100*2&#xff1b; 假设滤波后的数据峰峰值变成95*2&#xff1b;&#xff08;95*2是滤波后的数据的峰峰值&#xff0c;不是fft后的值…

豆浆机缺水检测功能如何实现的

豆浆机缺水检测功能的实现是通过光学液位传感器来完成的。这种传感器具有多种优势&#xff0c;如内部所有元器件经过树脂胶封处理&#xff0c;没有任何机械活动部件&#xff0c;免调试、免检验、免维护等特点。它采用了光电液位传感器内置的光学电子元件&#xff0c;体积小、功…

自动化神器Autolt,让你不再重复工作!

随着互联网不断发展&#xff0c;它给我们带来便利的同时&#xff0c;也带来了枯燥、重复、机械的重复工作。今天&#xff0c;我要和大家分享一款老牌实用的自动化工具&#xff1a;AutoIt&#xff0c;它能够让你告别繁琐的重复性工作&#xff0c;提高工作效率。 这里透露一下&am…