Flink理论—容错之状态

Flink理论—容错之状态

在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。

Flink 使用流重放检查点的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,

为了使该机制实现其完全保证,数据流源(例如消息队列或代理)需要能够将流倒回到定义的最近点。Apache Kafka具有这种能力,Flink 的 Kafka 连接器利用了这一点。

如果发生程序故障(由于机器、网络或软件故障),Flink 会停止分布式流数据流。然后系统重新启动算子并将其重置为最新的成功检查点。输入流重置为状态快照点。作为重新启动的并行数据流的一部分进行处理的任何记录都保证不会影响之前的检查点状态。

状态

我们在 Flink 的官方博客中找到这样一段话,可以认为这是对状态的定义:

When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.

这段话告诉我们,所谓的状态指的是,在流处理过程中那些需要记住的数据,而这些数据既可以包括业务数据,也可以包括元数据。Flink 本身提供了不同的状态管理器来管理状态,并且这个状态可以非常大。

有状态操作的一些示例:

  • 当应用程序搜索某些事件模式时,状态将存储迄今为止遇到的事件序列。
  • 当每分钟/小时/天聚合事件时,状态保存待处理的聚合。
  • 当通过数据点流训练机器学习模型时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态允许有效访问过去发生的事件。

Flink 状态分类

我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。在 Flink 中,根据数据集是否按照某一个 Key 进行分区,将状态分为 Keyed StateOperator State(Non-Keyed State)两种类型。

image (4).png

如上图所示,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态,图中的八边形、圆形和三角形分别管理各自的状态,并且只有指定的 key 才能访问和更新自己对应的状态。

这里还需要说明这个 Key 是隐含的,也就是不需要我们手动去操作,我们只要定义一个描述符,然后去操作状态即可,也就是说 Key 本身也是托管的

上面那张图还是简化了很多,真实的状态是这样的,我们的一个算子实例还是托管了很多Key 的,其实为了方便理解你可将其理解为一个大的Map ,

image-20240216085912214

对 Keyed State 的访问只能在 Keyed Stream上进行,即在键控/分区数据交换之后,并且仅限于与当前事件的键关联的值。

Keyed State 进一步组织成所谓的键组。Key Groups 是 Flink 可以重新分配 Keyed State 的原子单元;键组的数量与定义的最大并行度完全相同。在执行期间,键控运算符的每个并行实例都使用一个或多个键组的键

与 Keyed State 不同的是,Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。

Operator State 的实际应用场景不如 Keyed State 多,一般来说它会被用在 Source 或 Sink 等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。

但是有一点需要说明的是,无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

image (5).png

我们可以看一下 State 的类图,对于 Keyed State,Flink 提供了几种现成的数据结构供我们使用,State 主要有四种实现,分别为 ValueState、MapState、AppendingState 和 ReadOnlyBrodcastState ,其中 AppendingState 又可以细分为ReducingState、AggregatingState 和 ListState。

那么我们怎么访问这些状态呢?Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,类图如下:

image (6).png

状态的使用

状态的基本使用

下面演示一下如何使用 StateDesciptor 和 ValueState,代码如下:

public static void main(String[] args) throws Exception {

   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

   env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 5L), Tuple2.of(1L, 2L))
         .keyBy(0)
         .flatMap(new CountWindowAverage())
         .printToErr();

       env.execute("submit job");

}
 public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

       private transient ValueState<Tuple2<Long, Long>> sum;
       public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

           Tuple2<Long, Long> currentSum;
           // 访问ValueState
           if(sum.value()==null){
               currentSum = Tuple2.of(0L, 0L);
           }else {
               currentSum = sum.value();
           }
           // 更新
           currentSum.f0 += 1;
           // 第二个元素加1
           currentSum.f1 += input.f1;
           // 更新state
           sum.update(currentSum);

           // 如果count的值大于等于2,求知道并清空state
           if (currentSum.f0 >= 2) {
               out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
               sum.clear();
           }
   }

   public void open(Configuration config) {
       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
               new ValueStateDescriptor<>(
                       "average", // state的名字
                       TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                       ); // 设置默认值

       StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.seconds(10))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

       descriptor.enableTimeToLive(ttlConfig);

       sum = getRuntimeContext().getState(descriptor);
       }
}

在上述例子中,我们通过继承 RichFlatMapFunction 来访问 State,通过 getRuntimeContext().getState(descriptor) 来获取状态的句柄。而真正的访问和更新状态则在 Map 函数中实现。

我们这里的输出条件为,每当第一个元素的和达到二,就把第二个元素的和与第一个元素的和相除,最后输出。我们直接运行,在控制台可以看到结果:

image (7).png

状态管理配置

同样,我们对于任何状态数据还可以设置它们的过期时间。如果一个状态设置了 TTL,并且已经过期,那么我们之前保存的值就会被清理。

想要使用 TTL,我们需要首先构建一个 StateTtlConfig 配置对象;然后,可以通过传递配置在任何状态描述符中启用 TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(10))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
descriptor.enableTimeToLive(ttlConfig);

image (8).png

StateTtlConfig 这个类中有一些配置需要我们注意:

image (9).png

UpdateType 表明了过期时间什么时候更新,而对于那些过期的状态,是否还能被访问则取决于 StateVisibility 的配置。

总结

主要Flink 中的状态分类和使用,并且用实际案例演示了用法;关于状态后端我们可以参考下一节

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/389859.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一周学会Django5 Python Web开发-项目配置settings.py文件-模版配置

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计17条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

OpenAI突然发布首款文生视频模型——Sora;谷歌发布Gemini 1.5,迈向多模态大模型新时代

&#x1f989; AI新闻 &#x1f680; OpenAI突然发布首款文生视频模型——Sora 摘要&#xff1a;OpenAI发布了首个AI视频模型Sora&#xff0c;可以根据文字指令生成神级效果的长视频&#xff0c;引发了广泛关注和震惊。 Sora模型通过深入理解语言和图像&#xff0c;能够创造出…

阿里云香港服务器多少钱一年?288元

阿里云香港服务器2核1G、30M带宽、40GB ESSD系统盘优惠价格24元/月&#xff0c;288元一年&#xff0c;每月流量1024GB&#xff0c;多配置可选&#xff0c;官方优惠活动入口 https://t.aliyun.com/U/bLynLC 阿里云服务器网aliyunfuwuqi.com分享阿里云香港服务器优惠活动、详细配…

前端工程化面试题 | 10.精选前端工程化高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

【算法设计与分析】搜索旋转排序数组

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;算法分析与设计 ⛺️稳中求进&#xff0c;晒太阳 题目 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff…

Covalent Network(CQT)与卡尔加里大学建立合作,共同推动区块链技术创新

Covalent Network&#xff08;CQT&#xff09;作为领先的 Web3 数据索引器和提供者&#xff0c;宣布已经与卡尔加里大学达成了具备开创性意义的合作&#xff0c;此次合作标志着推动区块链数据研究和可访问性的重要里程碑。卡尔加里大学是首个以验证者的身份加入 Covalent Netwo…

程序全家桶 | 机器学习之心【Python机器学习/深度学习程序全家桶】

理论背景 机器学习&#xff08;Machine Learning&#xff09;是一种人工智能&#xff08;Artificial Intelligence&#xff09;领域的技术和方法&#xff0c;通过使用数据和统计模型&#xff0c;使计算机系统能够自动学习和改进&#xff0c;而无需明确地进行编程。机器学习使计…

RUST入门:如何用vscode调试rust程序

RUST已经流行一阵子了&#xff0c;但是比较系统的IDE介绍还是比较少&#xff0c;这里我简单介绍 一下如何用vscode实现单步调试rust程序&#xff0c;就像我们平时调试c程序一样。 学习资料网站 首先&#xff0c;介绍几个学习rust的好网站&#xff0c; Rust程序设计语言Rust语…

html从零开始8:css3新特性、动画、媒体查询、雪碧图、字体图标【搬代码】

css3新特性 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, …

分布式事务详解

概述 随着互联网的发展,软件系统由原来的单体应用转变为分布式应用。分布式系统把一个单体应用拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作。这种分布式系统下不同服务之间通过远程协作完成的事务称之为分布式事务,例如用户注册送积分事务…

一、部署Oracle

部署Oracle 一、Docker部署1.Oracle11g1.1 测试环境1.1.1 拉取镜像1.1.2 启动容器1.1.3 配置容器环境变量1.1.4 修改sys、system用户密码1.1.5 创建表空间1.1.6 创建用户并授权1.1.5 使用DBeaver测试连接 二、安装包部署 一、Docker部署 1.Oracle11g 1.1 测试环境 当前只能用…

MATLAB知识点:perms函数(★★★☆☆)用来返回向量v中各元素所有可能的排列情况

讲解视频&#xff1a;可以在bilibili搜索《MATLAB教程新手入门篇——数学建模清风主讲》。​ MATLAB教程新手入门篇&#xff08;数学建模清风主讲&#xff0c;适合零基础同学观看&#xff09;_哔哩哔哩_bilibili 节选自第3章&#xff1a;课后习题讲解中拓展的函数 在讲解第三…

适用于电脑和手机的照片恢复工具指南

这是适用于 Android、iPhone、Mac 和 Windows 的最佳照片恢复应用程序的指南。 如果您不小心删除了一堆珍贵的照片&#xff0c;请不要担心&#xff01; 恢复丢失的照片和数据实际上比您想象的要容易得多。 通过使用照片恢复应用程序&#xff0c;您可以“解锁”存储卡或硬盘驱…

OpenCV Mat实例详解 四

OpenCV Mat实例详解三中详细介绍来了OpenCV Mat类的公有静态成员函数&#xff0c;下面介绍OpenCV Mat类的其他常用成员函数。 OpenCV Mat类常用成员函数 Mat & adjustROI (int dtop, int dbottom, int dleft, int dright)&#xff1b; dtop ROI 上边界移动值&#xff0c;如…

BUGKU-WEB game1

题目描述 题目截图如下&#xff1a; 进入场景看看&#xff1a; 是一个盖楼的游戏&#xff01; 解题思路 先看看源码&#xff0c;好像没发现什么特别的是不是要得到一定的分数才会有对应的flag&#xff1f;查看下F12&#xff0c;请求链接发现&#xff0c;这不就提示了 相…

[计算机网络]---序列化和反序列化

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、再谈协议…

C++动态规划-线性dp算法

莫愁千里路 自有到来风 CSDN 请求进入专栏 X 是否进入《C专栏》? 确定 目录 线性dp简介 斐波那契数列模型 第N个泰波那契数 思路&#xff1a; 代码测试&#xff1a; 三步问题 思路&#xff1a; 代码测试&#xff1a; 最小花费爬楼梯 思路…

Java:如何判断一个链表是否为回文结构?(画图+代码 详解)

一、判断思想 我们设计一个时间复杂度为O(n),额外空间复杂度为O(1)的算法&#xff0c;我们在不创建额外空间的基础上来判断是否为回文结构。 思想&#xff1a; 1、使用快慢指针法&#xff0c;找到链表的中间节点。 2、翻转中间节点的后半部分。 3、分别从头节点和尾节点向中间遍…

LeetCode刷题计划

LeetCode刷题计划 推荐 代码随想录&#xff1a;https://github.com/youngyangyang04/leetcode-master 卡码网 练习ACM模式 https://kamacoder.com/ 01 #include <iostream> using namespace std;int main() {int a ,b;while(cin>>a>>b){cout<<ab<…

从汇编分析C语言可变参数的原理,并实现一个简单的sprintf函数

C语言可变参数 使用printf等函数的时候函数原型是printf(const char* fmt, ...), 这一类参数的个数不限的函数是可变参数 使用 使用一个头文件stdarg.h, 主要使用以下的宏 typedef char * va_list;// 把 n 圆整到 sizeof(int) 的倍数 #define _INTSIZEOF(n) ( (sizeo…