Flink状态State | 大数据技术

简单说两句

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink状态

image-20240602200907231

Flink中的State

image-20240602192616430

State概念

在 Flink 中,状态是流处理程序中非常重要的一部分,它允许你保存和访问数据,以实现复杂的计算逻辑。

可以简单理解为: 历史计算结果

Flink中的算子任务的State分类通常分为两类

1️⃣ 有状态

有状态需要考虑历史的数据,相同的输入可能会得到不同的输出

比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

2️⃣ 无状态

无状态简单说就是不需要考虑历史的数据,相同的输入得到相同的结果

比如map、filter、flatmap算子都属于无状态,不需要依赖其他数据

Flink默认已经支持了无状态和有状态计算!

状态分类

Flink中有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)

Managed State是由Flink管理的,Flink帮忙存储、恢复和优化

Raw State是开发者自己管理的,需要自己序列化

❇️通常情况下,我们采用托管状态来实现我们的需求!!!

托管状态

​ Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以Flink 能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效

​ 很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

🎨所以:我们又可以将托管状态分为两类:算子状态按键分区状态

键控状态Keyed State

详细内容可以瞅瞅官网:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/state/

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(…) 来得到 KeyedStream 。

img

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

MapState:维护 Map 类型的状态。


Code实操

例子1

使用KeyState中的ValueState来模拟实现maxBy

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author tiancx
 */
public class StateMaxByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //加载数据
        DataStream<Tuple2<String, Integer>> source = env.fromElements(
                        Tuple2.of("北京", 1),
                        Tuple2.of("上海", 2),
                        Tuple2.of("广州", 3),
                        Tuple2.of("北京", 4),
                        Tuple2.of("上海", 5),
                        Tuple2.of("广州", 6),
                        Tuple2.of("北京", 3))
                .keyBy(t -> t.f0);
        source.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
            //定义状态,用于存储最大值
            ValueState<Integer> maxValueState = null;

            //进行初始化
            @Override
            public void open(Configuration parameters) throws Exception {
                //创建状态描述器
                ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("maxValueState", Integer.class);
                maxValueState = getRuntimeContext().getState(descriptor);
            }

            @Override
            public Tuple3<String, Integer, Integer> map(Tuple2<String, Integer> value) throws Exception {
                //获取当前值
                Integer currentVal = value.f1;
                Integer currentMax = maxValueState.value();
                if (currentMax == null || currentVal > currentMax) {
                    maxValueState.update(currentVal);
                }
                return Tuple3.of(value.f0, value.f1, maxValueState.value());
            }
        }).print();

        env.execute();
    }
}

运行看结果

5c1eb573f51d5a9cec2032e503b0dee3

例子2

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * @author tiancx
 */
public class StateDemo01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
        DataStream<Tuple2<String, Integer>> source = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(" ");
                        return Tuple2.of(split[0], Integer.parseInt(split[1]));
                    }
                })
                .keyBy(t -> t.f0);
        source.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, List<Integer>>>() {


            ListState<Integer> listState = null;
            //存放超过38度的次数
            ValueState<Integer> valueState = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<Integer>("listState", Integer.class);
                ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("valueState", Integer.class);
                listState = getRuntimeContext().getListState(listStateDescriptor);
                valueState = getRuntimeContext().getState(descriptor);

            }

            @Override
            public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, List<Integer>>> out) throws Exception {
                System.out.println("进入flatMap");
                Integer val = value.f1;
                if (valueState.value() == null) {
                    valueState.update(0);
                }
                if (val > 38) {
                    listState.add(val);
                    valueState.update(valueState.value() + 1);
                }
                if (valueState.value() >= 3) {
                    List<Integer> list = (List<Integer>) listState.get();
                    out.collect(Tuple2.of(value.f0, list));
                    listState.clear();
                    valueState.clear();
                }
            }
        }).print();

        env.execute();
    }
}

输入

image-20240602100424957

运行结果

image-20240602100441746

算子状态OperatorState

​ 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。

​ 算 子 状 态 也 支 持 不 同 的 结 构 类 型 , 主 要 有 三 种 : ListState 、 UnionListState 和BroadcastState。


code实操

例子1:

在 map 算子中计算数据的个数

代码清单


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author tiancx
 */
public class OperatorListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
        stream.map(new MyCountMapFunction())
                .print();
        env.execute();

    }

    public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {

        private Long count = 0L;
        private ListState<Long> listState;

        @Override
        public Long map(String value) throws Exception {
            return ++count;
        }

        /**
         * 本地变量持久化:将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法
         *
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("MyCountMapFunction.snapshotState");
            listState.clear();
            listState.add(count);
        }

        /**
         * 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次
         *
         * @param context the context for initializing the operator
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("MyCountMapFunction.initializeState");
            //从上下文初始化状态
            listState = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("listState", Types.LONG()));
            //从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) {
                for (Long aLong : listState.get()) {
                    count += aLong;
                }
            }
        }
    }
}

输入

image-20240602110341359

运行结果

image-20240602110403448

【都看到这了,点点赞点点关注呗,爱你们】😚😚

蓝白色微信公众号大学生校园清新简单纸飞机动态引导关注简洁新媒体分享中文动态引导关注

💬

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/675667.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用J-LINK COMMANDER检查极海APM32F072烧录

键入 connect: 此时会显示默认设备&#xff0c;如果之前设置过会有&#xff0c;为了演示&#xff0c;我不选 键入 &#xff1f; 然后会弹出设备选择界面&#xff1a; 根据自己的设备搜索型号&#xff1a; 我这里搜索“APM32F072VB”,点击OK: 选择接口类型&#xff1a; 如果要…

用Python优雅地写LaTeX

latexify用于生成 LaTeX 数学公式的 Python 库。LaTeX 是一种基于 ΤΕΧ 的排版系统&#xff0c;对于展示复杂的数学公式表现极为出色。该项目可以用 Python 函数&#xff0c;轻松生成复杂的 LaTeX 数学公式描述。 安装库 查看版本号 0.4.2 案例演示 我们需要以装饰器的形式…

jquery发ajax自动302、xhrredirect,莫名弹出登录窗口。tomcat部署情况下

效果如下&#xff1a; 原因如下&#xff1a; 跟tomcat自带的一个项目同名了&#xff0c;只要前缀跟那个项目同名 都被拦截。 解决方案&#xff1a; 我直接改了一个接口名字&#xff0c;只要不和tomcat自带项目名字一样即可

Linux基础 (十二):Linux 线程的创建与同步

本篇博客详细介绍与线程有关的内容&#xff0c;这部分也是笔试面试的重点&#xff0c;需要我们对线程有深刻的理解&#xff0c;尤其是线程的并发运行以及线程同步的控制&#xff01;接下来&#xff0c;让我们走进线程的世界&#xff0c;去理解线程&#xff0c;使用线程&#xf…

RocketMQ学习(3) 秒杀实战

学习完RocketMQ的用法,现在用它来做一个简单的秒杀项目练练手。 关于秒杀,我之前其实有专门的学习过其中的一些业务逻辑和常见问题,我在这篇博客中有写过多并发场景下的秒杀场景,需要考虑哪些问题?也可以学习一下 除了RocketMQ,本文还需要会springBoot + Redis + Mysql…

隐藏 IP 地址的重要性是什么?

在当今的数字时代&#xff0c;保护我们的在线身份至关重要。从保护个人信息到保护隐私&#xff0c;互联网用户越来越多地寻求增强在线安全性的方法。保持匿名和保护敏感数据的一个关键方面是隐藏您的 IP 地址。在这篇博文中&#xff0c;我们将深入探讨隐藏 IP 地址的重要性&…

项目:消息队列的前置知识

文章目录 写在前面环境安装 Protubuf基本介绍 Muduo基本介绍 SQLite3异步操作实现线程池 本篇是对于一个仿RabbitMQ实现的消息队列项目的前置知识的说明文档 写在前面 环境安装 Protubuf 基本介绍 项目所需要的比较重要模块有Protubuf模块&#xff0c;那么下面先对于这个模…

28 _ WebComponent:像搭积木一样构建Web应用

在上一篇文章中我们从技术演变的角度介绍了PWA&#xff0c;这是一套集合了多种技术的理念&#xff0c;让浏览器渐进式适应设备端。今天我们要站在开发者和项目角度来聊聊WebComponent&#xff0c;同样它也是一套技术的组合&#xff0c;能提供给开发者组件化开发的能力。 那什么…

微信支付(可复用)

3.1微信支付 本项目选择小程序支付 参考&#xff1a;产品中心 - 微信支付商户平台微信支付商户平台提供各类支付产品满足商家通过微信支付收款的需求&#xff1b;平台提供智慧经营&#xff0c;现金红包&#xff0c;代金券等运营工具&#xff0c;助力商家更好的玩转营销&#x…

重生奇迹mu格斗家介绍

出生地&#xff1a;勇者大陆 性 别&#xff1a;男 擅 长&#xff1a;近距离攻击、技能以PVP为主战斗风格 转 职&#xff1a;格斗大师&#xff08;3转&#xff09; 介 绍&#xff1a;以PVP战斗模式为主的格斗家&#xff0c;依角色养成配点不同&#xff0c;可发展成以力量体力…

恒创科技:无法与服务器建立安全连接怎么解决?

在使用互联网服务时&#xff0c;有时会出现无法与服务器建立安全连接的问题&#xff0c;此错误消息通常出现在尝试访问需要安全连接的网站(例如使用 HTTPS 的网站)时&#xff0c;这可能是由于多种原因造成的&#xff0c;以下是一些常见的解决方法&#xff0c;帮助你解决问题。 …

AI来了,产品经理该怎样面对它?

AI终于来了&#xff0c;我们一方面期待着它可能给我们生活带来的变化&#xff0c;另一方面又担忧它可能带给我们巨大的风险和挑战。 AI带来的影响 AI不确定性的风险有很多&#xff0c;例如有人关注它是否成为“奥创”&#xff0c;但对我们大多数人来说这样的风险还很遥远&#…

Java1.8+ idea hbuilder+ uniapp、vue上门家政小程序APP源码开发

Java1.8 idea hbuilder uniapp、vue上门家政小程序APP源码开发 家政服务系统是一种专为家庭提供全方位服务的综合性系统。该系统通过整合多种服务功能和智能化管理&#xff0c;旨在提高家庭生活的质量和效率。 家政服务系统技术开发环境&#xff1a; 技术架构&#xff1a;spri…

怎么制作在线研学活动报名系统?教你快速搞定

易查分小程序&#xff1a;提升研学活动体验&#xff0c;智慧管理新选择 在教育多元化的今天&#xff0c;学校组织的研学活动可以为学生提供更多实践学习、探索世界的机会。不过&#xff0c;对于老师来说&#xff0c;活动的报名和管理常常比较复杂&#xff0c;导致工作量增加。…

工业相机识别电路板元器件:彩色与黑白的区别

工业相机用于识别电路板上的元器件时&#xff0c;选择彩色相机或黑白相机取决于具体应用需求和条件。彩色相机能提供更丰富的信息&#xff0c;但处理复杂度较高&#xff1b;黑白相机则在处理速度和精度上具有优势。理解它们的区别和各自的优缺点&#xff0c;有助于在具体项目中…

软件功能测试内容简析,第三方软件测试机构进行功能测试的好处

软件功能测试是指对软件产品的各项功能进行验证和确认的过程。它是软件开发过程中非常重要的一环&#xff0c;通过对软件的功能进行全面测试&#xff0c;可以确保软件在交付给用户之前达到预期的质量要求。 在进行功能测试时&#xff0c;需要包括以下几个方面的测试内容&#…

docker运行centos提示Operation not permitted

1、在docker中运行了centos7镜像 2、进入到centos容器中使用systemctl命令时提示 systemctl Failed to get D-Bus connection: Operation not permitted 3、解决办法 在运行centos镜像的时候加上--privileged参数 4、附上docker官网命令说明截图

驱动芯片退饱和保护(DESAT)

驱动芯片退饱和保护&#xff08;DESAT&#xff09; 1.概述2.短路能力评估3.驱动芯片的退饱和保护功能介绍3.1 退饱和工作原理3.2 退饱和电路的关键组成和影响因素 4.驱动芯片的退饱和保护功能的调试4.1 如何增加 DESAT 充电电流4.2 如何调整 DESAT 阈值电压4.3 如何使用 OC 功能…

Chrome 调试技巧

1. alert 在最早的时候&#xff0c;javascript 程序员调试代码都是通过 alert 进行&#xff0c;但 alert 会让整个程序被打断&#xff0c;并且还有一个很大的缺点&#xff0c;调试完成之后&#xff0c;如果忘记将 alert 删除 or 注释掉&#xff0c;导致别人访问该页面时会莫名…

基于System-Verilog实现DE2-115开发板驱动HC_SR04超声波测距

目录 前言 一、SystemVerilog——下一代硬件设计语言 与Verilog关系 与SystemC关系 二、实验原理 2.1 传感器概述&#xff1a; 2.2 传感器引脚 2.3 传感器工作原理 2.4 整体测距原理及编写思路 三、System-Verilog文件 3.1 时钟分频 3.2 超声波测距 3.3 数码管驱动…