Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析

FLink处理函数简介

在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

Flink几种处理函数简介

  1. ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
  2. KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
  3. ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。

这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14

实例分析

KeyedProcessFunction基于事件时间的定时器

代码:


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerEventTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountEventTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    /**
     * The implementation of the ProcessFunction that maintains the count and timeouts
     */
    static class CountEventTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);
            //获取当前数据流的水位线
            long currentWatermark = ctx.timerService().currentWatermark();

//            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            long timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY
            //注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器
            ctx.timerService().registerEventTimeTimer(timer);

            //删除事件时间定时器
            if (currentWatermark < 0) {
                ctx.timerService().deleteEventTimeTimer(timer);
            }

            System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));

            // 打印信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));

        }

        @Override
        public void onTimer(
                long timestamp, //定时器触发时间,等于以上的timer
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));
            System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));
            
            out.collect(new Tuple2<String, Long>(currentKey, result));

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000

运行结果:
在这里插入图片描述

KeyedProcessFunction基于处理时间的定时器

代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerProcessTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
//                        return System.currentTimeMillis();
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountProcessTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    static class CountProcessTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 60 * 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);

            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            //注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timer
            ctx.timerService().registerProcessingTimeTimer(timer);
            //删除处理时间定时器
//            ctx.timerService().deleteProcessingTimeTimer(timer);

            System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印所有信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));

            //另外还支持侧流
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};
            if (result < 2) {
                ctx.output(outputTag, new Tuple2<>(currentKey, result));
            } else {
                out.collect(new Tuple2<String, Long>(currentKey, result));
            }

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a,1705568024000    
a,1705568024000

运行结果:
在这里插入图片描述

总结

在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。

参考:
Process Function
Generating Watermarks

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/333212.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

v3的响应式,ref,reactive和生命周期(简写)

vue3笔记 1.两种安装方式 (1)直接创建项目vue3 (2)使用vite 组件传值 1.组件传值的用法 从父组件向子组件传值&#xff1a; <Son :prop1"nmb"></Son> 2.defineprops函数 defineProps函数在setup标签内不需要引入&#xff0c;可直接使用 defineP…

easyui渲染隐藏域<input type=“hidden“ />为textbox可作为分割条使用

最近在修改前端代码的时候&#xff0c;偶然发现使用javascript代码渲染的方式将<input type"hidden" />渲染为textbox时&#xff0c;会显示一个神奇的效果&#xff0c;这个textbox输入框并不会隐藏&#xff0c;而是显示未一个细条&#xff0c;博主发现非常适合…

Spring Boot整合MyBatis-Plus

引言 在现代软件开发中&#xff0c;我们经常需要处理大量的数据。为了有效地管理这些数据&#xff0c;我们需要使用一些强大的框架。其中&#xff0c;Spring Boot和MyBatis-Plus是两个非常流行的框架。Spring Boot是一个基于Spring的开源Java框架&#xff0c;可以用于创建独立…

并查集测试

1.介绍 **本质&#xff1a;**处理一些不相交的集合的合并问题&#xff0c;**比如&#xff1a;**最小生成树&#xff0c;最近公共祖先 操作&#xff1a; 1.初始化init&#xff0c;2.find查询&#xff0c;3.合并union 2.初始化init() 将父节点零散的散开->父节点为本身fa[i…

C++ //练习 1.25 借助网站上的Sales_item.h头文件,编译并运行本节给出的书店程序。

C Primer&#xff08;第5版&#xff09; 练习 1.25 练习 1.25 借助网站上的Sales_item.h头文件&#xff0c;编译并运行本节给出的书店程序。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /********************************…

黑马苍穹外卖Day8学习

文章目录 地址簿功能模块需求分析代码开发 用户下单需求分析代码开发 订单支付微信支付介绍微信支付准备工作代码导入 地址簿功能模块 需求分析 代码开发 Controller层 RestController RequestMapping("/user/addressBook") Slf4j Api(tags "C端-地址簿接口…

docker部署Jira+配置MySQL8数据库

写在前面&#xff1a;如果你通过docker安装Jira且启动过&#xff0c;然后你现在又想使用mysql数据库&#xff0c;需要注意 你除了停掉原有容器&#xff0c;还需要删除&#xff1a;/var/lib/docker/volumes/jiraVolume/_data下的文件&#xff0c;否则启动后会无法正常使用。注意…

MySQL基础笔记(7)约束

顾名思义&#xff0c;用来限制表结构中存储的数据~ 一.概述 作用于表中字段上的规则&#xff0c;用于限制存储在表中的数据&#xff0c;目的在于使数据库中的数据正确、有效性和完整性。 大致可以分为如下的几类&#xff1a; &#xff08;重点关注主键和外键约束~&#xff09; …

揭开UI设计的神秘面纱:如何打造一款让用户爱不释手的移动APP

文章目录 一、目标用户分析二、设计风格和色彩搭配三、布局和导航设计四、交互设计五、视觉元素设计六、响应式设计七、测试和优化八、持续更新和迭代九、团队协作和沟通十、学习和成长《移动APP UI设计与制作(微课版)》编辑推荐内容简介目录 《Flutter入门经典&#xff08;移动…

51单片机_电压采集器电压表

实物演示效果&#xff1a; https://www.bilibili.com/video/BV1My4y1F7xY/?vd_source6ff7cd03af95cd504b60511ef9373a1d 一、基本功能 利用51单片机作为主控芯片&#xff0c;3段式电压采集。模拟量经A/D&#xff08;ADC0809&#xff09;模数转换芯片&#xff0c;把模拟量转换…

开发实践5_project

要求&#xff1a; &#xff08;对作业要求的"Student"稍作了变换&#xff0c;表单名称为“Index”。&#xff09;获得后台 Index 数据&#xff0c;作展示&#xff0c;要求使用分页器&#xff0c;包含上一页、下一页、当前页/总页。 结果&#xff1a; ① preparatio…

OpenAI终于发布GPT Store!全网超300万GPTs,开发者分成、个人定制版在路上!

OpenAI终于发布GPT Store&#xff01;全网超300万GPTs&#xff0c;开发者分成、个人定制版在路上&#xff01; 刚刚&#xff0c;OpenAI 官方发布了 GPT Store。自从开发者大会宣布以来&#xff0c;已经过去了两个月时间&#xff0c;OpenAI 官方表示&#xff0c;用户已经创建了…

Hardware-Aware-Transformers开源项目笔记

文章目录 Hardware-Aware-Transformers开源项目笔记开源项目背景知识nas进化算法进化算法代码示例 开源项目Evolutionary Search1 生成延迟的数据集2 训练延迟预测器3 使延时约束运行搜索算法4. 训练搜索得到的subTransformer5. 根据重训练后的submodel 得到BLEU精度值 代码结构…

易点易动有效管理固定资产出入监控,助力企业降低固定资产丢失率

随着智慧化和数字化进程的深入,RFID(射频识别)技术日益广泛应用。作为一种成熟且成本效益较高的自动识别技术,RFID可以实时追踪和监控资产的流向,有效解决了企业固定资产管理过程中的痛点问题,助力企业降低固定资产丢失率。易点易动为客户提供的RFID固定资产管理系统,通过识别固…

【探索C++容器:vector的使用和模拟实现】

【本节目标】 1.vector的介绍及使用 2.vector深度剖析及模拟实现 1.vector的介绍及使用 1.1 vector的介绍 vertor文档介绍 1. vector是表示可变大小数组的序列容器。2. 就像数组一样&#xff0c;vector也采用连续存储空间来存储元素。也就是意味着可以采用下标对vector的元…

c++:基于c语言基础上的语法不同(1)

前言&#xff1a;此篇文章适合学完c语言基础概念的同学&#xff0c;是帮助c向c语言的同学快速掌握基本语法。 基础格式 #include<iostream>using namespace std; int main() {system("pause");return 0; } 输入&#xff1a; cin>>a;//a是输入内容 输出…

文件系统和IO流

目录 ​文件系统和IO流 一:文件的认知 认识文件 树型结构组织和⽬录: 文件路径&#xff08;Path): 文件形式: 二:File的方法 File的概述: File的属性 File的构造方法 File常用的get系列方法 ⽰例一:观察get系列的特点和差异 File常用的增,删方法 示例二:普通文件…

AI小程序添加深度合成类目解决办法

基于文言一心和gpt等大模型做了一个ai助理小程序&#xff0c;在提交“一点AI助理”小程序时&#xff0c;审核如下&#xff1a; 失败原因1 审核失败原因 你好&#xff0c;你的小程序涉及提供提供文本深度合成技术 (如: AI问答) 等相关服务&#xff0c;请补充选择&#xff1a;深度…

JVM:性能监控工具分析和线上问题排查实践

前言 在日常开发过程中&#xff0c;多少都会碰到一些jvm相关的问题&#xff0c;比如&#xff1a;内存溢出、内存泄漏、cpu利用率飙升到100%、线程死锁、应用异常宕机等。 在这个日益内卷的环境&#xff0c;如何运用好工具分析jvm问题&#xff0c;成为每个java攻城狮必备的技能…

记录自己 自学摸索 学习日语:新标日

新标日 几节课合成在一起太乱了 第四课会更新个集合 自学日语吧算是 摸索中&#xff01; 尽量一周一课 第一课 单词 ちゅうごくじん 中国人 中国人 にほんじん 日本人 日本人 あめりかじん アメリカ人 美国人 かんこくじん 韓国人 韩国人 ふらんすじん フラン…