flink的带状态的RichFlatMapFunction函数使用

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;

import java.util.Objects;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 * Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型
 */
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {

    // 键值分区状态,对应每个name一个值
    ValueState<StateEntity> nameState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 创建一个键值分区状态
        ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);
        nameState = getRuntimeContext().getState(state);
    }

    @Override
    public void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {
        // 判断状态值是否为空(状态默认值是空)
        if (Objects.isNull(nameState.value())) {
            StateEntity sFalg = new StateEntity(input.f0, input.f1);
            nameState.update(sFalg);
            return;
        }
        // 和上一次的状态值比较
        StateEntity value = nameState.value();
        if (Math.abs(value.count - input.f1) > 100) {
            collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));
        }
        value.setName(input.f0);
        value.setCount(input.f1);
        // 更新状态值
        nameState.update(value);
    }


}

package wikiedits.func.state;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class RichFlatMapFunctionTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX,YYY,ZZZ三种name
                    String name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");
                    int count = RandomUtils.nextInt(0, 1000);
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(5000);
                }
            }

            @Override
            public void cancel() {}
        });
        dataStream.keyBy((f) -> {
            return f.f0;
        }).flatMap(new MyRichFlatMapFunction()).print();

        env.execute();
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }

}

结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123270.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

智慧工地综合管理平台-项目整体需求规格说明书

引言 定位与目标 智慧工地是一种现代化的管理方式,目标是通过应用现代科技手段,对施工现场的设备、人员、物资等信息全面掌控,减少工地事故的发生,提高施工质量和安全性,同时也能够降低成本,提高效益,实现建筑施工的数字化、智能化和可持续发展,为城市建设和社会发展…

【STM32 PWM输出+串口调整PWM周期和占空比】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、PWM是什么&#xff1f;1. PWM 图解二、认识STM32的PWM功能1.哪些定时器有PWM输出功能1.1 高级定时器&#xff0c;7路PWM输出&#xff0c;3组是互补输出&…

0基础两小时建网站

​作者主页 &#x1f4da;lovewold少个r博客主页 ⚠️本文重点&#xff1a;0基础2小时搭建个人网站 &#x1f449;【C-C入门系列专栏】&#xff1a;博客文章专栏传送门 &#x1f604;每日一言&#xff1a;宁静是一片强大而治愈的神奇海洋&#xff01; 目录 前言 第一步 环境…

第12章 PyTorch图像分割代码框架-2

模型模块 本书的第5-9章重点介绍了各种2D和3D的语义分割和实例分割网络模型&#xff0c;所以在模型模块中&#xff0c;我们需要做的事情就是将要实验的分割网络写在该目录下。有时候我们可能想尝试不同的分割网络结构&#xff0c;所以在该目录下可以存在多个想要实验的网络模型…

11 # 手写 reduce 方法

reduce 使用 reduce() 方法对数组中的每个元素按序执行一个提供的 reducer 函数&#xff0c;每一次运行 reducer 会将先前元素的计算结果作为参数传入&#xff0c;最后将其结果汇总为单个返回值。 第一次执行回调函数时&#xff0c;不存在“上一次的计算结果”。如果需要回调…

运行obotframework-ride控制台报错module ‘urllib‘ has no attribute ‘Request‘

背景&#xff1a;Python3.8robotframework-ride1.7.3.1&#xff0c;运行报错module urllib has no attribute Request 原因&#xff1a; 解决&#xff1a;升级robotframework-ride到2.0以上。或者降级python到3.7。

基于SSM的演唱会购票系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue、HTML 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是…

大数据学习之一文学会Spark【Spark知识点总结】

文章目录 什么是SparkSpark的特点Spark vs HadoopSparkHadoopSpark集群安装部署Spark集群安装部署StandaloneON YARN Spark的工作原理什么是RDDRDD的特点Spark架构相关进程Spark架构原理 Spark实战&#xff1a;单词统计Scala代码开发java代码开发任务提交 Transformation与Acti…

自动控制原理--面试问答题

以下文中的&#xff0c;例如 s_1 为 s下角标1。面试加油&#xff01; 控制系统的三要素&#xff1a;稳准快。稳&#xff0c;系统最后不能震荡、发散&#xff0c;一定要收敛于某一个值&#xff1b;快&#xff0c;能够迅速达到系统的预设值&#xff1b;准&#xff0c;最后稳态值…

清凉油市场现状及未来发展趋势

清凉油市场一直以其庞大的规模和快速增长的势头受到人们的关注。无论是消费者对健康生活方式的追求&#xff0c;还是中国作为全球最大市场的地位&#xff0c;都为清凉油市场的持续发展注入了强大的动力。随着人们对健康意识的提升和对保健产品需求的增加&#xff0c;清凉油市场…

算法?认识一下啦

一、什么是算法&#xff1f; 算法 &#xff0c;是对特定问题求解方法和步骤的一种描述。它是有限指令的有限序列&#xff0c;其中每个指令表示一个或多个操作。 算法和程序的关系 算法​是解决问题的一种方法或一个过程&#xff0c;考虑如何将输入转换成输出&#xff0c;一个…

功能更新|Leangoo领歌免费敏捷工具支持SAFe大规模敏捷框架

Leangoo领歌是一款永久免费的专业的敏捷开发管理工具&#xff0c;提供端到端敏捷研发管理解决方案&#xff0c;涵盖敏捷需求管理、任务协同、进展跟踪、统计度量等。 
 Leangoo可以支持敏捷研发管理全流程&#xff0c;包括小型团队敏捷开发&#xff0c;规模化敏捷SAFe&#xf…

SpringBoot测试类启动web环境-下篇

一、响应状态 1.MockMvcResultMatchers 说明&#xff1a;模拟结果匹配。 package com.forever;import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoC…

软件测试|MySQL LIKE:深入了解模糊查询

简介 在数据库查询中&#xff0c;模糊查询是一种强大的技术&#xff0c;可以用来搜索与指定模式匹配的数据。MySQL数据库提供了一个灵活而强大的LIKE操作符&#xff0c;使得模糊查询变得简单和高效。本文将详细介绍MySQL中的LIKE操作符以及它的用法&#xff0c;并通过示例演示…

软件测试/测试开发丨接口测试Mock实战练习学习笔记

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/27857 一、Rewrite 1.1、Rewrite 原理 1.2、Rewrite 实战 Tools → Rewrite 勾选 Enable Rewrite 点击下方 Add 按钮新建一个重写的规则 在右侧编辑重…

JVM之jinfo虚拟机配置信息工具

jinfo虚拟机配置信息工具 1、jinfo jinfo&#xff08;Configuration Info for Java&#xff09;的作用是实时地查看和调整虚拟机的各项参数。 使用jps -v 可以查看虚拟机启动时显示指定的参数列表&#xff0c;但是如果想知道未被显示指定的参数的系统默认值&#xff0c;除 …

blender动画制作全流程软件

blender官网下载地址 Download — blender.org blender菜单中英文对照表 blender常用快捷键&#xff1a; ~切换视图 z切换着色模式 shiftA新建物体 tab进入编辑模式 在编辑模式下: 1编辑点 2编辑线 3编辑面 shfit空格弹出所有快捷键 游标一般配合标注使用 常用:G移动物体…

1214. 波动数列

题目&#xff1a; 1214. 波动数列 - AcWing题库 思路&#xff1a;dp dp划分递归 转自&#xff1a; AcWing 1214. 波动数列&#xff08;有公式详细推导&#xff09; - AcWing 代码&#xff1a; #include <iostream> #include <cstring> #include <algori…

Stable Diffusion WebUI扩展sd-webui-controlnet之Canny

什么是Canny? 简单来说,Canny是计算机视觉领域的一种边缘检测算法。 关于Canny算法大家可以去看我下面这篇博客,里面详细介绍了Canny算法的原理以及代码演示。 OpenCV竟然可以这样学!成神之路终将不远(十五)_maxminval opencv-CSDN博客文章浏览阅读111次。14 图像梯度…

【Orangepi Zero2 全志H616】驱动串口实现Tik Tok—VUI(语音交互)

一、编程实现语音和开发板通信 wiringpi库源码demo.c 二、基于前面串口的代码修改实现 uartTool.huartTool.cuartTest.c 三、ADB adb控制指令 四、手机接入Linux热拔插相关 a. 把手机接入开发板 b. 安装adb工具&#xff0c;在终端输入adb安装指令&#xff1a; sudo apt-g…