【Flink快速入门-7.Flink 状态管理】

Flink 状态管理

实验介绍

在批计算中,我们对某个特定 Batch 的数据通过一系列计算之后输出一个最终结果,你会发现我们并没有提到过数据的状态,或者说我们对数据状态并不关心。但是在流计算中,有状态的计算是流处理框架中的重要功能之一,因为很多复杂的业务场景都会涉及到数据前后状态。Flink 本身也是带状态的流处理引擎,如果你在此之前有看过 Flink 的官方文档,应该有注意到Stateful这个关键词。本节实验我们就重点学习 Flink 中的状态(State)相关知识点。

知识点
  • State 分类
    • Keyed State
    • Operator State
  • Checkpoint
  • StateBackend

State 分类

在 Flink 中,State 总的来说可以分为两种类型,Keyed State(键控状态)和 Operator State(算子状态)。

  • Operator State:Operator State 可以作用在所有算子上,每个算子中并行的 Task 都可以共享一个状态,或者说同⼀个算⼦中的多个 Task 的状态是相同的。但是请注意,算子状态不能由相同或不同算子的另一个实例访问。Operator State 支持三种基本数据结构,分别是:

    • ListState:存储列表类型的状态。
    • UnionListState:存储列表类型的状态。和 ListState 的区别是,如果发生故障,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
    • BroadcastState:用于广播的算子状态。如果一个算子有多项任务,并且它的每项任务状态又都相同,这种情况就可以使用广播状态。
  • Keyed State:Keyed State 是作用在 KeyedStream 上的。从名称中就可以看出来,它的特点是和 Key 强相关的。在任务处理中,Flink 为每个 Key 维护一个状态实例,而且相同 Key 所对应的数据都会被分配到同一个任务中执行。Keyed State 支持五种基本数据结构,分别是:

    • ValueState:保存单个 Value,可以针对该 Value 进行 get/set 操作。
    • ListState:保存一个列表,列表中可以存储多个 Value。可以针对列表进行 add、get、update 操作。
    • MapState:保存 Key-Value 类型的值。可以针对其进行 get、put、remove 操作,还可以使用 contains 判断某个 key 是否存在。
    • ReducingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。
    • AggregatingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。与 ReducingState 有些不同,聚合类型可能不同于添加到状态的元素的类型。接口和 ListState 相同,但是使用 add(IN)添加的元素本质是通过使用指定的 AggregateFunction 进行聚合。

接下来我们以 KeyedState 为例进行实验。假设某网站的用户访问日志样例如下:

20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006

从左往右依次表示用户访问时间、用户触发的 action(登录、登出等)、城市、IP 地址、用户 ID。在实验代码中用 UserLog 样例类来表示该日志:

case class UserLog(time: Long, action: String, city: String, ip: String, user_id: String)

现在的业务需求是:现在的业务需求是:统计同城市两个⽤户登录的时间差。特殊的是第⼀个⽤户登录的时 候,它前⾯没有⽤户登录,我们需要在代码⾥特殊处理。在 com.vlab.state 包下创建 KeyedState 的 object。完整代码如下:

package com.vlab.state

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
 * @projectName FlinkLearning  
 * @package com.vlab.state  
 * @className com.vlab.state.KeyedState  
 * @description ${description}  
 * @author pblh123
 * @date 2025/2/8 13:15
 * @version 1.0
 *
 */
    
object KeyedState {

  /**
   * 用户日志案例类
   * 用于表示用户在特定时间点进行的特定操作
   * 主要解决的问题是封装用户日志数据,以便于日志的处理和分析
   *
   * @param time 日志记录的时间戳,表示日志发生的精确时间
   * @param action 用户的具体操作,如登录、浏览商品等
   * @param city 用户进行操作时所在的城市,用于地理位置分析
   * @param ip 用户的IP地址,用于追踪用户来源和进行安全检查
   * @param userid 用户的唯一标识符,用于关联用户的所有操作
   */
  case class UserLog(time: Long, action: String, city: String, ip:String, userid:String)

  def main(args: Array[String]): Unit = {
    // 参数数量判断,输入IP
    if (args.length != 1) {
      System.err.println("Usage: KeyedState <input ip>")
      System.exit(5)
    }
    val inputIp = args(0)
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data: DataStream[String] = env.socketTextStream(inputIp, 9999)

    import org.apache.flink.api.scala._

    // 数据映射为UserLog案例类
    val datauserlog = data.map(line => {
      val fields = line.split(",")
      UserLog(fields(0).toLong, fields(1), fields(2), fields(3), fields(4))
    })

    // 进行 keyBy 操作,基于 city 来分组
    datauserlog.keyBy(_.city)
      .mapWithState[(String, Long), UserLog] {
        case (curr: UserLog, None) =>
          // 没有前一个状态,返回当前城市和初始时间差(0)
          // 输出格式 (城市名, 0),// 保存当前的 UserLog
          ((curr.city, 0L),Some(curr))

        case (curr: UserLog, last: Some[UserLog]) =>
          // 有前一个状态,计算时间差
          val diff = curr.time - last.get.time
          // 返回城市名和时间差 // 更新状态
          ((curr.city, Math.abs(diff)) ,Some(curr))
      }
      .print() // 打印输出

    env.execute("KeyedState")
  }
}

上面代码的 main 函数中,我们监听 localhost 的 9999 端口并将接收到的日志通过 map 算子转换为 UserLog 类型。重点是接下来的这段代码:

    // 进行 keyBy 操作,基于 city 来分组
    datauserlog.keyBy(_.city)
      .mapWithState[(String, Long), UserLog] {
        case (curr: UserLog, None) =>
          // 没有前一个状态,返回当前城市和初始时间差(0)
          // 输出格式 (城市名, 0),// 保存当前的 UserLog
          ((curr.city, 0L),Some(curr))

        case (curr: UserLog, last: Some[UserLog]) =>
          // 有前一个状态,计算时间差
          val diff = curr.time - last.get.time
          // 返回城市名和时间差 // 更新状态
          ((curr.city, Math.abs(diff)) ,Some(curr))
      }
      .print() // 打印输出

在根据城市 keyBy 之后,使用了 mapWithState 方法,mapWithState 算子是 map 算子对应的带状态的方法。Flink 中基本上所有的基础算子都有一个对应的带状态的算子,且以 WithState 结尾。在 mapWithState 中,curr 变量和 last 变量分别表示当前登录用户和前一个登录用户,但是由于第一个用户登录的时候,它前面没有用户登录,所以用 case (curr:UserLog, None) => ((curr.city, 0), Some(curr)) 来处理,注意第二个输入参数为 None。由于数据在传输过程中可能存在乱序的情况(输出的顺序和输入的顺序不一致),为了避免时间差为负值的情况,所以用了 Math.abs() 取绝对值。

在终端中执行 nc -l -p 9999,运行以上代码,并在终端中输入以下日志:

20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006

输出如下:
在这里插入图片描述

Checkpoint

在开始认识 Checkpoint 之前,我们先认识三个分布式中表示状态一致性的术语,分别是 at-most-once、at-least-once、exactly-once。

  • At-most-once:最多一次。数据最多被处理一次,这意味着当程序发生故障的时候,有可能会造成数据丢失。
  • At-least-once:最少一次。数据至少会被处理一次,这意味着当程序发生故障的时候,可能会存在同一条数据被重复多次计算的情况。
  • Exactly-once:精准一次。数据不多不少刚好被使用了一次,即使程序发生故障,但是任务恢复之后会和发生故障之前的状态保持一致,已经处理过的数据不会被重复处理,没有被处理的数据也不会丢失。

这三个术语并不是 Flink 中特有的,在其它分布式框架中也会涉及到,特别是在 Kafka 中尤为频繁。

既然 Flink 提供了 State 的支持,那么在程序出现问题或者机器宕机的时候,如何保证整个程序都能够顺利恢复到正确的 State?或者说如何保证 Exactly-Once 语义?为了解决这个问题,Flink 引入了 Checkpoint(检查点)机制。Flink Checkpoint 是一种容错恢复机制,这种机制保证了实时程序运行时,即使突然遇到异常也可以自我进行恢复。Checkpoint 示意图如下所示,其中一个非常重要的角色就是 barrier(栅栏,可以理解为一个个标记)。在数据流入 Flink 的过程中,会从源头开始就往数据流中注入 barrier。作为数据流数据的一部分,barrier 不会干扰正常的数据流,一个 barrier 会把数据分割成两个部分,一部分进入当前快照,另一部分进入下一个快照。每个 barrier 都带有快照的 id,并且 barrier 之前的数据都进入了此快照。多个 barrier 会出现在数据流中,也就是会产生多个快照。当 barrier 在 source 源头被注入时,系统就会记录当前快照的位置。
在这里插入图片描述

Flink 中默认没有开启 Checkpoint,需要我们通过 env.enableCheckpointing() 方法去指定。env.enableCheckpointing() 方法接受一个 Long 类型的参数,表示设定 Checkpoint 的时间间隔,单位为毫秒。如设置时间间隔为 10 秒:

env.enableCheckpointing(10 * 1000)

StateBackends

当 Checkpoint 机制启动时,State 将在检查点中持久化来应对数据丢失以及恢复。而状态在内部是如何表示的、状态是如何持久化到检查点中以及持久化到哪里都取决于选定的 StateBackends(状态后端)。Flink 提供了三种 StateBackends,分别是 MemoryStateBackend、

FsStateBackend 和 RocksDBStateBackend。

  • MemoryStateBackend:基于内存的状态管理。特点是速度快、性能高。缺点是一旦机器发生故障,整个内存中存储的状态数据都会丢失,而且内存由于内存大小的限制,这种方式并不会被用在生产环境中。MemoryStateBackend 可以通过 env.setStateBackend(new MemoryStateBackend(maxStateSize)),maxStateSize 是一个 Int 类型的参数,用来设置保存 State 可占用的最大内存。

  • FsStateBackend:基于文件系统的状态管理。这里的文件系统可以是本地文件系统,也可以是分布式文件系统,如:HDFS、OSS、S3 等。相对于 MemoryStateBackend 来说,基于文件的方式数据安全性更加得到了保障,可以保存的 State 也不会受限于磁盘大小。FsStateBackend 可以使用 setStateBackend(new FsStateBackend(path)) 来指定,参数 path 表示存储路径。生产环境一般都用分布式文件系统进存储,如:hdfs://node1:9000/flink/checkpoint/

  • RocksDBStateBackend:用 RocksDB 来存储 State。RocksDB 是一个 Key-Value 型的数据库,这种方式大家了解即可。

实验总结

本节实验我们介绍了 Flink 中的状态管理,包括 State 分类、Checkpoint 机制和 StateBackends。其中 State 分类包括 Keyed State 和 Operator State。在 Flink 状态管理中,使用相对来说比较简单,重点是概念理解。如果你学习过 Spark,请不要用 Spark 中的 Checkpoint 来类比 Flink 中的 Checkpoint,这是两种完全不同的机制。另外,在 Keyed State 案例中的 mapWithStat 参数如果理解起来比较困难,可以查阅资料补充一下 Scala 基础知识。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972338.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE进阶】数据库连接池

目录 &#x1f334;数据库连接池 &#x1f38b;数据库连接池的使用 &#x1f332;MySQL企业开发规范 &#x1f334;数据库连接池 数据库连接池负责分配、管理和释放数据库连接&#xff0c;它允许应⽤程序重复使⽤⼀个现有的数据库连接&#xff0c;⽽不是再重新建⽴⼀个. 没…

剑指 Offer II 025. 链表中的两数相加

comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20025.%20%E9%93%BE%E8%A1%A8%E4%B8%AD%E7%9A%84%E4%B8%A4%E6%95%B0%E7%9B%B8%E5%8A%A0/README.md 剑指 Offer II 025. 链表中的两数相加 题目描述 给定两个 非…

数据分析--数据清洗

一、数据清洗的重要性&#xff1a;数据质量决定分析成败 1.1 真实案例警示 电商平台事故&#xff1a;2019年某电商大促期间&#xff0c;因价格数据未清洗导致错误标价&#xff0c;产生3000万元损失医疗数据分析&#xff1a;未清洗的异常血压值&#xff08;如300mmHg&#xff…

[算法学习笔记]1. 枚举与暴力

一、枚举算法 定义 枚举是基于已有知识来猜测答案的问题求解策略。即在已知可能答案的范围内&#xff0c;通过逐一尝试寻找符合条件的解。 2. 核心思想 穷举验证&#xff1a;对可能答案集合中的每一个元素进行尝试终止条件&#xff1a;找到满足条件的解&#xff0c;或遍历完…

基于Flask的广西高校舆情分析系统的设计与实现

【Flask】基于Flask的广西高校舆情分析系统的设计与实现&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统综合运用Python、Flask框架及多种数据处理与可视化工具开发&#xff0c;结合Boot…

还在为AI模型部署发愁?VSCode插件让你轻松拥有DeepSeek和近百种AI模型!

1 还在为AI模型部署发愁&#xff1f;VSCode插件让你轻松拥有DeepSeek和近百种AI模型&#xff01; 1.1 背景 DeepSeek在春节期间突然大行其道&#xff0c;欣喜国力大增的同时&#xff0c;对于普通IT工作者&#xff0c;如何才能享受这一波AI红利&#xff0c;让自己的工作更出彩呢…

sourcetree gitee 详细使用

SSH 公钥设置 | Gitee 帮助中心 先配置公钥&#xff0c;输入gitee密码完成验证 gitee仓库创建完成 打开sourcetree 如果你本地有项目&#xff08;vite &#xff09;需要 git init 在设置中完成远程仓库的添加 &#xff08;ssh ,https) 直接提交推送&#xff0c;完成后&#xf…

ios苹果手机使用AScript应用程序实现UI自动化操作,非常简单的一种方式

现在要想实现ios的ui自动化还是非常简单的&#xff0c;只需要安装AScript这个自动化工具就可以了&#xff0c;而且安卓&#xff0c;iso还有windows都支持&#xff0c;非常好用。 在ios端安装之后&#xff0c;需要使用mac电脑或者windows电脑激活一下 使用Windows电脑激活​ 激…

【触想智能】工业显示器和普通显示器的区别以及工业显示器的主要应用领域分析

在现代工业中&#xff0c;工业显示器被广泛应用于各种场景&#xff0c;从监控系统到生产控制&#xff0c;它们在实时数据显示、操作界面和信息传递方面发挥着重要作用。与普通显示器相比&#xff0c;工业显示器在耐用性、可靠性和适应特殊环境的能力上有着显著的差异。 触想工业…

HarmonyNext上传用户相册图片到服务器

图片选择就不用说了&#xff0c;直接用 无须申请权限 。 上传图片&#xff0c;步骤和android对比稍微有点复杂&#xff0c;可能是为了安全性考虑&#xff0c;需要将图片先拷贝到缓存目录下面&#xff0c;然后再上传&#xff0c;当然你也可以转成Base64&#xff0c;然后和服务…

.NET SixLabors.ImageSharp v1.0 图像实用程序控制台示例

使用 C# 控制台应用程序示例在 Windows、Linux 和 MacOS 机器上处理图像&#xff0c;包括创建散点图和直方图&#xff0c;以及根据需要旋转图像以便正确显示。 这个小型实用程序库需要将 NuGet SixLabors.ImageSharp包&#xff08;版本 1.0.4&#xff09;添加到.NET Core 3.1/ …

第1章大型互联网公司的基础架构——1.2 客户端连接机房的技术1:DNS

客户端启动时要做的第一件事情就是通过互联网与机房建立连接&#xff0c;然后用户才可以在客户端与后台服务器进行网络通信。目前在计算机网络中应用较为广泛的网络通信协议是TCP/IP&#xff0c;它的通信基础是IP地址&#xff0c;因为IP地址有如下两个主要功能。 标识设备&…

【旋转框目标检测】基于YOLO11/v8深度学习的遥感视角船只智能检测系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

Python|Windows 安装 DeepSpeed 安装方法及报错 Unable to pre-compile async_io 处理

前置文档&#xff1a;Python&#xff5c;Windows 安装 DeepSpeed 报错 Unable to pre-compile async_io 处理 直接 pip 安装 deepspeed 的报错信息 如果直接使用 pip install DeepSpeed 安装&#xff0c;会触发如下报错信息。出现后&#xff0c;需使用如下方法完成安装。 Co…

PHP支付宝--转账到支付宝账户

官方参考文档&#xff1a; ​https://opendocs.alipay.com/open/62987723_alipay.fund.trans.uni.transfer?sceneca56bca529e64125a2786703c6192d41&pathHash66064890​ 可以使用默认应用&#xff0c;也可以自建新应用&#xff0c;此处以默认应用来讲解【默认应用默认支持…

百度搜索融合 DeepSeek 满血版,开启智能搜索新篇

百度搜索融合 DeepSeek 满血版&#xff0c;开启智能搜索新篇 &#x1f680; &#x1f539; 一、百度搜索全量接入 DeepSeek &#x1f539; 百度搜索迎来重要升级&#xff0c;DeepSeek 满血版全面上线&#xff01;&#x1f389; 用户在百度 APP 搜索后&#xff0c;点击「AI」即…

【Prometheus】prometheus结合pushgateway实现脚本运行状态监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

【R语言】回归分析与判别分析

一、线性回归分析 1、lm()函数 lm()函数是用于拟合线性模型&#xff08;Linear Models&#xff09;的主要函数。线性模型是一种统计方法&#xff0c;用于描述一个或多个自变量&#xff08;预测变量、解释变量&#xff09;与因变量&#xff08;响应变量&#xff09;之间的关系…

黑马JS教程笔记(JavaScript教程)——JS基础

黑马pink老师-JavaScript基础语法 黑马程序员前端JavaScript入门到精通全套视频教程&#xff0c;javascript核心进阶ES6语法、API、js高级等基础知识和实战教程 文章目录 ~~黑马pink老师-JavaScript基础语法~~001-计算机编程基础002-计算机编程基础编程语言和标记语言区别 00…

CHARMM-GUI EnzyDocker: 一个基于网络的用于酶中多个反应状态的蛋白质 - 配体对接的计算平台

❝ "CHARMM-GUI EnzyDocker for Protein−Ligand Docking of Multiple Reactive States along a Reaction Coordinate in Enzymes"介绍了 CHARMM-GUI EnzyDocker&#xff0c;这是一个基于网络的计算平台&#xff0c;旨在简化和加速 EnzyDock 对接模拟的设置过程&…