大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink 并行度
  • Flink 并行度详解
  • Flink 并行度 案例
    在这里插入图片描述

状态类型

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。

  • 有状态计算:依赖之前或之后的事件
  • 无状态计算:独立

根据数据结构不同,Flink定义了多种State,应用于不同的场景。

  • ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
  • ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
  • MapState:即状态值为一个Map,用户通过put和putAll方法添加元素

State按照是否有Key划分为:

  • KeyedState
  • OperatorState

案例1 利用State求平均值

实现思路

  • 读数据源
  • 将数据源根据Key分组
  • 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FlinkStateTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Long, Long>> data = env
                .fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(1L, 4L),
                        Tuple2.of(1L, 2L)
                );
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data
                .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
                    @Override
                    public Long getKey(Tuple2<Long, Long> value) throws Exception {
                        return value.f0;
                    }
                });
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
                .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    private transient ValueState<Tuple2<Long, Long>> sum;

                    @Override
                    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                        Tuple2<Long, Long> currentSum = sum.value();
                        if (currentSum == null) {
                            currentSum = Tuple2.of(0L, 0L);
                        }
                        // 更新
                        currentSum.f0 += 1L;
                        currentSum.f1 += value.f1;
                        System.out.println("currentValue: " + currentSum);
                        // 更新状态值
                        sum.update(currentSum);
                        // 如果 count >= 5 清空状态值 重新计算
                        if (currentSum.f0 >= 5) {
                            out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
                            sum.clear();
                        }
                    }

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                                "average",
                                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                        );
                        sum = getRuntimeContext().getState(descriptor);
                    }
                });
        flatMapped.print();
        env.execute("Flink State Test");
    }
}

运行结果

在这里插入图片描述

执行分析

在这里插入图片描述
在这里插入图片描述

Keyed State

表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。

KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。

Operator State

与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。

DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。

状态描述

在这里插入图片描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:

  • State名称
  • Value Serializer
  • State Type Info

在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/873855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数学建模笔记—— 模糊综合评价

数学建模笔记—— 模糊综合评价 模糊综合评价1. 模糊数学概述2. 经典集合和模糊集合的基本概念2.1 经典集合2.2 模糊集合和隶属函数1. 基本概念2.模糊集合的表示方法3. 模糊集合的分类4. 隶属函数的确定方法 3. 评价问题概述4. 一级模糊综合评价模型典型例题 5. 多层次模糊综合…

SprinBoot+Vue停车场管理系统的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质…

iOS——GCD再学习

GCD 使用GCD好处&#xff0c;具体如下&#xff1a; GCD 可用于多核的并行运算&#xff1b;GCD 会自动利用更多的 CPU 内核&#xff08;比如双核、四核&#xff09;&#xff1b;GCD 会自动管理线程的生命周期&#xff08;创建线程、调度任务、销毁线程&#xff09;&#xff1b…

❤《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案

《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案 文章目录 《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案1、问题一&#xff1a;原生开发中 request请求中返回 的数据无法 使用this传递给 data{}中怎么办&#xff1f;2、刚登录后如何将token信息保存&#xf…

智汇云舟荣膺国家级专精特新“小巨人”企业称号

近日&#xff0c;北京市经济和信息化局发布了经工业和信息化部审核的第六批专精特新“小巨人”企业名单&#xff0c;智汇云舟凭借其在视频孪生领域的卓越贡献和技术实力成功入选&#xff0c;荣膺国家级专精特新“小巨人”企业称号。 专精特新“小巨人”&#xff0c;是目前全国中…

PDF 全文多语言 AI 摘要 API 数据接口

PDF 全文多语言 AI 摘要 API 数据接口 PDF / 文本摘要 AI 生成 PDF 文档摘要 AI 处理 / 智能摘要。 1. 产品功能 支持多语言摘要生成&#xff1b;支持 formdata 格式 PDF 文件流传参&#xff1b;快速处理大文件&#xff1b;基于 AI 模型&#xff0c;持续迭代优化&#xff1b;…

【鸿蒙开发工具报错】Build task failed. Open the Run window to view details.

Build task failed. Open the Run window to view details. 问题描述 在使用deveco-studio 开发工具进行HarmonyOS第一个应用构建开发时&#xff0c;通过Previewer预览页面时报错&#xff0c;报错信息为&#xff1a;Build task failed. Open the Run window to view details.…

哈希表,算法

一.什么是哈希表 哈希表是一种用于快速数据存取的数据结构。它通过哈希函数将键&#xff08;key&#xff09;映射到表中的一个位置&#xff0c;从而实现高效的插入、删除和查找操作。 二.哈希冲突 哈希冲突发生在多个键通过哈希函数映射到哈希表的同一位置时。由于哈希表的大…

【专题】2024年中国游戏出海洞察报告合集PDF分享(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p37570 2023 年全球游戏市场规模高达 6205.2 亿美元&#xff0c;且预计未来持续增长&#xff0c;这清晰地展示了该市场的巨大潜力和良好前景。 中国游戏在全球移动游戏市场的份额于 2023 年已达 37%&#xff0c;产业贡献超 30% 的市场…

Redis主从数据同步过程:命令传播、部分重同步、复制偏移量等

请记住胡广一句话&#xff0c;所有的中间件所有的框架都是建立在基础之上&#xff0c;数据结构&#xff0c;计算机网络&#xff0c;计算机原理大伙一定得看透&#xff01;&#xff01;~ 1. Redis数据同步 1.1 数据同步过程 大家有没想过为什么Redis多机要进行数据同步&#…

[数据集][目标检测]水面垃圾检测数据集VOC+YOLO格式2027张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;2027 标注数量(xml文件个数)&#xff1a;2027 标注数量(txt文件个数)&#xff1a;2027 标注…

SAP 免费学习网站推荐

1、https://www.guru99.com/ 可以看到有很多的开发语言可以学习。其中就有SAP。 点击SAP菜单后&#xff0c;可以看到每个模块的操作 每个模块下面都有操作的截图&#xff0c;结合翻译软件看的话很容易看懂 2、https://community.sap.com/ 这个是SAP官方的社区&#xff0c…

[数据结构] 开散列法 闭散列法 模拟实现哈希结构(一)

标题&#xff1a;[数据结构] 开散列法 && 闭散列法 模拟实现哈希结构 个人主页&#xff1a;水墨不写bug 目录 一、闭散列法 核心逻辑的解决 i、为什么要设置位置状态&#xff1f;&#xff08;伪删除法的使用&#xff09; ii、哈希函数的设计 接口的实现 1、插入&a…

STM32 RTC实时时钟

RTC实时时钟 BKP可以在VBAT维持供电时&#xff0c;完成主电源掉电时&#xff0c;保存少量数据的任务。备份寄存器和VBAT引脚同时存在&#xff0c;更多是为了服务RTC的。 目前&#xff0c;Linux、Windows、安卓这些系统&#xff0c;底层的计时系统都是使用的Unix时间戳&#xf…

网络编程(UDP)

UDP编程 UDP&#xff1a;全双工通信、面向无连接、不可靠 UDP&#xff08;User Datagram Protocol&#xff09;用户数据报协议&#xff0c;是不可靠的无连接的协议。在数据发送前&#xff0c;因为不需要进行连接&#xff0c;所以可以进行高效率的数据传输。 适用场景 发送小尺寸…

Anaconda安装和环境配置教程(深度学习准备)

目录 1.下载选择 2.prompt配置 3.虚拟环境配置 4.检查是不是安装成功 5.安装jupter 6.关闭anaconda重新进入 7.总结 1.下载选择 我第一次使用的这个官网上面的邮箱的方式下载的&#xff0c;但是这个方式真的特别慢&#xff0c;于是用了这个清华的镜像网站&#xff0c;网…

基于JAVA+SpringBoot+Vue的工程教育认证的计算机课程管理平台

基于JAVASpringBootVue的工程教育认证的计算机课程管理平台 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接…

简单比较 http https http2,我们要如何把http升级为https

&#x1f9d1;‍&#x1f4bb; 写在开头 点赞 收藏 学会&#x1f923;&#x1f923;&#x1f923; 什么是HTTP 超文本传输​​协议&#xff08;HTTP&#xff09;是用于传输诸如HTML的超媒体文档的应用层协议。它被设计用于Web浏览器和Web服务器之间的通信&#xff0c;但它也…

合宙LuatOS开发板Core_Air780EP使用说明

Core-Air780EP 开发板是合宙通信推出的基于 Air780EP 模组所开发的&#xff0c; 包含电源&#xff0c;SIM卡&#xff0c;USB&#xff0c;天线&#xff0c;音频等必要功能的最小硬件系统。 以方便用户在设计前期对 Air780EP模块进行性能评估&#xff0c;功能调试&#xff0c;软…

精益生产现场管理和改善的实战路径

精益生产&#xff0c;作为制造业的革新利器&#xff0c;不仅能够帮助企业降低成本、提升质量&#xff0c;还能大幅度提高生产效率。但如何将这一理念从理论转化为实际行动&#xff0c;真正落地于生产现场&#xff0c;成为许多管理者面临的难题。今天&#xff0c;就让天行健咨询…