Flink Sql:四种Join方式详解(基于flink1.15官方文档)

JOINs

flink sql主要有四种连接方式,分别是Regular Joins、Interval Joins、Temporal Joins、lookup join

1、Regular Joins(常规连接 )

这种连接方式和hive sql中的join是一样的,包括inner join,left join,right join,full join

1、指定数据源建立students表
CREATE TABLE students (
  id STRING,
  name STRING,
  age INT,
  sex STRING,
  clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置为最新生成的数据
  'format' = 'csv' -- 指定数据的格式
);

2、kafka生产students表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班


3、创建关联表scores
CREATE TABLE scores (
  sid STRING,   
  cid STRING,     --学科id
  score INT     
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

4、kafka生产scores数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142

-- inner jion   两边数据都不为null的才会关联
select 
a.id,a.name,b.sid,b.score
from 
students as a
inner join
scores as b
on a.id=b.sid;

-- left join/right join    保证左边/右边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
right join
scores as b
on a.id=b.sid;

-- full join       保证两边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
full join
scores as b
on a.id=b.sid;



-- 注意:
-- 常规连接,会将两个表的数据一直保存在状态中,时间长了,状态会越来越大,导致任务执行失败,通常在批处理中使用,因为批处理没有状态这个概念。

为了避免状态过大可能会导致的任务失败问题,我们可以设置状态有效期
-- 状态有效期,状态在flink中保存的时间,但是如果sql中除了关联操作还有聚合这样也需要将数据保存在状态中的操作,状态有效期设置的太短可能会让聚合这样的操作失败,设置的太长延迟也会增加。所以,状态保留多久需要根据实际业务分析
SET 'table.exec.state.ttl' = '20000';
设置该参数后,那么只有在20秒内到达的数据才会被保存到状态中进行关联。

inner join结果:

left join 结果:

right join结果:

full join结果:

2、Interval Joins(间隔连接

Interval Joins:在一段时间内关联

对于流式查询,与常规连接相比,间隔连接仅支持具有时间属性的追加表。由于时间属性是拟单调递增的,因此 Flink 可以从其状态中删除旧值,而不会影响结果的正确性。

这种方式可以变相弥补Regular Joins中时间长了状态过大的问题。

CREATE TABLE students_proctime (
    id STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班

CREATE TABLE scores_proctime (
    sid STRING,
    cid STRING,
    score INT,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142


select a.id,a.name,b.sid,b.score from 
students_proctime a, scores_proctime b
where a.id=b.sid
-- a表的时间需要在b表时间10秒内或b表的时间需要在a表时间10秒内
and (
	a.proctime BETWEEN b.proctime - INTERVAL '10' SECOND AND b.proctime
    or b.proctime BETWEEN a.proctime - INTERVAL '10' SECOND AND a.proctime
);

3、Temporal Joins(时态连接)

这种关联方式是专门用来关联时态表的。

  • Temporal Joins(时态连接)是在流式计算或数据处理中,对两个或多个随时间变化的表(也称为动态表或时态表)进行连接的操作。这些表包含随时间变化的数据,并且行与一个或多个时态周期相关联。

在我们生活中最常见的时态表就是汇率表,汇率随着时间变化而变化。

 

案例:

例如,假设我们有一张订单表,每张订单的价格都采用不同的货币。为了正确地将此表标准化为单一货币(如美元),每张订单都需要与下订单时相应的货币兑换率相结合。

1、创建订单表
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,    --币种
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

2、订单表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
o_001,1,EUR,2024-06-06 12:00:00
o_002,100,EUR,2024-06-06 12:00:07
o_003,200,EUR,2024-06-06 12:00:16
o_004,10,EUR,2024-06-06 12:00:21
o_005,20,EUR,2024-06-06 12:00:25

3、创建汇率表
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED -- 主键,区分不同的汇率
) WITH (
  'connector' = 'kafka',
  'topic' = 'currency_rates1', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置
  'format' = 'canal-json' -- 指定数据的格式
);

4、向汇率表中添加数据
insert into currency_rates
values
('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),
('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),
('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),
('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates

-- 使用常规关联方式关联时态表只能关联到最新的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates as b
on a.currency=b.currency;

-- 时态表join
-- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates FOR SYSTEM_TIME AS OF a.order_time as b 
on a.currency=b.currency;

常规join结果:

时态join结果:

4、lookup join(查找连接

Lookup Join,也称为维表 Join,通常用于从外部系统查询的数据表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。

具体来说:

lookup join用于流表(动态表)关联维度表

流表:动态表

维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql等外部数据源


扩展:流表、事实表、维度表

-- 流表(动态表)
1、流表的数据来源通常是实时数据流,这些数据流可以来自各种数据源,如 Kafka、RabbitMQ、Kinesis 等。Flink可以通过数据源连接器(Source Connectors)将这些实时数据流接入到 Flink 系统中
2、与传统数据库中的表不同,流表的行是动态生成的,随着数据流的持续产生而不断增加


-- 维度表
1、主要提供数据的分析角度,包含了描述业务环境的属性信息,如时间、地理、产品等。
2、维度表:通常比较宽(包含多个属性列),但行数相对较少,因为维度表中的每一行通常代表一个具体的业务实体或类别,如一个商品、一个客户、一个日期等。
3、维度表与事实表之间通过外键相关联,共同构成了星型模型或雪花模型。事实表中的外键用于与维度表中的主键相匹配,从而提供数据的上下文和分类信息。
4、维度表存储的是对数据的描述性信息,这些信息通常不随时间变化,或者变化不频繁。例如,商品的品牌、型号、颜色等属性一旦确定后很少会发生变化。但在某些情况下,如新产品上市或促销活动,可能需要更新维度表以添加新的维度成员。

-- 事实表
1、存储了实际的数据度量值,如销售额、订单数量等。事实表是数据分析的核心,包含了所有用于分析的数据指标。
2、通常比较窄(包含较少的列),但行数非常多,因为事实表中的每一行通常代表一个具体的事件或交易,如一个订单、一次点击等。
3、事实表存储的是度量数据(即指标),这些数据会随时间变化,并且经常需要被汇总和分析。例如,销售额、订单数量、点击量等指标会随着业务活动的进行而不断更新。
4、事实表的数据更新频率通常较高,因为事实数据会随着业务活动的进行而不断产生。例如,每当有新的订单产生时,都需要在事实表中插入一条新的记录。

 

1、创建分数表
CREATE TABLE scores (
    sid INT,
    cid STRING,
    score INT,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

2、生产分数表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000003,137

3、建立学生表,我们将学生表当作维度表放在mysql中
CREATE TABLE students_test (
    id INT,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/bigdata29',
    'table-name' = 'students_test',
    'username' ='root',
    'password' = '123456',
    'lookup.cache.max-rows' = '1000', -- 最大缓存行数
    'lookup.cache.ttl' ='10000' -- 缓存过期时间
);


学生表数据
1500100001,施笑槐,22,女,文科六班



-- 使用常规关联方式
-- 维表的数据只在任务启动的时候读取一次,后面不再实时读取,
-- 只能关联到任务启动时读取的数据
-- 一旦mysql中的学生表更新数据,但是关联的学生表数据却是任务启动时从mysql读取的,这就有错误了,lookup join可以解决该问题。
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test  as b
on a.sid=b.id;

-- lookup join
-- 当流表每来一条数据时,使用关联字段到维表的数据源中查询
-- 优点:实时更新数据源,准确性高
-- 缺点:每一次都需要查询数据库,性能会降低
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test FOR SYSTEM_TIME AS OF a.proctime as b
on a.sid=b.id;

此时我们修改更新mysql中的学生表数据

修改之前

修改后:

常规关联结果:

look up关联结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/706893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【论文阅读】-- 时态合并树状图:时态标量数据的基于拓扑的静态可视化

时态合并树状图:时态标量数据的基于拓扑的静态可视化 摘要1 引言2 相关工作及背景介绍2.1 增广合并树2.2 (增强)合并树的可视化与跟踪2.3 特征跟踪2.4 数据线性化 3 时间合并树状图3.1 映射单个时间步长: R d → R R^d \rightarrow R Rd→R3.2 映射所有时…

Unity 设置窗口置顶超级详解版

目录 前言 一、user32.dll 1.什么是user32.dll 2.如何使用user32.dll 二、句柄Handle 1.句柄 2.句柄的功能 3.拿句柄的方法 三、窗口置顶 1.窗口置顶的方法 2.参数说明 3.使用方法 四、作者的碎碎念 前言 up依旧挑战全网讲解最详细版本~~ 本篇文章讲解的是unity…

基于Springboot的小型超市商品展销系统-计算机毕业设计源码01635

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化,电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流,人类发展的历史正进入一个新时代。在现实运用中,应用软件的工作…

anaconda安装pytorch-快速上手99%可以(可以虚拟环境OR不进行虚拟环境)

一、预备工作 先检查自己是否有anaconda 在cmd里面输入conda --version查看 二、在anaconda中创建虚拟环境 1.1 打开Anaconda Prompt 1.2 进行自定义安装python 将其中的自定义地址和版本换成自己想安装的地址和版本 我这里安装的地址是E:\Anaconda\DL,python版本是3.8.3…

循环 -控制语句

循环 循环是什么 重复执行一段代码的结构。只要满足循环的条件,会一直执行这个代码。 循环条件:在一定范围之内,按照指定的次数来执行循环。 循环体:在指定的次数内执行的命令序列。只要条件满足循环体会被一直执行。 循环和…

Windows搭建nacos集群

Nacos是阿里巴巴的产品,现在是SpringCloud中的一个组件。相比Eureka功能更加丰富,在国内受欢迎程度较高。 下载地址:Tags alibaba/nacos GitHub 链接:百度网盘 请输入提取码 提取码:8888 解压文件夹 目录说明&am…

FPGA - 滤波器 - FIR滤波器设计

一,数字滤波器 滤波器是一种用来减少或消除干扰的器件,其功能是对输入信号进行过滤处理得到所需的信号。滤波器最常见的用法是对特定频率的频点或该频点以外的频率信号进行有效滤除,从而实现消除干扰、获取某特定频率信号的功能。一种更广泛的…

MyBatis 高级映射与延迟加载(分步查询)的详细内容

1. MyBatis 高级映射与延迟加载(分步查询)的详细内容 文章目录 1. MyBatis 高级映射与延迟加载(分步查询)的详细内容2. 准备工作3. 多对一 高级映射3.1 第一种方式:级联属性映射3.2 第二种方式:association…

英格索兰IngsollRang控制器过热维修讲解

【英格索兰IngsollRang控制器维修请关注】 【英格索兰IngsollRang控制器维修】 【英格索兰控制器维修】 一、IngsollRang扭矩枪控制器故障诊断 1. 检查环境温度:首先,确认工作场所的温度是否过高。如果环境温度超过设备规定的工作温度,可能…

刷代码随想录有感(102):动态规划——整数拆分

题干&#xff1a; 代码&#xff1a; class Solution { public:int integerBreak(int n) {vector<int>dp(n 1);dp[0] 0;dp[1] 0;dp[2] 1;for(int i 3; i < n; i){for(int j 1; j < i / 2; j){dp[i] max(dp[i], max((j) * (i - j), j * dp[i - j]));}}return…

JavaScript的数组(一维数组、二维数组、数组常用的方法调用)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

eclipse 老的s2sh(Struts2+Spring+Hibernate) 项目 用import导入直接导致死机(CPU100%)的解决

1、下载Apache Tomcat - Apache Tomcat 8 Software Downloads 图中是8.5.100的版本&#xff0c;下面的设置用的是另一个版本的&#xff0c;其实是一样。 2、先将Server配好&#xff0c;然后再进行导入操作。 2、选择jdk 当然&#xff0c;这里也可以直接“Download and instal…

无公网IP与服务器完成企业微信网页应用开发远程调试详细流程

文章目录 前言1. Windows安装Cpolar2. 创建Cpolar域名3. 创建企业微信应用4. 定义回调本地接口5. 回调和可信域名接口校验6. 设置固定Cpolar域名7. 使用固定域名校验 前言 本文主要介绍如何在企业微信开发者中心通过使用内网穿透工具提供的公网域名成功验证回调本地接口服务! …

Solr7.4.0报错org.apache.solr.common.SolrException

文章目录 org.apache.solr.common.SolrException: Exception writing document id MATERIAL-99598435990497269125316 to the index; possible analysis error: cannot change DocValues type from NUMERIC to SORTED_NUMERIC for field "opt_time"Exception writing…

为什么说掌握心理学知识成为产品经理一门必修课?

大家好&#xff0c;我是herosunly。985院校硕士毕业&#xff0c;现担任算法研究员一职&#xff0c;热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名&#xff0c;CCF比赛第二名&#xff0c;科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的…

uniapp地图导航

我们只需要给图标加一个点击事件 我这里的数据都是动态的&#xff0c;想测试的朋友可以写固定值 然后跳转之后首先会调到选择软件导航 点击导航之后会显示使用哪个app 最后我们选择之后将会直接跳转到app进行导航

【启明智显彩屏应用】Model3A 7寸触摸彩屏的充电桩应用方案

一、充电桩概述 &#xff08;一&#xff09;充电桩诞生背景 随着社会的进步和人们生活质量的提升&#xff0c;汽车已逐渐融入每个家庭的日常生活中。然而&#xff0c;汽车数量的激增也带来了严重的环境污染问题&#xff0c;特别是尾气排放。为了应对这一挑战&#xff0c;新能源…

Modbus协议转Profinet协议网关与气体监测系统配置案例

一、背景&#xff1b;Modbus协议和Profinet协议作为工业领域常见的两种通讯协议&#xff0c;各自具有一定的特点和应用范围。Modbus转Profinet网关&#xff08;XD-MDPN100/300&#xff09;在工业自动化控制系统中&#xff0c;可以将Modbus协议转换为Profinet协议&#xff0c;以…

认识线性调频信号(LFM)和脉冲压缩

目录 1. 线性调频&#xff08;LFM&#xff09;信号&#xff1a;2.Matlab仿真3.脉冲压缩 微信公众号获取更多FPGA相关源码&#xff1a; 1. 线性调频&#xff08;LFM&#xff09;信号&#xff1a; 在时域中&#xff0c;一个理想的线性调频信号或脉冲持续时间为T秒&#xff0c;…

加码多肤色影像技术 这是传音找到的“出海利器“?

全球化时代&#xff0c;市场竞争愈演愈烈&#xff0c;产品差异化已然成为了企业脱颖而出的关键。在黄、白肤色长期占据人像摄影主赛道的背景下&#xff0c;传音就凭借独一无二的多肤色影像技术走出非洲&#xff0c;走向了更广阔的新兴市场。 聚焦深肤色人群拍照痛点&#xff0c…