【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例 - 完整版

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、Flink的window介绍
    • 1、window介绍
    • 2、window API
      • 1)、WindowAssigner
      • 2)、Trigger
      • 3)、Evictor
    • 3、window的生命周期
  • 二、window的分类
    • 1、Tumbling Windows
    • 2、Sliding Windows
    • 3、Session Windows
    • 4、Global Windows
    • 5、按照时间time和数量count分类
    • 6、按照滑动间隔slide和窗口大小size分类
  • 三、窗口函数
    • 1、ReduceFunction
    • 2、AggregateFunction
    • 3、ProcessWindowFunction
    • 4、ProcessWindowFunction with Incremental Aggregation
      • 1)、Incremental Window Aggregation with ReduceFunction
      • 2)、Incremental Window Aggregation with AggregateFunction
  • 四、maven依赖
  • 五、示例:基于时间的滚动和滑动窗口
    • 1、滚动窗口实现统计地铁进站口人数
      • 1)、一般实现(Tuple2数据结构)及验证
      • 2)、面向对象实现(pojo数据结构)及验证
      • 3)、面向对象lambda实现(pojo的数据结构lambda)及验证
      • 4)、一般lambda实现(Tuple2数据结构)及验证
    • 2、滑动窗口实现统计地铁进站口人数
      • 1)、一般实现(Tuple2数据结构)及验证
      • 2)、面向对象实现(pojo数据结构)及验证
  • 六、示例:基于数量的滚动窗口与滑动窗口
    • 1、滚动窗口实现地铁进站口人数
      • 1)、实现
      • 2)、验证步骤
    • 2、滑动窗口实现地铁进站口人数
      • 1)、实现
      • 2)、验证步骤
  • 七、示例:会话窗口
    • 1、实现
    • 2、验证步骤


本文介绍了Flink window的分类以及三个示例分别实现滚动、滑动以及会话窗口,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章:

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(1)- 窗口介绍、分类、函数
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(3)- 基于数量的滚动和滑动、会话窗口
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例 - 完整版

关于窗口的更多介绍参考文章:
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)

一、Flink的window介绍

1、window介绍

Windows是处理无限流的核心。Windows将流划分为有限大小的“buckets”,我们可以在其上进行计算。

窗口Flink程序的一般结构如下所示。第一个片段指的是键控流,而第二个片段指非键控流。可以看出,唯一的区别是对键控流的keyBy(…)调用和对非键控流变为windowAll(…)的window(…)。这也将作为页面其余部分的路线图。

流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

在流处理应用中,数据是连续不断的,有时需要做一些聚合类的处理,例如在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

2、window API

  • Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

  • Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

在上文中,方括号([…])中的命令是可选的。

使用keyby的流,应该使用window方法

未使用keyby的流,应该调用windowAll方法

1)、WindowAssigner

window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中,Flink提供了很多各种场景用的WindowAssigner:

如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

2)、Trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可。

  • onElement()
  • onEventTime()
  • onProcessingTime()
    此抽象类的这三个方法会返回一个 TriggerResult, TriggerResult 有如下几种可能的选择:
  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口

3)、Evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后。

  • evictBefore() 包含要在窗口函数之前应用的eviction逻辑,而evictAfter()包含将在窗口函数之后应用的逻辑。在应用窗口函数之前eviction的元素将不会被它处理。

  • CountEvictor:保留窗口中最多用户指定数量的元素,并丢弃窗口缓冲区开头的剩余元素。

  • DeltaEvictor:取一个DeltaFunction和一个阈值,计算窗口缓冲区中最后一个元素和其余元素之间的增量,并删除增量大于或等于阈值的元素。

  • TimeEvictor:以毫秒为单位的间隔作为参数,对于给定的窗口,它在其元素中找到最大时间戳max_ts,并删除所有时间戳小于max_ts-interval的元素。

Flink 提供了如下三种通用的 evictor:

  • CountEvictor 保留指定数量的元素
    TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值
  • DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。

3、window的生命周期

应该属于该窗口的第一个元素到达后,就会立即创建一个窗口,并且当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,该窗口将被完全删除(请参阅允许延迟)。

Flink只保证删除基于时间的窗口,而不保证删除其他类型的窗口,例如全局窗口(请参见窗口分配器)。

例如,使用基于事件时间的窗口策略,该策略每5分钟创建一个不重叠(或滚动)的窗口,并且允许的延迟为1分钟,当时间戳位于该时间间隔内的第一个元素到达时,Flink将为12:00到12:05之间的时间间隔创建一个新窗口,并且当水印超过12:06时间戳时,它将删除该窗口。

此外,每个窗口都将有一个触发器(请参阅触发器)和一个附加的函数(ProcessWindowFunction、ReduceFunction或AggregateFunction)(请参阅窗口函数)。

该函数将包含要应用于窗口内容的计算,而触发器指定了窗口被视为准备应用该函数的条件。

触发策略可能类似于“当窗口中的元素数超过4时”,或者“当水印经过窗口末尾时”。

触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的元素,而不是窗口元数据。

这意味着仍然可以向该窗口添加新数据。

除此之外,您还可以指定一个Evictor,该Evictor能够在触发器触发后以及应用该函数之前和/或之后从窗口中删除元素。

简单的说,当有第一个属于该window的元素到达时就创建了一个window,当时间或事件触发该windowremoved的时候则结束。每个window都有一个Trigger和一个Function,function用于计算,trigger用于触发window条件。同时也可以使用Evictor在Trigger触发前后对window的元素进行处理。

二、window的分类

结合实际的业务应用选择适用的接口很重要,一般而言,TumblingTimeWindows、SlidingTimeWindows需要重点关注,而EventTimeSessionWindows和ProcessingTimeSessionWindows是Flink的session会话窗口,需要设置会话超时时间,如果超时则触发window计算。Flink 1.17版本已经实现的窗口见图。
在这里插入图片描述

具体分类以及使用场景如下。

1、Tumbling Windows

滚动窗口分配器(Tumbling windows assigner)将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定大小,不会重叠。例如,如果指定大小为 5 分钟的滚动窗口,则将评估当前窗口,并且每 5 分钟启动一个新窗口,如下图所示。
在这里插入图片描述
示例代码

DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

2、Sliding Windows

滑动窗口分配器(sliding windows assigner)将元素分配给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。窗口滑动参数控制滑动窗口的启动频率。因此,如果 sliding小于size,则滑动窗口可能会重叠。在这种情况下,元素被分配给多个窗口。例如,可以有大小为 10 分钟的窗口,该窗口滑动 5 分钟。这样,您每 5 分钟就会得到一个窗口,其中包含过去 10 分钟内到达的事件,如下图所示。
在这里插入图片描述
示例代码

ataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

3、Session Windows

会话窗口分配器(session windows assigner)按活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内未收到元素时(即,当出现不活动间隙时),会话窗口将关闭。会话窗口分配器可以配置静态会话间隙或会话间隙提取器功能,该函数定义不活动时间的时间。当此时间段到期时,当前会话将关闭,后续元素将分配给新的会话窗口。

会话窗口分配器按活动会话对元素进行分组。
与滚动窗口和滑动窗口不同,会话窗口不重叠,也没有固定的开始和结束时间。
相反,当会话窗口在一定时间段内没有接收到元素时,即,当出现不活动间隙时,会话窗口将关闭。
会话窗口分配器可以配置有静态会话间隙,也可以配置有会话间隙提取器功能,该功能定义不活动时段的长度。
当这段时间到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。
在这里插入图片描述

示例代码

DataStream<T> input = ...;

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

由于会话窗口没有固定的开始和结束,因此它们的计算方式与翻转和滑动窗口不同。在内部,会话窗口运算符为每个到达记录创建一个新窗口,如果窗口彼此靠近而不是定义的间隙,则将它们合并在一起。为了可合并,会话窗口运算符需要一个合并触发器和一个合并窗口函数,例如 ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

4、Global Windows

全局窗口分配器(global windows assigner)将具有相同键的所有元素分配给同一个全局窗口。只有自己自定义触发器的时候该窗口才能使用。否则,将不会执行任何计算,因为全局窗口没有一个自然的终点,我们可以在该端点处理聚合元素。
在这里插入图片描述

示例代码

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

5、按照时间time和数量count分类

  • time-window,时间窗口,根据时间划分窗口,如:每xx小时统计最近xx小时的数据
  • count-window,数量窗口,根据数量划分窗口,如:每xx条/行数据统计最近xx条/行数据

在这里插入图片描述

6、按照滑动间隔slide和窗口大小size分类

  • tumbling-window,滚动窗口,size=slide,如,每隔10s统计最近10s的数据
    在这里插入图片描述

  • sliding-window,滑动窗口,size>slide,如,每隔5s统计最近10s的数据
    在这里插入图片描述

当size<slide的时候,如每隔15s统计最近10s的数据,会有数据丢失,视具体情况而定是否使用

三、窗口函数

定义窗口分配器(window assigner)后,需要指定要在每个窗口上执行的计算。这是 window 函数的职责,一旦系统确定窗口已准备好处理,它就用于处理每个(可能是keyed)窗口的元素。

window 函数有 ReduceFunction、AggregateFunction 或 ProcessWindowFunction 。前两个可以更有效地执行,因为 Flink 可以在每个窗口到达时增量聚合元素。ProcessWindowFunction 获取窗口中包含的所有元素的可迭代对象,以及有关元素所属窗口的其他元信息。

使用 ProcessWindowFunction 的窗口化转换不能像其他情况那样有效地执行,因为 Flink 在调用函数之前必须在内部缓冲窗口的所有元素。通过将 ProcessWindowFunction 与 ReduceFunction 或 AggregateFunction 结合使用来获取窗口元素的增量聚合和 ProcessWindowFunction 接收的其他窗口元数据,可以缓解此问题。

1、ReduceFunction

ReduceFunction 指定如何将输入中的两个元素组合在一起以生成相同类型的输出元素。Flink 使用 ReduceFunction 以增量方式聚合窗口的元素。

  • 代码示例-计算2个字段的和
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

2、AggregateFunction

聚合函数是 ReduceFunction 的通用版本,具有三种类型:输入类型 (IN)、累加器类型 (ACC) 和输出类型 (OUT)。输入类型是输入流中的元素类型,AggregateFunction 具有将一个输入元素添加到累加器的方法。该接口还具有用于创建初始累加器、将两个累加器合并为一个累加器以及从累加器中提取输出(OUT 类型)的方法。与 ReduceFunction 相同,Flink 将在窗口的输入元素到达时增量聚合它们。

  • 代码示例-计算两个字段的平均值
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

3、ProcessWindowFunction

ProcessWindowFunction 获取一个包含窗口所有元素的 Iterable,以及一个可以访问时间和状态信息的 Context 对象,这使其能够提供比其他窗口函数更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为准备好进行处理。

  • 代码示例-统计个数
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

将 ProcessWindowFunction 用于简单的聚合(如计数)效率非常低。一般是将 ReduceFunction 或 AggregateFunction 与 ProcessWindowFunction 结合使用,以获取增量聚合和 ProcessWindowFunction 的添加信息。

4、ProcessWindowFunction with Incremental Aggregation

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 结合使用,以便在元素到达窗口时以增量方式聚合元素。当窗口关闭时,将向进程窗口函数提供聚合结果。这允许它以增量方式计算窗口,同时可以访问 ProcessWindowFunction 的其他窗口元信息。

1)、Incremental Window Aggregation with ReduceFunction

下面的示例演示如何将增量 ReduceFunction 与 ProcessWindowFunction 结合使用,以返回窗口中的最小事件以及窗口的开始时间。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

2)、Incremental Window Aggregation with AggregateFunction

下面的示例演示如何将增量聚合函数与 ProcessWindowFunction 结合使用以计算平均值,并发出键和窗口以及平均值。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}

四、maven依赖

<properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


五、示例:基于时间的滚动和滑动窗口

1、滚动窗口实现统计地铁进站口人数

实现:每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数

1)、一般实现(Tuple2数据结构)及验证

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
		
		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");

				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});
		
		//按照地铁口分组
//		KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//			@Override
//			public String getKey(Tuple2<String, Integer> value) throws Exception {
//				return value.f0;
//			}
//		});
		//另外一种分组方式
		KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = subwayExit.keyBy(0);
				
		DataStream<Tuple2<String, Integer>> result1 = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
				//另外一种聚合方式实现
//				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//
//					@Override
//					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
//
//						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
//					}
//
//				});
				.sum(1);

		// sink
		result1.print();

		// execute
		env.execute();
	}

}

验证步骤

  • 1、启动nc
nc -lk 9999
  • 2、启动应用程序

  • 3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
no1,1
no2,1
no1,2
no1,3
no2,6
  • 4、查看应用程序控制台输出
    在这里插入图片描述

2)、面向对象实现(pojo数据结构)及验证

  • bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubWay {
	// 地铁站进站口
	private String No;
	// 某一时段人数
	private Integer userCount;
}
  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});
		
		//userCount是Subway的属性名称
		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		// sink
		result.print();

		// execute
		env.execute();
	}

}
  • 验证步骤
    1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
no,1
no2,1
no2,4
no1,2

4、查看应用程序控制台输出
在这里插入图片描述

3)、面向对象lambda实现(pojo的数据结构lambda)及验证

Subway的bean参考上文示例中的内容。

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo3 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
//		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {
//
//			@Override
//			public Subway map(String line) throws Exception {
//				String[] arr = line.split(",");
//				return new Subway(arr[0], Integer.parseInt(arr[1]));
//			}
//		});
		DataStream<Subway> subwayExit = lines.map(new Splitter());

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(Subway::getNo);
		// subwayExit.keyBy(subway->subway.getNo())

		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		
		// sink
		result.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Subway> {
		@Override
		public Subway map(String value) {
			String[] arr = value.split(",");
			return new Subway(arr[0], Integer.parseInt(arr[1]));
		}
	}
}

验证步骤

  • 1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n1,2
n2,3
n1,4
n1,5

4、查看应用程序控制台输出
在这里插入图片描述

4)、一般lambda实现(Tuple2数据结构)及验证

  • 实现
import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo4 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.10.42", 9999)
                .map(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum(1);
		// sink
		dataStream.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Tuple2<String, Integer>> {
		@Override
		public Tuple2<String, Integer> map(String value) {
			String[] arr = value.split(",");
			return new Tuple2(arr[0], Integer.parseInt(arr[1]));
		}
	}

}

验证步骤

  • 1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n3,1
n3,5
n4,6
n4,8
n3,3

4、查看应用程序控制台输出
在这里插入图片描述

2、滑动窗口实现统计地铁进站口人数

每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数

lambda实现方式不再赘述

1)、一般实现(Tuple2数据结构)及验证

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滑动窗口的入门示例 
 * 每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数 
 * size>slide
 * 
 */
public class SlidingProcessingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");
				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
			@Override
			public String getKey(Tuple2<String, Integer> value) throws Exception {
				return value.f0;
			}
		});

		SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);

		// sink
		result.print();

		// execute
		env.execute();
	}

}
  • 验证步骤
    1、启动nc

nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
1,4
2,3
2,4,
1,2
1,3

4、查看应用程序控制台输出

通过验证发现输出数据与预期一致

7> (1,5)
4> (2,3)
7> (1,9)
4> (2,7)
7> (1,6)
4> (2,4)
7> (1,5)
7> (1,3)

2)、面向对象实现(pojo数据结构)及验证

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class SlidingProcessingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		SingleOutputStreamOperator<Subway> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}
  • 验证步骤
    1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
2,2
3,3
2,4
2,5
3,5
4,5
3,5

4、查看应用程序控制台输出

通过查看输出结果与预期一致。

5> Subway(No=3, userCount=3)
4> Subway(No=2, userCount=2)
4> Subway(No=2, userCount=6)
5> Subway(No=3, userCount=3)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
4> Subway(No=2, userCount=9)
5> Subway(No=3, userCount=10)
4> Subway(No=2, userCount=5)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)

六、示例:基于数量的滚动窗口与滑动窗口

1、滚动窗口实现地铁进站口人数

实现统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计

本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。

1)、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan 
 * 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
 */
public class TumblingCountWindowDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

//		KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);

		KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
		SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5).sum("userCount");
		
		// sink
		result1.print();
		
		// execute
		env.execute();

	}

}

2)、验证步骤

1、启动nc

nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,5
1,6
1,2
1,4
2,4
3,6
23,11
1,8

4、查看应用程序控制台输出

通过查看输出结果与预期一致。
在这里插入图片描述

2、滑动窗口实现地铁进站口人数

统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计

本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。

1)、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class SlidingCountWindowDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {

		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

//				KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);

		KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
//		public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
//			return window(GlobalWindows.create())
//					.evictor(CountEvictor.of(size))
//					.trigger(CountTrigger.of(slide));
//		}
		SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5, 3).sum("userCount");

		// sink
		result1.print();

		// execute
		env.execute();
	}

}

2)、验证步骤

1、启动nc

nc -lk 9999

2、启动应用程序

3、nc控制台输入


[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2

4、查看应用程序控制台输出

通过查看输出结果与预期一致。
在这里插入图片描述

七、示例:会话窗口

实现设置会话超时时间为10s,如果上一个窗口有数据,10s内没有数据则触发上个窗口的计算。

1、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class TimeSessionWindowsDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> carDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});
		
		//KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);
		KeyedStream<Subway, String> keyedDS = carDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 设置会话超时时间为10s,10s内没有数据则触发上个窗口的计算,如果上一个窗口有数据
		SingleOutputStreamOperator<Subway> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("userCount");
		
		// sink
		result.print();

		// execute
		env.execute();

	}

}

2、验证步骤

1、启动nc

nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2

4、查看应用程序控制台输出

通过查看输出结果与预期一致。
在这里插入图片描述

以上,本文介绍了Flink window的分类以及三个示例分别实现滚动、滑动以及会话窗口,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(1)- 窗口介绍、分类、函数
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(3)- 基于数量的滚动和滑动、会话窗口
【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/260507.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

文件的基本管理

目录 一、Linux系统目录结构和相对/绝对路径 &#xff08;一&#xff09;系统目录结构 &#xff08;二&#xff09;相对路径和绝对路径 1.绝对路径 2.相对路径 &#xff08;三&#xff09;通配符的作用 二、创建、复制、删除文件&#xff0c;rm -rf /意外事故 &#xf…

2022年智能算法之凌日搜索算法(TS),原理公式详解,附matlab代码

凌日搜索算法&#xff08;Transit Search&#xff0c;TS&#xff09;是一种新型元启发式优化算法&#xff0c;该算法基于著名的系外行星探测方法&#xff0c;具有寻优能力强、进化能力强、搜索速度快的特点。该成果于2022年发表在知名SCI期刊Results in Control and Optimizati…

最强的AI视频去码图片修复模型:CodeFormer

1 CodeFormer介绍 1.1 CodeFormer解决的问题 CodeFormer是由南洋理工大学-商汤科技联合研究中心S-Lab在NeurIPS 2022上提出的一种基于VQGANTransformer的人脸复原模型。该方法基于预训练VQGAN离散码本空间&#xff0c;改变复原任务的固有范式&#xff0c;将人脸复原任务转成C…

Zookeeper-集群架构

Zookeeper集群架构 集群角色 Leader&#xff1a; 领导者 事务请求&#xff08;写操作&#xff09;的唯一调度者和处理者&#xff0c;保证集群事务处理的顺序性&#xff1b;集群内部各个服务器的调度者。对于create、setData、delete等有写操作的请求&#xff0c;则要统一转发…

Leetcode—46.全排列【中等】

2023每日刷题&#xff08;六十六&#xff09; Leetcode—46.全排列 算法思想 对于排列来说&#xff0c;我们需要考虑数字之间的相对顺序&#xff0c;不同的相对顺序会产生不同的排列方式。此外&#xff0c;序列中的每个数字一定存在于每个排列当中。因此&#xff0c;不能依次…

【学习笔记】Java函数式编程02——Stream流

文章目录 三、Stream流3.1 概述3.2 快速入门3.2.1 数据准备3.2.2 场景练习3.2.2.1 场景一、遍历所有作家并打印:star:使用stream()流的forEach方法 3.2.2.2 场景二、打印所以年龄小于18的作家名字&#xff0c;并且注意去重:star:distinct()方法:star:filter()方法 3.2.2.3 场景…

【数据结构和算法】定长子串中元音的最大数目

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 方法一&#xff1a;滑动窗口 2.2 方法二&#xff1a;滑动窗口优化版 三、代码 3.1 方法一&#xf…

Python-基于fastapi实现SSE流式返回(类似GPT)

最近在做大模型对话相关功能&#xff0c;需要将对话内容流式返回给前端页面&#xff08;类似GPT的效果&#xff09;。下面直接说下如何实现&#xff1a; 1.首先导入fastapi和sse流式返回所需要的包 from fastapi import APIRouter, Response, status from sse_starlette.sse …

【数据结构和算法】子数组最大平均数 I

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 滑动窗口含义 2.2 滑动窗口一般解法 2.3 方法一&#xff1a;滑动窗口 三、代码 3.1 方法一&#…

数据挖掘体系介绍

数据挖掘是什么&#xff1f; 简而言之&#xff0c;对数据进行挖掘&#xff0c;从中提取出有效的信息。一般我们会把这种信息通过概念、规则、规律、模式等有组织的方式展示出来&#xff0c;形成所谓的知识。特别是在这个大数据时代&#xff0c;当数据多到一定程度&#xff0c;…

Jenkins 执行远程脚本的插件—SSH2 Easy

SSH2 Easy 是什么&#xff1f; SSH2 Easy 是一个 Jenkins 插件&#xff0c;它用于在 Jenkins 构建过程中通过 SSH2 协议与远程服务器进行交互。通过该插件&#xff0c;用户可以在 Jenkins 的构建过程中执行远程命令、上传或下载文件、管理远程服务器等操作。 以下是 SSH2 Eas…

用户管理第2节课--idea 2023.2 后端--实现基本数据库操作(操作user表)

一、模型user对象>和数据库的字段关联 & 自动生成 【其中涉及删除表数据&#xff0c;一切又从零开始】 二、模型user对象>和数据库的字段关联 2.1在model文件夹下&#xff0c;新建 user对象 2.1.1 概念 大家可以想象我们现在的数据是存储在数据库里的&…

HOT 100 最难的题居然是游戏厂的最爱

写在前面 翻看 网易 历年笔面题单的时候&#xff0c;发现一道有意思的题目。 该题评论区&#xff0c;网易 的踪影很少&#xff0c;反而被那些在 4399 笔试中遇到的同学所攻陷&#xff1a; 好嘛&#xff0c;所以这道题还是「游戏厂」的最爱&#xff1f;&#xff01;&#x1f923…

Ubuntu 常用命令之 fdisk 命令用法介绍

fdisk 是一个用于处理磁盘分区的命令行工具,它在 Linux 系统中广泛使用。fdisk 命令可以创建、删除、更改、复制和显示硬盘分区,以及更改硬盘的分区 ID。 fdisk 命令的常用参数如下 -l:列出所有分区表-b:设置扇区大小,如果不设置,默认为 512 字节-u:改变显示/输入单位-…

亚马逊鲲鹏系统引爆广告点击率提升秘籍

在竞争激烈的电商市场&#xff0c;提高广告点击率成为各大卖家争相追求的目标。而如今&#xff0c;亚马逊鲲鹏系统的强大功能再次为卖家们打开了广告优化的新大门。其中&#xff0c;搜索广告功能更是成为提高关键词排名的利器。本文将详细介绍如何通过亚马逊鲲鹏系统实现点击广…

全球知名的五款JavaScript混淆加密工具详解

​ 现在市场上有很多好用的混淆加密工具&#xff0c;其中一些比较流行且受欢迎的工具包括&#xff1a; 1、UglifyJS&#xff08;罗马尼亚&#xff09;&#xff1a;UglifyJS是一个非常流行的 JavaScript工具库&#xff0c;它可以压缩、混淆、美化和格式化 JavaScript 代码。使用…

A01、关于jvm执行子系统

1、Class 类文件结构 1.1、Java跨平台的基础 各种不同平台的虚拟机与所有平台都统一使用的程序存储格式——字节码&#xff08;ByteCode&#xff09;是构成平台无关性的基石&#xff0c;也是语言无关性的基础。Java虚拟机不和包括Java在内的任何语言绑定&#xff0c;它只与 “…

新三板炒股开户需要满足哪些条件?交易规则有哪些?

新三板是全国中小企业股份转让系统&#xff0c;属于场外市场&#xff0c;不能满足在主板上市的中小企业就可以申请在新三板挂牌交易。 一、新三板开通条件 新三板分为2个层级&#xff1a; 创新层&#xff1a;开通前10个交易日日均资产100万及以上&#xff0c;两年的股票交易经…

Jenkins 构建触发器指南

目录 触发远程构建 (例如&#xff0c;使用脚本) 描述 配置步骤 安全令牌 在其他项目构建完成后触发构建 描述 配置步骤 定时触发构建 描述 配置步骤 GitHub钩子触发GITScm轮询 描述 配置步骤 Poll SCM - 轮询版本控制系统 描述 触发远程构建 (例如&#xff0c;使…

基于SSM的双减后初小教育课外学习生活活动平台的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…