Flink 学习七 Flink 状态(flink state)

Flink 学习七 Flink 状态(flink state)

1.状态简介

流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据,

状态就是:用户在程序逻辑中用于记录信息的变量

在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行

1.1 row state

我们自定义变量来保存数据

public class _01_status_row {

	public static void main(String[] args) throws Exception {
		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
		DataStream<String> dataStream = dataStreamSource.map(new MapFunction<String, String>() {
            //自己定义的 变量来保存中间值:这里就无法有效的持久化和恢复
            //状态: raw state  状态
			String oldString = "";

            //如何让flink 来托管我们的状态变量,完成持久化和恢复??
			@Override
			public String map(String value) throws Exception {
                oldString = oldString + value;
				return oldString;
			}
		});
		dataStream.print();
		env.execute();
	}
}

1.2 flink state 托管状态

flink 提供了内置的状态数据管理机制,也叫状态机制: 状态一致性维护,状态数据的访问和存储;

1.3 恢复

Flink 任务是一个JOB .JOB 范围很多Task ,Task 对应示例subtask

是subtask 出错的时候,flink 底层会自动的从帮我们恢复task 的运行

如果是Job失败了 从 flink state 恢复,需要在特殊指定一些参数

2.状态分类

算子状态:

  • 每个subtask 自己持有一份独立的状态数据
  • 算子函数实现CheckpointFunction 后,既可使用算子状态
  • 算子状态: 一般是用于source算子中, 其他场景下建议使用keyedState (键控状态)

键控状态 Keyed State

  • 键控状态,只能使用于KeyedStream 的算子中
  • 算子为每一个key绑定一份独立的状态数据

更多的使用场景是键控状态 Keyed State

3.算子状态 Operator State

每个subtask 自己持有一份独立的状态数据;算子状态,在逻辑上,由算子 task下所有subtask共享;

如何理解:正常运行时,subtask自己读写自己的状态数据;而一旦job重启且带状态算子发生了并行度的变化,则之前的状态数据将在新的一批subtask 间均匀分配

在这里插入图片描述

public class _02_operator_flink_status {

    public static void main(String[] args) throws Exception {
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        
        //=============配置 ===============
        //需要开启 Checkpoint 机制
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        //需要开启持久化的路径  可选hdfs 本地
        env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout2/");
        //task级别的failover
        //一个task 失败 job 失败 ,有很多重启策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        //task 失败 重启最多3次 , 失败后1秒重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));
        //=============配置 ===============

        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStream<String> dataStream = dataStreamSource.map(new StateMapFunction());
        dataStream.print();
        env.execute();
    }
}


class StateMapFunction implements MapFunction<String,String> , CheckpointedFunction {

    ListState<String> listState;

    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        listState.add(value);
        Iterable<String> strings = listState.get();
        StringBuilder sb = new StringBuilder();
        for (String string : strings) {
            sb.append(string);
        }
        //写一个异常
        if(value.length()==5){
            int a = 1/ 0;
        }
        return sb.toString();
    }

    //持久化之前会调用的方法
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        System.out.println("执行快照!!!!!"+ checkpointId);
    }

    //算子的任务在启动之前,会调用下面的方法,为用户的状态初始化
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //context 获取状态存储器
        OperatorStateStore operatorStateStore = context.getOperatorStateStore();
        //定义一个昨天存储结构的描述器
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);
        //获取状态存储器 中获取容器来存储器
        //getListState 方法还会加载之前存储的状态数据
         listState = operatorStateStore.getListState(listStateDescriptor);
    }
}

3.键控状态 Keyed State

3.1 基础概念

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-981L9koP-1687272668448)(flink7手绘/state_partitioning.svg)]1

不同点:

算子状态中,一个算子有一个状态存储空间

Keyed State:每个Key 都是有自己的状态存储空间

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wq7IOvvT-1687272668448)(flink7手绘/state_keyed.png)]

3.2 示例

public class _03_keyed_flink_status {

    public static void main(String[] args) throws Exception {
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //需要开启 Checkpoint 机制
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        //需要开启持久化的路径  可选hdfs 本地
        env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout4/");
        //task级别的failover
        //一个task 失败 job 失败
        env.setRestartStrategy(RestartStrategies.noRestart());
        //task 失败 重启最多3次 , 失败后1秒重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));

        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStream<String> dataStream = dataStreamSource.keyBy(x -> x)
                .map(new KeyedStateMapFunction()).setParallelism(2);
        dataStream.print("===>").setParallelism(3);
        env.execute();
    }
}

//flink 状态管理 算子需要实现CheckpointedFunction
class KeyedStateMapFunction extends RichMapFunction<String, String>{
    ListState<String> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext runtimeContext = getRuntimeContext();
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);
         listState = runtimeContext.getListState(listStateDescriptor);
    }
    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        listState.add(value);
        Iterable<String> strings = listState.get();
        StringBuilder sb = new StringBuilder();
        for (String string : strings) {
            sb.append(string);
        }
        //写一个异常
        if(value.length()==5){
            int a = 1/ 0;
        }
        return sb.toString();
    }
}

//======
[root@localhost ~]# nc -lk 9000
a
a
a
b
b
b
c
c
c
c
d
d
d
 控制台数据输出为
===>:2> a
===>:3> aa
===>:1> aaa
===>:1> b
===>:2> bb
===>:3> bbb
===>:1> c
===>:2> cc
===>:3> ccc
===>:1> cccc    ========> 每个key 都有一个自己的ListState<String> listState;

3.3 状态API 使用

class KeyedStateMapFunction_2 extends RichMapFunction<String, String>{
    ValueState<String> valueState;
    ListState<String> listState;
    MapState<String, String> mapState;
    ReducingState<Integer> reducingState;
    AggregatingState<Integer, Double> aggState;

    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();

        //单值状态存储器
         valueState = runtimeContext.getState(new ValueStateDescriptor<String>("string", String.class));
         //列表状态存储器
         listState = runtimeContext.getListState(new ListStateDescriptor<>("list", String.class));
         //map 状态存储器
         mapState = runtimeContext.getMapState(new MapStateDescriptor<String, String>("map", String.class, String.class));
         //做累加 reduce
         reducingState = runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduce", new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1+value2;
            }
        }, Integer.class));
         //记录聚合状态  --> 平均值
        AggregatingState<Integer, Double> aggState = runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
            @Override
            public Tuple2<Integer, Integer> createAccumulator() {
                return Tuple2.of(0, 0);
            }

            @Override
            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
            }

            @Override
            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                return Double.valueOf(accumulator.f1 / accumulator.f0);
            }

            //批处理会使用
            @Override
            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                return Tuple2.of(a.f0 + b.f0, b.f0 + b.f1);
            }
        }, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {
        })));
    }
    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        //valueState
        valueState.update("new value");//更新值
        String value1 = valueState.value();//q取值

        //listState
        listState.add(value); //添加一个数据
        listState.addAll(Arrays.asList("1","2")); //添加多个数据
        listState.update(Arrays.asList("1","2")); //替换原有数据

        //mapState
        Iterable<String> keys = mapState.keys(); 
        boolean contains = mapState.contains("1");
        mapState.put("1","2");  //添加数据
        Map<String,String> map = new HashMap<>();
        map.put("1","2");
        mapState.putAll(map);//批量添加数据


        //reducingState
        //做累加
        reducingState.add(Integer.valueOf(value));
        Integer integer = reducingState.get(); //取值
        //计算平均值
        aggState.add(Integer.valueOf(value));
        Double aDouble = aggState.get();//取值
        return value1;
    }
}

3.4 状态的TTL 管理

        RuntimeContext runtimeContext = getRuntimeContext();
        //单值状态存储器
        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string", String.class);
        //存活时间和过期 参考
        StateTtlConfig build = StateTtlConfig.newBuilder(Time.milliseconds(5000))  //数据存活时间
                .setTtl(Time.milliseconds(5000)) //数据存活时间 和上面效果一样
                .updateTtlOnCreateAndWrite() //插入和更新时 TTL 重新计算存活时间
                .updateTtlOnReadAndWrite()  //读或者写 TTL 重新计算存活时间  //比如List 是单条数据  Map 则是一个Key value 是一个单独的TTL
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //返回已经过期的数据
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //没清楚可以返回过期数据
                .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)//TTL处理时间语义
                .useProcessingTime() //效果同上
                .cleanupFullSnapshot()//清理过期状态数据 在checkpoint 的时候
                .cleanupInRocksdbCompactFilter(1000) //只对rocksdb 生效 在rockdb Compact机制在Compact 时过期时间清理
                .build();
        valueStateDescriptor.enableTimeToLive(build);
        valueState = runtimeContext.getState(valueStateDescriptor);

4.状态后端

4.1 基础概念

状态数据的存储管理的实现,状态数据的本地读写,远端快照数据存储

状态后端是可插拔替换的,它对上层屏蔽了底层的差异,因为在更换状态后端时,用户的代码不需要做任何更改

4.2 可用的状态后端

  • HashMapStateBacked

    • heap 堆内存,溢出的话就是本地磁盘,对象的形式存在
    • 大规模数据内存不够会溢出到磁盘
    • 支持大规模数据状态,若有溢出到磁盘,则效率会明显降低
  • EmbeddedRocksDBStateBackend

    • 数据状态交给RocksDb 管理和存储
    • 数据是序列化的KV 字节存储 ,
    • RocksDb 中的数据,会存在内存缓存和磁盘
    • RocksDb 对磁盘数据读取较快,性能不会有较大印象

    两种状态后端策略 生成快照checkpoint 文件是一样的 ,重启后改变StateBacked 可以兼容运行;程序在重启后改变状态后端的方式不影响程序运行;

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());

//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

状态后端的方式不影响程序运行;**

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());

//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/30533.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java实训第七天——2023.6.13

文章目录 一、用Visual Studio Code写一个计算器二、同一个js被多个html引用三、js操作css四、DOM对象属性的操作案例五、js解析json 一、用Visual Studio Code写一个计算器 功能&#xff1a;实现简单的加减乘除 <!DOCTYPE html> <html lang"en"> <…

LeetCode 2481. 分割圆的最少切割次数

【LetMeFly】2481.分割圆的最少切割次数 力扣题目链接&#xff1a;https://leetcode.cn/problems/minimum-cuts-to-divide-a-circle/ 圆内一个 有效切割 &#xff0c;符合以下二者之一&#xff1a; 该切割是两个端点在圆上的线段&#xff0c;且该线段经过圆心。该切割是一端…

mapbox-gl 点位编辑功能

文章目录 前言方式一&#xff1a;借助 Marker添加自定义icon添加POI图层&#xff0c;绑定对应事件基于Marker交互创建自定义Marker编辑 / 创建POI 方式二&#xff1a;采用 mapbox-gl-draw 插件总结 前言 矢量在线编辑是gis常用的编辑功能&#xff0c;兴趣点&#xff08;POI&am…

kettle开发-Day38-超好用自定义数据处理组件

目录 前言&#xff1a; 一、半斤八两&#xff0c;都不太行 1、表输入&#xff0c;速度快&#xff0c;但不稳妥 2、稳的一批&#xff0c;但是慢的像蜗牛 二、各诉衷肠&#xff0c;合作共赢 1、表输入&#xff0c;高效数据插入 2、插入更新&#xff0c;一个都不能少 三、表输…

express的使用(四) nodejs转发表单到后台

原文链接 搬砖的林小白-express的使用(四) 个人博客地址&#xff0c;求关注&#xff0c;也希望大家在里面批评我的不足之处 看前提示 本篇所讲述的内容是node端转发前端发送过来的表单到第三方中&#xff0c;应用的场景有很多&#xff0c;如我们经常做的将文件存储到七牛云或…

Scala学习笔记

累了&#xff0c;基础配置不想写了&#xff0c;直接抄了→Scala的环境搭建 这里需要注意的是&#xff0c;创建新项目时&#xff0c;不要用默认的Class类&#xff0c;用Object&#xff0c;原因看→scala中的object为什么可以直接运行 一、Scala简介 1.1 图解Scala和Java的关系 1…

大数据测试基本知识

常用大数据框架结构 1.大数据测试常用到的软件工具 工具推荐&#xff0c;对于测试数据构造工具有&#xff1a;Datafaker、DbSchema、Online test data generator等&#xff1b;ETL测试工具有&#xff1a;RightData、QuerySurge等&#xff1b;数据质量检查工具&#xff1a;great…

MySQL-SQL存储过程/触发器详解(上)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️努力不一定有回报&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xf…

Three.js--》实现3d地月模型展示

目录 项目搭建 初始化three.js基础代码 创建月球模型 添加地球模型 添加模型标签 今天简单实现一个three.js的小Demo&#xff0c;加强自己对three知识的掌握与学习&#xff0c;只有在项目中才能灵活将所学知识运用起来&#xff0c;话不多说直接开始。 项目搭建 本案例还…

《离散数学》:代数系统和图论导论

一、代数系统 代数系统是数学中的一个重要概念&#xff0c;它涉及一组对象以及定义在这些对象上的运算规则。代数系统可以是抽象的&#xff0c;也可以是具体的。 在抽象代数中&#xff0c;代数系统通常由一组元素和一组操作&#xff08;或称为运算&#xff09;组成。这些操作…

【MySQL新手入门系列四】:手把手教你MySQL数据查询由入门到学徒

SQL语言是与数据库交互的机制&#xff0c;是关系型数据库的标准语言。SQL语言可以用于创建、修改和查询关系数据库。SQL的SELECT语句是最重要的命令之一&#xff0c;用于从指定表中查询数据。在此博客中&#xff0c;我们将进一步了解SELECT语句以及WHERE子句以及它们的重要性。…

vue进阶-vue-route

Vue Router 是 Vue.js 的官方路由。它与 Vue.js 核心深度集成&#xff0c;让用 Vue.js 构建单页应用变得轻而易举。 本章只做学习记录&#xff0c;详尽的内容一定要去官网查看api文档 Vue Router-Vue.js 的官方路由 1. 路由的基本使用 1.1 安装vue-router npm install vue-…

SpringCloud Eureka注册中心高可用集群配置(八)

当注册中心扛不住高并发的时候&#xff0c;这时候 要用集群来扛&#xff1b; 我们再新建两个module microservice-eureka-server-2002 microservice-eureka-server-2003 第一步&#xff1a; pom.xml 把依赖加下&#xff1a; <dependencies> <dependency…

golang 协程的实现原理

核心概念 要理解协程的实现, 首先需要了解go中的三个非常重要的概念, 它们分别是G, M和P, 没有看过golang源代码的可能会对它们感到陌生, 这三项是协程最主要的组成部分, 它们在golang的源代码中无处不在. G (goroutine) G是goroutine的头文字, goroutine可以解释为受管理的…

Prompt 范式产业实践分享!基于飞桨 UIE-X 和 Intel OpenVINO 实现跨模态文档信息抽取

近期 Prompt 范式备受关注&#xff0c;实际上&#xff0c;其思想在产业界已经有了一些成功的应用案例。中科院软件所和百度共同提出了大一统诸多任务的通用信息抽取技术 UIE&#xff08;Universal Information Extraction&#xff09;。截至目前&#xff0c;UIE 系列模型已发布…

Selenium 相对定位

目录 前言&#xff1a; 相对定位 工作原理 可用的相对定位 Above Below Left of Right of Near 链式相对定位 相对于WebElement的相对定位 实例演示 前言&#xff1a; Selenium传统定位基本能解决80%的定位需求&#xff0c;但是还是有一些复杂场景传统定位定不到的…

express框架学习笔记

express简介 express是一个基于Node.js平台的极简的、灵活的WEB应用开发框架。express是一个封装好的工具包&#xff0c;封装了很多功能&#xff0c;便于我们开发WEB应用&#xff08;HTTP服务&#xff09; express使用 新建express文件夹新建文件test01.js&#xff0c;代码如…

深蓝学院C++基础与深度解析笔记 第 5 章 语句

1. 语句基础 ● 语句的常见类别 – 表达式语句&#xff1a;表达式后加分号&#xff0c;对表达式求值后丢弃&#xff0c;可能产生副作用 – 空语句&#xff1a;仅包含一个分号的语句&#xff0c;可能与循环一起工作 – 复合语句&#xff08;语句体&#xff09;&#xff1a;由大…

电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume

1、数据仓库概念 数据仓库&#xff08; Data Warehouse &#xff09;&#xff0c;是为企业制定决策&#xff0c;提供数据支持的。可以帮助企业&#xff0c;改进业务流程、提高产品质量等。 数据仓库的输入数据通常包括&#xff1a;业务数据、用户行为数据和爬虫数据等。 业务数…

流场粒子追踪精度数值实验

在计算流线&#xff0c;拉格朗日拟序结构等流场后处理时&#xff0c;我们常常需要计算无质量的粒子在流场中迁移时的轨迹&#xff0c;无质量意味着粒子的速度为流场当地的速度。此时&#xff0c;求解粒子的位移这个问题是一个非常简单的常微分方程问题。 假设流场中存在 i 个粒…