Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event event, long l) {
                                        return event.timestamp;
                                    }
                                })
                )
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        if (value.user.equals("Mary")) {
                            out.collect(value.user);
                        } else if (value.user.equals("Bob")) {
                            out.collect(value.user);
                            out.collect(value.user);
                        }
                        System.out.println(ctx.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter and
     * also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
     *     {@link TimerService} for registering timers and querying the time. The context is only
     *     valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
     *     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}
     *     for registering timers and querying the time. The context is only valid during the
     *     invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program is
         * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /** A {@link TimerService} for querying time and registering timers. */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /** The {@link TimeDomain} of the firing timer. */
        public abstract TimeDomain timeDomain();
    }
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/324340.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

论文笔记(三十九)Learning Human-to-Robot Handovers from Point Clouds

Learning Human-to-Robot Handovers from Point Clouds 文章概括摘要1. 介绍2. 相关工作3. 背景3.1. 强化学习3.2. 移交模拟基准 4. 方法4.1. Handover Environment4.2. 感知4.3. 基于视觉的控制4.4. 师生两阶段培训 (Two-Stage Teacher-Student Training) 5. 实验5.1. 模拟评估…

智能光栅光片显微成像技术的LabVIEW解决方案

智能光栅光片显微成像技术的LabVIEW解决方案 在生物医学研究中&#xff0c;高效的成像技术对于捕捉细胞内罕见和复杂事件至关重要。智能光栅光片显微技术&#xff08;smartLLSM&#xff09;的出现&#xff0c;代表了LabVIEW软件在高端成像领域的革命性应用&#xff0c;这项技术…

P1563 [NOIP2016 提高组] 玩具谜题————C++

目录 [NOIP2016 提高组] 玩具谜题题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示 解题思路Code运行结果 [NOIP2016 提高组] 玩具谜题 题目背景 NOIP2016 提高组 D1T1 题目描述 小南有一套可爱的玩具小人&#xff0c;它…

免费学习鸿蒙(HarmonyOS)开发,一些地址分享

HarmonyOS万物互联&#xff0c;从华为一系列的操作来看已经与iOS、Android形成三足鼎立之势了。 根据《澎湃新闻》的报道&#xff0c;已有23所985高校和46所211高校加入了鸿蒙班的行列&#xff0c;合计达到了69所国内一流高校。通过鸿蒙班的设立&#xff0c;高校可以为学生提供…

4 - JdbcTemplate

spring 框架如何处理对数据库的操作呢? 1. 基本介绍 文档&#xff1a;JdbcTemplate APIs : /spring-framework-5.3.8/docs/javadoc-api/index.html JdbcTemplate 是 Spring 提供的访问数据库的技术。可以将 JDBC 的常用操作封装为模板方法 已经提供了特别多的 API 2. 使用…

【ubuntu】docker中如何ping其他ip或外网

docker中如何ping其他ip或外网 示例图&#xff1a; 运行下面命令&#xff1a; docker run -it --namehei busybox看情况需要加权限 sudo&#xff0c;即&#xff1a; sudo docker run -it --namehei busyboxping 外网 ping -c 4 www.baidu.comping 内网 ping -c 4 192.168.…

steam游戏搬砖项目还能火多久?

最近放假回到老家&#xff0c;见了不少亲戚朋友&#xff0c;大家不约而同都在感叹今年大环境不好&#xff0c;工作不顺&#xff0c;生意效益不好&#xff0c;公司状况不佳&#xff0c;反问我们生意如何&#xff1f;为了让他们心里好受一点&#xff0c;我也假装附和道:也不咋地&…

汽车研发测试大全

车研发中需要做的试验&#xff0c;这些试验都是保证我们的车能安全、稳定、可靠行驶的必要条件。主要包含以下内容&#xff1a; 一、整车试验项目 1.1整车可靠性试验 1.2 NVH试验 1.3 HVAC试验 1.4 EMC试验 1.5 化学分析试验 1.6 整车道路性能试验 二、零部件试验项目 …

node的下载、安装、配置

下载&#xff1a; 官网下载&#xff1a;Node.js 左右两个都可以&#xff1a; 往期版本&#xff1a; ​​​​​​https://registry.npmmirror.com/binary.html?pathnode/v16.18.0/ 其中的16.18.0可以修改成你需要的版本 安装&#xff1a; 打开cmd&#xff1a; 输入以下指令…

精品基于Uniapp+springboot车辆充电桩缴费管理系统管理系统App-地图

《[含文档PPT源码等]精品基于Uniappspringboot充电桩管理系统App》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功&#xff01; 软件开发环境及开发工具&#xff1a; 开发语言&#xff1a;Java 后台框架&#xff1a;springboot、ssm 安…

【STM32单片机】迷宫游戏设计

文章目录 一、主要功能二、软件设计三、实验现象联系作者 一、主要功能 本项目使用STM32F103/F407单片机控制器&#xff0c;TFTLCD触摸屏、按键等。 主要功能&#xff1a; 系统运行后&#xff0c;TFTLCD显示游戏界面&#xff0c;可按下KEY_UP键进入游戏&#xff1b; 系统内置…

持久双向通信网络协议-WebSocket-入门案例实现demo

1 介绍 WebSocket 是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工通信——浏览器和服务器只需要完成一次握手&#xff0c;两者之间就可以创建持久性的连接&#xff0c; 并进行双向数据传输。 HTTP协议和WebSocket协议对比&#xff1a; HTTP是短连接&#xff0…

Javaweb之SpringBootWeb案例员工管理分页查询的详细解析

3. 员工管理 完成了部门管理的功能开发之后&#xff0c;我们进入到下一环节员工管理功能的开发。 基于以上原型&#xff0c;我们可以把员工管理功能分为&#xff1a; 分页查询&#xff08;今天完成&#xff09; 带条件的分页查询&#xff08;今天完成&#xff09; 删除员工&…

20240115寻找两数之和

代码 class Solution:def getSumIndex(self, nums: List[int], target: int) -> List[int]:records dict()for index, value in enumerate(nums): if target - value in records: # 遍历当前元素&#xff0c;并在map中寻找是否有匹配的keyreturn [records[target- valu…

postman案例

一、表单接口 基本正向 有效反向 无效反向 JSON接口 基本正向 有效反向 无效反向 文件上传接口 token 获取token值

CNN感受野

在卷积神经网络中&#xff0c;决定某一层输出结果中一个元素所对应的输入层的区域大小&#xff0c;被称为感受野。通俗的解释是&#xff0c;输出feature map上的一个单元对应输入层上的区域大小。 感受野计算公式&#xff1a; F ( i ) ( F ( i 1 ) − 1 ) S t r i d e K s…

“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)

文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件&#xff1f;”提示。 2.问题分析 如果这个文件在NTFS分区上&#xff0c;且存在特殊的安全属性。那么把它从NT…

CodeFuse开源这半年

2023 年可以称得上是大模型元年&#xff0c;在过去的这一年里&#xff0c;大模型领域飞速发展&#xff0c;新的大模型纷纷涌现&#xff0c;基于大模型的新产品也吸引着大家的眼球&#xff0c;未来&#xff0c;这个领域又会给大家带来多少惊喜&#xff1f; 蚂蚁也推出了自己的百…

STM32F103标准外设库——寄存器 (二)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

9.1、加密技术原理详解

目录 一、加解密定义 二、加密技术分类 2.1、对称加密 2.2、非对称加密 三、对称加密算法 四、非对称加密算法 五、对称加密和非对称加密比较 六、对称加密和非对称加密结合 一、加解密定义 数据加密&#xff1a;对原来为明文的文件或数据按某种算法进行处理&#x…