【Flink系列四】Window及Watermark

3.1、window

在 Flink 中 Window 可以将无限流切分成有限流,是处理有限流的核心组件,现在 Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window)。

Flink中的窗口可以分成:滚动窗口(Tumbling Window,无重叠),滑动窗口(Sliding Window,可能有重叠),会话窗口(Session Window,活动间隙),全局窗口(Gobal Window)

3.1.1、Tumbling Windows 滚动窗口

滚动窗口的assigner分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

// 滚动event-time窗口
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);
 
// 滚动processing-time窗口
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.second(5)))
    .<windowed transformation>(<window function>);
 
// 长度为一天的滚动event-time窗口, 偏移量为-8小时
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

3.1.2、Sliding Windows滑动窗口

滑动窗口的assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(滑动步长window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

// 滑动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);
 
// 滑动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);
 
// 滑动 processing-time 窗口,偏移量为 -8 小时
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

3.1.3、Session Windows 会话窗口

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

// 设置了固定间隔的event-time会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
 
// 设置了动态间隔的event-time会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element)-> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);
 
// 设置了固定间隔的 processing-time session 窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 processing-time 会话窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))

3.1.4、Global Windows 全局窗口

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);
3.1.5、Triggers窗口触发

Trigger决定了一个窗口(由window assigner定义)何时可以被window function处理。一般来说,watermark的时间戳>=window endTime并且在窗口内有数据,就会触发窗口的计算。每个WindowAssigner都有一个默认的Trigger。如果默认trigger无法满足需求,可以在trigger(...)调用中指定自定义的trigger。

  • onElement() 每次往 window 增加一个元素的时候都会触发
  • onEventTime() 当 event-time timer 被触发的时候会调用
  • onProcessingTime() 当 processing-time timer 被触发的时候会调用
  • onMerge() 对两个 trigger 的 state 进行 merge 操作
  • clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口

3.2、time和watermark

3.2.1、time

在 Flink 中 Time 可以分为三种Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系我们可以从下图中得知:

3.2.2、watermark

Flink提出了watermark,专门处理EventTime窗口计算,其本质其实就是一个时间戳。因为对于迟到数据late element,不可能一直无限期等待,必须有一个机制来保证一个特定的时间后,必须取触发window去进行计算,这种机制就是watermark

watermark本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。 Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。 Watermark的产生和Apache Flink内部处理逻辑如下图所示: 

目前Apache Flink 有两种生产Watermark的方式,如下:

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

参阅:Apache Flink 漫谈系列(03) - Watermark-阿里云开发者社区

我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 watermark 例子

3.2.3、迟到的数据

上面的 watermark 让我们能够应对乱序的数据,但是真实世界中我们没法得到一个完美的 watermark 数值 — 要么没法获取到,要么耗费太大,因此实际工作中我们会使用近似 watermark — 生成 watermark(t) 之后,还有较小的概率接受到时间戳 t 之前的数据,在 Flink 中将这些数据定义为 “late elements”, 同样我们可以在 window 中指定是允许延迟的最大时间(默认为 0),可以使用下面的代码进行设置

设置allowedLateness之后,迟来的数据同样可以触发窗口,进行输出,利用 Flink 的 side output 机制,我们可以获取到这些迟到的数据,使用方式如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/227369.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

翻译: 生成式人工智能的经济潜力 第2部分行业影响 The economic potential of generative AI

麦肯锡报告 翻译: 生成式人工智能的经济潜力 第一部分商业价值 The economic potential of generative AI 1. 行业影响 在我们分析的63个使用案例中&#xff0c;生成式人工智能有潜力在各行各业创造2.6万亿至4.4万亿美元的价值。其确切影响将取决于各种因素&#xff0c;比如…

Qt之QGraphicsView —— 笔记1:绘制简单图元(附完整源码)

效果 相关类介绍 QGraphicsView类提供了一个小部件,用于显示QGraphicsScene的内容。QGraphicsView在可滚动视口中可视化。QGraphicsView将滚动其视口,以确保该点在视图中居中。 QGraphicsScene类 提供了一个用于管理大量二维图形项的场景。请注意,QGraphicsScene没有自己的视…

Open-Falcon(一)环境配置

目录 ip划分一、主机准备二、环境配置2.1修改主机名、修改hosts文件2.2配置阿里源&#xff0c;安装工具2.3关闭防火墙、selinux2.4配置时间2.5安装go2.6安装redis2.7 安装mysql初始化MySQL表结构 ip划分 主机名IP服务open-faclon-server192.168.150.200open-faclon-serverngin…

nodejs微信小程序+python+PHP的智能停车系统-计算机毕业设计推荐django

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

select*from 查询语句返回null

1. 写了一个很简单查询全部数据的demo接口,结果一直返回null. 数据库明明有数据而且在console控制台也可以查到,但是用接口访问一直返回null 2.后发现原来是数据库中字段名和对象的属性名称对不上&#xff0c;所以可以在yml文件中加上 mybatis:configuration:map-underscore-…

如何搭建一套完整的智能安防视频监控平台?关于设备与软件选型的几点建议

安防视频监控系统主要由前端摄像机设备、视频显示设备、视频存储设备、安防应用软件/平台以及其它传输、辅助类设备组成。一般来说&#xff0c;安防监控系统具有可扩展和开放性&#xff0c;以方便未来的扩展和与其他系统的集成。今天我们就来介绍一下&#xff0c;搭建一套完整的…

Kubersphere应用【四】创建SpringBoot项目

一、生成Springboot项目 【地址】https://start.spring.io/ Spring在线生成项目工具&#xff0c;可以快速生成Spring Boot项目。选择要的依赖项&#xff0c;填写基本信息&#xff0c;点击【GENERATE】就可以生成一个可运行的Spring Boot项目。 二、IDEA导入项目 生成项目后,进…

LAMP和分离式LNMP部署

目录 一.什么是LAMP&#xff1f; 二.安装LAMP 先安装apache&#xff0c;httpd网页服务&#xff1a; 接着安装mysql&#xff1a; 安装php&#xff1a; 创建论坛&#xff1a; 三.安装分布式LNMP&#xff1a; 先安装nginx&#xff1a; 到另一台主机安装php&#xff1a; …

dtm分布式事务框架之SAGA 实战

一.dtm分布式事务框架之SAGA 1.1DTM介绍 DTM是一款开源的分布式事务管理器&#xff0c;解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。 通俗一点说&#xff0c;DTM提供跨服务事务能力&#xff0c;一组服务要么全部成功&#xff0c;要么全部回滚&#xff0c;避免只更…

javaTCP协议实现一对一聊天

我们首先要完成服务端&#xff0c;不然出错&#xff0c;运行也要先运行服务端&#xff0c;如果不先连接服务端&#xff0c;就不监听&#xff0c;那客户端不知道连接谁 服务端 import java.awt.BorderLayout; import java.awt.event.ActionEvent; import java.awt.event.Actio…

ncnn模型部署——使用VS2019把项目打包成DLL文件

一、项目打包成DLL文件 1.创建动态链接库DLL项目 创建完成&#xff0c;项目中包含源文件dllmain.cpp, pch.cpp&#xff0c;头文件framework.h, pch.h 2.编写和配置DLL项目 &#xff08;1&#xff09;配置pch.h文件&#xff0c;在头文件pch.h中定义宏&#xff0c;宏的作用的是…

gma 空间绘图实战(1):绘制多个子图,连接并展示局部放大区域

安装 gma&#xff1a;pip install gma 本文基于&#xff1a;gma 2.0.3&#xff0c;Python 3.10 本文用到的矢量数据为&#xff1a;CTAmap 1.12。来源于 https://www.shengshixian.com/ 。&#xff08;感谢锐多宝&#xff09; 绘图目标 参考代码 import matplotlib.pyplot as p…

电子秤ADC芯片CS1237技术资料问题合集

问题11&#xff1a;实际应用中&#xff0c;多个称重传感器应该怎么与ADC连接&#xff1f; 解答&#xff1a;如果传感器是测量同一物体&#xff08;例如&#xff1a;厨房垃圾处理器&#xff09;&#xff0c;一般建议使用并联的方式。则相同类型的信号线连接在一起。对于传感器的…

MySQL - 并发控制与事务的隔离级别

目录 第1关&#xff1a;并发控制与事务的隔离级别 第2关&#xff1a;读脏 第3关&#xff1a;不可重复读 第4关&#xff1a;幻读 第5关&#xff1a;主动加锁保证可重复读 第6关&#xff1a;可串行化 第1关&#xff1a;并发控制与事务的隔离级别 任务描述 本关任务&#…

Java第二十一章 :网络通信

网络程序设计基础 网络程序设计编写的是与其他计算机进行通信的程序。Java 已经将网络程序所需要的元素封装成不同的类&#xff0c;用户只要创建这些类的对象&#xff0c;使用相应的方法&#xff0c;即使不具备有关的网络支持&#xff0c;也可以编写出高质量的网络通信程…

十六、FreeRTOS之FreeRTOS队列集

本节需要掌握以下内容&#xff1a; 1&#xff0c;队列集简介&#xff08;了解&#xff09; 2&#xff0c;队列集相关API函数介绍&#xff08;熟悉&#xff09; 3&#xff0c;队列集操作实验&#xff08;掌握&#xff09; 一、队列集简介&#xff08;了解&#xff09; 一个…

硬件基础:差模和共模

一直以来&#xff0c;都难以理解差模和共模这两个概念&#xff0c;什么差分信号、差模信号、共模信号&#xff0c;差模干扰、共模干扰……虽然看了一些资料&#xff0c;但貌似说法还挺多的&#xff0c;理解起来仍然是一头雾水。所以&#xff0c;专门用一篇文章来好好研究下这个…

Anisble中剧本的应用

1.什么是playbook及playbook的组成 1. Playbook 的功能 playbook 是由一个或多个 play 组成的列表 Playboot 文件使用 YAML 来写的 2. YAML 简介&#xff1a; 是一种表达资料序列的格式 &#xff0c; 类似 XML Yet Another Markup Language 3. 特点 可读性好 和脚本语言…

Java+Swing: 登录和重置按钮的点击事件 整理6

1. 在Login类中给按钮添加事件 // 按钮添加鼠标点击事件loginButton.addActionListener();resetButton.addActionListener(); 2. 创建一个事件处理的类&#xff0c; 该类实现了ActionListener package com.handler;/*** Author&#xff1a;xiexu* Date&#xff1a;2023/12/7 13…

Python面向对象③:封装【侯小啾Python基础领航计划 系列(二十一)】

Python面向对象③:封装【侯小啾Python基础领航计划 系列(二十一)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔…