文章目录
- 1、基本处理函数
- 2、定时器和定时服务
- 3、KeyedProcessFunction下演示定时器
- 4、process重获取当前watermark
前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作
,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑
!!!
1、基本处理函数
处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:
stream.process(new MyProcessFunction())
-
ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction
-
ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型
-
ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}
抽象方法 processElement:
- 定义处理元素的逻辑
- 流中的每个元素都会调用一次这个房啊
- 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据
非抽象方法onTimer:
- 定时器触发时调用这个方法
- 注册定时器即设一个闹钟,
onTimer则是闹钟响了以后要做的事
- onTimer是基于时间线的一个回调方法
- onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)
最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction
关于处理函数的分类:
- 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
- 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction
2、定时器和定时服务
ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:
- 获取当前的处理时间
long currentProcessingTime();
- 获取当前的水位线(事件时间)
long currentWatermark();
- 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
- 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);
注意:
- 只有在KeyedStream中才支持使用TimerService设置定时器的操作
- TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次
3、KeyedProcessFunction下演示定时器
事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发
public class KeyedProcessTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("node01", 9527)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //乱序默认的水位生成器
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L) //时间戳提取
);
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
// TODO Process:keyed
SingleOutputStreamOperator<String> process = sensorKS.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
/**
* 来一条数据调用一次
* @param value 每条数据
* @param ctx 上下文对象
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//获取当前数据的key
String currentKey = ctx.getCurrentKey();
// 获取定时器服务对象
TimerService timerService = ctx.timerService();
// 数据中提取出来的事件时间
Long currentEventTime = ctx.timestamp();
//注册定时任务,水位线被推到5s时触发
timerService.registerEventTimeTimer(5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");
/**
* 时间进展到定时器注册的时间,调用该方法
* @param timestamp 当前时间进展,就是定时器被触发时的时间
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
String currentKey = ctx.getCurrentKey();
System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
}
}
);
process.print();
env.execute();
}
}
运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms
,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:
看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发
再用处理时间下的定时器:
public class KeyedProcessTimerDemo {
public static void main(String[] args) throws Exception {
//...重复代码略,同上
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
// TODO Process:keyed
SingleOutputStreamOperator<String> process = sensorKS.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//获取当前数据的key
String currentKey = ctx.getCurrentKey();
TimerService timerService = ctx.timerService();
//当前数据的处理时间
long currentTs = timerService.currentProcessingTime();
//定时器不用水位线为标杆,直接处理时间加5s
timerService.registerProcessingTimeTimer(currentTs + 5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");
}
//...重复代码略,同上
}
运行:
4、process重获取当前watermark
还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:
//...重复代码略,同上
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 获取 process的 当前watermark
long currentWatermark = timerService.currentWatermark();
System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
}
此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个
在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。
上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)