Flink SQL TopN语句详解

TopN 定义(⽀持 Batch\Streaming): TopN 对应离线数仓的 row_number(),使⽤ row_number() 对某⼀个分组的数据进⾏排序。

应⽤场景: 根据 某个排序 条件,计算 某个分组 下的排⾏榜数据。

SQL 语法标准:

SELECT [column_list]
FROM (
 SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
 ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
 FROM table_name)
WHERE rownum <= N [AND conditions];
  • ROW_NUMBER() :标识 TopN 排序⼦句;
  • PARTITION BY col1[, col2…] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据排序取 topN,下述案例中的 partition by key ,根据需求中的搜索关键词(key)做为分区;
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]…] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进⾏排序;
  • WHERE rownum <= N :这个⼦句是必须的,加上这个⼦句,Flink 才能将其识别为 TopN 查询,其中 N 代表 TopN 的条⽬数;
  • [AND conditions] :其他的限制条件也可以加上。

实际案例: 取某个搜索关键词下的搜索热度前 10 名的词条数据。

输⼊数据为搜索词条数据的搜索热度数据,当搜索热度发⽣变化时,会将变化后的数据写⼊到数据源的 Kafka 中:

数据源 schema:

-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(⽐如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (
 name STRING NOT NULL,
 search_cnt BIGINT NOT NULL,
 key STRING NOT NULL,
 row_time timestamp(3),
 WATERMARK FOR row_time AS row_time
) WITH (
 'connector' = 'filesystem', 
 'path' = 'file:///Users/hhx/Desktop/source_table.csv',
 'format' = 'csv'
);

A,100,a,2021-11-01 00:01:03
A,200,a,2021-11-02 00:01:03
A,300,a,2021-11-03 00:01:03
B,200,b,2021-11-01 00:01:03
B,300,b,2021-11-02 00:01:03
B,400,b,2021-11-03 00:01:03
C,300,c,2021-11-01 00:01:03
C,400,c,2021-11-02 00:01:03
C,500,c,2021-11-03 00:01:03
D,400,d,2021-11-01 00:01:03
D,500,d,2021-11-02 00:01:03
D,600,d,2021-11-03 00:01:03

-- 数据汇 schema:
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(⽐如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE sink_table (
 key BIGINT,
 name BIGINT,
 search_cnt BIGINT,
 `timestamp` TIMESTAMP(3)
) WITH (
 ...
);

-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
 SELECT key, name, search_cnt, row_time, 
 -- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 2 名
 ROW_NUMBER() OVER (PARTITION BY key ORDER BY search_cnt desc) AS rownum
 FROM source_table)
WHERE rownum <= 2

输出结果:

在这里插入图片描述

注意: 包含回撤流。

上⾯ SQL 会翻译成以下三个算⼦

数据源 :数据源即最新的词条下⾯的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进⾏ hash 分发到下游排序算⼦,相同的 key 数据将会发送到⼀个并发中;

排序算⼦ :为每个 Key 维护了⼀个 TopN 的榜单数据,接受到上游的⼀条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加⼊ TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据⽐原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据,之前下发的排名数据被撤回(即回撤数据),然后下发新的排名数据;

数据汇 :接收到上游的数据之后,然后输出到外部存储引擎中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123041.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

启动Hbase出现报错

报错信息&#xff1a;slave1:head: cannot open/usr/local/hbase-2.3.1/bin/../logs/hbasewanggiqi-regionserver-slavel.out’ for reading: No such file or direslave2: head: cannot open/usr/local/hbase-2.3.1/bin/../logs/hbasewangqiqi-regionserver-slave2.out’ for …

无人机航迹规划:六种最新智能优化算法(DBO、LO、SWO、COA、LSO、KOA)求解无人机路径规划MATLAB

一、六种算法&#xff08;DBO、LO、SWO、COA、LSO、KOA&#xff09;简介 1、蜣螂优化算法DBO 蜣螂优化算法&#xff08;Dung beetle optimizer&#xff0c;DBO&#xff09;由Jiankai Xue和Bo Shen于2022年提出&#xff0c;该算法主要受蜣螂的滚球、跳舞、觅食、偷窃和繁殖行为…

《网络协议》03. 传输层(TCP UDP)

title: 《网络协议》03. 传输层&#xff08;TCP & UDP&#xff09; date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录&#xff1a;网络协议 excerpt: 传输层、UDP、TCP&#xff08;可靠传输&#xff0c;流量控制&#xff0c;拥塞控制&#xf…

如何使用 NFTScan NFT API 在 zkSync 网络上开发 Web3 应用

zkSync 是由 Matter Labs 创建的&#xff0c;是一个以用户为中心的 zk rollup 平台&#xff0c;它是以太坊的第 2 层扩展解决方案&#xff0c;使用 zk-rollups 作为扩展技术&#xff0c;与 optimistic rollups 一样&#xff0c;zk-rollups 将会汇总以太坊主网上的交易并将交易证…

基于CSP的运动想象EEG分类任务实战

基于运动想象的公开数据集&#xff1a;Data set IVa (BCI Competition III)1 数据描述参考前文&#xff1a;https://blog.csdn.net/qq_43811536/article/details/134224005?spm1001.2014.3001.5501 EEG 信号时频空域分析参考前文&#xff1a;https://blog.csdn.net/qq_4381153…

基于PHP语言的会员系统搭建(Docker版)

1、操作系统 准备&#xff1a; ubuntu22机器 基础&#xff1a;docker:【精选】Docker微服务-基础_v2/_catalog-CSDN博客 2、安装Docker # Add Dockers official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl gnupg sudo install -m 0755 -d /etc/…

java项目之公廉租房维保系统(ssm框架)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的公廉租房维保系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 一、业主管理功能 该部分内容提…

【应用前沿】360QPaaS 精彩亮相首届中国航空制造设备博览会 | 数智航空

近日&#xff0c;首届“中国航空制造设备博览会”&#xff08;CAEE2023&#xff09;在宁波国际会展中心顺利召开&#xff0c;本届大会以“数智产融 开放发展”为主题&#xff0c;以“新技术、新产品、新服务、新企业”为定位&#xff0c;以特色化、专业化、品牌化、高端化为方向…

[MySQL] MySQL库的基础操作

文章目录 一、数据库的创建 1、1 库的创建 1、2 字符集与校验规则 1、2、1 查看字符集与校验规则 1、2、2 字符集与校验规则的设置 1、2、3 校验规则对数据库的影响 二、数据库的操作 2、1 查看数据库 2、2 删除数据库 2、3 修该数据库 2、4 数据库删除和备份 2、5 显示创建语…

覆盖13个行业,数据分类分级标准汇编更新啦!(附下载)

2016年11月&#xff0c;《网络安全法》明确将“数据分类”作为网络安全保护法定义务之一。 2021年9月&#xff0c;《数据安全法》再次具体确立了“数据分类分级保护制度”及其基本原则。 2021年11月&#xff0c;《个人信息保护法》、《网络数据安全管理条例(征求意见稿)》相继出…

Linux开发工具之编译器gcc/g++

文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…

Bean作用域

从笔者之前的博客&#xff0c;我们可以看出 Spring 是⽤来读取和存储 Bean&#xff0c;因此在 Spring 中 Bean 是最核⼼的操作 资源&#xff0c;所以接下来我们深⼊学习⼀下 Bean 对象&#xff1a;Bean作用域&#xff01; 限定程序中变量的可用范围叫做作用域&#xff01;或者…

Maven依赖包冲突的两种排查和解决方案

1、识别冲突 观察错误消息&#xff1a;Maven在构建过程或者是项目启动过程中&#xff0c;大概率会输出关于版本冲突的警告或错误消息。当然也有小概率是在运行到指定代码时才会产生的包冲突导致异常报错。 使用mvn dependency:tree命令&#xff1a;这个命令可以展示项目中的所…

Adobe Illustrator 2021 下载及安装教程

目录 下载地址&#xff1a; 安装教程&#xff1a; 下载地址&#xff1a; Adobe Illustrator 2021安装包 链接&#xff1a;https://pan.baidu.com/s/1UIzjbS5pRuL7Zpt9RrU5lQ 提取码&#xff1a;lxwj 安装教程&#xff1a; 1、下载压缩包,解压文件 2、双击Set_up.exe&#…

基于Fuzzing和ChatGPT结合的AI自动化测试实践分享

一、前言 有赞目前&#xff0c;结合insight接口自动化平台、horizons用例管理平台、引流回放平台、页面比对工具、数据工厂等&#xff0c;在研发全流程中&#xff0c;已经沉淀了对应的质量保障的实践经验&#xff0c;并在逐渐的进化中。 在AI能力大幅进步的背景下&#xff0c…

视频电影和字幕如何合并?

我们在看一些国外的电影或者电视剧有时是没有字幕文件的&#xff0c;而对于普通人来说&#xff0c;没有字幕意味着我们无法看懂电影的剧情&#xff0c;好不容易获得的视频资源没有意义了&#xff0c;这种情况该怎么办呢&#xff1f; 其实这种情况完全不用怕&#xff0c;要知道…

机器学习 - DBSCAN聚类算法:技术与实战全解析

目录 一、简介DBSCAN算法的定义和背景聚类的重要性和应用领域DBSCAN与其他聚类算法的比较 二、理论基础密度的概念核心点、边界点和噪声点DBSCAN算法流程邻域的查询聚类的形成过程 参数选择的影响 三、算法参数eps&#xff08;邻域半径&#xff09;举例说明&#xff1a;如何选择…

Kibana使用Watcher监控服务日志并发送飞书报警(Markdown)

Watcher是什么 Kibana Watcher 是 Elasticsearch 的监控和告警工具&#xff0c;它允许你设置和管理告警规则以监控 Elasticsearch 数据和集群的状态。Kibana Watcher 可以监测各种指标和数据&#xff0c;然后在满足特定条件时触发警报。它提供了一种强大的方式来实时监控 Elas…

Could not load library libcudnn_cnn_train.so.8, 解决类似问题的思路与方法

完整报错 Could not load library libcudnn_cnn_train.so.8. Error: /home/ai/anaconda3/envs/ai/bin/../lib/libcudnn_ops_train.so.8: undefined symbol: _ZN5cudnn3ops26JoinInternalPriorityStreamEP12cudnnContexti, version libcudnn_ops_infer.so.8 错误原因 该错误其…

Git入门---简介,常用命令

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《Spring与Mybatis集成整合》《Vue.js使用》 ⛺️ 越努力 &#xff0c;越幸运。 1.Git 的简介 1.1. 介绍 Git是一个开源的分布式版本控制系统&#xff0c;最初由Linus Torvalds于2005年创…