【开发篇】一、处理函数:定时器与定时服务

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...

}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取
                );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次
                     * @param value  每条数据
                     * @param ctx 上下文对象
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        // 获取定时器服务对象
                        TimerService timerService = ctx.timerService();
                        // 数据中提取出来的事件时间
                        Long currentEventTime = ctx.timestamp(); 
                        //注册定时任务,水位线被推到5s时触发
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");

                        
                    /**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展,就是定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();
                        System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
		
		//...重复代码略,同上
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        
                        TimerService timerService = ctx.timerService();

						//当前数据的处理时间
                        long currentTs = timerService.currentProcessingTime();
						//定时器不用水位线为标杆,直接处理时间加5s
                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
                        
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");

                    }


         //...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上

@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     
     // 获取 process的 当前watermark
     long currentWatermark = timerService.currentWatermark();
     System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
 }


此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105034.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

宏电5G RedCap工业智能网关获首个中国移动5G物联网开放实验室5G及轻量化产品能力认证

10月21日&#xff0c;2023世界物联网博览会——中国移动物联网开发者大会暨物联网产业论坛在无锡圆满举行。宏电股份参与中国移动5G物联网开放实验室5G及轻量化产品能力认证成果授牌仪式&#xff0c;并获得认证证书。 此次认证主要对产品功能、产品性能、RedCap网络兼容性进行测…

DJYROS产品:基于DJYOS的国产自主割草机器人解决方案

基于都江堰泛计算操作系统的国产自主机器人操作系统即将发布…… 1、都江堰机器人操作系统命名&#xff1a;DJYROS 2、机器人算法&#xff1a;联合行业自主机器人厂家&#xff0c;构建机器人算法库。 3、机器人芯片&#xff1a;联合行业机器人AI芯片公司&#xff0c;构建专用…

Windows Server 2019 搭建FTP站点

目录 1.添加IIS及FTP服务角色 2.创建FTP账户&#xff08;用户名和密码&#xff09;和组 3.设置共享文件夹的权限 4.添加及设置FTP站点 5.配置FTP防火墙支持 6.配置安全组策略 7.客户端测试 踩过的坑说明&#xff1a; 1.添加IIS及FTP服务角色 a.选择【开始】→【服务器…

中文编程开发语言工具编程实际案例:台球棋牌混合计时计费软件使用的编程构件说明

中文编程开发语言工具编程实际案例&#xff1a;台球棋牌混合计时计费软件使用的编程构件说明 上图说明&#xff1a;该软件可以用于桌球和棋牌同时计时计费&#xff0c;在没有开台的时候&#xff0c;图片是处于等待状态&#xff0c;这使用编程工具中的固定图像构件&#xff0c;在…

leetcode做题笔记203. 移除链表元素

给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xff1a;[1,2,3,4,5]示例 2&#xff1a; 输入&#xff1a…

css 两栏布局的实现

目录 前言 1. 浮动布局 用法 代码示例 理解 2. Flex布局 用法 代码示例 理解 3. Grid布局 用法 代码示例 理解 高质量的设计 前言 两栏布局是一种常见的网页设计模式&#xff0c;它将页面分为两个主要区域&#xff1a;主内容区域和侧边栏。这种布局方式不仅能够提…

睿趣科技:抖音小店申请流程

随着移动互联网的发展&#xff0c;越来越多的人开始尝试通过开设网店来创业。抖音作为国内最受欢迎的短视频平台之一&#xff0c;也推出了自己的电商功能——抖音小店。那么&#xff0c;如何申请抖音小店呢?下面就为大家详细介绍一下抖音小店的申请流程。 首先&#xff0c;打开…

Vue3:将表格数据下载为excel文件

需求 将表格数据或者其他形式的数据下载为excel文件 技术栈 Vue3、ElementPlus、 实现 1、安装相关的库 下载xlsx 和 file-saver 库 npm install -S file-saver npm install -S xlsx引入XLSX库和FileSaver库 import XLSX from xlsx; import FileSaver from file-saver;…

linux音频-IIS音频接口

IIS 总线 IIS(Integrate Interface of Sound)即集成音频接口&#xff0c;在上个世纪 80 年代首先被 Philips 公司用于消费产品的音频设备&#xff0c; I2S规范 I2S总线只能用来处理audio data&#xff0c;而别的信号比如控制信号&#xff0c;编码信号则交给别的模块处理。为了…

iOS调试技巧——使用Python 自定义LLDB

一、类介绍 在使用Python 自定义LLDB之前&#xff0c;先了解一下LLDB的一些类型 SBTarget 正在被调试的程序SBProcess 和程序关联的具体的进程SBThread 执行的线程SBFrame 和线程关联的一个栈帧SBVariable 变量&#xff0c;寄存器或是一个表达式 一般情况下&#xff0c;我们…

【从0到1设计一个网关】自研网关的架构搭建

文章目录 项目骨架搭建领域模型与DDD核心上下文模型封装静态配置的加载组件生命周期源码地址: 源码地址 项目骨架搭建 这里我使用的IDE工具是IDEA。 从上文中我们了解到,我们的项目大概有五个模块,Client,Common,Register Center,Config Center,Core这五个模块。 下面…

Linux进程终止

文章目录 进程退出场景进程退出码strerrorerrno浅谈进程异常exit && _exit 进程退出场景 代码运行完毕&#xff0c;结果正确代码运行完毕&#xff0c;结果不正确代码异常 进程退出码 我们写的C/C的代码&#xff0c;main函数每次都需要返回0&#xff0c;而这个return…

比Nginx测试桩更方便,ShenYu网关的Mock插件

有时候为了方便测试&#xff0c;我们需要模拟 HTTP 外部接口的返回结果。通常情况下&#xff0c;我们可以使用 Nginx 测试桩来实现这个目的。然而&#xff0c;Nginx 的使用门槛较高&#xff0c;可能对一些初级开发和测试人员来说有一定的难度。相比之下&#xff0c;Apache Shen…

深度学习_6_实战_点集最优直线解_代码解析

问题描述&#xff1a; 上述题目的意思为&#xff0c;人工造出一些数据点&#xff0c;对我们的模型y Xw b ∈进行训练&#xff0c;其中标准模型如下&#xff1a; 其中W和X都为张量&#xff0c;我们训练的模型越接近题目给出的标准模型越好 训练过程如下&#xff1a; 人造数…

SCSS动态生成类

前言 在项目开发中&#xff0c;为了方便样式的复用和规范化&#xff0c;通常都会统一一些公共的样式类&#xff0c;如果用传统的css来写就会显得很臃肿。 最近看了看接手的项目的公共css文件&#xff0c;发现很多重复的样式声明&#xff0c;还有全局的样式使用不统一问题。 例…

80.每日一练:移除元素(力扣)

问题描述 代码解决以及思想 解法一 class Solution { public:int removeElement(vector<int>& nums, int val) {int len 0; // 初始化一个用于记录非目标值个数的变量// 创建一个迭代器 it&#xff0c;指向 nums 的开头vector<int>::iterator it nums.beg…

Ubuntu22.04 交叉编译阿里oss c-sdk

一、交叉编译openssl Ubuntu20.04 交叉编译openssl 1.0.1f_编译前去除 makefile 中所有的"-m64"字段_qq76211822的博客-CSDN博客文章浏览阅读319次。Ubuntu20.04 交叉编译openssl_编译前去除 makefile 中所有的"-m64"字段https://blog.csdn.net/sz7621182…

【码银送书第九期】《ChatGPT 驱动软件开发:AI 在软件研发全流程中的革新与实践》

计算机技术的发展和互联网的普及&#xff0c;使信息处理和传输变得更加高效&#xff0c;极大地改变了金融、商业、教育、娱乐等领域的运作方式。数据分析、人工智能和云计算等新兴技术&#xff0c;也在不断地影响和改变着各个行业。 如今&#xff0c;我们正在见证人工智能技术的…

Hadoop3教程(三十四):(生产调优篇)MapReduce生产经验汇总

文章目录 &#xff08;164&#xff09;MR跑得慢的原因&#xff08;165&#xff09;MR常用调优参数Map阶段Reduce阶段 &#xff08;166&#xff09;MR数据倾斜问题参考文献 &#xff08;164&#xff09;MR跑得慢的原因 MR程序执行效率的瓶颈&#xff0c;或者说当你觉得你的MR程…

在 Mac M1 上运行 Llama 2 并进行训练

在 Mac M1 上运行 Llama 2 并进行训练 Llama 2 是由领先的人工智能研究公司 Meta &#xff08;前Facebook&#xff09;开发并发布的下一代大型语言模型 (LLM)。 它基于 2 万亿个公共数据 token 进行了预训练&#xff0c;旨在帮助开发人员和企业组织构建基于人工智能的生成工具和…