详解 Flink 的状态管理

一、Flink 状态介绍

1. 流处理的无状态和有状态

  • 无状态的流处理:根据每一次当前输入的数据直接转换输出结果的过程,在处理中只需要观察每个输入的独立事件。例如, 将一个字符串类型的数据拆分开作为元组输出或将每个输入的数值加 1 后输出。Flink 中的基本转换算子 (map、filter、flatMap 等) 在计算时不依赖其他数据,所以都属于无状态的算子。

在这里插入图片描述

  • 有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。

    在这里插入图片描述

2. Flink 的状态管理

  • 在传统的事务型处理架构中,状态数据一般是保存在数据库中的,在业务处理过程中与数据库交互进行状态的读取和更新;但对于大数据实时处理架构来说,在业务处理时频繁地读写外部数据库会造成性能达不到要求,因此不能使用数据库进行状态管理
  • 在实时流处理中一般将状态直接保存在内存中来保证性能,但必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题随之产生
  • Flink 拥有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态一致性、状态的高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整。开发者只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上

二、Flink 状态分类

1. 托管状态

Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现

1.1 算子状态

Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少

在这里插入图片描述

  • 由同一并行任务所处理的所有数据都可以访问到相同的算子状态
  • 算子状态对于同一任务而言是共享的
  • 算子状态不能由相同或不同算子的另一个任务访问
1.1.1 算子状态数据结构
  • 列表状态(List state):将状态表示为一组数据的列表
  • 联合列表状态(Union list state):也是将状态表示为一组数据的列表。与列表状态的区别在于,在发生故障时或者从保存点(savepoint)启动应用程序时恢复的方式不同
  • 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
1.1.2 案例
public class TestFlinkOperatorState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的map算子,用于统计输入数据个数
        DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());
        
        resultStream.print();
        
        env.execute();
        
    }
    
    //定义有状态的 map 操作
    //实现 ListCheckpointed 接口,泛型为状态数据类型
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
        //定义一个本地变量作为状态
        private Integer count = 0;
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }
        
        //对状态做快照
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }
        
        //容错恢复状态
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for(Integer num : state) {
                count += num;
            }
        }
        
    }
    
}
1.2 按键分区状态

Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用

在这里插入图片描述

  • 在进行按键分区(keyBy)之后,具有相同 key 的所有数据,都会分配到同一个并行子任务中,这个任务会维护和处理这个 key 对应的状态实例
  • 一个并行子任务可能会处理多个 key 的数据,所以该任务会为每个 key 都维护一个状态实例
  • 在底层,同一个并行子任务的所有 KeyedState 会根据 key 保存成键值对(key-value)的形式,当一条数据到来时,任务会自动将状态的访问范围限定为当前数据的 key,并从键值对(key-value)存储中读取出对应的状态值
  • 具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的
  • 在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN,  OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • 值状态:ValueState<T>,将状态表示为单个的值,值的类型为 T
    • ValueState.value():获取状态值
    • ValueState.update(T value):添加或更新状态值
    • ValueState.clear():清空操作
  • 列表状态:ListState<T>,将状态表示为一组数据的列表,列表里的元素的数据类型为 T
    • ListState.add(T value):追加状态值
    • ListState.addAll(List<T> values):追加状态值列表
    • ListState.get():获取状态值的 Iterable<T>
    • ListState.update(List<T> values):更新状态值列表
    • ListState.clear():清空操作
  • 映射状态:MapState<K, V>,将状态表示为一组 Key-Value 对
    • MapState.get(UK key):获取状态值
    • MapState.put(UK key , UV value):添加或更新状态值
    • MapState.contains(UK key):判断状态值是否存在
    • MapState.remove(UK key):删除状态值
    • MapState.clear():清空操作
  • 聚合状态:ReducingState<T>AggregatingState<I, O>,将状态表示为一个用于聚合操作的列表
    • ReducingState.add():聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理
    • ReducingState.clear():清空操作,AggregatingState 同理
1.2.2 案例
/**
	按键分区状态的使用步骤:
		1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function
		2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        
        /*
        	需求:自定义有状态的map算子,按sensor_id统计个数
        */
        //使用按键分区状态必须先进行keyBy
        DataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
        
        resultStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义MapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
        
        //定义一个值状态属性
        private ValueState<Integer> myValueState;
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));
        }
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            //获取状态值
            Integer count = myValueState.value();
            if(count == null) {
                count = 0;
            }
            
            count++;
                
            //更新状态值
            myValueState.update(count);
            
            return count;
        }
                
    }
}

2. 原始状态

Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复

  • Flink 不会对原始状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
  • 只有在遇到托管状态无法实现的特殊需求时,才考虑使用原始状态;一般情况下不推荐使用

三、Flink 状态编程案例

/**
	需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警
        //报警信息:sensor_id,前一次温度值,当前温度值
        DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));
        
        warningStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义FlatMapFunction
    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
        //定义温度差阈值属性
        private Double threshold;
        //定义值状态属性,保存上一次的温度值
        private ValueState<Double> lastTempState;
        
        public TempChangeWarning(Double threshold) {
            this.threshold = threshold;
        }
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
        }
        
        //重写flatMap方法
        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            //获取上一次温度状态值
            Double lastTemp = lastTempState.value();
            
            //如果状态值不为null,则进行差值判断
            if(lastTemp != null) {
                Double diff = Math.abs(lastTemp - value.getTemperature());
                //差值超过阈值,则输出报警信息
                if(diff >= threshold) {
                    out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
                }
            }
            
            //更新状态值
            lastTempState.update(value.getTemperature());
        }
        
        //在close方法中清空状态
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

四、Flink 状态后端

State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件

1. 介绍

​ 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2. 分类

  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
    在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。

  • FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。

  • RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    

3. 配置

3.1 配置文件配置
  • 进入 flink 安装目录下的 conf 目录,打开 flink-conf.yaml 文件

    cd /opt/module/flink/conf
    vim flink-conf.yaml
    
  • 在文件中的 Fault tolerance and checkpointing 部分进行配置

    #Fault tolerance and checkpointing
    #============================================================
    state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb
    
    #state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints
    
    jobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
    
3.2 代码配置

在代码中为每个作业单独配置状态后端

public class TestStatebackend {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //配置状态后端
        //1.MemoryStateBackend
        env.setStateBackend(new MemoryStateBackend());
        
        //2.FsStateBackend
        env.setStateBackend(new FsStateBackend("hdfs://......"));
        
        //3.RocksDBStateBackend,需要先引入依赖
        env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
    
        dataStream.print();
        
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/694000.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Pycharm中import torch报错解决方案(Python+Pycharm+Pytorch cpu版)

pycharm环境搭建完毕后&#xff0c;编写一个py文件demo&#xff0c;import torch报错&#xff0c;提示没有。设置python解释器&#xff1a; 选择conda环境&#xff0c;使用现有环境&#xff0c;conda执行文件找到Anaconda安装路径下Scripts文件夹内的conda.exe&#xff0c;最后…

程控直流电源:助力企业实现绿色转型与可持续发展

一、程控直流电源发展趋势 1. 程控直流电源的高效节能 在全球能源紧张、环境污染严重的背景下&#xff0c;高效节能成为电源行业的重要发展方向。程控直流电源采用先进的电源管理技术&#xff0c;实现了高效率、低功耗的目标。未来&#xff0c;随着技术的不断进步&#xff0c…

英语学习笔记33——A fine day

A fine day 风和日丽 词汇 Vocabulary day n. 日子&#xff0c;白天 复数&#xff1a;days 常见节日&#xff1a;Mothers’ Day 母亲节      Fathers’ Day 父亲节      Teachers’ Day 教师节      Children’s Day 儿童节      Women’s Day 妇女节 c…

IO流字符流(FileReader与FileWriter)

目录 FileReader 空参read方法 带参read方法&#x1f447; FileWriter void write(intc) 写出一个字符 void write(string str) 写出一个字符串 void write(string str,int off,int len) 写出一个字符串的一部分 void write(char[] cbuf) …

入门matlab

常识 如何建一个新文件 创建新文件&#xff0c;点击新建&#xff0c;我们就可以开始写代码了 为什么要在代码开头加入clear 假如我们有2个文件&#xff0c;第一个文件里面给x赋值100&#xff0c;第二个文件为输出x 依次运行&#xff1a; 结果输出100&#xff0c;这是因为它们…

less---20-28

less-20 这关登陆成功会显示cookie,所以抓包在cookie处注入 less-21 这关登陆成功会显示cookie,所以抓包在cookie处注入&#xff0c;发现不成功&#xff0c;查看代码发现被编码 先对注入语句进行base64编码再注入 less-22 闭合字符",同21关 less-23 这关查看代码发现…

高并发ping多台主机IP

简介 社区或者是大型公司往往有成千上万或者几百台设备&#xff0c;保持设备始终在线对网络运维人员来说至关重要&#xff0c;然而一个一个登录检查&#xff0c;或者一个一个ping并不明智&#xff0c;累人且效率极低&#xff0c;并出错率高。花钱买检测服务当我没说。 shell编…

C++【STL】改造红黑树简单模拟实现set map(带你了解set map的底层实现结构)

目录 一、学前铺垫&#xff08;泛型编程&#xff09; 二、改造红黑树 1.红黑树节点的改造 2.insert的改造 3.迭代器的实现 4.完整改造代码 三、set的模拟实现封装 四、map的模拟实现封装 五、完结撒❀ 前言&#xff1a; 下面为了简单模拟实现set map所出现的代码是以…

【诈骗离你我很近】中国同胞进来看看国外诈骗新套路。

前几天一个老外经常在CSDN给我发消息&#xff0c;我最开始很警惕&#xff0c;不过聊了大概半个月&#xff0c;我就没怎么怀疑他了&#xff0c;而且还很高兴认识了一个外国朋友。这半个月聊天内容很正常&#xff0c;就聊些中国的小习惯&#xff0c;让我教他用筷子。还问我有哪些…

算法家族之一——二分法

目录 算法算法的打印效果如果算法里的整型“i”为1如果算法里的整型“i”为11 算法的流程图算法的实际应用总结 大家好&#xff0c;我叫 这是我58&#xff0c;现在&#xff0c;请看下面的算法。 算法 #define _CRT_SECURE_NO_WARNINGS 1//<--预处理指令 #include <stdi…

实现手机空号过滤或手机号码有效性验证

手机空号过滤或手机号码有效性验证通常涉及使用专门的API接口来查询手机号码的状态。这些API接口通常由第三方服务提供商提供&#xff0c;它们会与电信运营商合作或利用自己的数据库来验证手机号码是否真实存在、是否已被分配、是否处于空号状态等。 以下是一些步骤和考虑因素…

Java:111-SpringMVC的底层原理(中篇)

这里续写上一章博客&#xff08;110章博客&#xff09;&#xff1a; 现在我们来学习一下高级的技术&#xff0c;前面的mvc知识&#xff0c;我们基本可以在67章博客及其后面相关的博客可以学习到&#xff0c;现在开始学习精髓&#xff1a; Spring MVC 高级技术&#xff1a; …

黑马程序员——Spring框架——day07——SpringBoot高级

目录&#xff1a; SpringBoot自动化配置原理 starter依赖管理机制自动化配置初体验Configuration配置注解Import注解使用1Import注解使用2Conditional衍生条件装配ConfigurationProperties配置绑定SpringBootApplication入口分析EnableAutoConfiguration自动配置注解按条件开启…

指针(初阶2)“野指针以及指针运算”

目录 一.野指针 二.如何避免野指针 三.指针运算 1、指针&#xff08;-&#xff09;整数 2、指针 - 指针 3、指针关系运算 小编在这里声明一下&#xff0c;将某一块的知识点分为上中下或者1&#xff0c;2&#xff0c;3来编写不是为了增加小编的文章总量&#xff0c;也不是故意这…

【大模型】Ollama+open-webui/Anything LLM部署本地大模型构建RAG个人知识库教程(Mac)

目录 一、Ollama是什么&#xff1f; 二、如何在Mac上安装Ollama 1. 准备工作 2. 下载并安装Ollama 3. 运行Ollama 4. 安装和配置大型语言模型 5. 使用Ollama 三、安装open-webui 1. 准备工作 2. Open WebUI ⭐的主要特点 3. Docker安装OpenWebUI&#xff0c;拉去太慢…

quick4 - hackmyvm

简介 靶机名称&#xff1a;quick4 难度&#xff1a;简单 靶场地址&#xff1a;https://hackmyvm.eu/machines/machine.php?vmQuick4 本地环境 虚拟机&#xff1a;vitual box 靶场IP&#xff08;quick4&#xff09;&#xff1a;192.168.56.104 跳板机IP(windows 11)&…

【OpenHarmony】ArkTS 语法基础 ⑤ ( ArkTS 状态管理 | @State 装饰器定义状态数据 | 使用状态数据渲染组件 )

文章目录 一、ArkTS 状态管理 - State 装饰器1、State 装饰器定义状态数据2、State 装饰器定义状态数据 - 示例分析3、使用 State 装饰器定义的状态数据渲染组件 - 示例分析 二、完整代码示例1、完整自定义组件代码示例2、展示效果 参考文档 : <HarmonyOS第一课>ArkTS开发…

2024年6月8日:极工度玩公司全球首发“竞技智慧,渔护海洋”竞渔棋,世界海洋日直播发布会圆满落幕

在纪念2024年世界海洋日之际&#xff0c;极工度玩公司举办了一场别具一格的全球益智游戏首发发布会&#xff0c;向世界彰显了其对海洋生态保护的坚定承诺与热忱。这场以“竞技智慧&#xff0c;渔护海洋”为主题的盛会&#xff0c;旨在为参与者带来创新的游戏体验&#xff0c;同…

归并排序法

归并排序法是典型的分治算法应用&#xff0c;1946年由冯.诺伊曼发明。 算法思路&#xff1a;归并排序算法有两个基本操作&#xff0c;一是分&#xff0c;也就是把原数组划分成两个子数组的过程&#xff0c;另一个是治&#xff0c;它将两个有序数组合并成一个更大的有序数组。 …

【SpringCloud学习笔记】Docker(中篇)

Docker 1. 自定义镜像 前面我们都是使用docker pull拉取仓库中现成的镜像&#xff0c;但是如果我们想要将一个Java应用程序构建成镜像然后部署应该怎么做呢&#xff1f;这个时候我们就需要自定义镜像了 **镜像&#xff1a;**本质上就是一堆文件的集合&#xff0c;包含了应用程…