一文弄明白KeyedProcessFunction函数

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/**
 * A keyed function that processes elements of a stream.
 *
 * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is
 * invoked. This can produce zero or more elements as output. Implementations can also query the
 * time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,
 * OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as
 * output and register further timers.
 *
 * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
 * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
 *
 * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link
 * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
 * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
 * methods can be implemented. See {@link
 * org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
 * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
 */

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {

    private String key;

    private long count;

    private long lastQuestTimestamp;

    public long getAndIncrementCount() {
        return ++count;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public long getLastQuestTimestamp() {
        return lastQuestTimestamp;
    }

    public void setLastQuestTimestamp(long lastQuestTimestamp) {
        this.lastQuestTimestamp = lastQuestTimestamp;
    }
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {

    private ValueState<CountWithTimestampNew> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));
    }

    // 实现数据处理逻辑的地方
    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();
        if (countWithTimestampNew == null) {
            countWithTimestampNew = new CountWithTimestampNew();
            countWithTimestampNew.setKey(value.f0);
        }

        countWithTimestampNew.getAndIncrementCount();

        //更新这个单词最后一次出现的时间
        countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());

        //单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象
        state.update(countWithTimestampNew);

        //给当前单词创建定时器,十秒后触发
        long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;

        //尝试注释掉看看是否还会触发onTimer方法
        ctx.timerService().registerProcessingTimeTimer(timer);

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",
                    currentKey.getField(0),
                    countWithTimestampNew.getCount(),
                    time(countWithTimestampNew.getLastQuestTimestamp()),
                    time(timer)
                ));
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {

        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();

        //标记当前元素是否已经连续10s未出现
        boolean isTimeout = false;

        if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {
            //out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));
            isTimeout = true;
        }

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",
                currentKey.getField(0),
                countWithTimestampNew.getCount(),
                time(countWithTimestampNew.getLastQuestTimestamp()),
                time(timestamp),
                String.valueOf(isTimeout)
        ));
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }
}

最后是启动类

public class KeyedProcessFunctionDemo2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度1
        env.setParallelism(1);

        // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);

        // 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // 对收到的字符串用空格做分割,得到多个单词
                .flatMap(new SplitterFlatMapFunction())
                // 设置时间戳分配器,用当前时间作为时间戳
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // 使用当前系统时间作为时间戳
                        return System.currentTimeMillis();
                    }

                    @Override
                    public Watermark getCurrentWatermark() {
                        // 本例不需要watermark,返回null
                        return null;
                    }
                })
                // 将单词作为key分区
                .keyBy(0)
                // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
                .process(new CountWithTimeoutKeyProcessFunctionNew());

        // 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
        timeOutWord.print();

        env.execute("ProcessFunction demo : KeyedProcessFunction");
    }
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/402485.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Observability:使用 OpenTelemetry 和 Elastic 监控 OpenAI API 和 GPT 模型

作者&#xff1a; 来自 Elastic David Hope ChatGPT 现在非常火爆&#xff0c;甚至席卷了整个互联网。 作为 ChatGPT 的狂热用户和 ChatGPT 应用程序的开发人员&#xff0c;我对这项技术的可能性感到非常兴奋。 我看到的情况是&#xff0c;基于 ChatGPT 的解决方案将会呈指数级…

C++实现归并排序题目

目录 例1 例2 例3 例4 例1 912. 排序数组 参考代码 class Solution { public:vector<int> tmpnums;vector<int> sortArray(vector<int>& nums) {tmpnums.resize(nums.size());mergeSort(nums, 0, nums.size() - 1);return nums;}void mergeSort(vector…

如何使用rocketmq实现分布式事务?

什么是rocketmq事务消息 事务消息是 Apache RocketMQ 提供的一种高级消息类型&#xff0c;支持在分布式场景下保障消息生产和本地事务的最终一致性。 RocketMQ的分布式事务又称为“半消息事务”。 事务消息处理流程 RocketMQ是靠半消息机制实现分布式事务 事务消息&#x…

OpenAI 的 GPTs 提示词泄露攻击与防护实战:防御卷(一)

前面的OpenAI DevDay活动上&#xff0c;GPTs技术的亮相引起了广泛关注。随着GPTs的创建权限开放给Plus用户&#xff0c;社区里迅速涌现了各种有趣的GPT应用&#xff0c;这些都是利用了Prompt提示词的灵活性。这不仅展示了技术的创新潜力&#xff0c;也让人们开始思考如何获取他…

C++学习Day09之系统标准异常

目录 一、程序及输出1.1 系统标准异常示例1.2 标准异常表格 二、分析与总结 一、程序及输出 1.1 系统标准异常示例 #include<iostream> using namespace std; #include <stdexcept> // std 标准 except 异常class Person { public:Person(int age){if (age <…

短小精悍的npm入门级保姆教程,一篇包会

npm是什么&#xff1f; npm是一个强大的包管理工具&#xff0c;它使开发人员能够轻松地安装、更新和管理项目依赖的包。通过初始化一个package.json 文件&#xff0c;我们可以描述你的项目并记录其依赖关系。使用npm install命令&#xff0c;我们可以安装和管理包。使用npm pu…

SQL注入漏洞解析

什么是SQL注入 原理&#xff1a; SQL注入即是指web应用程序对用户输入数据的合法性没有判断或过滤不严&#xff0c;攻击者可以在web应用程序中事先定义好的查询语句的结尾上添加额外的SQL语句&#xff0c;在管理员不知情的情况下实现非法操作&#xff0c;以此来实现欺骗数据库服…

消息中间件之RocketMQ源码分析(十三)

Broker消息存储机制 RocketMQ首先将消息数据写入操作系统PageCache&#xff0c;然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的&#xff0c;整个过程如图 Commit目录下有多个CommitLog文件&#xff0c;其实CommitLog只有一个…

前端构建效率优化之路

项目背景 我们的系统&#xff08;一个 ToB 的 Web 单页应用&#xff09;前端单页应用经过多年的迭代&#xff0c;目前已经累积有大几十万行的业务代码&#xff0c;30 路由模块&#xff0c;整体的代码量和复杂度还是比较高的。 项目整体是基于 Vue TypeScirpt&#xff0c;而构…

PostgreSQL与MySQL,谁更胜一筹

前言 PostgreSQL与MySQL都是优秀的开源数据库。在日常学习中&#xff0c;新手可能接触最多的是MySql,但是实际工作中&#xff0c;两者的应用场景其实都很广。我之前的做过上网流量销售业务&#xff0c;用的是MySQL,现在接触广告业务&#xff0c;用的是pg数据库&#xff0c;每天…

C语言:指针(一)

目录 1.内存和地址2. 指针变量和地址2.1 取地址操作符&#xff08;&&#xff09;2.2 指针变量和解引用操作符&#xff08;*&#xff09;2.2.1 指针变量2.2.2 解引用操作符&#xff08;*&#xff09; 2.3 指针变量的大小 3.指针变量的类型和意义3.1 指针的解引用3.2 指针 -指…

SQL注入漏洞解析--less-3

1.首先我们打开第三关看一下 2.这个和之前1.2关提示都是一样&#xff0c;所以我们先输入id数字看一下显示什么 3.这里正常回显&#xff0c;当我们后边加上时可以看到页面报错信息。可推断sql语句是单引号字符型且有括号&#xff0c;所以我们需要闭合单引号且也要考虑括号。 4…

FISCO BCOS(十七)利用脚本进行区块链系统监控

要利用脚本进行区块链系统监控&#xff0c;你可以使用各种编程语言编写脚本&#xff0c;如Python、Shell等 利用脚本进行区块链系统监控可以提高系统的稳定性、可靠性&#xff0c;并帮助及时发现和解决潜在问题&#xff0c;从而确保区块链网络的正常运行。本文可以利用脚本来解…

java使用File创建空文件和创建单级文件、多级文件、删除、获得文件夹下的文件和文件夹

1、使用createNewFile创建文件 package com.controller;import org.springframework.web.bind.annotation.*;import java.io.File; import java.io.IOException; import java.util.LinkedList;RestController CrossOrigin RequestMapping("/admin") public class Ad…

IO进程线程作业day5

1> 将互斥机制的代码实现重新敲一遍 #include <myhead.h> int num520;//定义一个全局变量 pthread_mutex_t mutex;//创建锁 //线程1任务 void *task1(void *arg) {puts("任务1");pthread_mutex_lock(&mutex);//上锁num1314;sleep(1);printf("tas…

Liunx使用nginx和http搭建yum-server仓库

文章目录 1. yum-server的搭建方式2. nginx搭建yum-server仓库2.1. 安装配置nginx2.2 配置yum-server的rpm2.3. 同步yum源相关包2.3.1 rsync同步源3.3.1 reposync同步源 2.4. 配置客户端访问yum配置2.5. 验证测试 3. http服务搭建yum-server仓库3.1. 安装配置http3.2 配置yum-s…

代码随想录算法训练营第一天

● 今日学习的文章链接和视频链接 ● 自己看到题目的第一想法 1. 704二分法&#xff1a; 方法一&#xff1a; 整个数组是 左闭右闭区间 [ ] left指针指向数组开始下标&#xff0c; right 指针指向数组最后下表nums.size()-1, mid为 (leftright) /2循环条件 left<rightnu…

论文精读--Noisy Student

一个 EfficientNet 模型首先作为教师模型在标记图像上进行训练&#xff0c;为 300M 未标记图像生成伪标签。然后将相同或更大的 EfficientNet 作为学生模型并结合标记图像和伪标签图像进行训练。学生网络训练完成后变为教师再次训练下一个学生网络&#xff0c;并迭代重复此过程…

unity学习(34)——角色选取界面(跨场景坑多)

先把SelectMenu中的camera的audio listener去掉。 现在还是平面&#xff0c;直接在camera下面添加两个panel即可&#xff0c;应该是用不到canvas了&#xff0c;都是2D的UI。 加完以后问题来了&#xff0c;角色选择界面的按钮跑到主界面上边了&#xff0c;而且现在账号密码都输…

国外创意品牌案例:英国北方铁路公司发布“Try the train”活动

近期&#xff0c;英国北方铁路公司&#xff08;Northern Trains&#xff09;发起了一项名为“Try the train” 的活动&#xff0c;旨在帮助那些对火车感到恐惧的人在搭乘火车时感到更舒适&#xff0c;以解锁公司业务新的增长领域&#xff0c;吸引更多的人在通勤、上学、出游、参…