Flink 状态管理与容错机制(CheckPoint SavePoint)的关系

一、什么是状态

无状态计算的例子: 例如一个加法算子,第一次输入2+3=5那么以后我多次数据2+3的时候得到的结果都是5。得出的结论就是,相同的输入都会得到相同的结果,与次数无关。
有状态计算的例子: 访问量的统计,我们都知道Nginx的访问日志一个请求一条日志,基于此我们就可以统计访问量。如下,/api/a这个url第一此访问的时候,返回的结果就是 count1,但当第二次访问的时候,返回的结果变成了2。为什么Flink知道之前已经处理过一次 hello world,这就是state发挥作用了,这里是被称为keyed state存储了之前需要统计的数据,keyby接口的调用会创建keyed streamkey进行划分,这是使用keyed state的前提。得出的结论就是,相同的输入得到不同的结果,与次数有关。这就是有状态的数据。
[点击并拖拽以移动] ​

什么场景下会大量使用到这种状态数据啦?简单举几个例子:
【1】去重的需求中,比如说我们只想知道这100个同事都属于那几个部门的等等。
【2】窗口计算,已进入未触发的数据。比如,我们一分钟统计一次,1-2之间的1.5这个时候的数据对于2来说就是一个有状态的数据,因为2的结果与1.5有关。
【3】机器学习/深度学习,训练的模型及参数。这对于机器学习的同学深入感触。比如,第一次输入hello,机器会给我一个反馈,那么下次会基于这个反馈做进一步的学习处理。那么上一步的结果对于我而言就是一种有状态的输入。
【4】访问历史数据,需要与昨日进行对比。昨日的数据对于今日而言也属于一种状态。你品,你细品。

为什么要管理状态,用内存不香吗?首先流失作业是有它的标准的,不是什么东西随随便便就说自己这个是流失处理。首先,7*24小时运行,高可靠,你内存不行吧,你的容量总有用完的时候吧。其次,数据不丢失不重,恰好计算一次,你内存要实现需要备份和恢复,你还总伴随着小部分数据的丢失吧。最后,数据实时产生,不延迟,你内存不够横向扩展时,你需要延迟吧。

理想的状态管理就是下面描述的样子,Flink也都帮我们实现了。
[点击并拖拽以移动] ​

二、状态的类型

Managed State & Raw State

Managed StateRaw State
状态管理方式Flink Runtime 管理 —自动存储,自动恢复 —内存管理上有优化用户自己管理(Flink不知道你在State中存储的数据结构的) —要自己实例化
状态数据结构已知的数据结构 —value,list,map…字节数据 —byte[]
推荐使用场景大多数情况下均可使用自定义 Operator 时可以使用(当Managed State 不够时使用)

Managed Stated 分为: Keyed StatedOperator State
【1】Keyed Stated: 只能用于keyBy生成的KeyedStream上的算子。每一个key对应一个State,一个Operator实例处理多个Key,访问相应的多个State。相同Key会在相同的实例中处理。整个过程如果没有keyBy操作,它是没有KeyedStream的,而Keyed Stated只能应用在KeyedStream 上。

并发改变: State随着Key在实例间迁移。例如:实例A中之前处理KeyAKeyB,后面我扩展了实例B,那么 实例A就只需要处理KeyAKeyB就交给 实例B进行处理。安装状态进行分离,可以理解为分布式。

通过 RuntimeContext 访问,说明Operator是一个Rich Function,否则是拿不到RuntimeContext

支持的数据结构: ValueStateListStateReducingStateAggregatingStateMapState

【2】Operator State: 可以用于所有的算子,常用于source上,例如FlinkKafkaConsumer。一个Operator实例对应一个State,所以一个Operator中会处理多个key,可以理解为集群。

并发改变: Operator State没有key,并发改变的时候就需要重新分配。内置了两种方案:均匀分配和合并后每个得到全量。

访问方式: 实现CheckpointedFunctionListCheckpointed接口。

支持的数据结构: ListState

三、Keyed State 使用示例

什么是 keyed state: 对于keyed state,有两个特点:
【1】只能应用于KeyedStream 的函数与操作中,例如Keyed UDF, window state
【2】keyed state是已经分区 / 划分好的,每一个 key 只能属于某一个 keyed state
对于如何理解已经分区的概念,我们需要看一下keyby的语义,大家可以看到下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过keyby会进行相应的分发。例如对于hello wordhello这个词通过hash运算永远只会到右下方并发的task上面去。
[点击并拖拽以移动] ​

什么是 operator state
【1】又称为non-keyed state,每一个operator state都仅与一个operator的实例绑定。
【2】常见的operator statesource state,例如记录当前sourceoffset再看一段使用operator stateword count代码:
[点击并拖拽以移动] ​

这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为list stateoperator state。如下几种Keyed State之间的依赖关系,都是state的子类。它们的访问方式和数据结构都有一定的区别。
[点击并拖拽以移动] ​

状态数据类型访问接口备注
ValueState单个值[update(T) 修改/T value 获取]例如 WordCount 用 word 做 key,state就是单个的数值。这个单个也可以是字符串、对象等都有可能。访问方式只有上面两种。
MapStateMapput(UK key, UV value) putAll(Map<UK,UV> map) remove(UK key) boolean contains(UK key) UV get(UK key) Iterable<Map.Entry> entries() Iterable<Map.Entry> iterator() Iterable keys() Iterable values()能够操作具体的对象的key
ListStateListadd/ addAll(List) update(List) Iterable get()
ReducingState单个值add/ addAll(List) update(List) T get()与 List 是同一个父类,这个add是直接将数据更新进了 Reducing的结果里面。举个例子,例如我们统计1分钟的结果,list是先将数据添加到list中,等到1分钟的时候全来出来统计。而 Reducing是来一条就统计一条结果。好处是节省内存。
AggregatingState单个值add(IN)/OUT get()与 List 是同一个父类,与Reducing的不同是,Reducing输入和输出的类型都是相同的。而Aggregating 是可以不同的。例如,我要计算一个平局值,Reducing是算好返回,而Aggregating会返回总和和个数。

举个ValueState的案例

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据流
DataStream<Event> events = env.addSource(source);

DataStream<Alert> alerts = events
        // 生成 keyedStata 通过 sourceAddress
        .keyBy(Event::sourceAddress)
        // StateMachineMapper 状态机
        .flatMap(new StateMachineMapper());


//我么看下状态机怎么写   实现 RichFlatMapFunction
@SuppressWarnings("serial")
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {

    private ValueState<LeaderLatch.State> currentState;

    @Override
    public void open(Configuration conf) {
        // 获取一个 valueState
        currentState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("state", State.class));
    }

    //来一条数据处理一条
    @Override
    public void flatMap(Event evt, Collector<Alert> out) throws Exception {
        // 获取 value
        State state = currentState.value();
        if (state == null) {
            state = State.Initial;//State 是本地的变量
        }

        // 把事件对状态的影响加上去,得到一个状态
        State nextState = state.transition(evt.type());

        //判断状态是否合法
        if (nextState == State.InvalidTransition) {
            //扔出去
            out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
        }
        //是否不能继续转化了,例如取消的订单
        else if (nextState.isTerminal()) {
            // 从 state 中清楚掉
            currentState.clear();
        }
        else {
            // 修改状态
            currentState.update(nextState);
        }
    }
}

四、CheckPoint 与 state 的关系

Checkpoint是从source触发到下游所有节点完成的一次全局操作。下图可以有一个对Checkpoint的直观感受,红框里面可以看到一共触发了 569KCheckpoint,然后全部都成功完成,没有fail的。
[点击并拖拽以移动] ​

**state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,**看下图的具体数据统计,其state也就9kb大小 。
[点击并拖拽以移动] ​

五、状态如何保存和恢复

Checkpoint定时制作分布式快照,对程序的状态进行备份。发生故障时,将整个作业的Task都回滚到最后一次成功Checkpoint中的状态,然后从保存的点继续处理。

必要条件: 数据源支持重发(如果不重发,丢失的消息就真的丢了)

一致性语义: 恰好一次(如果p相同,单线程,多个线程时,可能有的算子对其已经计算了一次了,有的没有就需要注意),至少一次。

//  获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//状态数据
//两个checkpoint 触发间隔设置1S,越频繁追的数据就越少,io消耗也越大
env.enableCheckpointing(1000);
//EXACTLY_ONCE语义说明 Checkpoint是要对替的,这样消息不会重复,也不会对丢。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//两个checkpoint 最少等待500ms 例如第一个checkpoint做了700ms按理300ms后就要做下一个checkpoint。但是它们之间的等待时间300ms<500ms 此时,就会延长200ms减少checkpoint过于频繁,影响业务。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoint多久超时,如果这个checkpoint在1分钟内还没做完,那就失败了
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同时最多有多少个checkpoint进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//当重新分配并发度,拆分task时,是否保存checkpoint。如果不保存就需要使用savepoint来保存数据,放到外部的介质中。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

Checkpoint vs Savepoint

CheckpointSavepoint
触发管理方式由Flink自动触发并管理由用户手动触发并管理
主要用途在 Task 发生异常时快速恢复,例如网络抖动导致的超时异常有计划的进行备份,使作业能停止后再恢复,例如修改代码、调整并发。
特点轻量、自动从故障中服务、在作业停止后默认清除持久、以标准格式存储,允许代码或配置发生变化、手动触发 savepoint 恢复。

可选的状态存储方式:
【1】MemoryStateBackend:构造方法:

MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)

存储方式: StateTaskManager内存。CheckpointJobManager内存。
容量限制: 单个State maxStateSize默认5MmaxStateSize <= akka.framesize默认10M。总大小不超过JobManager内存。
推荐使用场景: 本地测试,几乎无状态的作业,比如ETL/JobManager不容易挂,或影响不大的情况。不推荐在生产场景使用。

【2】FsStateBackend: 构造方法:

FsStateBackend(URL checkpointDataUri, boolean asynchronousSnapshots)

存储方式: StateTaskManager内存。Checkpoint:外部文件系统(本地或HDFS)。
容量限制: 单个TaskManagerState总量不超过它的内存。总大小不超过配置的文件系统容量(会定期清理)。
推荐使用场景: 常规使用状态的作业,例如分钟级窗口聚合、join。需要开启HA的作业。可以在生产环境使用。

【3】RocksDBStateBackend: 构造方法:

RocksDBStateBackend(URL checkpointDataUri, boolean enableIncrementalCheckpointing)

存储方式: StateTaskManager上的KV数据库(实际使用内存+磁盘)。Checkpoint:外部文件系统(本地或HDFS)。
容量限制: 单个TaskManagerState总量不超过它的内存+磁盘,单个key 最大2G。总大小不超过配置的文件系统容量。
推荐使用场景: 超大状态的作业,例如天级窗口聚合。需要开启HA的作业。对状态读写性能要求比较高的作业。可以在生产环境使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/262147.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【科学计算语言】实验三 Python复杂数据类型

【目的和要求】 &#xff08;1&#xff09;掌握Python语言中的组合数据类型 &#xff08;2&#xff09;掌握列表、元组、字典、集合及字符串的基本应用 &#xff08;3&#xff09;熟练运用有关序列操作的Python内置函数 【实验准备】 【实验内容】 1. 实验练习&#xff1a;掌握…

HTML5的完整学习笔记

HTML 什么是HTML&#xff1a; 作为前端三件套之一&#xff0c;HTML的全称是超文本标记语言&#xff08;Hypertext Markup Language&#xff09;。HTML是一种标记语言&#xff0c;用于创建网页。它由一系列标签组成&#xff0c;这些标签用于定义网页的结构和内容。HTML标签告诉…

[XR806开发板试用] XR806——基于FreeRTOS下部署竞技机器人先进模糊控制器

前言 很荣幸参与到由“极术社区和全志在线联合组织”举办的XR806开发板试用活动。本人热衷于各种的开发板的开发&#xff0c;同时更愿意将其实现到具体项目中。秉承以上原则&#xff0c;发现大家的重心都放在开发中的环境构建过程&#xff0c;缺少了不少实际应用场景的运用&am…

谷歌Gemini造假始末

&#x1f4a1;大家好&#xff0c;我是可夫小子&#xff0c;《小白玩转ChatGPT》专栏作者&#xff0c;关注AIGC、读书和自媒体。 在过去一年中&#xff0c;OpenAI ChatGPT引发了一股AI新浪潮&#xff0c;而谷歌则一直处于被压制的状态&#xff0c;迫切需要一款现象级的AI产品来…

【UML】第10篇 类图(属性、操作和接口)(2/3)

目录 3.3 类的属性&#xff08;Attribute&#xff09; 3.3.1 可见性&#xff08;Visibility&#xff09; 3.3.2 属性的名称 3.3.3 数据类型 3.3.4 初始值 3.3.5 属性字符串 3.4 类的操作&#xff08;Operations&#xff09; 3.4.1 参数表 3.4.2 返回类型 3.5 类的职责…

浅述无人机技术在地质灾害应急救援场景中的应用

12月18日23时&#xff0c;甘肃临夏州积石山县发生6.2级地震&#xff0c;震源深度10千米&#xff0c;灾区电力、通信受到影响。地震发生后&#xff0c;无人机技术也火速应用在灾区的应急抢险中。目前&#xff0c;根据受灾地区实际情况&#xff0c;翼龙-2H应急救灾型无人机已出动…

Kafka集群架构原理(待完善)

kafka在zookeeper数据结构 controller选举 客户端同时往zookeeper写入, 第一个写入成功(临时节点), 成为leader, 当leader挂掉, 临时节点被移除, 监听机制监听下线,重新竞争leader, 客户端也能监听最新leader leader partition自平衡 leader不均匀时, 造成某个节点压力过大, …

一套rk3588 rtsp服务器推流的 github 方案及记录 -03(完结)

opencv 解码记录 解码库使用的时候发现瑞芯微以前做过解码库对ffmpeg和gstreamer的支持 然后最近实在不想再调试Rtsp浪费时间了&#xff0c;就从这中间找了一个比较快的方案 ffmpeg 带硬解码库编译 编译流程参考文献 https://blog.csdn.net/T__zxt/article/details/12342435…

opencv静态链接error LNK2019

opencv 3.1.0 静态库&#xff0c;包括以下文件 只链接opencv_world310d.lib&#xff0c;报错 opencv_world310d.lib(matrix.obj) : error LNK2019: 无法解析的外部符号 _ippicvsFlip_16u_I8&#xff0c;该符号在函数 "enum IppStatus (__stdcall*__cdecl cv::getFlipFu…

鸿蒙系列--组件介绍之基础组件

一、通用属性和文本样式 针对包含文本元素的组件&#xff08;比如&#xff1a;Text、Span、Button、TextInput等&#xff09;&#xff0c;可以设置一些通用的文本样式&#xff0c;比如颜色&#xff1a;fontColor、大小&#xff1a;fontSize、样式&#xff1a;fontStyle、 粗细…

Spring(1)Spring从零到入门 - Spring特点,系统架构简介,两个核心概念IoC与DI(涉及管理第三方bean)

Spring&#xff08;1&#xff09;Spring从零到入门 - Spring特点&#xff0c;系统架构简介&#xff0c;两个核心概念IoC与DI&#xff08;涉及管理第三方bean&#xff09; 引入&#xff1a;单体服务器 "单体服务器的开发"通常指的是在一个单一的服务器上构建和部署整个…

微信小程序 动态设置状态栏样式

onLoad(options) {//修改状态栏标题wx.setNavigationBarTitle({title: 页面标题, //页面标题success: () > {}, //接口调用成功的回调函数fail: () > {}, //接口调用失败的回调函数complete: () > {} //接口调用结束的回调函数&#xff08;调用成功、失败…

C# SixLabors.ImageSharp.Drawing的多种用途

生成验证码 /// <summary> /// 生成二维码 /// </summary> /// <param name"webRootPath">wwwroot目录</param> /// <param name"verifyCode">验证码</param> /// <param name"width">图片宽度</…

互联网加竞赛 python+大数据校园卡数据分析

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于yolov5的深度学习车牌识别系统实现 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;4分工作量&#xff1a;4分创新点&#xff1a;3分 该项目较为新颖&am…

德人合科技 | 设计公司文件加密系统——天锐绿盾自动智能透明加密防泄密系统

设计公司文件加密系统——天锐绿盾自动智能透明加密防泄密系统 PC端访问地址&#xff1a; www.drhchina.com 一、背景介绍 设计公司通常涉及到大量的创意作品、设计方案、客户资料等重要文件&#xff0c;这些文件往往包含公司的核心价值和商业机密。因此&#xff0c;如何确保…

@vue/cli脚手架

0_vue/cli 脚手架介绍 目标: webpack自己配置环境很麻烦, 下载vue/cli包,用vue命令创建脚手架项目 vue/cli是Vue官方提供的一个全局模块包(得到vue命令), 此包用于创建脚手架项目 脚手架是为了保证各施工过程顺利进行而搭设的工作平 vue/cli的好处 开箱即用 0配置webpack babe…

个人财务工具、密钥管理平台、在线会计软件、稍后阅读方案 | 开源专题 No.51

gethomepage/homepage Stars: 10.1k License: GPL-3.0 这个项目是一个现代化、完全静态的、快速且安全的应用程序仪表盘&#xff0c;具有超过 100 种服务和多语言翻译的集成。 快速&#xff1a;网站在构建时以静态方式生成&#xff0c;加载时间飞快。安全&#xff1a;所有对后…

全面掌握XSS漏洞攻击,实战案例从Self-XSS到账户接管,以及通过参数污染的XSS实现攻击

全面掌握XSS漏洞攻击,实战案例从Self-XSS到账户接管。 什么是跨站脚本攻击 (XSS)? 跨站脚本攻击(XSS)是一种网络安全漏洞,允许攻击者破坏用户与易受攻击的应用程序之间的交互。它允许攻击者绕过同源策略,该策略旨在将不同的网站隔离开来。XSS漏洞通常允许攻击者伪装成受…

Unity中Shader缩放矩阵

文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量&#xff0c;用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘&#xff0c;来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…

【CentOS 7.9 分区】挂载硬盘为LVM操作实例

LVM与标准分区有何区别&#xff0c;如何选择 目录 1 小系统使用LVM的益处&#xff1a;2 大系统使用LVM的益处&#xff1a;3 优点&#xff1a;CentOS 7.9 挂载硬盘为LVM操作实例查看硬盘情况格式化硬盘创建PV创建VG创建LV创建文件系统并挂载自动挂载添加&#xff1a;注意用空格间…