【大数据】Flink SQL 语法篇(四):Group 聚合、Over 聚合

Flink SQL 语法篇(四):Group 聚合、Over 聚合

  • 1.Group 聚合
    • 1.1 基础概念
    • 1.2 窗口聚合和 Group 聚合
    • 1.3 SQL 语义
    • 1.4 Group 聚合支持 Grouping sets、Rollup、Cube
  • 2.Over 聚合
    • 2.1 时间区间聚合
    • 2.2 行数聚合

1.Group 聚合

1.1 基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

1.2 窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
)

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group by
    dim,
    -- 按照 Flink SQL tumble 窗口写法划分窗口
    tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
);

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
);

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
    dim,
    -- 将秒级别时间戳 / 60 转化为 1min
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

1.3 SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

1.4 Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT 
    supplier_id
    , rating
    , product_id
    , COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

2.Over 聚合

Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

那这里我们拿 Over 聚合窗口聚合 做一个对比,其之间的最大不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到。
  • Over 聚合:能够保留原始字段。

注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

  • 应用场景:计算最近一段滑动窗口的聚合结果数据。
  • 实际案例:查询每个产品最近一小时订单的金额总和。
SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders
  • Over 聚合的语法总结如下:
SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间)。
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合。
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。

2.1 时间区间聚合

按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 1 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 1 小时的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table

2.2 行数聚合

按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '2',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table

预跑结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/410232.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

医院LIS(全称Laboratory Information Management System)系统源码

目录 一、医院LIS系统概况 二、医院LIS系统建设必要性 三、为什么要使用LIS系统 四、技术框架 (1)总体框架 (2)技术细节 (3)LIS主要功能模块 五、LIS系统优势 (1)客户/用户…

docker创建mongodb数据库容器-主从模式

介绍 本文将通过docker创建三个mongodb数据库容器,分别设置一个主数据库mongodb-master:27017, 两个从数据库mongodb-slave-1:27018, mongodb-slave-2:27019 1. 拉取mongo镜像 docker pull mongo:3.63.6版本是一个稳定的版本,可以选择安装…

Linux学习之vi/vim详细介绍

目录 ​编辑 1. 什么是 vim? 2. vi/vim 的使用 2.1 命令模式 2.2 输入模式 2.3 底线命令模式 3. vi/vim 使用实例 3.1 使用 vi/vim 进入一般模式 3.2 按下 i 进入输入模式(也称为编辑模式),开始编辑文字 3.3 按下 ESC 按钮回到一般模式…

JavaWeb——007MYSQL(DQL多表设计)

# 数据库开发-MySQL 一级目录二级目录三级目录 1. 数据库操作-DQL1.1 介绍1.2 语法1.3 基本查询1.4 条件查询1.5 聚合函数1.6 分组查询1.7 排序查询1.8 分页查询1.9 案例1.9.1 案例一1.9.2 案例二 2. 多表设计2.1 一对多2.1.1 表设计2.1.2 外键约束 2.2 一对一2.3 多对多2.4 案…

2024-02-25 Unity 编辑器开发之编辑器拓展7 —— Inspector 窗口拓展

文章目录 1 SerializedObject 和 SerializedProperty2 自定义显示步骤3 数组、List 自定义显示3.1 基础方式3.2 自定义方式 4 自定义属性自定义显示4.1 基础方式4.2 自定义方式 5 字典自定义显示5.1 SerizlizeField5.2 ISerializationCallbackReceiver5.3 代码示例 1 Serialize…

Git Windows安装教程

Git的下载 去 Git 官网下载对应系统的软件了,下载地址为 git-scm.com 或者 gitforwindows.org git-scm 是 Git 的官方,里面有不同系统不同平台的安装包和源代码gitforwindows.org 里只有 windows 系统的安装包 安装 使用许可声明 选择安装目录 选择安…

springBoot整合Redis(一、Jedis操作Redis)

在springboot环境下连接redis的方法有很多,首先最简单的就是直接通过jedis类来连接,jedis类就相当于是redis的客户端表示。 但是因为现在比较常用的是:StringRedisTemplate和RedisTemplate,所以jedis只做简单的介绍。 一、Jedis…

勒索攻击新趋势,DarkSide解密工具

勒索攻击新趋势 2020年通过勒索病毒攻击已经成为网络犯罪分子热崇追捧的一种方式,全球几乎每天都有企业被勒索病毒攻击勒索,而且勒索的金额也越来越高,从几万美元到几千万美元不等,越来越多的黑客组织使用勒索病毒对企业发起攻击…

【Java系列】JDK 1.8 新特性之 Lambda表达式

目录 1、Lambda表达式介绍2、从匿名类到Lambda转换3、Lambda表达式 六种语法格式语法格式一:无参数、无返回值,只需要一个Lambda体语法格式二:lambda有一个参数、无返回值​语法格式三:Lambda只有一个参数时,可以省略&…

Linux之安装jdk,tomcat,mysql,部署项目

目录 一、操作流程 1.1安装jdk 1.2安装tomcat(加创建自启动脚本) 1.3 安装mysql 1.4部署项目 一、操作流程 首先把需要用的包放进opt文件下 1.1安装jdk 把jdk解压到/usr/local/java里 在刚刚放解压包的文件夹打开vim /etc/profile编辑器&#xff0c…

【前端素材】推荐优质后台管理系统Dashy平台模板(附源码)

一、需求分析 后台管理系统(或称作管理后台、管理系统、后台管理平台)是一种专门用于管理网站、应用程序或系统后台运营的软件系统。它通常由一系列功能模块组成,为管理员提供了管理、监控和控制网站或应用程序的各个方面的工具和界面。以下…

prometheus监控带安全认证的elasticsearch

1.下载elasticsearch_exporter wget 下载二进制包并解压、运行: wget https://github.com/prometheus-community/elasticsearch_exporter/releases/download/v1.3.0/elasticsearch_exporter-1.3.0.linux-amd64.tar.gz tar -xvf elasticsearch_exporter-1.3.0.lin…

【Prometheus】概念和工作原理介绍

目录 一、概述 1.1 prometheus简介 1.2 prometheus特点 1.3 prometheus架构图 1.4 prometheus组件介绍 1、Prometheus Server 2、Client Library 3、pushgateway 4、Exporters 5、Service Discovery 6、Alertmanager 7、grafana 1.5 Prometheus 数据流向 1.6 Pro…

MATLAB环境下基于洗牌复杂演化的图像分割算法

智能优化算法因其较强的搜索解能力而得到了大量的应用,在这些计算智能算法中,群体智能优化算法因其高效性、有效性以及健壮性等优点而得到了科研人员的青睐。这类算法借鉴生物群体的合作特性,主要解决大规模复杂的分布式问题,研究…

WPF Style样式设置

1.本window设置样式 <Window x:Class"WPF_Study.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.com/expressi…

论文阅读NAM:Normalization-based Attention Module

Abstarct 识别不太显著的特征是模型压缩的关键。然而&#xff0c;在革命性的注意力机制中却没有对其进行研究。在这项工作中&#xff0c;我们提出了一种新的基于归一化的注意力模块&#xff08;NAM&#xff09;&#xff0c;它抑制了不太显著的权重。它对注意力模块应用了权重稀…

YOLOV9论文概述

YOLOV9论文阅读 摘要信息瓶颈可逆函数可编程梯度信息 (PGI)辅助可逆分支多级辅助信息 GELAN实验设计参数设置实验结论消融实验模块深度辅助可逆分支和多层辅助信息PGI综合 可视化结果结论 摘要 今天的深度学习方法侧重于如何设计最合适的目标函数&#xff0c;以便模型的预测结果…

Seata分布式事务实战AT模式

目录 分布式事务简介 典型的分布式事务应用场景 两阶段提交协议(2PC) 2PC存在的问题 什么是Seata&#xff1f; Seata的三大角色 Seata AT模式的设计思路 一阶段 二阶段 Seata快速开始 Seata Server&#xff08;TC&#xff09;环境搭建 db存储模式Nacos(注册&配…

H5 个人引导页官网型源码

H5 个人引导页官网型源码 源码介绍&#xff1a;源码无后台、无数据库&#xff0c;H5自检测适应、无加密&#xff0c;直接修改可用。 源码含有多选项&#xff0c;多功能。可展示自己站点、团队站点。手机电脑双端。 下载地址&#xff1a; https://www.changyouzuhao.cn/1434.…

HTML+CSS+JS:轮播组件

效果演示 一个具有动画效果的卡片元素和一个注册表单&#xff0c;背景为渐变色&#xff0c;整体布局简洁美观。 Code <div class"card" style"--d:-1;"><div class"content"><div class"img"><img src"./i…