Flink学习-时间和窗口

在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的 窗口计算。所以窗口和时间往往是分不开的。

时间语义

image-20231231194655718

  • 事件时间(Event Time):每个事件在对应的设备上发生的时间,也就是数据生成的时间。
  • 处理时间(Processing Time):执行处理操作的机器的系统时间
  • 摄取时间(Ingestion Time):事件进入到flink的时间

哪种 时间语义更重要

image-20231231194959522

一般情况下,业务日志数据中都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。处理时间是我们计算效率的衡量标准,而事件事件更符合我们的业务计算逻辑。而处理时间是我们计算效率的衡量标准,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让流处理延迟降到最低,效率达到最高。

flink1.13版本开始,将事件时间作为默认的时间语义。

水位线(Watermark)

什么是水位线

事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

watermark特点

image-20231231195431317

  • 水位线时插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线时基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以保证正确处理乱序数据
  • 一个水位线表示在当前流中事件时间已经达到了时间戳tt之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据

总结起来,水位线(watermark)在Flink中的作用是用于处理乱序事件流,确保事件按照正确的顺序进行处理,以便进行准确的窗口计算和延迟处理。也就是, 牺牲掉一定的实时性,为了保证数据的完整性。

水位线的传递

image-20231231200028619

在 Flink 的数据流处理中,水位线是以特定的事件元素形式插入到数据流中的。这个特殊的事件元素被称为水位线事件(Watermark Event),它包含了水位线的时间戳信息。当数据流中的水位线事件到达算子(Operator)时,Flink 会根据其时间戳更新当前的水位线

​ 在源算子(Source Operator)中,可以通过调用特定的方法(如assignTimestampsAndWatermarks)来插入水位线事件。这样,在源算子产生的数据流中就会包含水位线事件,以及普通的数据事件。

​ 然后,水位线事件会随着数据流在不同的算子之间进行传递。当算子处理数据时,它会检查接收到的事件的时间戳,并与当前水位线进行比较。如果事件的时间戳大于当前水位线,算子会更新水位线,并触发相应的操作。

在“重分区”的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,这时我们应该以最慢的那个时钟,也就是最小的水位线为准。

水位线在上下游任务之间的传递,非常巧妙的避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。

如何生成水位线

生成水位线的总体原则

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。

所以 Flink 中的水位线,其实是流处理中对低延迟结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

水位线生成策略(Watermark Strategies)

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法 :assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器TimestampAssigner 和一个“水位线生成器WatermarkGenerator

TimestampAssigner: 由 WatermarkStrategy 显示地指定从数据里面哪一个字段提取当前的时间戳,然后把它分配到当前的数据上。 就相当于再数据上追加了一个字段,这个字段是真正的 timestamp。它有可能和之前某个字段一样,也可能基于之前的字段做了一定的改变。

**时间戳的分配是生成水位线的基础。**基于时间戳,我们可以指定水位线生成策略WatermarkGenerator

WatermarkGenerator: 主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要又有两个方法:onEvent()onPeriodicEmit()

  • onEvent:基于事件生成 watermark 。
  • onPeriodicEmit:基于周期性的发射生成 watermark 。默认200ms

Flink 内置水位线生成器

有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以 永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用.

val stream = env.addSource(new ClickSource)
.assignTimestampsAndWatermarks(
    WatermarkStrategy.forMonotonousTimestamps[Event]()
    .withTimestampAssigner(new SerializableTimestampAssigner[Event] 
    {override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {
                               element.timestamp
                           }
    }
    )
)

乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的 结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy.forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

val stream1 = env.addSource(new ClickSource)
//插入水位线的逻辑
.assignTimestampsAndWatermarks(
//针对乱序流插入水位线,延迟时间设置为 5s
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
    .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] 
        {override def extractTimestamp(element: Event, recordTimestamp: Long): Long = 			element.timestamp
        }
    )
)

自定义水位线策略

WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink 有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

  • 周期性水位线生成器(Periodic Generator)

    周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水 位线。

  • 断点式水位线生成器(Punctuated Generator)

    断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

窗口window

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想 要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这 就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心

窗口计算中,一般使用半开半闭的时间范围,即左闭右开。也就是说,窗口的开始时间是包含的,而结束时间是不包含的。因此,当一个事件的事件时间正好等于水位线时间时,它会被包含在该窗口的计算范围内。

​ 但是由于有乱序数据的存在,我们需要设置一个延迟时间来等所有数据到齐。比如设置延迟时间为2秒,这样0~10秒的窗口会在12的数据到来之后,才真正关闭计算输出结果。这样就可以包含迟到的数据了。

这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个 窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开.

窗口的分类

按照驱动类型分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window)。这在实际应用中最常见,之前所举的例子也都是时间窗口。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window)。

时间窗口

flink中有一个时间窗口的类,叫TimeWindow,有两个私有属性:startend。表示窗口的开始和结束的时间戳,单位为毫秒

private final long start;
private final long end;
public long maxTimestamp(){
    return end - 1;
}
计数窗口

基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。

计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口Global Window)实现。

按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:

  • 滚动窗口(Tumbling Window)
  • 滑动窗口(Sliding Window)
  • 会话窗口(Session Window)
  • 以及全局窗口(Global Window)
滚动窗口 (Tumbling Windows)

滚动窗口对数据进行均匀分片。窗口之间没有重叠,也不会有间隔是首尾相接的状态,如果把多个窗口的创建看作一个窗口的移动,那么他就像在滚动一样。

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现。

滑动窗口 (Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两 个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代 表了窗口计算的频率。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚 至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常 报警。这时滑动窗口无疑就是很好的实现方式。

会话窗口(Session Windows)

会话窗口顾名思义,是基于“会话”(session)来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。

与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义。对于会话窗口而言,最重要的参数就是会话超时时间的长度(size),也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的 提取器(gap extractor)动态提取最小间隔 gap 的值。

在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计。

全局窗口(Global Windows)

还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认 是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

image-20231231203922146

Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。

窗口API

按键分区窗口(Keyed Windows)

经过按键分区 keyBy()操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这 就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时 执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的 处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对DataStream调用keyBy()进行按键分区,然后再调用window() 定义窗口。

stream.keyBy(...)
.window(...)

非按键分区(Non-Keyed Windows)

如果没有进行 keyBy(),那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只 能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用 这种方式。

在代码中,直接基于 DataStream 调用 windowAll()定义窗口。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作

代码中窗口 API 的调用

窗口 操作主要有两个部分:

  • 窗口分配器(Window Assigners)
  • 窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate() 方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

窗口分配器有各种形式, 而窗口函数的调用方法也不只.aggregate()一种.

窗口分配器(Window Assigers)

定义窗口分配器Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。而窗口分配数据的规则,其实就对应着不同的窗口类型。所以可 以说,窗口分配器其实就是在指定窗口的类型

窗口分配器最通用的定义方式,就是调用 window()方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用 windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream

//滚动处理时间窗口 :窗口5秒
TumblingProcessingTimeWindows.of(Time.seconds(5));
//滑动处理时间窗口:长度为 10 秒、滑动步长为 5 秒的滑动窗 口
SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))    ;
//处理时间会话窗口 :  静态会话超时时间为 10 秒的会话窗口
ProcessingTimeSessionWindows.withGap(Time.seconds(10))    ;
ProcessingTimeSessionWindows.withDynamicGap(new 
									SessionWindowTimeGapExtractor[(String, Long)] 
                                            {
                                                override def extract(element: (String, Long)) { 
											// 提取 session gap 值返回, 单位毫秒
                                                element._1.length * 1000
                                             }
										}
                                           )    ;
//滚动事件时间窗口
TumblingEventTimeWindows.of(Time.seconds(5));
//滑动事件时间窗口
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5));
//事件时间会话窗口
EventTimeSessionWindows.withGap(Time.seconds(10));

//滚动计数窗口
stream.keyBy(...)
.countWindow(10);
//滑动计数窗口
stream.keyBy(...)
.countWindow(103);
//全局窗口
stream.keyBy(...)
.window(GlobalWindows.create())

窗口函数

定义窗 口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

根据处理的方式分为:增量聚合函数(流)、全窗口函数(批)。

增量聚合函数(incremental aggregation functions)

典型的增量聚合函数有两个:ReduceFunctionAggregateFunction

归约函数(ReduceFunction)

最基本的聚合方式就是归约(reduce)。窗口的归约聚合,就是将窗口中收集到的数据两两进行归约。当我们进行流处 理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了 增量式的聚合。

窗口函数中也提供了 ReduceFunction:只要基于 WindowedStream 调用.reduce()方法,然 后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚 合了。这里的 ReduceFunction 其实与简单聚合时用到的 ReduceFunction 是同一个函数类接口, 所以使用方式也是完全一样的。

//      抽取数据中的时间戳.
assignAscendingTimestamps(_.timestamp).map(x => {(x.user, 1)})
//      按照用户名进行分组
.keyBy(_._1)
//      设置5s的滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5))   )
//      聚合 保留一个用户名,值相加
.reduce((l, f) => (l._1, l._2 + f._2)    )

stream.print()
env.execute()
 
聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

为了更加灵活地处理窗口计算,Flink的Window API提供了更加一般化的aggregate()方法。 直接基于 WindowedStream 调用 aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个 方法需要传入一个 AggregateFunction 的实现类作为参数。

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型; 累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

AggregateFunction 接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。

另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream 调用。主要包括 sum()/max()/ maxBy() /min()/ minBy(),与 KeyedStream 的简单 聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。

全窗口函数(Full Window Functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗 口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

在 Flink 中,全窗口函数也有两种:WindowFunctionProcessWindowFunction

窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction())

处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象” (Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,可以认为是一个增强版的 WindowFunction。

其他API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。

触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用 trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream
.keyBy(...)
.window(...)
.trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTriggerCountTrigger。所以一般情况下是不需要自定义触发器的。

rigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

可以看到,除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。这里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发 窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

我们可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。

移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream
.keyBy(...)
.window(...)
.evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

允许延迟(Allowed Lateness)

在事件时间语义下,窗口中可能会出现数据迟到的情况。迟到数据默认会被直接丢弃,不会进入窗口进行统计计算。这样可能会导致统计结果不准确。

为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用 allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))

从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

将迟到的数据放入侧输出流

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些本该被丢弃的数据。

基于 WindowedStream 调用 sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

val stream = env.addSource(new ClickSource)
val outputTag = new OutputTag[Event]("late")
stream
.keyBy("user")
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用 getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

val winAggStream = stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction)

val lateStream = winAggStream.getSideOutput(outputTag)

getSideOutput()是 DataStream 的方法,获取到的侧输出流数据类型应该和OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

窗口的生命周期

1、 窗口的创建

窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

2、窗口计算的触发

除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。

对于不同的窗口类型,触发计算的条件也会不同。Flink 预定义的窗口类型都有对应内置的触发器。

3、 窗口的销毁

一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口 (TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw) 实现的,而全局窗口不会清除状态,所以就不会被销毁。

在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

迟到数据的处理

设置水位线延迟时间

水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后。

既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级

当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。

允许窗口处理迟到数据

除设置水位线延迟外,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。

这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。这其实就是著名的 Lambda 架构。原先需要两套独立的系统来同时保证实时性和结果的最 终正确性,如今 Flink 一套系统就全部搞定了。

将迟到数据发送到窗口侧输出流

还可以用窗口的侧输出流,来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283225.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【js】js解析Token:

一、效果&#xff1a; 二、实现&#xff1a; export function getTokenObject(token) {//通过split()方法将token转为字符串数组,数组中的第二个字符进行解析return token ? JSON.parse(decodeURIComponent(escape(window.atob(token.split(".")[1].replace(/-/g &…

2023年亲身经历总结 | 记录从大学到现在历程

现在是2023年12月31日&#xff0c;回想2023年&#xff0c;是充满挑战的一年&#xff0c;也是成就斐然的一年。回首过去&#xff0c;仿佛一幅画卷展开&#xff0c;每一笔每一刻都镌刻着成长的印记。 以下文章从大学经历说起&#xff0c;到现在发展情况&#xff0c;希望我的经历对…

大数据应用领域:数据驱动一切

大数据出现的时间只有十几年&#xff0c;被人们广泛接受并应用只有几年的时间&#xff0c;但就是这短短几年的时间&#xff0c;大数据呈现出爆炸式增长的态势。在各个领域&#xff0c;大数据的身影几乎无处不在。今天我们通过一些大数据典型的应用场景分析&#xff0c;一起来看…

ALSA学习(5)——设备中的alsa

参考博客&#xff1a; https://blog.csdn.net/DroidPhone/article/details/7165482 &#xff08;一下内容基本是原博主的博客转载&#xff09; 文章目录 一、ASOC的由来二、硬件架构三、软件架构四、数据结构五、内核对ASoC的改进 一、ASOC的由来 ASoC–ALSA System on Chip …

Vue-Setup

一、setup概述 小小提示&#xff1a;vue3中可以写多个根标签。 Person.vue中内容 <template><div class"person"><h2>姓名&#xff1a;{{name}}</h2><h2>年龄&#xff1a;{{age}}</h2><!--定义了一个事件&#xff0c;点击这…

【网络面试(5)】收发数据及断开服务器(四次挥手)

前面了解到服务器和客户端在创建套接字&#xff0c;建立连接后&#xff0c;就可以进入到下一步&#xff0c;双发可以互相发送和接收数据&#xff0c;本篇博客就来学习一下这个过程。  我们印象里&#xff0c;发送数据应该是我们在浏览器输入网址&#xff0c;敲击回车的一瞬间&…

L1-077:大笨钟的心情

有网友问&#xff1a;未来还会有更多大笨钟题吗&#xff1f;笨钟回复说&#xff1a;看心情…… 本题就请你替大笨钟写一个程序&#xff0c;根据心情自动输出回答。 输入格式&#xff1a; 输入在一行中给出 24 个 [0, 100] 区间内的整数&#xff0c;依次代表大笨钟在一天 24 小时…

【LLM 】7个基本的NLP模型,为ML应用程序赋能

在上一篇文章中&#xff0c;我们已经解释了什么是NLP及其在现实世界中的应用。在这篇文章中&#xff0c;我们将继续介绍NLP应用程序中使用的一些主要深度学习模型。 BERT 来自变压器的双向编码器表示&#xff08;BERT&#xff09;由Jacob Devlin在2018年的论文《BERT:用于语言…

C/C++ 函数重载

函数多态是C在C语言的基础新增的功能。默认参数能够使用不同数目的参数调用同一个函数&#xff0c;而函数多态(函数重载)让您能够使用多个同名的函数。术语“多态”指的是有多种形式&#xff0c;因此函数多态允许函数可以有多种形式。类似地&#xff0c;术语“函数重载”指的是…

【时钟】分布式时钟HLC|Logical Time|Vector Clock|True Time

目录 简略 详细 附录 1 分布式系统不能使用NTP的原因 简略 分布式系统中不同于单机系统不能使用NTP(网络时间协议&#xff08;Network Time Protocol&#xff09;)来获取时间&#xff0c;所以我们需要一个特别的方式来获取分布式系统中的时间&#xff0c;mvcc也是使用time保证读…

2024最全面且有知识深度的web3开发工具、web3学习项目资源平台

在Web3技术迅速发展的时代&#xff0c;寻找一个综合且深入的Web3开发工具和学习项目资源平台变得至关重要。今天&#xff0c;我将向大家介绍一个非常有价值的网站&#xff0c;它就是https://web3x.world 。 Web3X是一个全面而深入的Web3开发者社区&#xff0c;为开发者们提供了…

最优化方法Python计算:无约束优化应用——神经网络回归模型

人类大脑有数百亿个相互连接的神经元&#xff08;如下图(a)所示&#xff09;&#xff0c;这些神经元通过树突从其他神经元接收信息&#xff0c;在细胞体内综合、并变换信息&#xff0c;通过轴突上的突触向其他神经元传递信息。我们在博文《最优化方法Python计算&#xff1a;无约…

跳跃表原理及实现

一、跳表数据结构 跳表是有序表的一种&#xff0c;其底层是通过链表实现的。链表的特点是插入删除效率高&#xff0c;但是查找节点效率很低&#xff0c;最坏的时间复杂度是O(N)&#xff0c;那么跳表就是解决这一痛点而生的。 为了提高查询效率&#xff0c;我们可以给链表加上索…

打破成本壁垒,免费SSL证书为中小企业保驾护航

HTTPS&#xff0c;这个曾经看似遥远的技术词汇&#xff0c;如今已与我们每个人的网络生活息息相关。而实现HTTPS加密传输的关键一环——SSL证书&#xff0c;正以其独特的安全性能&#xff0c;为网站筑起一道坚实的防护墙。更令人惊喜的是&#xff0c;免费SSL证书服务已经到来&a…

数据结构与算法教程,数据结构C语言版教程!(第二部分、线性表详解:数据结构线性表10分钟入门)三

第二部分、线性表详解&#xff1a;数据结构线性表10分钟入门 线性表&#xff0c;数据结构中最简单的一种存储结构&#xff0c;专门用于存储逻辑关系为"一对一"的数据。 线性表&#xff0c;基于数据在实际物理空间中的存储状态&#xff0c;又可细分为顺序表&#xff…

自动化网络故障修复管理

什么是故障管理 故障管理是网络管理的组成部分&#xff0c;涉及检测、隔离和解决问题。如果实施得当&#xff0c;网络故障管理可以使连接、应用程序和服务保持在最佳水平&#xff0c;提供容错能力并最大限度地减少停机时间。专门为此目的设计的平台或工具称为故障管理系统。 …

JavaScript setTimeout和setInterval的用法与区别详解

目录 I. 总述 II. setTimeout()函数 III. setInterval()函数 IV. 新年倒计时案例 Javascript的setTimeOut和setInterval函数应用非常广泛&#xff0c;它们都用来处理延时和定时任务&#xff0c;下面这篇文章主要给大家介绍了关于JavaScript setTimeout和setInterval的用法与…

解决 Nginx 反向代理中的 DNS 解析问题:从挑战到突破20231228

引言 在使用 Nginx 作为反向代理服务器时&#xff0c;我们可能会遇到各种配置和网络问题。最近&#xff0c;我遇到了一个有趣的挑战&#xff1a;Nginx 在反向代理配置中无法解析特定的域名&#xff0c;导致 502 错误。这个问题的解决过程不仅揭示了 Nginx 的一个不太为人知的功…

分布式【雪花算法】

雪花算法 背景&#xff1a;在分布式系统中&#xff0c;需要使用全局唯一ID&#xff0c;期待ID能够按照时间有序生成。 **原理&#xff1a;**雪花算法是 64 位 的二进制&#xff0c;一共包含了四部分&#xff1a; 1位是符号位&#xff0c;也就是最高位&#xff0c;始终是0&am…

MySQL存储过程、创建、调用、查看、删除、存储过程与函数的额区别、缺陷等、存储过程写分页等

MySQL存储过程 1、存储过程的定义2、存储过程使用的意义3、存储过程的创建4、存储过程的调用5、存储过程的查看6、存储过程的删除7、存储及过程与函数的区别8、存储过程的缺陷9、存储过程写分页 1、存储过程的定义 存储过程&#xff1a;存储过程&#xff08;Stored Procedure&…