【大数据】Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT

Flink SQL 语法篇(二)

  • 1.WITH 子句
  • 2.SELECT & WHERE 子句
  • 3.SELECT DISTINCT 子句

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM Orders

INSERT INTO target_table
SELECT order_id, price + tax FROM Orders

INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10

-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/366239.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

谷歌seo搜索引擎优化需要做什么?

当你要做谷歌seo,经手一个你之前没有接触过的网站,你首先要做的就是分析网站当前的流量数据,如果是新站自然不需要这一步,不过数据分析依旧是件很重要的事情,做seo不懂得分析数据相当于白做 再来就是你要了解网站所在的…

卸载Ubuntu双系统

卸载Ubuntu双系统 我们卸载Ubuntu双系统,可能出于以下原因: 1、Ubuntu系统内核损坏无法正常进入 2、Ubuntu系统分配空间不足,直接扩区较为复杂 3、以后不再使用Ubuntu,清理留出空间 123无论出于哪种原因,我们都是要…

柔性电流探头方向判断有哪些方法?干货分享!

柔性电流探头方向判断的方法干货分享!从理论到实践,助您成为专业人士!干货收藏,快看起来吧!      柔性电流探头方向判断一直是电力行业测试中的关键问题之一,确切地判断电流方向对于测试电力系统的稳定…

【GitHub项目推荐--一个由OpenAI提供支持的聊天机器人和虚拟助手的构建平台】【转载】

Botpress Botpress是一个开源项目,它提供了一个平台,用于构建、部署和管理基于人工智能的聊天机器人和虚拟助手 github地址: https://github.com/botpress/botpress Botpress的介绍 Botpress是一个开源项目,它提供了一个平台&…

ROS2 Humble学习笔记 (2)

本文发表于个人的github pages。因csdn本身显示插件和转载过程中导致显示不太友好。建议大家阅读原文。想查看完整内容,请移步到ROS2 Humble学习笔记2。 本文篇幅较长,可抽空按照章节阅读。本文只作为对入门教程的一种浮现和提升。 一、前言 在上一篇…

Spring框架——主流框架

文章目录 Spring(轻量级容器框架)Spring 学习的核心内容-一图胜千言IOC 控制反转 的开发模式Spring快速入门Spring容器剖析手动开发- 简单的 Spring 基于 XML 配置的程序课堂练习 Spring 管理 Bean-IOCSpring 配置/管理 bean 介绍Bean 管理包括两方面: Bean 配置方式基于 xml 文…

2023年上-未来几年我要做什么

1月份,离职。 2月份,春节休假回来,中旬去参加了一个月的瑜伽培训,学会了倒立、鹤蝉。。。。 3月份,瑜伽培训结束,开始收拾房子,并调研各类项目。 4月份,参与了朋友的区块链项目 …

echarts step line

https://ppchart.com/#/ <template><div class"c-box" ref"jsEchart"></div> </template><script> import * as $echarts from echarts // 事件处理函数 export default {props: {// 需要传递的数据data: {type: Array,defa…

Day06-Linux下目录命令讲解及重要文件讲解

Day06-Linux下目录命令讲解及重要文件讲解 1. Linux目录文件1.1 Linux系统目录结构介绍1.1.1 Linux与Windows目录结构对比 1.2 重要的Linux配置文件介绍1.2.1 /etc系统初始化及设置相关重要文件1.2.2 /usr目录的重要知识介绍------应用程序目录1.2.3 /var目录下的路径知识-----…

thinkphp项目之composer快速安装使用

引言 由于项目的需求&#xff0c;thinkphp项目使用到composer。网上搜索有一堆的教程使用&#xff0c;根据自己的需要摸索了下。 步骤 1. 安装phpstudy v8&#xff0c;这个经常用的运行环境&#xff0c;方便好多开发者。安装教程一步一步到最后就行。 2. 安装composer组件&a…

Github 2F2【解决】经验帖-PPHub登入

最近在做项目时,Github总是出问题,这是一经验贴 Github 2F2登入问题【无法登入】PPhub 2F2是为了安全,更好的生态 启用 2FA 二十八 (28) 天后,要在使用 GitHub.com 时 2FA 检查 物理安全密钥、Windows Hello 或面容 ID/触控 ID、SMS、GitHub Mobile 都可以作为 2F2 的工…

CTF特训(二):青少年CTF-MISC部分WP

FLAG&#xff1a;当觉得自己很菜的时候&#xff0c;就静下心来学习 专研方向:MISC&#xff0c;CTF 每日emo&#xff1a;听一千遍反方向的钟&#xff0c;我们能回到过去吗&#xff1f; CTF特训(二)&#xff1a;青少年CTF-MISC部分WP&#xff1a; 文章目录 CTF特训(二)&#xff1…

LeetCode:283. 移动零

283. 移动零 1&#xff09;题目2&#xff09;代码方法一&#xff1a;两层for循环方法二&#xff1a;使用双指针 3&#xff09;结果方法一结果方法二结果 1&#xff09;题目 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的…

计算机毕业设计 | SpringBoot 房屋租赁网 房屋租赁平台(附源码)

1&#xff0c;绪论 1.1 背景调研 在房地产行业持续火热的当今环境下&#xff0c;房地产行业和互联网行业协同发展&#xff0c;互相促进融合已经成为一种趋势和潮流。本项目实现了在线房产平台的功能&#xff0c;多种技术的灵活运用使得项目具备很好的用户体验感。 这个项目的…

day24打卡

day24打卡 思路&#xff1a;画出决策树&#xff0c;暴力枚举。子集问题 决策树&#xff1a; 函数头&#xff1a;void dfs(int n, int k, int pos) 函数体&#xff1a; ​ 出口&#xff1a;全局变量count k 保存结果到全局变量ret中 ​ 子问题&#xff1a;从pos位置向后变…

Linux服务详解

如有错误或有补充&#xff0c;以及任何改进的意见&#xff0c;请在评论区留下您的高见&#xff0c;同时文中给出大部分命令的示例&#xff0c;即是您暂时无法在Linux中查看&#xff0c;您也可以知道各种操作的功能以及输出 如果觉得本文写的不错&#xff0c;不妨点个赞&#x…

C++ //练习 3.36 编写一段程序,比较两个数组是否相等。再写一段程序,比较两个vector对象是否相等。

C Primer&#xff08;第5版&#xff09; 练习 3.36 练习 3.36 编写一段程序&#xff0c;比较两个数组是否相等。再写一段程序&#xff0c;比较两个vector对象是否相等。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /******…

WordPress可以做企业官网吗?如何用wordpress建公司网站?

我们在国内看到很多个人博客网站都是使用WordPress搭建&#xff0c;但是企业官网的相对少一些&#xff0c;那么WordPress可以做企业官网吗&#xff1f;如何用wordpress建公司网站呢&#xff1f;下面boke112百科就跟大家简单说一下。 WordPress是一款免费开源的内容管理系统&am…

主机安全加固之-openssh版本升级

升级openssh之前&#xff0c;为了保证能正常通过工具连接主机&#xff0c;咱们开启telnet服务&#xff0c;通过telnet的方式登录主机 一&#xff1a;开启telnet服务 1.安装telnet服务 [rootlocalhost ~]# yum install –y telnet telnet-server xinetd2.修改telnet服务配置文…

Multisim14.0仿真(四十一)交通信号灯仿真设计

一、功能简介&#xff1a; 1&#xff09;、采用两片74LS192做减法计数器&#xff0c;实现倒计时功能。 2&#xff09;、采用DCD数码管显示时间。 3&#xff09;、采用4个TRAFFIC_LIGHT_SINGLE红绿灯 4&#xff09;、采用74LS160和74LS138实现对红绿灯的逻辑控制。 5&#xff09…