Flink State 状态原理解析 | 京东物流技术团队

一、Flink State 概念

State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。

Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。在Flink应用运行过程中,通过 checkpoint 快照定期地保存状态数据。并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。

1.1 Apache Flink 中 State 的存储实现 StateBackend 分类

分为以下3类:

  • 基于内存的 HeapStateBackend。状态存储在内存中。
  • 基于 HDFS 或 OSS 的 FsStateBackend。状态存储在内存,并在做 cp(checkpoint)时存到远端。
  • 基于 RocksDB 的 RocksDBStateBackend。将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。

HeapStateBackend 和 RocksDBStateBackend 分别对应在 TaskManager 内存模型中的位置:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

RocksDBStateBackend 中存储结构:

namespace: 在不同的 namespace 下存在相同名称的状态。

1.1.1 State 状态持久化

通过 Chandy-Lamport 分布式快照算法进行 checkpoint 完成状态数据的持久化。然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。

chekcpoint 分类:

  • 基于内存的全量 checkpoint
  • HDFS 全量 checkpoint
  • RocksDB 全量 checkpoint/增量 checkpoint

1.2 State 基于算子和数据分组的分类

State 可分为 Operator State 和 Keyed State 两类。

  • Operator State(称为 non-keyed state)

常常存在于Source, Sink中。具体实现类例如:

  • BroadcastState

例:Kafka Source 中用 OperatorState 记录 offset。

  • Keyed State

任何类型的 keyed state 都可以有有效期(TTL),所有状态类型都支持单元素的 TTL。 这意味着 List 元素和 Map 映射元素将独立到期。

例:SQL GroupBy/PartitionBy 后的窗口中的数据,每个 key 都有对应的 State。key 与 key 之间的 State 数据不可见。

keyed state 的具体实现类:

  • ValueState
  • MapState
  • ListState
  • AggregatingState
  • ReducingState
  • 。。。。。

Flink State思维导图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Keyed StateOperator State
适用算子类型只适用于KeyedStream上的算子可用于所有算子
状态分配每个Key对应一个状态一个算子子任务对应一个状态
横向扩展状态随着keyBy的分组KeyGroup自动在多个算子子任务上迁移有多种状态重新分配的方式
创建和访问方式自定义算子(重写RichFunction,通过State 名称从 getRuntimeContext方法创建或获得 State )实现 CheckpointedFunction 等接口
支持数据结构ValueState、ListState、MapState等ListState、BroadcastState等

二、常见状态相关处理流程

2.1 Flink 应用中状态是如何存储的?

1. Kafka Source 如何存储 OperatorState?

class FlinkKafkaConsumerBase {
 private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // state名称:"topic-partition-offset-states"
// 特殊的State类型:Union State 
}


unionOffsetStates这个变量就是 OperatorState类型的。

2. Map算子如何存储需要累计的数据?

  • ValueState/MapState/ListState/…

思考:keyby 后的数据分发与多并行度 subtask 之间的关系是怎样的?

首先,datastream 中数据经过 keyby 之后,会划分到各个 KeyedStream 中。每个 KeyedStream 有自己的 KeyedState(如ValueState/ListState/MapState)。

其次,KeyedStream 中的数据会以 KeyGroup 方式组织在一起。KeyGroup 是 Flink 重新分发 key state 的最小单元。

最后,KeyGroup 中的数据会通过取模最大并行度的方式分散到各个 subtask 中。以下是关键源码:

KeyGroupStreamPartitioner#selectChannel(record)
{
    K key;
    key = keySelector.getKey(record.getInstance().getValue());
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(
            key, maxParallelism, numberOfChannels);
}
--KeyGroupRangeAssignment#assignKeyToParallelOperator()
    {
    return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }
    --KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup()
      公式:OperatorIndex = keyGroupId * parallelism / maxParallelism
    --KeyGroupRangeAssignment#assignToKeyGroup()
      {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
       }




2.2 修改并行度场景时 State 状态存储的变化

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.3 State 与 Checkpoint 关系

分布式快照 Checkpoint 的概念,定期将 State 持久化到 外部存储系统(HDFS/OSS) 上。用户可以通过实现 CheckpointedFunction 接口来使用 operator state。通过 barrier 来对齐 checkpoint,等待 State 持久化完成(此过程参数不同也可能是异步的)。

常见 State 与 CP 相关的问题

  • State 状态过大。现象为多个算子或单个算子多个 subtask 做 checkpoint 慢,可导致 CP 对齐时间长,严重时会导致 CP 超时。
  • 数据倾斜导致某个 subtask 处理不及时。现象为单个算子少数几个 subtask 做 checkpoint 慢,导致 CP 对齐时间长。严重时会导致 CP 超时。
  • 大作业(并行度搞)频繁做 CP,会频繁上传小文件,导致 HDFS 集群小文件过多。

常用解决措施:调大托管内存大小。

三、参考文档:

  • Flink State 官方文档:Flink 状态与容错
  • https://cloud.tencent.com/developer/article/1403939
  • https://www.modb.pro/db/81206

作者:京东物流 吴云涛

来源:京东云开发者社区 自猿其说Tech 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/224314.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

清新脱俗的Notes主页

大家好&#xff0c;才是真的好。 作为Notes客户端重度用户&#xff0c;我个人非常喜欢Notes客户机&#xff0c;平时都在使用。对于另一些Notes用户&#xff0c;喜欢Notes的人非常喜欢&#xff0c;而且还知道它非常强大&#xff0c;可以进行很多定制化。 今天我们来讲的就是No…

【MATLAB】SSA+FFT+HHT组合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 SSAFFTHHT组合算法是一种基于奇异谱分析&#xff08;SSA&#xff09;、快速傅里叶变换&#xff08;FFT&#xff09;和希尔伯特-黄变换&#xff08;HHT&#xff09;的组合算法。 其中&am…

【CentOS8】使用 Tomcat 部署 Java Web 项目(使用 sdkman)

文章目录 配置 Tomcat将 Tomcat 启动命令设置为 Linux 自定义服务给 Tomcat 设置管理员账号密码IDEA 打包 Java web 项目 我是使用 sdkman 下载的 jdk 和 tomcat&#xff0c;所以接下来的部署配置都是在 sdkman 构建的环境的。想要知道如何下载 sdkman 可以看看这篇文章 —…

操作符(原码反码补码)

目录 前言&#xff1a; 原反补码&#xff1a; 位操作符&#xff1a; &#xff06; &#xff5c; &#xff3e; &#xff5e; >> << 总结&#xff1a; 逻辑操作符 && || 其他操作符&#xff1a; sizeof -- () &#xff1f;&#xf…

数据仓库与数据挖掘复习资料

一、题型与考点[第一种] 1、解释基本概念(中英互译解释简单的含义)&#xff1b; 2、简答题(每个10分有两个一定要记住)&#xff1a; ① 考时间序列Time series(第六章)的基本概念含义解释作用&#xff08;序列模式挖掘的作用&#xff09;&#xff1b; ② 考聚类(第五章)重点考…

Redis7--基础篇7(哨兵sentinel)

1. 关于哨兵的介绍 1、监控redis运行状态&#xff0c;包括master和slave&#xff08;主从监控&#xff09; 2、哨兵可以将故障转移的结果发送给客户端&#xff08;消息通知&#xff09; 3、当master down机&#xff0c;能自动将slave切换成新master&#xff08;故障转移&#…

STM32——电动车报警器

项目设计 // 如果检测到 PA4 被拉低&#xff08;小偷偷车&#xff09;&#xff0c;并且警报模式打开 // 则将 PB7 拉低&#xff0c;继电器通电&#xff0c;喇叭一直响 // 如果检测到 PA5 被拉高&#xff08;按键 A 按下&#xff09;&#xff0c;设定为开启警报模式 // 则将…

Python计算方差

方差可以反应变量的离散程度&#xff0c;是因为它度量了数据点与均值的差异。方差是每个数据点与均值的差的平方和的平均值&#xff0c;它可以反映数据点在均值附近的分布情况。如果方差较小&#xff0c;说明数据点更加集中在均值附近&#xff0c;离散程度较小&#xff1b;如果…

什么是神经网络的超参数

1 引言 超参数在神经网络的设计和训练中起着至关重要的作用。它们是在开始训练之前设置的参数&#xff0c;与网络的结构、训练过程和优化算法有关。正确的超参数选择对于达到最优模型性能至关重要。 2 神经网络结构的超参数 层数&#xff08;Layers&#xff09;&#xff1a; 决…

基于STM32 IAP技术的物联网设备固件更新应用研究

本文将深入研究基于STM32 IAP技术的物联网设备固件更新应用。首先&#xff0c;我们会介绍物联网设备固件更新的重要性和挑战。然后&#xff0c;我们将详细讲解STM32 IAP技术的原理和实现方式。接下来&#xff0c;我们会通过一个代码示例演示如何使用STM32 IAP技术实现物联网设备…

搭建个人网盘应用Nextcloud

使用DNF管理软件包 1 使用winscp工具将openeuler-20.03-LTS-x86_64-dvd.iso上传至openeuler虚拟机的/root目录下&#xff0c;然后执行如下命令挂载ISO [rootopenEuler ~]# mount -o loop /root/openEuler-20.03-LTS-everything-x86_64-dvd.iso /mnt/2 添加软件源 [rootope…

联合基于信息论的安全和隐蔽通信的框架

这个标题很帅 abstractintroductionsystem modelPROPOSED JOINT OPTIMIZATION OF ITS AND COVERT TRANSMISSION RATE信息论安全 &#xff08;ITS&#xff09;隐蔽通信需要&#xff08;CC&#xff09; Joint Information-Theoretic Secrecy and Covert Communication in the Pre…

AWS Remote Control ( Wi-Fi ) on i.MX RT1060 EVK - 2 “架构 AWS”

接续上一章节&#xff0c;我们把开发环境架设好之后&#xff0c;此章节叙述如何建立 AWS IoT 环境&#xff0c;请务必已经有 AWS Account&#xff0c;申请 AWS Account 之流程将不在此说明。 III-1. 登入AWS IoT&#xff0c; 在“管理”>“所有装置”>“实物”下点击“建…

VS2019 下配置 OpenCV4.6.0 库

一、编辑电脑系统环境变量。 二、打开 VS2019 新建一个C项目。 1.进行Debug和Release的配置 X64 平台。 2.属性配置&#xff1a;VC目录 -> 包含目录 3.属性配置&#xff1a;VC目录 -> 库目录 4.属性配置&#xff1a;链接器 -> 输入 -> 附加依赖项 带 d 的 .lib 为 …

Web开发学习HTTP协议、通过浏览器控制台学习HTTP协议。

文章目录 HTTP协议1.HTTP协议是什么&#xff1f;2.HTTP协议的特点3.什么是URL?4.通过浏览器控制台学习HTTP协议Request Headers请求数据格式说明Response Headers请求数据格式说明 5.HTTP工作原理 HTTP协议 1.HTTP协议是什么&#xff1f; HTTP协议是一种超文本传输协议&…

合并PDF(将多个pdf文件整合成一个pdf文件)

推荐使用下面这个免费在线的PDF文件合并工具&#xff0c;简单且易操作。 合并PDF - 在线上免费合并PDF文件 (smallpdf.com) 还有其他功能&#xff0c;不过现在我尚未使用其他功能&#xff1a; 关于费用&#xff1a;

Python 云服务器应用,Https,定时重启

Python 云服务器应用,Https,定时重启 环境搭建Python模块模块导入生成Flask实例GET处理启动服务器打开网页验证 GET接入证书 支持https申请证书下载证书保留 xxx.crt 和 xxx.key文件就可以了 copy到python项目目录ssl_context 配置 宝塔面板操作在www目录下新建python工作目录在…

Python实现FA萤火虫优化算法优化LightGBM回归模型(LGBMRegressor算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 萤火虫算法&#xff08;Fire-fly algorithm&#xff0c;FA&#xff09;由剑桥大学Yang于2009年提出 , …

【力扣】240.搜索二维矩阵

题目意思是从该矩阵之中查找出是否有和target一样的值&#xff0c;若有则返回true&#xff0c;无则返回false。这里我用的是java。总共有三种方法&#xff0c;分别是暴力解题法&#xff08;能过&#xff09;&#xff0c;二分查找法&#xff08;就是将二维数组拆分成m个二维数组…

NDIS协议驱动开发指南

文章目录 NDIS协议驱动开发指南1. 技术概览2. NDIS协议驱动2.1 BindAdapterHandlerEx2.2 SendNetBufferListsCompleteHandler2.3 ReceiveNetBufferListsHandler2.4 ProtocolNetPnpEvent 3. NET_BUFFER_LIST4. ndisprot实例5. 总结 NDIS协议驱动开发指南 我们知道&#xff0c;在…