事件时间+时间窗口,最后一个窗口不执行问题踩坑与源码分析
1. 结论
在使用事件时间和时间窗口的过程中,当最后一个事件的事件时间未达到时间窗口的最大时间,窗口不会触发。
举例说明,在按小时的滚动窗口中,假设当前时间是12:05点,按照正常预想13:00时窗口会触发执行,但是在12:00到13:00的时间段内,最后一个事件的时间是12:50,之后再未产生新的事件,那么在13:00的时候,窗口并不会触发执行,只有当后续再产生新的事件,并且事件时间大于13:00时,12-13的窗口才会执行。
在实际开发过程中可能会带来一些问题,当事件不是源源不断的产生时,最后一个窗口不执行,影响结果。
示例代码见末尾。
2. 分析原因
源码基于flink 1.14.4
窗口是否执行是由trigger控制,从trigger中逐步寻找原因,在示例代码中使用了TumblingEventTimeWindows
,其默认trigger是EventTimeTrigger
(通过方法org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows#getDefaultTrigger
可知)。
2.1. 窗口触发
从EventTimeTrigger
实现可知,只有当窗口最大时间小于等于当前水位(每个事件到达时判断) 或 当eventime timer触发时的触发时间戳等于窗口最大时间时才会触发窗口。因此最终决定窗口是否触发的原因是水位值。
flink自身提供的其他和事件时间有关的trigger触发时机类似。
// 返回TriggerResult.FIRE时触发计算
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// 窗口最大时间小于等于水位值时,触发
return TriggerResult.FIRE;
} else {
// 大于水位值时,将窗口最大时间注册timer,注册timer后,当水印时间超过参数指定的时间时,会调用下面的onEvenTime方法
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
// time是timer触发时的事件戳
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
}
因此再了解下水位值如何更新。
2.2. 事件时间和水位
DataStrema
中默认提供了3种分配水位的方式,重点了解下通过WatermarkStrategy
实现指定水位的方法。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy)
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)
WatermarkStrategy
的核心是TimestampAssigner
和WatermarkGenerator
(按照字面意思暂且称之为时间戳分配器和水位生成器),水位生成器根据指定的时间戳来生成水位。同时WatermarkStrategy
默认提供了两种快速实现WatermarkStrategy的方法forMonotonousTimestamps
和forBoundedOutOfOrderness
,分别表示顺序递增和允许乱序的水位生成策略。
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
/** 重点,需要在实现WatermarkStrategy时,实现此方法 */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
// 提供了默认实现,用于类似于kafka等数据源时
return new RecordTimestampAssigner<>();
}
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
}
AscendingTimestampsWatermarks
是BoundedOutOfOrdernessWatermarks
的特例(outOfOrdernessMillis=0)。
由于WatermarkGenerator负责水位的具体生成,因此从该接口中寻找真相,接口内容如下,包含两个方法。具体作用见注释
public interface WatermarkGenerator<T> {
/**
* 指定每个事件到达后,如何处理
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* 周期调用,调用时处理水位。
* 调用时间间隔取决于ExecutionConfig#getAutoWatermarkInterval(),默认值为200ms
*/
void onPeriodicEmit(WatermarkOutput output);
}
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
private long maxTimestamp;
private final long outOfOrdernessMillis;
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// 默认最大时间是long最小值+允许的延迟时间+1,在允许延迟时间合理时,默认最大时间可以认为时long的最小值
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
从BoundedOutOfOrdernessWatermark的实现中可知,每个事件到达后,仅获取当前最大的时间戳。而水位是通过周期方法来生成的,默认情况下200ms生成一次水位。水位值=当前事件的最大时间戳-允许延迟的事件值-1。
到此真相就找到了,水位是根据当前已到事件的最大时间决定的,当后续无更大的时间的事件到之前,水位将暂停。由于水位值无法增加,当窗口到达自然时间点时,无法触发。
示例代码
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvConf.getEnv();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999);
DataStream<SensorRecord> inputStream = source.map(line -> {
String[] fields = line.split(",");
return new SensorRecord(fields[0], new Double(fields[1]));
});
WatermarkStrategy<SensorRecord> watermarkStrategy = WatermarkStrategy
// 单调递增的水位生成器
.<SensorRecord>forMonotonousTimestamps()
// 时间戳生成器,时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
.withTimestampAssigner((event, ts) -> event.getTs());
SingleOutputStreamOperator<SensorRecord> input = inputStream.assignTimestampsAndWatermarks(watermarkStrategy);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = input.keyBy(SensorRecord::getId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AggregateFunction<SensorRecord, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> createAccumulator() {
return null;
}
@Override
public Tuple2<String, Integer> add(SensorRecord value, Tuple2<String, Integer> accumulator) {
return null;
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
return null;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
return null;
}
});
result.print();
env.execute();
}