【Flink】窗口实战:TUMBLE、HOP、SESSION

窗口实战:TUMBLE、HOP、SESSION

  • 1.TUMBLE WINDOW
    • 1.1 语法
    • 1.2 标识函数
    • 1.3 模拟用例
  • 2.HOP WINDOW
    • 2.1 语法
    • 2.2 标识函数
    • 2.3 模拟用例
  • 3.SESSION WINDOW
  • 3.1 语法
    • 3.2 标识函数
    • 3.3 模拟用例
  • 4.更多说明

在流式计算中,流通常是无穷无尽的,我们无法知道什么时候数据源会继续 / 停止发送数据,所以在流上处理聚合事件(countsum 等)的处理方式与批处理中的处理方式会有所差异。在流上一般用窗口(Window)来限定聚合的范围,例如 “过去 2 分钟网站点击量的计数”、“在最近 100 个人中点赞这个视频的总人数”。窗口的概念相当于帮我们收集了一张有限数据的动态表,我们可以对表中的数据进行聚合计算。

窗口函数是一种特殊的函数,它并不在 SELECT 的投影列表中使用,而是在 GROUP BY 子句中使用。

1.TUMBLE WINDOW

TUMBLE WINDOW(滚动窗口)将每个进入的数据分配到一个指定窗口大小的窗口中。滚动窗口可以自定义固定的大小,并且不会出现重叠。我们可以对窗口内的数据进行计算。

1.1 语法

TUMBLE(time_attr, interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • 如果在 Event Time 时间模式下(使用 WATERMARK FOR 语句定义了时间戳字段),那么 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为该字段。
  • 如果在 Processing Time 时间模式下,则 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为 proctime() 函数生成的计算列,下文用 PROCTIME 举例,请在实际作业中替换为实际的列名。

1.2 标识函数

函数名
功能描述
TUMBLE_START(time-attr, size-interval)返回窗口的起始时间(包含边界)。例如 [00:10, 00:15) 窗口,返回 00:10
TUMBLE_END(time-attr, size-interval)返回窗口的结束时间(包含边界)。例如 [00:00, 00:15] 窗口,返回 00:15
TUMBLE_ROWTIME(time-attr, size-interval)返回窗口的结束时间(不包含边界)。例如[00:00, 00:15] 窗口,返回 00:14:59.999。返回值是一个 rowtime attribute,即可以基于该字段做时间属性的操作。
TUMBLE_PROCTIME(time-attr, size-interval)返回窗口的结束时间(不包含边界)。例如 [00:00, 00:15] 窗口,返回00:14:59.999。返回值是一个 proctime attribute,即可以基于该字段做时间属性的操作。

1.3 模拟用例

下文以 TUMBLE WINDOW 为例,帮助您更容易地理解 TUMBLE WINDOW。使用 Event Time 模拟统计 每小时各用户收入金额

示例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:30:00.0
Jack102021-11-11 11:30:00.0
Jack152021-11-11 11:40:00.0
CREATE TABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' SECOND
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales01.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    TUMBLE_START(times,INTERVAL '1' HOUR),
    TUMBLE_END(times,INTERVAL '1' HOUR),
    username,
    SUM(income)
FROM user_income
GROUP BY TUMBLE(times,INTERVAL '1' HOUR),username;

在这里插入图片描述

2.HOP WINDOW

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

HOP WINDOW 保持窗口大小(Size)不变,每次滑动指定的时间周期(Slide),因而允许窗口之间的相互重叠。

Slide 的大小决定了 Flink 创建新窗口的频率。

  • 当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。
  • 当 Slide 大于 Size 时,可能会导致有些事件被丢弃。
  • 当 Slide 等于 Size 时,等于是 TUMBLE WINDOW。

2.1 语法

HOP(time_attr, sliding_interval, window_size_interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • sliding_interval:用来设置 滑动时间周期大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • window_size_interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

2.2 标识函数

函数名
功能描述
HOP_START(time-attr, slide-interval,size-interval)返回该窗口的起始时间
HOP_END(time-attr, slide-interval,size-interval)返回该窗口的结束时间

2.3 模拟用例

下文以 HOP WINDOW 为例,帮助您更容易地理解 HOP WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,1 小时的窗口,30 分钟滑动一次

示例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:35:00.0
Jack102021-11-11 11:30:00.0
Jack152021-11-11 11:40:00.0
CREATE TABLE user_income (
    username VARCHAR,
    Income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' MINUTE
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales02.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    HOP_START(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
    HOP_END(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
    username,
    SUM(income)
FROM user_income
GROUP BY HOP(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),username;

在这里插入图片描述

3.SESSION WINDOW

SESSION WINDOW(会话窗口)通过 Session 活动对元素进行分组,Session 窗口与滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 Session 窗口通过一个 sSession 间隔来配置。这个 Session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 Session 将关闭并且后续的元素将被分配到新的 Session 窗口中。

Session Window 并非以长度来划分窗口,而是以 非活跃时间 来划分。例如超过 30 分钟不活跃(没有新数据),则之前的窗口结束,下一个来到的数据将会形成一个新窗口。

3.1 语法

SESSION(time_attr, interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

3.2 标识函数

函数名
功能描述
SESSION_START(time-attr, size-interval)返回该窗口的起始时间
SESSION_END(time-attr, size-interval)返回该窗口的结束时间

3.3 模拟用例

下文以 SESSION WINDOW 为例,帮助您更容易地理解 SESSION WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,会话超时时长为 30 分钟

样例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:50:00.0
Jack102021-11-11 11:40:00.0
Jack152021-11-11 11:45:00.0
CREATE TABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' MINUTE
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales03.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    SESSION_START(times,INTERVAL '30' MINUTE),
    SESSION_END(times,INTERVAL '30' MINUTE),
    username,
    SUM(income)
FROM user_income
GROUP BY SESSION(times,INTERVAL '30' MINUTE),username;

在这里插入图片描述

4.更多说明

以上三种窗口都有对应的辅助函数。以 TUMBLE 窗口为例(HOP、SESSION 也一样,只是前缀不同),辅助函数如下:

  • TUMBLE_ROWTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Event Time 时间模式下使用)。示例如下:
SELECT user,
	   TUMBLE_START(rowtime, INTERVAL '12' HOUR) AS sStart,
       TUMBLE_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd,
       SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '12' HOUR), user
  • TUMBLE_PROCTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Processing Time 时间模式下使用)。示例如下:
SELECT user,
	   TUMBLE_START(PROCTIME, INTERVAL '12' HOUR) AS sStart,
	   TUMBLE_PROCTIME(PROCTIME, INTERVAL '12' HOUR) AS snd,
	   SUM(amount)
FROM Orders
GROUP BY TUMBLE(PROCTIME, INTERVAL '12' HOUR), user

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/482616.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【PyQt】17.1-日历控件 不同风格的日期和时间、以及高级操作

日历控件puls版本 前言一、日历控件中不同风格的日期和时间1.1 代码1.2 注意事项格式设置m的大小写问题QTime和QDateTime的区别 1.3 运行结果 二、高级操作2.1 成倍调整2.2 下拉出日历2.3 事件函数2.4 获取设置的日期和时间 完整代码 前言 1、不同风格的日期和时间展示 2、高级…

Codeforces Round 930 (Div. 2)(A,B,C,D)

比赛链接 C是个交互,D是个前缀和加二分。D还是很难写的。 A. Shuffle Party 题意: 您将得到一个数组 a 1 , a 2 , … , a n a_1, a_2, \ldots, a_n a1​,a2​,…,an​ 。最初,每个 1 ≤ i ≤ n 1 \le i \le n 1≤i≤n 对应 a i i a_ii…

深度学习十大算法之长短时记忆网络(LSTM)

一、长短时记忆网络(LSTM)的基本概念 长短时记忆网络(LSTM)是一种特殊类型的循环神经网络(RNN),主要用于处理和预测序列数据的任务。LSTM由Hochreiter和Schmidhuber于1997年提出,其…

41-Vue-webpack基础

webpack基础 前言什么是webpackwebpack的基本使用指定webpack的entry和output 前言 本篇开始来学习下webpack的使用 什么是webpack webpack: 是前端项目工程化的具体解决方案。 主要功能:它提供了友好的前端模块化开发支持,以及代码压缩混淆、处理浏览…

海康威视-AIOT的业务转型

海康威视的转型和定位为智能物联网(AIoT)解决方案和大数据服务的提供商。 公司不仅仅聚焦于其核心的视频监控业务,而且正在积极拓展到新的技术领域和市场。通过专注于物联感知、人工智能、大数据等技术的创新,对未来技术发展方向的…

golang import引用项目下其他文件内函数

初始化项目 go mod init [module名字] go mod init project 项目结构 go mod 文件 代码 需要暴露给外界使用的变量/函数名必须大写 在main.go中引入,当前项目模块名/要引用的包名 package mainimport (// 这里的路径开头为项目go.mod中的module"project/…

微信小程序----猜数字游戏.

目标:简单猜字游戏,系统随机生成一个数,玩家可以猜8次,8次未猜对,游戏结束;未到8次猜对,游戏结束。 思路和要求: 创建四个页面,“首页”,“开始游戏”&#…

hadoop基本概念

一、概念 Hadoop 是一个开源的分布式计算和存储框架。 Hadoop 使用 Java 开发,所以可以在多种不同硬件平台的计算机上部署和使用。其核心部件包括分布式文件系统 (Hadoop DFS,HDFS) 和 MapReduce。 二、HDFS 命名节点 (NameNode) 命名节点 (NameNod…

STM32 | Systick定时器(第四天)

STM32 第四天 一、Systick定时器 1、定时器概念 定时器:是芯片内部用于计数从而得到时长的一种外设。 定时器定时长短与什么有关???(定时器定时长短与频率及计数大小有关) 定时器频率换算单位:1GHZ=1000MHZ=1000 000KHZ = 1000 000 000HZ 定时器定时时间:计数个数…

Django缓存(二)

一、视图缓存 Django的缓存可以设置缓存指定的视图,具体方式使用django.views.decorators.cache.cache_page, 方法有2种方式: 装饰器:以方法以装饰器的方式使用 from django.views.decorators.cache import cache_page@cache_page(60 * 15,cache="default") def…

CRC计算流程详解和FPGA实现

一、概念 CRC校验,中文翻译过来是:循环冗余校验,英文全称是:Cyclic Redundancy Check。是一种通过对数据产生固定位数的校验码,以检验数据是否存在错误的技术。 其主要特点是检错能力强、开销小,易于电路实…

【prompt六】MaPLe: Multi-modal Prompt Learning

1.motivation 最近的CLIP适应方法学习提示作为文本输入,以微调下游任务的CLIP。使用提示来适应CLIP(语言或视觉)的单个分支中的表示是次优的,因为它不允许在下游任务上动态调整两个表示空间的灵活性。在这项工作中,我们提出了针对视觉和语言分支的多模态提示学习(MaPLe),以…

“架构(Architecture)” 一词的定义演变历史(依据国际标准)

深入理解“架构”的客观含义,不仅能使IT行业的系统架构设计师提升思想境界,对每一个积极的社会行动者而言,也具有长远的现实意义,因为,“架构”一词,不只限于IT系统,而是指各类系统(包括社会系统…

python(django)之流程接口管理后台开发

1、在models.py中加入流程接口表和单一接口表 代码如下: from django.db import models from product.models import Product# Create your models here.class Apitest(models.Model):apitestname models.CharField(流程接口名称, max_length64)apitester model…

C#,图论与图算法,计算图(Graph)的岛(Island)数量的算法与源程序

1 孤岛数 给定一个布尔矩阵,求孤岛数。一组相连的1形成一个岛。例如,下面的矩阵包含5个岛: 在讨论问题之前,让我们先了解什么是连接组件。无向图的连通分量是一个子图,其中每两个顶点通过一条路径相互连接,并且不与子图外的其他顶点连接。 所有顶点相互连接的图只有一个…

java数据结构与算法基础-----字符串------正则表达式---持续补充中

java数据结构与算法刷题目录(剑指Offer、LeetCode、ACM)-----主目录-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/123063846 目前校招的面试,经常会遇到各种各样的有关字符串处理的算法。掌…

【docker系列】深入理解 Docker 容器管理与清理

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

JVM——运行时数据区

前言 由于JAVA程序是交由JVM执行的,所以我们所说的JAVA内存区域划分也是指的JVM内存区域划分,JAVA程序具体执行的过程如下图所示。首先Java源代码文件会被Java编译器编译为字节码文件,然后由JVM中的类加载器加载各个类的字节码文件&#xff0…

解决win10安装软件提示Microsoft Store界面

解决方法 1. 打开设置,找到应用 2. 应用与功能,选择任何来源

Day18:LeedCode 513.找树左下角的值 112. 路径总和 106.从中序与后序遍历序列构造二叉树

513. 找树左下角的值 给定一个二叉树的 根节点 root,请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1: 输入: root [2,1,3] 输出: 1 思路:出该二叉树的 最底层 最左边 节点的值找出深度最大的第一个结点(左结点先遍历) 方法一…