Flink状态的理解

Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

1. 状态

所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。
在这里插入图片描述
根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)

  • operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享
    在这里插入图片描述
    Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
  • keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State
    在这里插入图片描述

2. 状态数据结构

状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构

2.1 keyed state 数据结构

状态状态描述
ValueState保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key
ListState保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索
MapState维护了一个映射列表
ReducingState保存一个单值,表示添加到状态的所有值的聚合
AggregateState保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同

2.2 operator state

状态状态描述
ListStateListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配
UnionListStateUnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据;

3. 状态后端

默认情况下,state会保存在taskmanager的JVM堆内存,checkpoint会存储在JobManager的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)配置实现

老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend

flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend

  • HashMapStateBackend
    ※ 状态数据是以java对象形式存储在heap内存中;
    ※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
    ※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低)

  • EmbeddedRocksDBStateBackend

    ※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。

    ※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

    ※ 使用RocksDB + HDFS进行state存储:首先state先在taskManger的本地存储到RocksDB,然后异步写入到HDFS中,状态数量仅仅受限于本地磁盘容量限制
    在这里插入图片描述

4. 状态数据容错

Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错

4.1 checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

  • checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once

  • checkPoint的位置设置flink-conf.yaml#state.checkpoints.dir

  • Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数flink-conf.yaml#restart-strategy

♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启

全局配置

# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none

单个JOB内配置

Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)
 env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
  • checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数flink-conf.yaml#state.checkpoints.num-retained控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job  -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata

4.2 savePoint

savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。

savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中。

作业升级、代码修改、任务迁移和维护,都可以使用savePoint

  1. savePoint的存储位置
    savePoint的存储位置flink-conf.yaml#state.savepoints.dir,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。

  2. savePoint的手动触发:

#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]

#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId

jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 查看HDFS上savepoint目录

#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/

#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata

在这里插入图片描述
4. 触发savepoint并且停止作业

##语法: bin/flink stop jobId -yid yarnAppId

##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 从指定的savepoint启动job

##语法: bin/flink run -s savepointPath [runArgs]

##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar

  1. 清除savepoint数据

bin/flink savepoint -d savepointPath

##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints

  1. savePoint使用建议

为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。

不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

4.3 savePoint与checkPoint的区别

  • checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
  • savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
  • checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
    在这里插入图片描述

5. checkpoint机制原理

checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照), 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。
在这里插入图片描述
若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流

5.1 Barrier机制

Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。
在这里插入图片描述

  1. 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
  2. source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
  3. source向checkpoint coordinator确认snapshot已经完成。
  4. source继续向下游transformation operator发送 barrier。
  5. transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
  6. coordinator确认完成本周期的snapshot已经完成。

5.2 Barrier对齐

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)
在这里插入图片描述
算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐

(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。

(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。

(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
在这里插入图片描述

6.快照性能优化方案

每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。

Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。

两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/46359.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何判断某个视频是深度伪造的?

目录 一、前言 二、仔细检查面部动作 三、声音可以提供线索 四、观察视频中人物的身体姿势 五、小心无意义的词语 深造伪造危险吗&#xff1f; 一、前言 制作深度伪造视频就像在Word文档中编辑文本一样简单。换句话说&#xff0c;您可以拍下任何人的视频&#xff0c;让他…

HEVC网络适配层介绍

h265 的分层结构 分层结构的目的 ○ 网络类型多种多样&#xff0c;不同的网络环境具有不同的特性&#xff0c;压缩视频在其中进行传输必然会受到影响&#xff1b;比如不同网络的 MTU 有所不同&#xff1b; ○ 不同的应用场景对视频有不同的需求&#xff0c;视频业务会喜用不…

html2Canvas+jsPDF 下载PDF 遇到跨域的对象存储的图片无法显示

一、问题原因 对象存储的域名和你网址的域名不一样&#xff0c;此时用Canvas相关插件 将DOM元素转化为PDF&#xff0c;就会出现跨域错误。 二、解决办法 两步 1. 图片元素上设置属性 crossorigin"anonymous" 支持原生img和eleme组件 2. 存储桶设置资源跨域访问…

Redis多级缓存

文章目录 多级缓存背景JVM进程缓存Caffeine案例分析安装MySQL导入SQL Lua语法变量与循环数据类型声明变量循环 函数与条件控制函数条件控制 实现多级缓存安装OpenResty安装opm工具目录结构配置Nginx的环境变量运行启动 快速入门反向代理流程OpenResty监听请求编写item.lua 请求…

2023河南萌新联赛第(三)场:郑州大学 F

把题中所给的式子进行展开&#xff0c;最终可以得到一个等比数列。运用等比数列求和公式即可。 相关知识点&#xff1a;欧拉降幂&#xff0c;逆元。 逆元的用处&#xff1a;因为求和公式需要去除分子&#xff0c;而大数除法去取模会丢失精度&#xff0c;所以可以采用求出分子…

【设计模式——学习笔记】23种设计模式——桥接模式Bridge(原理讲解+应用场景介绍+案例介绍+Java代码实现)

问题引入 现在对不同手机类型的不同品牌实现操作编程(比如:开机、关机、上网&#xff0c;打电话等)&#xff0c;如图 【对应类图】 【分析】 扩展性问题(类爆炸)&#xff0c;如果我们再增加手机的样式(旋转式)&#xff0c;就需要增加各个品牌手机的类&#xff0c;同样如果我们…

C++第三方开发库matplotlib-cpp

Matplotlib-cpp是一个用于在C中绘制图表的开源库。它提供了与Python的Matplotlib库类似的功能&#xff0c;使得在C环境下进行数据可视化变得更加便捷。基于Matplotlib-cpp&#xff0c;我们可以使用各种绘图函数和样式选项来创建各种类型的图表&#xff0c;包括折线图、散点图、…

Qtday2作业

1. 头文件 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QDebug> #include<QIcon> #include<QPushButton> #include<QLineEdit> #include<QLabel> #include<QMovie> class Widget : public QWidget…

【多线程带来的的风险-线程安全的问题的简单实例-线程不安全的原因】

文章目录 前言线程不安全的5大原因1. 抢占式执行和随机调度2. 多个线程同时修改一个变量(共享数据&#xff09;3. 修改操作不是原子性的4. 内存可见性5. 指令重排序 前言 什么是线程安全&#xff1f; 简单来说&#xff0c;如果多线程环境下代码运行的结果是符合我们预期的&am…

【SpringCloud Alibaba】(一)微服务介绍

此专栏内容皆来自于【冰河】的《SpringCloud Alibaba 实战》文档。 1. 专栏介绍 我们先来看看《SpringCloud Alibaba实战》专栏的整体结构吧&#xff0c;先上图 从上图&#xff0c;大家可以看到&#xff0c;专栏从整体上分为十个大的篇章&#xff0c;分别为 专栏设计、微服务…

macOS Ventura 13.5 (22G74) 正式版发布,ISO、IPSW、PKG 下载

macOS Ventura 13.5 (22G74) 正式版发布&#xff0c;ISO、IPSW、PKG 下载 本站下载的 macOS Ventura 软件包&#xff0c;既可以拖拽到 Applications&#xff08;应用程序&#xff09;下直接安装&#xff0c;也可以制作启动 U 盘安装&#xff0c;或者在虚拟机中启动安装。另外也…

全志F1C200S嵌入式驱动开发(GPIO输出)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 和v3s一样,f1c200s本身的外部引脚比较少。所以这个时候,不可避免地,很多引脚的功能就会重叠在一起。这种情况下,我们就要学会取舍了。比如说,如果是学习sd卡的时候,那么spi的…

阿里云NVIDIA A100 GPU云服务器性能详解及租用费用

阿里云GPU服务器租用费用表包括包年包月、一个小时收费以及学生GPU服务器租用费用&#xff0c;阿里云GPU计算卡包括NVIDIA V100计算卡、T4计算卡、A10计算卡和A100计算卡&#xff0c;GPU云服务器gn6i可享受3折&#xff0c;阿里云百科分享阿里云GPU服务器租用表、GPU一个小时多少…

拦截Bean使用之前各个时机的Spring组件

拦截Bean使用之前各个时机的Spring组件 之前使用过的BeanPostProcessor就是在Bean实例化之后&#xff0c;注入属性值之前的时机。 Spring Bean的生命周期本次演示的是在Bean实例化之前的时机&#xff0c;使用BeanFactoryPostProcessor进行验证&#xff0c;以及在加载Bean之前进…

最新Ai创作源码ChatGPT商用运营源码/支持GPT4.0+支持ai绘画+支持Mind思维导图生成

本系统使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到本系统&#xff01; 支持GPT3模型、GPT4模型Midjourney专业绘画&#xff08;全自定义调参&#xff09;、Midjourney以图生图、Dall-E2绘画Mind思维导图生成应用工作台&#xff08;Prompt&#xff09;AI绘画广场自定…

STM32—CAN通信

文章目录 一、CAN通信简介1.1 CAN简介1.2 CAN协议特点1.3 CAN通信的帧类型1.4 数据帧结构1.5 CAN的位时序1.6 CAN的仲裁功能 二、STM32F1的CAN2.1 bxCAN简介2.2 bxCAN工作模式2.2.1 初始化模式2.2.2 正常模式2.2.3 睡眠模式2.2.4 静默模式2.2.5 环回模式 2.3 位时序和波特率 三…

leetCode刷题记录3-面试经典150题

文章目录 不要摆&#xff0c;没事干就刷题&#xff0c;只有好处&#xff0c;没有坏处&#xff0c;实在不行&#xff0c;看看竞赛题面试经典 150 题80. 删除有序数组中的重复项 II189. 轮转数组122. 买卖股票的最佳时机 II 不要摆&#xff0c;没事干就刷题&#xff0c;只有好处&…

阿里云部署 ChatGLM2-6B 与 langchain+ChatGLM

1.ChatGLM2-6B 部署 更新系统 apt-get update 安装git apt-get install git-lfs git init git lfs install 克隆 ChatGLM2-6B 源码 git clone https://github.com/THUDM/ChatGLM2-6B.git 克隆 chatglm2-6b 模型 #进入目录 cd ChatGLM2-6B #创建目录 mkdir model #进入目录 cd m…

安全攻击 --- XSS攻击

XSS跨站脚本攻击 &#xff08;1&#xff09;简介 OWASP TOP 10 之一&#xff0c;XSS被称为跨站脚本攻击&#xff08;Cross-Site-Scripting&#xff09;主要基于JavaScript&#xff08;JS&#xff09;完成攻击行为XSS通过精心构造JS代码注入到网页中&#xff0c;并由浏览器解释…

在nginx上部署nuxt项目

先安装Node.js 我安的18.17.0。 安装完成后&#xff0c;可以使用cmd&#xff0c;winr然cmd进入&#xff0c;测试是否安装成功。安装在哪个盘都可以测试。 测试 输入node -v 和 npm -v&#xff0c;&#xff08;中间有空格&#xff09;出现下图版本提示就是完成了NodeJS的安装…