在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的 窗口计算。所以窗口和时间往往是分不开的。
时间语义
- 事件时间(Event Time):每个事件在对应的设备上发生的时间,也就是数据生成的时间。
- 处理时间(Processing Time):执行处理操作的机器的系统时间
- 摄取时间(Ingestion Time):事件进入到flink的时间
哪种 时间语义更重要
一般情况下,业务日志数据中都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。处理时间是我们计算效率的衡量标准,而事件事件更符合我们的业务计算逻辑。而处理时间是我们计算效率的衡量标准,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让流处理延迟降到最低,效率达到最高。
flink1.13
版本开始,将事件时间作为默认的时间语义。
水位线(Watermark)
什么是水位线
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark
)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
watermark特点
- 水位线时插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线时基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以保证正确处理乱序数据
- 一个水位线表示在当前流中事件时间已经达到了时间戳
t
,t
之前的所有数据都到齐了,之后流中不会出现时间戳t'<t
的数据
总结起来,水位线(watermark)在Flink中的作用是用于处理乱序事件流,确保事件按照正确的顺序进行处理,以便进行准确的窗口计算和延迟处理。也就是, 牺牲掉一定的实时性,为了保证数据的完整性。
水位线的传递
在 Flink 的数据流处理中,水位线是以特定的事件元素形式插入到数据流中的。这个特殊的事件元素被称为水位线事件(Watermark Event),它包含了水位线的时间戳信息。当数据流中的水位线事件到达算子(Operator)时,Flink 会根据其时间戳更新当前的水位线。
在源算子(Source Operator)中,可以通过调用特定的方法(如assignTimestampsAndWatermarks
)来插入水位线事件。这样,在源算子产生的数据流中就会包含水位线事件,以及普通的数据事件。
然后,水位线事件会随着数据流在不同的算子之间进行传递。当算子处理数据时,它会检查接收到的事件的时间戳,并与当前水位线进行比较。如果事件的时间戳大于当前水位线,算子会更新水位线,并触发相应的操作。
在“重分区”的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,这时我们应该以最慢的那个时钟,也就是最小的水位线为准。
水位线在上下游任务之间的传递,非常巧妙的避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。
如何生成水位线
生成水位线的总体原则
如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
水位线生成策略(Watermark Strategies)
在 Flink 的 DataStream API
中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法 :assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
assignTimestampsAndWatermarks()
方法需要传入一个 WatermarkStrategy
作为参数,这就是 所谓的“水位线生成策略”。WatermarkStrategy
中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator
。
TimestampAssigner
: 由 WatermarkStrategy
显示地指定从数据里面哪一个字段提取当前的时间戳,然后把它分配到当前的数据上。 就相当于再数据上追加了一个字段,这个字段是真正的 timestamp
。它有可能和之前某个字段一样,也可能基于之前的字段做了一定的改变。
**时间戳的分配是生成水位线的基础。**基于时间戳,我们可以指定水位线生成策略
WatermarkGenerator
。
WatermarkGenerator
: 主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator
接口中,主要又有两个方法:onEvent()
和 onPeriodicEmit()
。
onEvent
:基于事件生成 watermark 。onPeriodicEmit
:基于周期性的发射生成watermark
。默认200ms
Flink 内置水位线生成器
有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以 永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用.
val stream = env.addSource(new ClickSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]()
.withTimestampAssigner(new SerializableTimestampAssigner[Event]
{override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {
element.timestamp
}
}
)
)
乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的 结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy.forBoundedOutOfOrderness()
方法就可以实现。这个方法需要传入一个 maxOutOfOrderness
参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
val stream1 = env.addSource(new ClickSource)
//插入水位线的逻辑
.assignTimestampsAndWatermarks(
//针对乱序流插入水位线,延迟时间设置为 5s
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[Event]
{override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
)
)
自定义水位线策略
在 WatermarkStrategy
中,时间戳分配器 TimestampAssigner
都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator
的实现。整体说来,Flink 有两种不同的生成水位线的方式:一种是周期性的(Periodic
),另一种是断点式的(Punctuated
)。
-
周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过
onEvent()
观察判断输入的事件,而在onPeriodicEmit()
里发出水 位线。 -
断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测
onEvent()
中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()
发出水位线。
窗口window
Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想 要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这 就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心。
窗口计算中,一般使用半开半闭的时间范围,即左闭右开。也就是说,窗口的开始时间是包含的,而结束时间是不包含的。因此,当一个事件的事件时间正好等于水位线时间时,它会被包含在该窗口的计算范围内。
但是由于有乱序数据的存在,我们需要设置一个延迟时间来等所有数据到齐。比如设置延迟时间为2
秒,这样0~10
秒的窗口会在12
的数据到来之后,才真正关闭计算输出结果。这样就可以包含迟到的数据了。
这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个 窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开.
窗口的分类
按照驱动类型分类
窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。
我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window
)。这在实际应用中最常见,之前所举的例子也都是时间窗口。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window
)。
时间窗口
flink中有一个时间窗口的类,叫TimeWindow
,有两个私有属性:start
和 end
。表示窗口的开始和结束的时间戳,单位为毫秒
private final long start;
private final long end;
public long maxTimestamp(){
return end - 1;
}
计数窗口
基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。
计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口(Global Window
)实现。
按照窗口分配数据的规则分类
根据分配数据的规则,窗口的具体实现可以分为 4 类:
- 滚动窗口(Tumbling Window)
- 滑动窗口(Sliding Window)
- 会话窗口(Session Window)
- 以及全局窗口(Global Window)
滚动窗口 (Tumbling Windows)
滚动窗口对数据进行均匀分片。窗口之间没有重叠,也不会有间隔,是首尾相接的状态,如果把多个窗口的创建看作一个窗口的移动,那么他就像在滚动一样。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现。
滑动窗口 (Sliding Windows)
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。
既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两 个:除去窗口大小(window size
)之外,还有一个“滑动步长”(window slide
),它其实就代 表了窗口计算的频率。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。
在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚 至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常 报警。这时滑动窗口无疑就是很好的实现方式。
会话窗口(Session Windows)
会话窗口顾名思义,是基于“会话”(session
)来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。
与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义。对于会话窗口而言,最重要的参数就是会话超时时间的长度(size),也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap
)小于指定的大小(size
),那说明还在保持会话,它们就属于同一个窗口;如果 gap
大于 size,
那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的 提取器(gap extractor
)动态提取最小间隔 gap 的值。
在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计。
全局窗口(Global Windows)
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认 是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。
Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。
窗口API
按键分区窗口(Keyed Windows)
经过按键分区 keyBy()操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这 就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时 执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的 处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用keyBy()
进行按键分区,然后再调用window() 定义窗口。
stream.keyBy(...)
.window(...)
非按键分区(Non-Keyed Windows)
如果没有进行 keyBy(),那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只 能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用 这种方式。
在代码中,直接基于 DataStream 调用 windowAll()定义窗口。
stream.windowAll(...)
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll
本身就是一个非并行的操作。
代码中窗口 API 的调用
窗口 操作主要有两个部分:
- 窗口分配器(Window Assigners)
- 窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
其中.window()
方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()
方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。
窗口分配器有各种形式, 而窗口函数的调用方法也不只.aggregate()
一种.
窗口分配器(Window Assigers)
定义窗口分配器(Window Assigners
)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。而窗口分配数据的规则,其实就对应着不同的窗口类型。所以可 以说,窗口分配器其实就是在指定窗口的类型。
窗口分配器最通用的定义方式,就是调用 window()
方法。这个方法需要传入一个 WindowAssigner
作为参数,返回 WindowedStream
。如果是非按键分区窗口,那么直接调用 windowAll()
方法,同样传入一个 WindowAssigner
,返回的是 AllWindowedStream
。
//滚动处理时间窗口 :窗口5秒
TumblingProcessingTimeWindows.of(Time.seconds(5));
//滑动处理时间窗口:长度为 10 秒、滑动步长为 5 秒的滑动窗 口
SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)) ;
//处理时间会话窗口 : 静态会话超时时间为 10 秒的会话窗口
ProcessingTimeSessionWindows.withGap(Time.seconds(10)) ;
ProcessingTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor[(String, Long)]
{
override def extract(element: (String, Long)) {
// 提取 session gap 值返回, 单位毫秒
element._1.length * 1000
}
}
) ;
//滚动事件时间窗口
TumblingEventTimeWindows.of(Time.seconds(5));
//滑动事件时间窗口
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5));
//事件时间会话窗口
EventTimeSessionWindows.withGap(Time.seconds(10));
//滚动计数窗口
stream.keyBy(...)
.countWindow(10);
//滑动计数窗口
stream.keyBy(...)
.countWindow(10,3);
//全局窗口
stream.keyBy(...)
.window(GlobalWindows.create())
窗口函数
定义窗 口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
根据处理的方式分为:增量聚合函数(流)、全窗口函数(批)。
增量聚合函数(incremental aggregation functions)
典型的增量聚合函数有两个:ReduceFunction
和 AggregateFunction
。
归约函数(ReduceFunction)
最基本的聚合方式就是归约(reduce)。窗口的归约聚合,就是将窗口中收集到的数据两两进行归约。当我们进行流处 理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了 增量式的聚合。
窗口函数中也提供了 ReduceFunction
:只要基于 WindowedStream
调用.reduce()
方法,然 后传入 ReduceFunction
作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚 合了。这里的 ReduceFunction
其实与简单聚合时用到的 ReduceFunction
是同一个函数类接口, 所以使用方式也是完全一样的。
// 抽取数据中的时间戳.
assignAscendingTimestamps(_.timestamp).map(x => {(x.user, 1)})
// 按照用户名进行分组
.keyBy(_._1)
// 设置5s的滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)) )
// 聚合 保留一个用户名,值相加
.reduce((l, f) => (l._1, l._2 + f._2) )
stream.print()
env.execute()
聚合函数(AggregateFunction)
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
为了更加灵活地处理窗口计算,Flink的Window API
提供了更加一般化的aggregate()
方法。 直接基于 WindowedStream
调用 aggregate()
方法,就可以定义更加灵活的窗口聚合操作。这个 方法需要传入一个 AggregateFunction
的实现类作为参数。
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
AggregateFunction
可以看作是 ReduceFunction
的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型; 累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
AggregateFunction
接口中有四个方法:
createAccumulator()
:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。add()
:将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。getResult()
:从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。merge()
:合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于
WindowedStream
调用。主要包括sum()
/max()/
maxBy()
/min()
/minBy()
,与KeyedStream
的简单 聚合非常相似。它们的底层,其实都是通过AggregateFunction
来实现的。
全窗口函数(Full Window Functions)
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗 口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在 Flink 中,全窗口函数也有两种:WindowFunction
和 ProcessWindowFunction
。
窗口函数(WindowFunction)
WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream
调用.apply()
方法,传入一个 WindowFunction 的实现类。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
处理窗口函数(ProcessWindowFunction)
ProcessWindowFunction
是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction
还可以获取到一个“上下文对象” (Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time
)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction
更加灵活、功能更加丰富,可以认为是一个增强版的 WindowFunction。
其他API
对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。
触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于 WindowedStream
调用 trigger()
方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream
.keyBy(...)
.window(...)
.trigger(new MyTrigger())
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner
)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger
;类似还有 ProcessingTimeTrigger
和 CountTrigger
。所以一般情况下是不需要自定义触发器的。
rigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
onElement()
:窗口中每到来一个元素,都会调用这个方法。onEventTime()
:当注册的事件时间定时器触发时,将调用这个方法。onProcessingTime ()
:当注册的处理时间定时器触发时,将调用这个方法。clear()
:当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
可以看到,除了 clear()
比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()
是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext
)对象,可以用来注册定时器回调(callback)。这里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow
)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发 窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。
上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult
,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。
- CONTINUE(继续):什么都不做
- FIRE(触发):触发计算,输出结果
- PURGE(清除):清空窗口中的所有数据,销毁窗口
- FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
我们可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult
的定义告诉我们,两者可以分开。
移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream
调用.evictor()
方法,就可以传入一个自定义的移除器(Evictor)。Evictor
是一个接口,不同的窗口类型都有各自预实现的移除器。
stream
.keyBy(...)
.window(...)
.evictor(new MyEvictor())
Evictor 接口定义了两个方法:
evictBefore()
:定义执行窗口函数之前的移除数据操作evictAfter()
:定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions
)之前移除数据的。
允许延迟(Allowed Lateness)
在事件时间语义下,窗口中可能会出现数据迟到的情况。迟到数据默认会被直接丢弃,不会进入窗口进行统计计算。这样可能会导致统计结果不准确。
为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness
)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。
基于 WindowedStream
调用 allowedLateness
()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。
stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))
从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。
将迟到的数据放入侧输出流
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些本该被丢弃的数据。
基于 WindowedStream
调用 sideOutputLateData()
方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag
的类型与流中数据类型相同。
val stream = env.addSource(new ClickSource)
val outputTag = new OutputTag[Event]("late")
stream
.keyBy("user")
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用 getSideOutput()
方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。
val winAggStream = stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction)
val lateStream = winAggStream.getSideOutput(outputTag)
getSideOutput()
是 DataStream 的方法,获取到的侧输出流数据类型应该和OutputTag
指定的类型一致,与窗口聚合之后流中的数据类型可以不同。
窗口的生命周期
1、 窗口的创建
窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。
2、窗口计算的触发
除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。
对于不同的窗口类型,触发计算的条件也会不同。Flink 预定义的窗口类型都有对应内置的触发器。
3、 窗口的销毁
一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口 (TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw) 实现的,而全局窗口不会清除状态,所以就不会被销毁。
在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。
迟到数据的处理
设置水位线延迟时间
水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后。
既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级。
当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。
允许窗口处理迟到数据
除设置水位线延迟外,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。
这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。这其实就是著名的 Lambda 架构。原先需要两套独立的系统来同时保证实时性和结果的最 终正确性,如今 Flink 一套系统就全部搞定了。
将迟到数据发送到窗口侧输出流
还可以用窗口的侧输出流,来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。
所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。