【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程

参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理


文章目录

  • DataStream编程模型之状态编程
  • 前言
  • 一、状态编程相关概念
    • 1.1Flink中状态始终与特定算子相关联
    • 1.2 演示代码
    • 1.3 状态编程程序输入输出


前言

流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。

一、状态编程相关概念

流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。

在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。

但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来

1.1Flink中状态始终与特定算子相关联

分为算子状态和键控状态
在这里插入图片描述
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。

算子状态不能由相同或不同算子的另一个任务访问

键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态

在这里插入图片描述

1.2 演示代码

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
 
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object StateTest {
  def main(args: Array[String]): Unit = {
    //设定执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment   
//设定程序并行度
    env.setParallelism(1) 
    //创建数据源
    val source = env.socketTextStream("localhost", 9999) 
    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))
    val alertStream = stockDataStream
      .keyBy(_.stockId)
      .flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类  这里重新了flatmap方法
     // 打印输出
    alertStream.print() 
    //触发程序执行
    env.execute("state test")
  }

class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{
    //定义状态保存上一次的价格
    lazy val lastPriceState: ValueState[Double] = getRuntimeContext
      .getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))
    override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {
      // 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较
      val diff = (value.price-lastPrice).abs
      if( diff > threshold)
        out.collect((value.stockId,lastPrice,value.price))
      //更新状态
      lastPriceState.update(value.price)
    }
  }
}

代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。

String,Double,Double
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。

3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。

ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。

1.3 状态编程程序输入输出

输入:

stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7

输出:

(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)

其中根据stock_id分类。

初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。

处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/938324.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux实操篇-远程登录/Vim/开机重启

目录 传送门前言一、远程登录1、概念2、ifconfig3、实战3.1、SSH(Secure Shell)3.2、VNC(Virtual Network Computing)3.3、RDP(Remote Desktop Protocol)3.4、Telnet(不推荐)3.5、FT…

【计算机网络】期末考试预习复习|上

作业讲解 物理层作业 共有4个用户进行CDMA通信。这4个用户的码片序列为: A: (–1 –1 –1 1 1 –1 1 1);B: (–1 –1 1 –1 1 1 1 –1) C: (–1 1 –1 1 1 1 –1 –1);D: (–1 1 –1 –1 –1 –1 1 –1) 现收到码片序列:(–1 1 –…

CTFHub-ssrf

技能树--Web--SSRF 内网访问 开启题目 尝试访问位于127.0.0.1的flag.php吧 进入环境 根据提示输入即可 127.0.0.1/flag.php 伪协议读取文件 开启题目 尝试去读取一下Web目录下的flag.php吧 进入环境,根据提示输入 file:///var/www/html/flag.php 鼠标右键查看…

解决PyTorch模型推理时显存占用问题的策略与优化

在将深度学习模型部署到生产环境时,显存占用逐渐增大是一个常见问题。这不仅可能导致性能下降,还可能引发内存溢出错误,从而影响服务的稳定性和可用性。本文旨在探讨这一问题的成因,并提供一系列解决方案和优化策略,以…

Java从入门到工作3 - 框架/工具

3.1、SpringBoot框架结构 在 Spring Boot 或微服务架构中,每个服务的文件目录结构通常遵循一定的约定。以下是一个常见的 Spring Boot 服务目录结构示例,以及各个文件和目录的简要说明: my-service │ ├── src │ ├── main │ │…

电子应用设计方案-56:智能书柜系统方案设计

智能书柜系统方案设计 一、引言 随着数字化时代的发展和人们对知识获取的需求增加,智能书柜作为一种创新的图书管理和存储解决方案,能够提供更高效、便捷和个性化的服务。本方案旨在设计一款功能齐全、智能化程度高的智能书柜系统。 二、系统概述 1. 系…

2024 年贵州技能大赛暨全省第二届数字技术应用职业技能竞赛“信息通信网络运行管理员”赛项--linux安全题

Linux操作系统渗透测试 Nmap -sS -p- ip 扫描 这题有俩种做法,一种用3306端口,另一种用48119端口 用48119端口是最简单的做法 nc 连接这个端口如何修改root密码 ssh连接 这样我们就成功的拿到root权限 1.通过本地PC中渗透测试平台Kali对服务器场景进…

网格剖分算法 铺装填充算法效果

1.原图 图:原图 2.OpenCV提取轮廓 图:提取轮廓线 3.计算凸包和最小外围轮廓 图:计算凸包和最小包围轮廓 4.网格剖分效果 图:网格剖分效果 5.铺装填充效果 图:铺装算法效果 原图--》提取轮廓线--》计算最小外包轮廓--》…

JMeter配置原件-计数器

一、面临的问题: 由于本人的【函数助手对话框】中counter计数器每次加2,且只显示偶数(如下图所示),因此借助【配置原件-计数器】来实现计数功能。 如果有大佬知道解决方式,麻烦评论区解答一下,谢谢。 二、配置原件-c…

旋转花键VS传统花键:传动效率的革新

旋转花键与传统花键都是一种传动装置,用于将转动力传递给另一个轴。主要区别在于其结合了花键轴和滚珠丝杆的功能特点,通过滚珠在花键轴和花键套之间的滚动来实现旋转运动和直线运动的传递,以下是几个关键的差异点: 1、结构设计&a…

C++类模板的应用

template <class T> class mylist{ public: // 这是一个链表的节点 struct Link{ T val; Link* next; } 增 &#xff1a;insert(T val) 在链表中创建新节点&#xff0c;节点上保存的数据为 val 删&#xff1a;remove(T val) 移除链表中数据为 val 的节点 改: operator[](…

python学opencv|读取图像(十二)BGR图像转HSV图像

【1】引言 前述已经学习了opencv中图像BGR相关知识&#xff0c;文章链接包括且不限于下述&#xff1a; python学opencv|读取图像&#xff08;六&#xff09;读取图像像素RGB值_opencv读取灰度图-CSDN博客 python学opencv|读取图像&#xff08;七&#xff09;抓取像素数据顺利…

基于 mzt-biz-log 实现接口调用日志记录

&#x1f3af;导读&#xff1a;mzt-biz-log 是一个用于记录操作日志的通用组件&#xff0c;旨在追踪系统中“谁”在“何时”对“何事”执行了“何种操作”。该组件通过简单的注解配置&#xff0c;如 LogRecord&#xff0c;即可实现接口调用的日志记录&#xff0c;支持成功与失败…

如何在繁忙的生活中找到自己的节奏?

目录 一、理解生活节奏的重要性 二、分析当前生活节奏 1. 时间分配 2. 心理状态 3. 身体状况 4. 生活习惯 1. 快慢适中 2. 张弛结合 3. 与目标相符 三、掌握调整生活节奏的策略 1. 设定优先级 2. 合理规划时间 3. 学会拒绝与取舍 4. 保持健康的生活方式 5. 留出…

Docker:目录挂载、数据卷(补充二)

Docker&#xff1a;目录挂载、数据卷 1. 挂载2. 卷映射 1. 挂载 -v /app/nghtml:/usr/share/nginx/html /app/nghtml 是外部主机的地址 /usr/share/nginx/html 是内部容器的地址这里启动一个nginx&#xff0c;然后在后台运行时其命令为 (base) ➜ ~ docker run -d -p 80:80 …

新能源汽车大屏可视化第三次数据存储

任务&#xff1a; 将数据存放到temp.csv 链接&#xff1a; 1.排行页面 https://www.dongchedi.com/sales 2.参数页面 https://www.dongchedi.com/auto/params-carIds-x-9824 完善打印&#xff1a; 1. [{‘series_id’: 5952, ‘series_name’: ‘海鸥’, ‘image’: ‘https://…

Three.js资源-模型下载网站

在使用 Three.js 进行 3D 开发时&#xff0c;拥有丰富的模型资源库可以大大提升开发效率和作品质量。以下是一些推荐的 Three.js 模型下载网站&#xff0c;它们提供了各种类型的 3D 模型&#xff0c;适合不同项目需求。无论你是需要逼真的建筑模型&#xff0c;还是简单的几何体…

无人机故障安全模式设计逻辑与技术!

一、设计逻辑 故障检测与识别&#xff1a; 无人机系统需具备实时监测各项关键参数的能力&#xff0c;如电池电量、电机状态、传感器数据等。 当检测到参数异常或超出预设阈值时&#xff0c;系统应能迅速识别故障类型及其严重程度。 故障处理策略&#xff1a; 根据故障类型…

洞察:OpenAI 全球宕机,企业应该如何应对 LLM 的不稳定性?

北京时间12月12日上午&#xff0c;OpenAI证实其聊天机器人ChatGPT正经历全球范围的宕机&#xff0c;ChatGPT、Sora及API受到影响。 OpenAI 更新事故报告称&#xff0c;已查明宕机原因&#xff0c;正努力以最快速度恢复正常服务&#xff0c;并对宕机表示歉意。 此次 OpenAI 故障…

STM32F407ZGT6-UCOSIII笔记2:UCOSIII任务创建实验-Printf 函数卡住 UCOSIII 系统问题解决

今日简单编写熟悉一下UCOSIII系统的任务创建代码&#xff0c;理解一下OS系统&#xff1a; 并发现以及解决了 Printf 函数卡住 UCOSIII 系统问题解决 文章提供测试代码讲解、完整工程下载、测试效果图 目录 文件结构解释&#xff1a; 任务函数文件&#xff1a; 目前各个文件任…