flink的分组聚合、over聚合、窗口聚合对比

【背景】

flink有几种聚合,使用上是有一些不同,需要加以区分:

分组聚合:group agg

over聚合:over agg

窗口聚合:window agg

省流版:

触发计算时机

结果流类型

状态大小

分组聚合group agg

每当有新行就输出更新的结果

update流

保持中间结果,所以状态可能无限膨胀

over agg

每当有新行就输出更新的结果,类似一个滑动窗口

append流

保持中间结果,所以状态可能无限膨胀

window agg

窗口结束产生一个总的聚合结果

append流

不生成中间结果,自动清除状态

下面是详细对比和具体的例子(主要讨论的是流处理下的情况)。

over聚合:over agg

OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通过分组减少到一行,它会为每行输入增加一个聚合值,结果是一个append流


 OVER 窗口的语法。

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

over聚合很少用到,所以本地自己做了一个测试:

测试sql如下:
create table test_window_tab

(

    region String

    ,qa_id String

    ,count_qa_id Bigint

) COMMENT ''

with

(

'properties.bootstrap.servers' ='',

'json.fail-on-missing-field' = 'false',

'connector' = 'kafka',

'format' = 'json',

'topic' = 'test_window_tab'

)

;

create table dwm_qa_score

(

    ,qa_id String   

    ,agent_id String

    ,region String

    ,saas_id String

    ,version_timestamp bigint

    , ts as to_timestamp(from_unixtime(`version_timestamp`, 'yyyy-MM-dd HH:mm:ss'))

    ,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL

    ,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND

) COMMENT ''

with

(

'properties.bootstrap.servers' ='',

'json.fail-on-missing-field' = 'false',

'connector' = 'kafka',

'format' = 'json',

'scan.startup.mode' = 'earliest-offset',

'topic' = 'dwm_qa_score'

)

;

insert into test_window_tab(region,qa_id,count_qa_id)

select region,qa_id,count(1)  over w as count_qa_id

from dwm_qa_score



window w as(

partition by region,qa_id

order by ts

rows between 2 preceding and current row

)

dwm_qa_score这个topic现有数据

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

数据选择offset=ealiest-offset运行程序得到结果如下

{"region":"TH","qa_id":"123","count_qa_id":1}

{"region":"TH","qa_id":"123","count_qa_id":2}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"1234","count_qa_id":1}

这里注意:

  1. 数据都会返回一个聚合
  2. 由于我们rows between 2 preceding and current row所以count_qa_id最多3

如果此时dwm_qa_score这个topic插入新数据

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH"

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": null

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": 0

}

发现flink作业输出record多了一条

但是在目标kafka:test_window_tab中没有新增结果

原因是我们插入的新数据中没有version_timestamp这一列为空或为0

如果往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则可以看到对应目标kafka:test_window_tab中会新增结果数据

{"region":"TH","qa_id":"1234","count_qa_id":2}

如果等一分钟后,再次往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则在目标kafka:test_window_tab中没有新增结果原因应该数据过期丢弃watermark)

你可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的 OVER 窗口必须是相同的

ORDER BY

OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc) 时间属性 上。其他的排序不支持。

PARTITION BY

OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。

范围(RANGE)定义

范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。

有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔

RANGE 间隔

RANGE 间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW 间隔

ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

常见错误

OVER windows' ordering in stream mode must be defined on a time attribute.

 这个报错,是建表的时候需要指定时间语义的字段,WATERMARK 是必须的,而且WATERMARK所用字段必须是order by的时间字段例如下面 order by load_date那么WATERMARK就要load_date生成,即WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE

object SqlOverRows02 {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)

    tEnv.executeSql(
      """
        |create table projects(
        |id int,
        |name string,
        |score double,
        |load_date timestamp(3),
        |WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE
        |)with(
        |'connector' = 'kafka',
        |'topic' = 'test-topic',
        |'properties.bootstrap.servers' = 'server120:9092',
        |'properties.group.id' = 'testGroup',
        |'scan.startup.mode' = 'latest-offset',
        |'format' = 'csv'
        |)
        |""".stripMargin)
    tEnv.executeSql(
      """
        |select
        | name,
        | max(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )max_score,
        | min(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )min_score,
        | current_time
        | from
        | projects
        |""".stripMargin).print()
  }
}

分组聚合:group agg

Apache Flink 支持标准的 GROUP BY 子句来聚合数据。

SELECT COUNT(*) FROM Orders GROUP BY order_id

特点:

1、聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”、“MAX”和 “MIN”。

2、对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止,会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。

 3、对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的,因为COUNT只需要保存计数值。

因此,可以设置table-exec-state-ttl,但是可能会影响查询结果的正确性,因为状态超时会被丢弃。

注意:

Flink 对于分组聚合提供了一系列性能优化的方法。更多参见:性能优化,包括MiniBatch 聚合、Local-Global 聚合、拆分 distinct 聚合、在 distinct 聚合上使用 FILTER 修饰符 、MiniBatch Regular Joins

窗口聚合:window agg

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列(必须包含,否则就变成分组聚合等了)。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态(watermark超过窗口end+allowlateness,就会销毁窗口)。

具体例子:

SELECT window_start, window_end, SUM(price) AS

total_price

FROM TABLE(

    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

GROUP BY window_start, window_end;

+------------------+------------------+-------------+

|     window_start |       window_end | total_price |

+------------------+------------------+-------------+

| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |

| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |

+------------------+------------------+-------------+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/449097.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MongoDB的count() 统计文档数量非常慢

在MongoDB中&#xff0c;count()函数用于统计文档的数量。但是count()函数通常不会使用索引来计算文档数量&#xff0c;而是扫描集合中的文档来计数。当数据量较大的时候&#xff0c;就不适合使用了。 解决方案&#xff1a; 1、使用聚合框架&#xff08;aggregation framewor…

EasyNVR级联EasyCVR,在EasyCVR播放视频会导致EasyNVR崩溃的原因排查与解决

视频综合管理平台EasyCVR视频监控系统支持多协议接入、兼容多类型设备&#xff0c;平台可以将监控区域内所有部署的监控设备进行统一接入与集中汇聚管理&#xff0c;实现对监控区域的实时视频监控、录像与存储、设备管理、云台控制、语音对讲、级联共享等&#xff0c;在监控中心…

从零搭建NodeJS项目(小白教程)

这边文章将介绍如何从零开始创建一个基于Express框架的Node.js项目。Express是一个快速、无拘束且极简的Node.js web应用框架&#xff0c;它提供了一系列强大的功能&#xff0c;使得web开发变得更加高效。 目录 1. 环境准备 2. 安装Express脚手架 3. 创建项目 4. 初始化项…

Clearview X for mac v3.5.0 电子书阅读器 兼容 M1/M2/M3

应用介绍 Clearview X 是 macOS 上的一款简洁易用且美观大方的电子书阅读器。直观好用的图书管理功能&#xff0c;支持 PDF, Epub, MOBI, CHM, FB2, CBR, CBZ 等流行的电子书格式&#xff0c;可以方便地添加注解&#xff0c;插入书签&#xff0c;及迅速的搜索查找。支持在不同…

git init 执行后发生了什么?

首先在磁盘中创建一个新目录 Git&#xff0c;进入该目录后执行 git init 初始化。这个时候目录下会创建一个隐藏目录 ./git&#xff0c;这个./git 目录叫做 Git 版本库或者仓库 $ git init Initialized empty Git repository in D:/Git/.git/ 在讲解.git 目录内容前&#xff0…

【C++】关联式容器

目录 前言&#xff1a; 一&#xff0c;set容器 二&#xff0c;multiset容器 三&#xff0c;map容器 四&#xff0c;multimap容器 前言&#xff1a; 在C中&#xff0c;STL中的部分容器&#xff0c;比如&#xff1a;vector、list、deque、 forward_list(C11)等&#xff0c;这…

第五届国际信息技术与教育技术大会(ITET 2024)即将召开!

2024年第五届国际信息技术与教育技术大会&#xff08;ITET 2024&#xff09;将于5月10-12日在日本鸟取举行。本届会议由日本鸟取大学主办&#xff0c;冈山大学、湘南工业大学、名古屋工业大学、山口大学等提供技术支持。ITET 2024旨在探讨计算机领域的创新发展在教育环境中所带…

javase day03笔记

第三天课堂笔记 idea的使用★★★ 创建空工程创建模块创建包&#xff1a;package创建类idea的设置 file -> settings 快捷键 shift &#xff0b; 回车 &#xff1a; 光标切换到下一行psvm回车&#xff1a; main方法main回车&#xff1a;main方法sout回车&#xff1a;输…

快速入门:JS对象/BOM/DOM/事件监听

本贴介绍JS相对进阶的知识&#xff0c;对于JavaScript的基础语法&#xff0c;本文不再赘述~ 一.JavaScript对象 1.Array数组对象 定义 var arr new Array(1,2,3); var arr[1,2,3]; 访问 arr[0]1; Js数组类似Java中的集合&#xff0c;长度&#xff0c;类型都可以改变。 如…

Web端功能测试方法最有作用的5个点

对于web测试&#xff0c;较之其他软件测试又有所不同&#xff0c;这是细节的不同&#xff0c;这个不同需要我们在不停的测试中去总结的。 web测试正式测试之前&#xff0c;应先确定如何开展测试&#xff0c;不可盲目的测试&#xff0c;讲究方法才能行之有效的提高我们的效…

Linux——文件缓冲区与模拟实现stdio.h

前言 我们学习了系统层面上的文件操作&#xff0c;也明白了重定向的基本原理&#xff0c;在重定向中&#xff0c;我们使用fflush(stdout)刷新了缓冲区&#xff0c;当时我们仅仅知道重定向需要刷新缓冲区&#xff0c;但是不知道其所以然&#xff0c;今天我们来见识一下。 一、…

框架学了不会用?四小时做完一个完整的前后端分离demo(SpringBoot+Vue)

四小时做完一个完整的前后端分离demo&#xff08;SpringBootVue&#xff09; 分享一个看到的还不错的小项目&#xff0c;非常适合刚学完框架但是没有太多动手机会的的学生党用来练手。 优势 手把手写代码&#xff0c;有教学视频免费&#xff0c;有源代码项目周期短 视频教程…

Nvidia显卡@参数规格@驱动下载@cuda版本查看

文章目录 Nvidia显卡产品类型GeForce系列 命名规则前缀和后缀技术特点性能指标/&#x1f47a;显存(VRAM)显存和位宽位宽和现存容量的设计 其他 显卡信息查看Nvidia官网查看其他数据库核心规格GeForce系列产品参数在线查看&#x1f47a;大汇显卡规格总比较其他显卡规格比较 性能…

Facebook、亚马逊账号如何养号?

之前我们讨论过很多关于代理器的问题。它们的工作原理是什么?在不同的软件中要使用那些代理服务器?这些代理服务器之间的区别是什么?什么是反检测浏览器等等。 除了这些问题&#xff0c;相信很多人也会关心在使用不同平台的时代理器的选择问题。比如&#xff0c;为什么最好…

目标检测——布匹缺陷检测数据集

一、简要 布匹瑕疵是指在布料生产过程中或后续处理中出现的各种不符合质量标准或期望的缺陷。这些瑕疵可能源自原料、织造工艺、染色、印花、加工等多个环节。布匹瑕疵的类型繁多&#xff0c;涵盖了结构瑕疵和质量瑕疵两大类。结构瑕疵指的是布料本身的缺陷&#xff0c;包括嵌…

Skia最新版CMake编译

运行示例&#xff1a;example/HelloWorld.cpp Skia: 2024年03月08日 master分支: 993a88a663c817fce23d47394b574e19d9991f2f 使用CMake编译 python tools/git-sync-depsbin/gn gen out/config --idejson --json-ide-script../../gn/gn_to_cmake.py此时output目录会生成CM…

指数幂+力扣

题目 题目链接 . - 力扣&#xff08;LeetCode&#xff09; 题目描述 代码实现 class Solution { public:double myPow(double x, int n) {long t n;return t > 0 ? _myPow(x, t) : 1 / _myPow(x, -t);}double _myPow(double x, int n){if(n 0) return 1;double y _…

【解决】Sublime Text找不到Package Control选项,且输入install也不显示Install Package(其中一种情况)

【问题描述】 Sublime Text 找不到 Package Control 选项&#xff0c;且输入 install 也不显示 Install Package 【解决方法】&#xff08;其中一种情况&#xff09; 1、工具栏 Preferences -> Settings&#xff0c;点开查看设置文档 2、检查 "ignored_packages&q…

Vue+OpenLayers7入门到实战:OpenLayers地图鼠标点击事件使用,点击地图后弹框并显示当前位置经纬度

返回《Vue+OpenLayers7》专栏目录:Vue+OpenLayers7入门到实战 前言 本章介绍如何使用OpenLayers7在地图上监听点击事件,以及监听地图点击事件后进行简单弹框并获取当前点击位置的经纬度并显示wgs84坐标位置和度分秒格式经纬度信息。 二、依赖和使用 "ol": "…

【开发】JavaWeb开发中如何解析JSON格式数据

目录 前言 JSON 的数据类型 Java 解析 JSON 常用于解析 JSON 的第三方库 Jackson Gson Fastjson 使用 Fastjson Fastjson 的优点 Fastjson 的主要对象 JSON 接口 JSONObject 类 JSONArray 类 前言 1W&#xff1a;什么是JSON&#xff1f; JSON 指 JavaScrip t对象表…