【Flink】Process Function

目录

1、ProcessFunction解析

1.1 抽象方法.processElement()

1.2 非抽象方法.onTimer()

2、Flink中8个不同的处理函数

2.1 ProcessFunction

2.2 KeyedProcessFunction

2.3 ProcessWindowFunction

2.4 ProcessAllWindowFunction

2.5 CoProcessFunction

2.6 ProcessJoinFunction

2.7 BroadcastProcessFunction

2.8 KeyedBroadcastProcessFunction

3、按键分区处理函数

3.1 定时器(Timer)和定时服务(TimerService)

4、窗口处理函数

4.1 窗口函数使用

4.2 ProcessWindowFunction解析


它是底层提炼的一个可以自定义处理逻辑的操作,被叫作“处理函数”(process function)

1、ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...

}

1.1 抽象方法.processElement()

用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。

  1. value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  2. ctx:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
  3. out:“收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap算子中的收集器完全一样,直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

1.2 非抽象方法.onTimer()

定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

注意:在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。

2、Flink中8个不同的处理函数

2.1 ProcessFunction

最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

2.2 KeyedProcessFunction

对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

2.3 ProcessWindowFunction

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

2.4 ProcessAllWindowFunction

同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

2.5 CoProcessFunction

合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

2.6 ProcessJoinFunction

间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

2.7 BroadcastProcessFunction

广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。

2.8 KeyedBroadcastProcessFunction

按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

3、按键分区处理函数

只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

3.1 定时器(Timer)和定时服务(TimerService

定时服务与当前运行的环境有关。ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间
long currentProcessingTime();

// 获取当前的水位线(事件时间)
long currentWatermark();

// 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);

// 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);

// 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);

// 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();

                        // TODO 1.定时器注册
                        TimerService timerService = ctx.timerService();

                        // 1、事件时间的案例
                        Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");

                        // 2、处理时间的案例
//                        long currentTs = timerService.currentProcessingTime();
//                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
//                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");


                        // 3、获取 process的 当前watermark
//                        long currentWatermark = timerService.currentWatermark();
//                        System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);


                        
                        // 注册定时器: 处理时间、事件时间
//                        timerService.registerProcessingTimeTimer();
//                        timerService.registerEventTimeTimer();
                        // 删除定时器: 处理时间、事件时间
//                        timerService.deleteEventTimeTimer();
//                        timerService.deleteProcessingTimeTimer();

                        // 获取当前时间进展: 处理时间-当前系统时间,  事件时间-当前watermark
//                        long currentTs = timerService.currentProcessingTime();
//                        long wm = timerService.currentWatermark();
                    }


                    /**
                     * TODO 2.时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展,就是定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();

                        System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

4、窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。

4.1 窗口函数使用

stream.keyBy( t -> t.f0 )
        .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
        .process(new MyProcessWindowFunction())

4.2 ProcessWindowFunction解析

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    ...

    public abstract void process(
            KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

    public void clear(Context context) throws Exception {}

    public abstract class Context implements java.io.Serializable {...}
}

ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类,它有四个类型参数:

  • IN:input,数据流中窗口任务的输入数据类型。
  • OUT:output,窗口任务进行计算之后的输出数据类型。
  • KEY:数据中键key的类型。
  • W:窗口的类型,是Window的子类型。一般情况下我们定义时间窗口,W就是TimeWindow。

ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数。

  • key:窗口做统计计算基于的键,也就是之前keyBy用来分区的字段。
  • context:当前窗口进行计算的上下文,它的类型就是ProcessWindowFunction内部定义的抽象类Context。
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
  • out:用来发送数据输出计算结果的收集器,类型为Collector。

可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别:

public abstract class Context implements java.io.Serializable {

    public abstract W window();

    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
    public abstract <X> void output(OutputTag<X> outputTag, X value);

}

除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化。这里不再持有TimerService对象,只能通过currentProcessingTime()和currentWatermark()来获取当前时间,所以失去了设置定时器的功能;另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。与此同时,也增加了一些获取其他信息的方法:比如可以通过.window()直接获取到当前的窗口对象,也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前key的所有窗口有效。

所以我们会发现,ProcessWindowFunction中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。

至于另一种窗口处理函数ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是AllWindowedStream,相当于对没有keyBy的数据流直接开窗并调用.process()方法:

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessAllWindowFunction())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/174327.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CentOS7安装Docker遇到的问题笔记

笔记/朱季谦 以下是笔者本人学习搭建docker过程当中记录的一些实践笔记&#xff0c;过程当中也遇到了一些坑&#xff0c;但都解决了&#xff0c;就此记录&#xff0c;留作以后再次搭建时可以直接参考。 一、首先&#xff0c;先检查CentOS版本&#xff0c;保证在CentOS7版本以…

智能座舱架构与芯片 - (3) 硬件篇 上

一、介绍 在了解智能座舱的基本架构之后&#xff0c;我们有必要针对智能座舱域的硬件平台&#xff0c;软件平台&#xff0c;SOC等进行逐一介绍。从它们的整体结构中去认识最新的智能座舱组成部件&#xff0c;以及主要功能等。 如上图&#xff0c;是中央计算-区域控制架构下的智…

《白帽子讲web安全》

第十四章 PHP安全 文件包含漏洞是“代码注入”的一种。“代码注入”这种攻击&#xff0c;其原理就是注入一段用户能控制的脚本或代码&#xff0c;并让服务器端执行。“代码注入”的典型代表就是文件包含&#xff08;File Inclusion&#xff09;。文件包含可能会出现在JSP、PHP…

基于霍克斯过程的限价订单簿模型下的深度强化学习做市策略

数量技术宅团队在CSDN学院推出了量化投资系列课程 欢迎有兴趣系统学习量化投资的同学&#xff0c;点击下方链接报名&#xff1a; 量化投资速成营&#xff08;入门课程&#xff09; Python股票量化投资 Python期货量化投资 Python数字货币量化投资 C语言CTP期货交易系统开…

import.meta.glob() 如何导入多个目录下的资源

import.meta.glob() 如何导入多个目录下的资源 刚开始用 vite&#xff0c;在做动态路由的时候遇到了这个问题&#xff0c;看到其它教程上都是只引用了一个目录层级的内容&#xff0c;比如这样&#xff1a; let RouterModules import.meta.glob("/src/view/*/*.vue"…

网络运维与网络安全 学习笔记2023.11.21

网络运维与网络安全 学习笔记 第二十二天 今日目标 端口隔离原理与配置、路由原理和配置、配置多路由器静态路由 配置默认路由、VLAN间通信之路由器 端口隔离原理与配置 端口隔离概述 实现报文之间的2层隔离&#xff0c;除了使用VLAN技术以后&#xff0c;还可以使用端口隔…

蓝桥杯每日一题2023.11.21

题目描述 “蓝桥杯”练习系统 (lanqiao.cn) 题目分析 思路&#xff1a; 1.去重排序将其进行预处理 2.用gcd得到最简比值 3.用gcd_sub分别计算分子、分母的指数最大公约数 #include<bits/stdc.h> using namespace std; const int N 110; typedef long long ll; ll…

图Graph的存储、图的广度优先搜索和深度优先搜索(待更新)

目录 一、图的两种存储方式 1.邻接矩阵 2.邻接表 生活中处处有图Graph的影子&#xff0c;例如交通图&#xff0c;地图&#xff0c;电路图等&#xff0c;形象的表示点与点之间的联系。 首先简单介绍一下图的概念和类型&#xff1a; 图的的定义&#xff1a;图是由一组顶点和一…

11.21序列检测,状态机比较与代码,按键消抖原理

序列检测 用一个atemp存储之前的所有状态&#xff0c;即之前出现的七位 含无关项检测 要检测011XXX110 对于暂时变量的高位&#xff0c;位数越高就是越早出现的数字&#xff0c;因为新的数字存储在TEMP的最低位 不重叠序列检测 &#xff0c;一组一组 011100 timescale 1ns…

合肥中科深谷嵌入式项目实战——基于ARM语音识别的智能家居系统(三)

基于ARM语音识别的智能家居系统 我们上一篇&#xff0c;我们实现在Linux系统下编译程序&#xff0c;我们首先通过两个小练习来熟悉一下如何去编译。今天&#xff0c;我们来介绍一下LCD屏幕基本使用。 一、LCD屏幕基本使用 如何使用LCD屏幕&#xff1f; 1、打开开发板LCD设…

JSP编写自己的第一个WebServlet实现客户端与服务端交互

我们在项目中找到java目录 下面有一个包路径 然后 我们在下面创建一个类 我这里叫 TransmissionTest 当然 名字是顺便取的 参考代码如下 package com.example.dom;import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet…

【精选】OpenCV多视角摄像头融合的目标检测系统:全面部署指南&源代码

1.研究背景与意义 随着计算机视觉和图像处理技术的快速发展&#xff0c;人们对于多摄像头拼接行人检测系统的需求日益增加。这种系统可以利用多个摄像头的视角&#xff0c;实时监测和跟踪行人的活动&#xff0c;为公共安全、交通管理、视频监控等领域提供重要的支持和帮助。 …

宏集新闻 | 虹科传感器事业部正式更名为宏集科技

致一直支持“虹科传感器”的朋友们&#xff1a; 为进一步整合资源&#xff0c;给您带来更全面、更优质的服务&#xff0c;我们非常荣幸地宣布&#xff0c;虹科传感器事业部已正式更名为宏集科技。这一重要的改变代表了虹科持续发展进程中的新里程碑&#xff0c;也体现了我们在传…

【brpc学习实践四】异步请求案例详解

注意 使用的还是源码的案例&#xff0c;添加个人注解。在前面的篇章我们讲解了客户端、服务端rpc构造的基本流程及同步、异步的案例基础之后&#xff0c;再理解此案例就容易了。 想直接看案例实现请看&#xff1a; server端实现 client端实现 服务端要点概览 controller ser…

同为科技(TOWE)智能机柜PDU助力上海华为数据中心完善机房末端配电

智能时代加速而来&#xff0c;最大的需求是算力&#xff0c;最关键的基础设施是数据中心。作为一家在信息通信领域拥有多年经验和技术积累的公司&#xff0c;华为在全国多个地区都设有数据中心&#xff0c;如知名的贵州贵安华为云全球总部、内蒙古乌兰察布华为数据中心等&#…

git -1

1.创建第一个仓库并配置local用户信息 git config git config --global 对当前用户所有仓库有效 git config --system 对系统所有登录的用户有效 git config --local 只对某个仓库有效 git config --list 显示配置 git config --list --global 所有仓库 git config --list…

机器视觉兄弟们,新工作之前,不要过度准备

大家对工作的渴望我感同身受&#xff0c;有人去机器视觉培训机构培训&#xff0c;有人默默无闻地努力学习&#xff0c;不都是为了一份高新好工作吗&#xff1f; 实际上是&#xff1a; 技术高的人&#xff0c;劳动力贬值。 技术低的人&#xff0c;没有生存空间。 你有野心&…

HarmonyOS从基础到实战-高性能华为在线答题元服务

最近看到美团、新浪、去哪儿多家互联网企业启动鸿蒙原生应用开发&#xff0c;这个HarmonyOS NEXT越来越引人关注。奈何当前不面向个人开发者开放&#xff0c;但是我们可以尝试下鸿蒙新的应用形态——元服务的开发。 元服务是基于HarmonyOS提供的一种面向未来的服务提供方式&…

『亚马逊云科技产品测评』活动征文|搭建Squoosh图片在线压缩工具

搭建Squoosh图片在线压缩工具 前言一、Squoosh是什么&#xff1f;二、准备一台Lightsail实例1.进入控制台2.创建实例3.开放端口4.部署Squoosh5.预览 三、搭建反向代理1. 安装宝塔2. 配置反向代理3. 预览代理效果 提示&#xff1a;授权声明&#xff1a;本篇文章授权活动官方亚马…

Spark---核心介绍

一、Spark核心 1、RDD 1&#xff09;、概念&#xff1a; RDD&#xff08;Resilient Distributed Datest&#xff09;&#xff0c;弹性分布式数据集。 2&#xff09;、RDD的五大特性&#xff1a; 1、RDD是由一系列的partition组成的 2、函数是作用在每一个partition(split…