FlinkSQL中F【FULL OUTER JOIN】使用实例分析(坑)

Flink版本:flink1.14
最近有【FULL OUTER JOIN】场景的实时数据开发需求,想要的结果是,左右表来了数据都下发数据;左表存在的数据,右表进来可以关联下发(同样,右表存在的数据,左表进来也可以关联下发)。但在实际应用中遇到一些问题。

FlinkSQL demo


CREATE TABLE waybill_extend_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE package_state_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't2',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE es_dim(
    id                STRING,
    ts                STRING,
    waybill_code      STRING,
    pin               STRING,
    operater_ts       STRING,
    operater_type     STRING,
    is_enable         STRING,
    batch_no          STRING,
    activity_key      STRING,
    p_type            STRING,
    p_name            STRING,
    version            STRING,
    update_time            STRING
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_dim',
    'document-type' = 'es_dim',
    'hosts' = 'http://xxx:9200',
    'format' = 'json'
);

CREATE TABLE es_sink(
     waybill_code      STRING
    ,first_order       STRING -- 新客1,非新客0
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE TABLE kafka_sink (
     waybill_code      STRING
    ,first_order       STRING 
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED --注意 确保在 DDL 中定义主键。
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 't3',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

--新客
CREATE view  waybill_extend_temp as
select
    IF(cur['waybill_code'] IS NOT NULL , cur['waybill_code'], src ['waybill_code'])  AS waybill_code,
    IF(cur['data_key'] IS NOT NULL , cur['data_key'], src ['data_key'])  AS data_key,
    IF(cur['create_time'] IS NOT NULL , cur['create_time'], src ['create_time'])  AS create_time,
    opt,
    _proc
FROM waybill_extend_kafka
where UPPER(opt) = 'DELETE' OR UPPER(opt) = 'INSERT';

CREATE view  waybill_extend_temp_handle as
SELECT
    waybill_code,
    case when UPPER(opt) = 'INSERT'  then '1'
         when UPPER(opt) = 'DELETE'  then '0'
            end as first_order,
    create_time,
    _proc
from waybill_extend_temp
where data_key = 'firstOrder';

--妥投
CREATE view package_state_temp as
select
    IF(cur['WAYBILL_CODE'] IS NOT NULL , cur['WAYBILL_CODE'], src ['WAYBILL_CODE'])  AS waybill_code,
    IF(cur['STATE'] IS NOT NULL , cur['STATE'], src ['STATE'])  AS state,
    IF(cur['CREATE_TIME'] IS NOT NULL , cur['CREATE_TIME'], src ['CREATE_TIME'])  AS create_time,
    opt,
    _proc
FROM package_state_kafka
where UPPER(opt) = 'INSERT';

CREATE view package_state_temp_handle as
SELECT
    waybill_code,
    max(state) as state,
    min(create_time) as package_update_time,
    proctime() as _proc
from package_state_temp
where state = '150'
group by waybill_code;

--full join
-- flink1.14 注意:flinksql里面的FULL OUTER JOIN 只是分别下发左右数据,中间状态不关联下发,在流处理场景下相当于union all
CREATE view waybill_extend_package_state  as
SELECT
    COALESCE(a.waybill_code, b.waybill_code) as waybill_code,
    a.first_order,
    a.create_time as extend_update_time,
    b.state,
    b.package_update_time,
    COALESCE(a._proc, b._proc) as _proc
from waybill_extend_temp_handle as a
FULL OUTER JOIN package_state_temp_handle b
on a.waybill_code=b.waybill_code;

--result
CREATE VIEW res_view AS
SELECT
     a.waybill_code
    ,a.first_order
    ,a.extend_update_time
    ,a.state
    ,a.package_update_time
    ,b.pin
    ,b.operater_ts
    ,b.operater_type
    ,b.is_enable
    ,b.batch_no
    ,CAST(CAST(a._proc AS TIMESTAMP(3)) AS STRING) as update_time
    ,row_number() over(partition by a.waybill_code order by b.operater_ts desc) as rn 
from waybill_extend_package_state as a
JOIN es_dim FOR SYSTEM_TIME AS OF a._proc as b
on a.waybill_code=b.waybill_code;

INSERT INTO es_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

INSERT INTO kafka_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts 
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

es_sink mapping:

POST es_sink/es_sink/_mapping
{
    "es_sink": {
        "properties": {
            "waybill_code": {
                "type": "keyword"
            },
            "pin": {
                "type": "keyword"
            },
            "operater_ts": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "operater_type": {
                "type": "keyword"
            },
            "is_enable": {
                "type": "keyword"
            },
            "batch_no": {
                "type": "keyword"
            },
            "first_order": {
                "type": "keyword"
            },
            "extend_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "state": {
                "type": "keyword"
            },
            "package_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

结果分析

从sink_es和sink_kafka获取数据都是同样的结果,部分结果如下:
在这里插入图片描述
但从结果中可以看出,FlinkSQL里面的【FULL OUTER JOIN】 只是分别下发左右数据,中间状态(从FlinkUI中可以看到【FULL OUTER JOIN】状态也做了保存)不关联下发,在流处理场景下相当于【UNION ALL】,不知是否是FlinkSQL的bug。
【FULL OUTER JOIN】状态数据,如下:
在这里插入图片描述
此次用例分析只是针对于Flink1.14,对于其他版本尚不清楚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/291443.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

服务器为什么大多用 Linux?

服务器为什么大多用 Linux&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「Linux的资料从专业入门到高级教程工具包」&#xff0c;点个关注&#xff0c;全部无偿共享给大家&#xff01;&#…

odoo17 | 计算字段和更改事件

前言 模型之间的关系是任何Odoo模块的关键组成部分。它们是任何业务案例建模所必需的。然而&#xff0c;我们可能希望给定模型中的字段之间存在链接。有时一个字段的值是由其他字段的值决定的&#xff0c;而有时我们希望帮助用户进行数据输入。 这些案例得到了计算字段和onch…

pycharm远程开发调试(remote development)踩坑记录2

在一次我清理了服务器上一些老的pycharm版本之后 打算重新装3.2版本&#xff0c;就全部给清理了。结果坏了事了&#xff0c;新版的装不上了。 试了公司和中科院的服务器都出现这样的问题&#xff0c;100%复现。md。 一直在这一步循环&#xff1a; Downloading the IDE Backen…

航天航空线束工艺3D虚拟展馆支持多人异地参观漫游

为了满足汽车线束企业员工工作需要&#xff0c;让新老员工了解到更先进、规范的线束工艺设计技术&#xff0c;华锐视点基于VR虚拟仿真、web3d开发和图形图像技术制作了一款汽车线束工艺设计VR虚拟仿真模拟展示系统。 汽车线束工艺设计VR虚拟仿真模拟展示系统共分为pc电脑端和VR…

在Google Colab中调用Gemini的API实现智能问答

一、引言 Google终于放出大招&#xff0c;在2023年12月6日正式推出规模最大、功能最强大的人工智能模型Gemini&#xff0c;对标ChatGPT&#xff0c;甚至有要赶超ChatGPT-4.0的节奏。 相比之前的Bard&#xff0c;Gemini的文本理解能力、图片识别能力和语义抽取能力大大增强&am…

图像分割实战-系列教程1:语义分割与实例分割概述

&#x1f341;&#x1f341;&#x1f341;图像分割实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Pycharm中进行 本篇文章配套的代码资源已经上传 下篇内容&#xff1a; Unet系列算法 1、图像分割任务概述 1.1 图像分割 分割任务就是在原始图像…

php学习06-魔术常量

有九个魔术常量它们的值随着它们在代码中的位置改变而改变。例如 LINE 的值就依赖于它在脚本中所处的行来决定。这些特殊的常量不区分大小写&#xff0c;如下&#xff1a; 参考

小红书12月内容趋势分析

为洞察小红书平台的内容创作趋势及品牌营销策略&#xff0c;新红推出12月月度榜单&#xff0c;从创作者、品牌、热搜词多方面入手&#xff0c;解析月榜数据&#xff0c;为从业者提供参考。 以下为12月部分榜单解析&#xff0c;想要查看更多行业榜单&#xff0c;创作优质内容&am…

【智慧零售】东胜物联蓝牙网关硬件解决方案,促进零售门店数字化管理

依托物联网&#xff08;IoT&#xff09;、大数据、人工智能&#xff08;AI&#xff09;等快速发展&#xff0c;数字化和智能化已成为零售企业的核心竞争力。更多的企业通过引入人工智能、大数据等先进技术手段&#xff0c;提高门店运营效率和服务质量。 某连锁咖啡企业牢牢抓住…

UCharts配置个性化图表:折柱混合、条状图、渐变

UCharts配置个性化图表&#xff1a;折柱混合、条状图、渐变 折线圆滑折线柱状图饼图条状图折柱混合渐变 折线 效果图&#xff1a; 配置&#xff1a; const opts {color: ["#B7E55D", "#78DAE6", "#FF432A", "#FF9641", "#FFD…

Spring的IOC解决程序耦合

pom.xml: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 …

SpringBoot-搭建集成Mybatis的项目

本文介绍了如何在IntelliJ IDEA中使用SpringBoot和Mybatis构建Java Web应用程序。通过本文的学习&#xff0c;读者将了解如何使用IntelliJ IDEA快速搭建一个基于SpringBoot和Mybatis的Java Web应用程序&#xff0c;提高开发效率。IntelliJ IDEA是一款功能强大的Java集成开发环境…

acwing 1358. 约数个数和(莫比乌斯函数)

设 d(x)&#xfffd;(&#xfffd;) 为 x&#xfffd; 的约数个数&#xff0c;给定 N,M&#xfffd;,&#xfffd;&#xff0c;求 ∑i1N∑j1Md(ij)∑&#xfffd;1&#xfffd;∑&#xfffd;1&#xfffd;&#xfffd;(&#xfffd;&#xfffd;) 输入格式 输入多组测试数据…

Spark内核解析-内存管理7(六)

1、Spark内存管理 Spark 作为一个基于内存的分布式计算引擎&#xff0c;其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理&#xff0c;有助于更好地开发 Spark 应用程序和进行性能调优。本文中阐述的原理基于 Spark 2.1 版本。 在执行 Spark 的…

VMware17 下载安装教程

VMware17 下载安装ubuntu22.04虚拟机安装 一、VM安装 1.打开官方下载地址&#xff1a;https://www.vmware.com/products/workstation-pro/workstation-pro-evaluation.html 跳转页面后&#xff0c;点击左边的Download oad now&#xff0c;下载的就是最新版的 Workstation 17 …

stm32实战之su-03t语音模块固件的制作与烧录

目录 su-03t简介 管脚定义 ​​智能公元语音固件制作​​ 账号注册 创建产品 产品配置 唤醒词自定义 命令词自定义 发音人配置 其他配置 生成和下载语音固件 固件烧录 下载SDK固件烧录工具 SU-03T驱动分享 su-03t简介 SU-03T 是一款低成本、低功耗、小体积的离线…

平衡二叉树,力扣

目录 前序遍历与后续遍历 题目地址&#xff1a; 题目&#xff1a; 我们直接看题解吧&#xff1a; 审题目事例提示&#xff1a; 解题方法&#xff1a; 难度分析&#xff1a; 解题方法分析&#xff1a; 解题分析&#xff1a; 解题思路&#xff1a; 代码实现&#xff1a; 补充说明…

idea 社区版 Database Navigator插件 列显示顺序错乱解决办法

idea 社区版 Database Navigator插件 列显示顺序错乱 影响&#xff1a;MyBatisCodeHelperPro插件生成代码字段顺序错乱 解决办法&#xff1a;将COLUMN 的排序方式由Name改为Position方式之后&#xff0c;reload即可&#xff01;

Spring Security 6.x 系列(14)—— 会话管理之源码分析

一、前言 在上篇 Spring Security 6.x 系列(13)—— 会话管理之会话概念及常用配置 Spring Security 6.x 系列(14)—— 会话管理之会话固定攻击防护及Session共享 中了清晰了协议和会话的概念、对 Spring Security 中的常用会话配置进行了说明,并了解会话固定攻击防护…

SCT52240Q双路 4A/4A 高速MOSFET/IGBT栅极驱动器, 可并联输出,替代UCC27524

• 4.5-24V宽供电电压 • 4A 峰值驱动拉电流和灌电流 • 双通道并联输出&#xff0c;增强驱动能力 • 低至-5V负压输入 • 支持TTL低压逻辑输入 • 13ns传输延迟 • 快速上升下降时间&#xff08;典型值8ns&#xff09; • 双通道1ns典型值延迟匹配时间 • 55uA静态功耗 • 输入…