Flink学习——状态编程

目录

一、Flink中的状态

二、状态编程

(一)ValueState案例——判断传感器的数据

1.代码实现

2.端口进行传输数据

3.运行结果

(二)ListState

(三)MapState案例——比较学生每次考试成绩

1.代码实现

2.端口传输学生成绩

3.运行结果

(四)ReducingState


一、Flink中的状态

        在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

        例如,监测温度的变化趋势的时候,如果现在的温度与上一秒的温度不一样,就说明处于不同的状态。

二、状态编程

(一)ValueState案例——判断传感器的数据

1.代码实现

import source.SensorReading
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777) //加载集合数据源*/

    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    val alterStream: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id)
      .flatMap(new ChangeAlert)
    alterStream.print()

    env.execute()
  }
}

class ChangeAlert extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  // TODO 定义状态对象,保存上一次的温度值
  // TODO "last-temp"是指给当前状态在运行程序的上下文中起名  后面的classOf[Double]是要指明last-temp的类型
  // TODO 关键字要改为lazy,等到使用的时候才创建对象
  lazy val lastState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))

  // TODO 判断机器是否为第一次开启
  lazy val firstState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("first-start", classOf[Int]))

  // lastState中有value()——取数和update()——更新 两个方法
  override def flatMap(value: SensorReading, out: Collector[
    (String, Double, Double)]): Unit = {
    // 首先,机器开启,判断是否为第一次开启,如果不是第一次开启,就进行温度判断,如果是第一次开启,就默认温度为0.0
    val firstValue: Int = firstState.value()
    val lastValue: Double = lastState.value()
    val dif: Double = (lastValue - value.temperature).abs
    if (firstValue != 0) {
      // 取到上一次状态中的值
      // TODO 对比这一次的值与上一次的温度的差
      if (dif > 10)
      // 如果差值>10,就输出
        out.collect((value.id, lastValue, value.temperature))
    } else {
      // 如果机器第一次开启,就更新机器状态
      firstState.update(1)
      if (dif > 10)
        out.collect((value.id, lastValue, value.temperature))
    }
    // 每次新来一个温度值,原有的温度状态就要更新
    lastState.update(value.temperature)
  }
}

2.端口进行传输数据

3.运行结果

(二)ListState

class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
  lazy val listState: ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("liststate", classOf[String]))

  override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
    val strings: lang.Iterable[String] = listState.get()
    listState.add("hello")
    val ls = new util.ArrayList[String]()
    ls.add("html")
    ls.add("flink")
    listState.addAll(ls)
    listState.update(ls)
  }
}

(三)MapState案例——比较学生每次考试成绩

1.代码实现

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

import java.util.Map
import java.util

/**
 * 状态编程判断学生成绩
 */
object StateTest2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777)
    // TODO 对成绩进行清洗
    val dataStream: DataStream[Test] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      Test(arr(0).trim, arr(1).trim, arr(2).trim.toDouble)
    })

    val scoreStream: DataStream[(String, String, Double, Double)] = dataStream
.keyBy(data => (data.id, data.subject))
      .flatMap(new MyTest)

    scoreStream.print()
    env.execute("state test")
  }
}

// 定义一个样例类,表示测试结果
case class Test(id: String, subject: String, score: Double)

class MyTest extends RichFlatMapFunction[Test, (String, String, Double, Double)] {
  lazy val mapState: MapState[(String, String), Double] = getRuntimeContext
    .getMapState(new MapStateDescriptor[(String, String), Double]("map-temp", classOf[(String, String)], classOf[Double]))

  override def flatMap(value: Test, 
                       out: Collector[(String, String, Double, Double)]): Unit = {
    // 放入第一次的成绩
    val previousScore: Double = Option(mapState.get(value.id, value.subject)).getOrElse(0.0)
    mapState.put((value.id, value.subject), previousScore)
    // 获取第二次的成绩
    val iter: util.Iterator[Map.Entry[(String, String), Double]] = mapState.iterator()
    while (iter.hasNext) {
      val unit: Map.Entry[(String, String), Double] = iter.next()
      val key: (String, String) = unit.getKey// 第一次考试的id,subject
      val value1: Double = unit.getValue// 第一次开始的score
      val dif: Double = (previousScore - value.score).abs
      if (dif >= 10) {
        out.collect((key._1, key._2, previousScore, value.score))
      }
      mapState.put((key._1, key._2), value.score)
    }
  }
}

2.端口传输学生成绩

3.运行结果

(四)ReducingState

class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
  lazy val reducingState: ReducingState[SensorReading] = getRuntimeContext
.getReducingState(new ReducingStateDescriptor[SensorReading]("reducestate", 
new MyReduceFunction2, classOf[SensorReading]))

  override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
    val reading: SensorReading = reducingState.get()
    reducingState.add(reading)
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21989.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DETR3D 论文学习

1. 解决了什么问题? 对于低成本自动驾驶系统,仅凭视觉信息进行 3D 目标检测是非常有挑战性的。目前的多相机 3D 目标检测方法有两类,一类直接对单目图像做预测,没有考虑 3D 场景的结构或传感器配置。这类方法需要多步后处理&…

C语言小游戏——扫雷

前言 结合前边我们所学的C语言知识,本期我们将使用C语言实现一个简单的小游戏——扫雷 目录 前言 总体框架设计 多文件分装程序 各功能模块化实现 初始化棋盘 棋盘打印 埋雷 判赢与排雷 游戏逻辑安排 总结 总体框架设计 和三子棋相同,游戏开始时…

Linux安装MySQL后无法通过IP地址访问处理方法

本文主要总结Linux安装Mysql后,其他主机访问不了MySQL数据库的原因和解决方法 环境说明: MySQL 5.7.30CentOS Linux release 7.6.1810 (Core) 创建完Mysql数据库后可以查看mysql 日志获取root 用户登录密码 [rootlocalhost mysql-5.7.30]# cat /var/l…

spring源码学习

1.xmlBeanFactory对defaultListableBeanFactory类进行扩展,主要用于从XML文档中获取BeanDefinition,对于注册及获取bean都是使用从父类DefaultListableBeanFactory继承的方法去实现。 xmlBeanFactory 主要是使用reader属性对资源文件进行读取和注册。 2.…

Maven属性与版本管理

文章目录 1 属性1.1 问题分析1.2 解决步骤步骤1:父工程中定义属性步骤2:修改依赖的version 2 配置文件加载属性步骤1:父工程定义属性步骤2:jdbc.properties文件中引用属性步骤3:设置maven过滤文件范围步骤4:测试是否生效 3 版本管理 在这一章节内容中,我们将学习两个…

cpp11实现线程池(一)——项目介绍

项目介绍 线程池是库的形式提供给用户,是必须放到代码中,不能单独运行,亦称为基础组件 第一版线程池任务对象使用继承技术,提供一个抽象基类Task,里面有一个纯虚函数run(),使用时继承该类,并重…

c++综合学习

1.函数调用 传值调用:在函数内部修改形式参数,不改编实际参数的值;引用调用:即指针调用,传入的是变量的指针,则在函数内部修改形式参数,实际参数跟着改变。 2. 数组 数组名即该数组的首地址&a…

CSPM 未来发展的思考

由于数据泄露的持续威胁以及云的短暂和快节奏的特性,只有在最基础的层面上保护您的云才有意义。组织已经转向 CSPM 解决方案来锁定他们的平台。 今天我们来聊聊什么是CSPM,它如何去产生有有效的帮助,未来会向哪发展。 什么是 CSPM&#xff1…

阿拉德手游服务端Centos搭建教程

阿拉德手游服务端Centos搭建教程 大家好我是艾西,又有几天没有更新文章了。这几天看了看还是有不少人对手游感兴趣,今天给大家分享一款早些年大火的pc游戏,现在也有手游了“阿拉德”。 你是否还记得DNF,一天你不小心救了赛丽亚&a…

Win10系统电脑开机黑屏一直转圈无法进入桌面怎么办?

Win10系统电脑开机黑屏一直转圈无法进入桌面怎么办?有用户电脑开机了之后无法进入到桌面中,开机了之后,电脑桌面只有显示一个黑屏和转圈的图标,一直都无法进入到桌面中。强制重启电脑之后依然是这样,那么这个情况怎么去…

今天公司来了个拿 30K 出来的测试,算是见识到了基础的天花板

今天上班开早会就是新人见面仪式,听说来了个很厉害的大佬,年纪还不大,是上家公司离职过来的,薪资已经达到中高等水平,很多人都好奇不已,能拿到这个薪资应该人不简单,果然,自我介绍的…

Mysql-存储过程简单入门

定义: 存储过程的英文是 Stored Procedure 。它的思想很简单,就是一组经过 预先编译 的 SQL 语句 的封装。 执行过程:存储过程预先存储在 MySQL 服务器上,需要执行的时候,客户端只需要向服务器端发出调用 存储过程的命…

Godot引擎 4.0 文档 - 循序渐进教程 - 监听玩家输入

本文为Google Translate英译中结果,DrGraph在此基础上加了一些校正。英文原版页面: Listening to player input — Godot Engine (stable) documentation in English 监听玩家输入 在上一课创建您的第一个脚本的基础上,让我们看看任何游戏…

SpringBoot集成SpringSecurity从0到1搭建权限管理详细过程(认证+授权)

前言 最近工作需要给一个老系统搭建一套权限管理,选用的安全框架是SpringSecurity,基本上是结合业务从0到1搭建了一套权限管理,然后想着可以将一些核心逻辑抽取出来写一个权限通用Demo,特此记录下。 文章目录 前言1、SpringSecuri…

Elastic Stack

一、简介 ELK是一个免费开源的日志分析架构技术栈总称,官网https://www.elastic.co/cn。包含三大基础组件,分别是Elasticsearch、Logstash、Kibana。但实际上ELK不仅仅适用于日志分析,它还可以支持其它任何数据搜索、分析和收集的场景&#…

接口测试:Eolink Apikit 和 Postman 哪个更好用?

接口测试:Eolink Apikit 和 Postman 哪个更好用? 很多做服务端开发的同学,应该基本都用过 Postman 来测试接口,虽然 Postman 能支撑日常工作,但是总感觉还是少了点什么,比如需要 Swagger 来维护接口文档&am…

nginx压测记录

nginx压测记录 1 概述2 原理3 环境3.1 设备与部署3.2 nginx配置/服务器配置 4 netty服务5 步骤6 结果7 写在最后 1 概述 都说nginx的负载均衡能力很强,最近出于好奇对nginx的实际并发能力进行了简单的测试,主要测试了TCP/IP层的长链接负载均衡 2 原理 …

YOLOv5区域检测+声音警报

YOLOv5区域检测声音警报 1. 相关配置2. 检测区域设置3. 画检测区域线(不想显示也可以不画)4. 报警模块5. 代码修改5.1 主代码5.2 细节修改(可忽略) 6. 实验效果 本篇博文工程源码下载 链接1:https://github.com/up-up-…

远程桌面连接工具在哪里下载?

在市场上,有很多种不同的工具可用。一些远程桌面连接工具(如RayLink)具有高清流畅、操作简单和连接速度快的特点。而其他一些连接工具则更注重保护安全和数据保密性。不同的远程桌面连接工具各有特点,需要根据不同的需求进行选择。…

[AI图片生成]自己搭建StableDiffusion安装过程

前言 最近尝试玩玩AI图片生成,安装一路坑 出个一路安装成功的记录 开始 找个空间大的盘符,这玩意将来会很占空间.一个模型大约5g左右,你可能还会装很多模型创建个目录,路径不要有中文安装git 下载地址 详细教程 (如果有忽略)下载 Python3.10.0,记得勾选添加到环境变量选项,安…