详解 Flink 的 ProcessFunction API

一、Flink 不同级别的 API

在这里插入图片描述

  • Flink 拥有易于使用的不同级别分层 API 使得它是一个非常易于开发的框架
  • 最底层的 API 仅仅提供了有状态流处理,它将处理函数(Process Function )嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
  • 核心 API(Core APIs),比如 DataStream API (用于处理有界或无界流数据)以及 DataSet API (用于处理有界数据集)在实际生产中一般使用较多。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
  • Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。 Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。
  • Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

二、ProcessFunction 介绍

  • 相较于 map、filter 和 window 等特定的具体的操作而言,Flink 在底层 API 中提炼出一个统一通用的 process 操作,它是所有转换算子的一个概括性的表达,可以在对应的接口中自定义处理逻辑,而这一层接口就被叫作“处理函数”(ProcessFunction)
  • 处理函数 (ProcessFunction) 提供了一个“定时服务”(TimerService),可以通过它访问流中的事件(event )、时间戳(timestamp )、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数 (ProcessFunction) 继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数 (ProcessFunction) 可以直接将数据输出到侧输出流(side output)中
  • 所以,处理函数 (ProcessFunction) 是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础

三、常见的 ProcessFunction 类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入
  • CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入
  • ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物
  • KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物
  • ProcessWindowFunction:KeyedStream 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入
  • ProcessAllWindowFunction:DataStream 开窗之后的处理函数,基于 AllWindowedStream 调用 process() 时作为参数传入

四、ProcessFunction API 实战

1. KeyedProcessFunction

1.1 解析
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    //1.两个核心方法:
    //1.1 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs) 
    public abstract void processElement(I value, Context ctx, Collector<O> out);
    
    //1.2 一个回调函数。当processElement中注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
    public abstract void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out);
    
    //2.富函数的以下方法:open()/close()/getRuntimeContext()
}
1.2 ProcessFunction 的 Context
//Context的常用方法
context.timestamp(); //获取当前数据的时间戳
context.getCurrentKey(); //获取当前数据的 key
context.output(OutputTag<X> outputTag, X value); //输出侧输出流
context.timerService(); //获取 TimerService 对象
1.3 Timer 和 TimerService

ProcessFunction 的 Context 对象调用 timerService() 方法可以直接返回一个 TimerService 对象;定时器 Timer 只能在 KeyedStream 上面使用

//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
//获取当前的处理时间
long currentProcessingTime();

//获取当前的水位线(事件时间)
long currentWatermark();

//注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);

//注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);

//删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);

//删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);
1.4 案例

需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time) 连续上升,则报警

public class ProcessFunctionCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        dataStream.keyBy("id").process(new TempContIncreWarning(10)).print();
        
        env.execute();
    }
    
    //自定义处理函数,用于监测一段时间内某个传感器温度值是否连续上升,输出报警信息
    public static class TempContIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {
        //定义私有属性:监测的时间间隔
        private Integer interval;
        
        public TempContIncreWarning(Integer interval) {
            this.interval = interval;
        }
        
        //定义两个值状态属性,分别保存上一次的温度值和定时器的时间戳
        private ValueState<Double> lastTempState;
        private ValueState<Long> timerTsState;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
        }
        
        @Override
        public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
            //获取状态值
            Double lastTemp = lastTempState.value();
            Long timerTs = timerTsState.value();
            
            //如果上一次的温度值为null或者上一次的温度值小于当前温度值并且定时器为null则注册定时器
            if(lastTemp == null || (lastTemp != null && value.getTemperature() > lastTemp && timerTs == null)) {
                Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
                ctx.timerService().registerProcessingTimeTimer(ts);
                timerTsState.update(ts);
            } else if(value.getTemperature() < lastTemp && timerTs != null) {//如果上一次的温度值大于当前温度值且定时器不为null则删除定时器,清空定时器值状态
                ctx.timerService().deleteProcessingTimeTimer(timerTs);
                timerTsState.clear();
            }
            
            //更新温度值状态
            lastTempState.update(value.getTemperature());
        }
        
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //定时器触发则输出报警信息
            out.collect("传感器" + ctx.getCurrentKey().getField(0) + "的温度在" + interval + "s内连续上升");
            timerTsState.clear();
        }
        
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

2. 侧输出流

监控传感器温度值,将温度值低于 30 度的数据输出到 side output

/**
	核心方法:ProcessFunction中的 Context 对象的 output(OutputTag<X> outputTag, X value)
*/
public class SideOutputCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义OutputTag,用来标记侧输出流的低温流
        OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};
        
        //DataStream不做keyBy,使用ProcessFunction的侧输出流进行高低温分流
       SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>(){
            @Override
           public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
               if(value.getTemperature() > 30) {//高温流,输出到主流
                   out.collect(value);
               } else {//低温流,输出到侧输出流
                   ctx.output(lowTempTag, value);
               }
           }
        });
        
        //高温流
        highTempStream.print("high-temp");
        
        //低温流
        highTempStream.getSideOutput(lowTempTag).print("low-temp");
        
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/693565.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BERT+PET方式数据处理

基于BERTPET方式数据预处理介绍 BERTPET方式数据预处理&#x1f43e; 本项目中对数据部分的预处理步骤如下: 查看项目数据集编写Config类项目文件配置代码编写数据处理相关代码 1 查看项目数据集&#x1f43e; 数据存放位置&#xff1a;/Users/***/PycharmProjects/llm/prom…

如何有效管理低绩效人员:CARES 框架

本文主要介绍了如何通过CARES框架有效管理低绩效员工&#xff0c;帮助他们提升绩效和积极性。原文: How to Effectively Manage Low Performers: The CARES Framework 作为管理者&#xff0c;最具挑战性的任务之一就是帮助表现不佳的团队成员提高积极性和技能水平。必须认识到&…

Linux磁盘分区使用情况查询

一、磁盘分区使用情况查询 1. 查询磁盘整体使用情况使用 df -h进行查询 如图我们可以了解到磁盘的一些大致的使用情况&#xff0c;注意当已用部分有超过80%使用的分区就意味着你需要进行磁盘的清理了。 2.查询指定的磁盘使用情况 使用指令 du -h 当不指定目录时&#xff0c;默…

“深入探讨Java中的对象拷贝:浅拷贝与深拷贝的差异与应用“

前言&#xff1a;在Java编程中&#xff0c;深拷贝&#xff08;Deep Copy&#xff09;与浅拷贝&#xff08;Shallow Copy&#xff09;是两个非常重要的概念。它们涉及到对象在内存中的复制方式&#xff0c;对于理解对象的引用、内存管理以及数据安全都至关重要。 ✨✨✨这里是秋…

秒解-今年高考数学压轴题,你不知道有多爽!附带:计算机程序验证结果

同步的公众号文章在此&#xff0c;今年高考数学-压轴题 原来可以秒解啊&#xff01;附带&#xff1a;计算机程序验证结果没错&#xff0c;其实高考数学-压轴题其实可以秒解的呀~https://mp.weixin.qq.com/s/4M50qP9MFwJOS9OpeyxvSg 没错&#xff0c;其实新课标I数学-压轴题其实…

里卡提方程(Riccati Equation)例子

里卡提方程(Riccati Equation) 里卡提方程(Riccati Equation)在人形机器人控制中有重要的应用,特别是在最优控制和估计问题中。里卡提方程主要用于求解线性二次型调节器(LQR, Linear Quadratic Regulator)和卡尔曼滤波器(Kalman Filter)。这些方法有助于提高机器人控…

RISCV中CLINT和PLIC解析

中断这个东西理论上属于CPU核心的东西。一般来说并不需要重新设计。实际的实现中是比较繁琐的&#xff0c;此处只介绍原理。ARM基本上会用NVIC(Nested Vectored Interrupt Controller) 的东西&#xff0c;RISC-V目前实现了一个比较简单的东西&#xff08;有人称之为简洁高效&am…

Dubbo 3.x源码(20)—Dubbo服务引用源码(3)

基于Dubbo 3.1&#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了调用createProxy方法&#xff0c;根据服务引用参数map创建服务接口代理引用对象的整体流程&#xff0c;我们知道会调用createInvokerForRemote方法创建远程引用Invoker&#xff0c;这是Dubbo …

QT系列教程(10) QTextEdit学习

简介 QTextEdit是文本编辑器&#xff0c;支持富文本功能。接下来我们创建一个Qt Application 应用&#xff0c;然后在ui中添加一个QTextEdit插件。 运行程序后&#xff0c;可以在QTextEdit中输入任何文字也包括富文本。 文本块 我们在MainWindow的ui文件中添加了textedit插件…

算法刷题【二分法】

题目&#xff1a; 注意题目中说明了数据时非递减的&#xff0c;那么这样就存在二分性&#xff0c;能够实现logn的复杂度。二分法每次只能取寻找特定的某一个值&#xff0c;所以我们要分别求左端点和有端点。 分析第一组用例得到结果如下: 成功找到左端点8 由此可知&#xff0…

Autosar Dem配置-Condition(TRC)的使用-基于ETAS软件

文章目录 前言Dem配置DemEnableConditionDemEnableConditionIdDemEnableConditionStatus DemEnableConditionGroupDemEventParameter 接口配置代码实现总结 前言 在车辆工作状态下&#xff0c;每个DTC检测可能都需要一个前提条件&#xff0c;否则如果任何条件下都可以进行DTC检…

C# WPF入门学习主线篇(十七)—— UniformGrid布局容器

C# WPF入门学习主线篇&#xff08;十七&#xff09;—— UniformGrid布局容器 欢迎来到C# WPF入门学习系列的第十七篇。在前几篇文章中&#xff0c;我们已经探讨了 Canvas、StackPanel、WrapPanel、DockPanel 和 Grid 布局容器及其使用方法。本篇博客将介绍另一种非常实用且简单…

coap-emqx:使用libcoap与emqx通信

# emqx开启CoAP网关 请参考【https://blog.csdn.net/chenhz2284/article/details/139562749?spm1001.2014.3001.5502】 # 写一个emqx的客户端程序&#xff0c;不断地往topic【server/1】发消息 【pom.xml】 <dependency><groupId>org.springframework.boot<…

一分钟学习数据安全—自主管理身份SSI加密技术

上篇介绍了SSI的架构。架构之后&#xff0c;我们要了解一下SSI发展的驱动力&#xff1a;加密技术。现代数字通信离不开数学和计算机科学&#xff0c;加密技术也源于此。加密技术使区块链和分布式账本得以实现&#xff0c;也使SSI成为可能。 以下我们就概览一下SSI基础架构中涉及…

Python实现半双工的实时通信SSE(Server-Sent Events)

Python实现半双工的实时通信SSE&#xff08;Server-Sent Events&#xff09; 1 简介 实现实时通信一般有WebSocket、Socket.IO和SSE&#xff08;Server-Sent Events&#xff09;三种方法。WebSocket和Socket.IO是全双工的实时双向通信技术&#xff0c;适合用于聊天和会话等&a…

微信小程序学习笔记(1)

文章目录 一、文件作用app.json&#xff1a;project.config.json:sitemap.json页面中.json 二、项目首页三、语法**WXML**和**HTML**WXSS 和CSS的区别小程序中.js文件的分类 一、文件作用 app.json&#xff1a; 当前小程序的全局配置&#xff0c;包括所有页面路径、窗口外观、…

kv视频如何转码mp4格式,kv转换mp4最简单方法

在数字化时代&#xff0c;视频格式转换成为了一项日常需求。有时候我们需要把kv格式转换为MP4格式。下面将详细介绍kv转MP4的方法 方法一、 1、使用 "小白兔视频格式在线转换网站" 2、地址发给"小白兔视频格式在线转换网站"的客服&#xff0c;客服下载即可…

基于小波脊线的一维时间序列信号分解方法(MATLAB R2018A)

信号分解技术是把一个复杂信号分解为若干含有时频信息的简单信号&#xff0c;研可通过分解后的简单信号来读取和分析复杂信号的有效特征。因此&#xff0c;信号分解技术对分析结果的影响是不言而喻的。 傅里叶分解是早期常用的信号分解方法&#xff0c;最初被用于分析热过程&a…

树状数组的基础

树状数组1 树状数组可以解决什么问题呢&#xff1f; 可以解决大部分区间上面的修改以及查询的问题&#xff0c;例如1.单点修改&#xff0c;单点查询&#xff0c;2.区间修改&#xff0c;单点查询&#xff0c;3.区间查询&#xff0c;区间修改&#xff0c;换言之&#xff0c;线段…

分享一个用python写的本地WIFI密码查看器

本章教程&#xff0c;主要分享一个本地wifi密码查看器&#xff0c;用python实现的&#xff0c;感兴趣的可以试一试。 具体代码 import subprocess # 导入 subprocess 模块&#xff0c;用于执行系统命令 import tkinter as tk # 导入 tkinter 模块&#xff0c;用于创建图形用…