FlinkSQL中【FULL OUTER JOIN】使用实例分析(坑)

Flink版本:flink1.14
最近有【FULL OUTER JOIN】场景的实时数据开发需求,想要的结果是,左右表来了数据都下发数据;左表存在的数据,右表进来可以关联下发(同样,右表存在的数据,左表进来也可以关联下发)。但在实际应用中遇到一些问题。
其中包括FlinkSQL知识点:

  • FlinkSQL 【FULL OUTER JOIN】
  • FlinkSQL 【Temporal Joins-Lookup Join】
  • FlinkSQL 【去重】
  • FlinkSQL 【upsert-kafka】

FlinkSQL demo


CREATE TABLE waybill_extend_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE package_state_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't2',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE es_dim(
    id                STRING,
    ts                STRING,
    waybill_code      STRING,
    pin               STRING,
    operater_ts       STRING,
    operater_type     STRING,
    is_enable         STRING,
    batch_no          STRING,
    activity_key      STRING,
    p_type            STRING,
    p_name            STRING,
    version            STRING,
    update_time            STRING
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_dim',
    'document-type' = 'es_dim',
    'hosts' = 'http://xxx:9200',
    'format' = 'json'
);

CREATE TABLE es_sink(
     waybill_code      STRING
    ,first_order       STRING -- 新客1,非新客0
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE TABLE kafka_sink (
     waybill_code      STRING
    ,first_order       STRING 
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED --注意 确保在 DDL 中定义主键。
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 't3',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

--新客
CREATE view  waybill_extend_temp as
select
    IF(cur['waybill_code'] IS NOT NULL , cur['waybill_code'], src ['waybill_code'])  AS waybill_code,
    IF(cur['data_key'] IS NOT NULL , cur['data_key'], src ['data_key'])  AS data_key,
    IF(cur['create_time'] IS NOT NULL , cur['create_time'], src ['create_time'])  AS create_time,
    opt,
    _proc
FROM waybill_extend_kafka
where UPPER(opt) = 'DELETE' OR UPPER(opt) = 'INSERT';

CREATE view  waybill_extend_temp_handle as
SELECT
    waybill_code,
    case when UPPER(opt) = 'INSERT'  then '1'
         when UPPER(opt) = 'DELETE'  then '0'
            end as first_order,
    create_time,
    _proc
from waybill_extend_temp
where data_key = 'firstOrder';

--妥投
CREATE view package_state_temp as
select
    IF(cur['WAYBILL_CODE'] IS NOT NULL , cur['WAYBILL_CODE'], src ['WAYBILL_CODE'])  AS waybill_code,
    IF(cur['STATE'] IS NOT NULL , cur['STATE'], src ['STATE'])  AS state,
    IF(cur['CREATE_TIME'] IS NOT NULL , cur['CREATE_TIME'], src ['CREATE_TIME'])  AS create_time,
    opt,
    _proc
FROM package_state_kafka
where UPPER(opt) = 'INSERT';

CREATE view package_state_temp_handle as
SELECT
    waybill_code,
    max(state) as state,
    min(create_time) as package_update_time,
    proctime() as _proc
from package_state_temp
where state = '150'
group by waybill_code;

--full join
-- flink1.14 注意:flinksql里面的FULL OUTER JOIN 只是分别下发左右数据,中间状态不关联下发,在流处理场景下相当于union all
CREATE view waybill_extend_package_state  as
SELECT
    COALESCE(a.waybill_code, b.waybill_code) as waybill_code,
    a.first_order,
    a.create_time as extend_update_time,
    b.state,
    b.package_update_time,
    COALESCE(a._proc, b._proc) as _proc
from waybill_extend_temp_handle as a
FULL OUTER JOIN package_state_temp_handle b
on a.waybill_code=b.waybill_code;

--result
CREATE VIEW res_view AS
SELECT
     a.waybill_code
    ,a.first_order
    ,a.extend_update_time
    ,a.state
    ,a.package_update_time
    ,b.pin
    ,b.operater_ts
    ,b.operater_type
    ,b.is_enable
    ,b.batch_no
    ,CAST(CAST(a._proc AS TIMESTAMP(3)) AS STRING) as update_time
    ,row_number() over(partition by a.waybill_code order by b.operater_ts desc) as rn 
from waybill_extend_package_state as a
JOIN es_dim FOR SYSTEM_TIME AS OF a._proc as b
on a.waybill_code=b.waybill_code;

INSERT INTO es_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

INSERT INTO kafka_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts 
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

es_sink mapping:

POST es_sink/es_sink/_mapping
{
    "es_sink": {
        "properties": {
            "waybill_code": {
                "type": "keyword"
            },
            "pin": {
                "type": "keyword"
            },
            "operater_ts": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "operater_type": {
                "type": "keyword"
            },
            "is_enable": {
                "type": "keyword"
            },
            "batch_no": {
                "type": "keyword"
            },
            "first_order": {
                "type": "keyword"
            },
            "extend_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "state": {
                "type": "keyword"
            },
            "package_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

结果分析

从sink_es和sink_kafka获取数据都是同样的结果,部分结果如下:
在这里插入图片描述
但从结果中可以看出,FlinkSQL里面的【FULL OUTER JOIN】 只是分别下发左右数据,中间状态(从FlinkUI中可以看到【FULL OUTER JOIN】状态也做了保存)不关联下发,在流处理场景下相当于【UNION ALL】,不知是否是FlinkSQL的bug。
【FULL OUTER JOIN】状态数据,如下:
在这里插入图片描述
此次用例分析只是针对于Flink1.14,对于其他版本尚不清楚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/300295.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计------SSM二手交易网站

项目介绍 该项目分为前后台&#xff0c;前台普通用户角色&#xff0c;后台管理员角色。 管理员主要功能如下&#xff1a; 登陆,商品分类管理,商品管理,商品订单管理,用户管理等功能。 用户角色主要功能如下&#xff1a; 包含以下功能&#xff1a;查看所有商品,用户登陆注册…

2023.12.30周报

目录 摘要 ABSTRACT 一、文献阅读 1、题目 2、摘要 3、创新点 4、文章解读 1、Introduction 2、时间序列的季节趋势表征 3、 季节趋势对比学习框架 4、实验 5、结论 二、ARIMA 一、ARIMA模型的基本思想 二、ARIMA模型的数学表达式 三、差分过程 1、什么是差分…

循环队列的队空队满情况

有题目&#xff1a; 循环队列放在一维数组A[0....M-1]中&#xff0c;end1指向队头元素&#xff0c;end2指向队尾元素的后一个位置。假设队列两端均可进行入队和出队操作&#xff0c;队列中最多能容纳M-1个元素。初始时为空。下列判断队空和队满的条件中&#xff0c;正确的是 …

计算机毕业设计 基于Javaweb的城乡居民基本医疗信息管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

逆置算法和数组循环移动算法

元素逆置 概述&#xff1a;其实就是将 第一个元素和最后一个元素交换&#xff0c;第二个元素和倒数第二个元素交换&#xff0c;依次到中间位置。用途&#xff1a;可用于数组的移动&#xff0c;字符串反转&#xff0c;链表反转操作&#xff0c;栈和队列反转等操作。 逆置图解 …

Cadence Editor 关于画PCB相关内容

目录 一 新建PCB文件 二 指定封装库 三 导入网表 四 放置器件 五 绘制板框 六 精准定位 七 原理图与PCB的交互 八 飞线设置 九 层管理 布局布线阶段需要显示的层 十 器件位置相关 1 器件选取的基准点 2 旋转 3 对齐 4 把器件移动到底层或顶层 5 锁定与解锁 6…

【MySQL】事务管理

文章目录 什么是事务为什么会出现事务事务的版本支持事务的提交方式事务的相关演示事务的隔离级别查看与设置隔离级别读未提交&#xff08;Read Uncommitted&#xff09;读提交&#xff08;Read Committed&#xff09;可重复读&#xff08;Repeatable Read&#xff09;串行化&a…

IDEAVsCode常用插件

IDEA&VsCode常用插件 IDEA lombok、mybatisx 插件 Vscode Vetur —— 语法高亮、智能感知、Emmet 等&#xff0c;包含格式化功能&#xff0c; AltShiftF &#xff08;格式化全文&#xff09;&#xff0c;CtrlK CtrlF&#xff08;格式化选中代码&#xff0c;两个 Ctrl需…

区间预测 | Matlab实现CNN-LSTM-KDE的卷积长短期神经网络结合核密度估计多变量时序区间预测

区间预测 | Matlab实现CNN-LSTM-KDE的卷积长短期神经网络结合核密度估计多变量时序区间预测 目录 区间预测 | Matlab实现CNN-LSTM-KDE的卷积长短期神经网络结合核密度估计多变量时序区间预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.CNN-LSTM-KDE多变量时间序列区…

Ubuntu无网络解决办法

1.进入root并输入密码 sudo su 2.更新NetworkManager的配置 用vim打开NetworkManager.conf vim /etc/NetworkManager/NetworkManager.conf 将第五行 managedFalse 改为 managedTrue 。 如果本身就是True就不用改了。 3.删除NetworkManager配置 service NetworkManager st…

el-date-picker日期时间选择器限制可选的日期范围

业务场景&#xff1a;需要限制日期时间选择器可选择的日期&#xff0c;有两种模式&#xff0c; 一种是已知范围&#xff0c;只能选已知范围内的日期&#xff0c; 另一种是知道最近天数&#xff0c;只能选今天往前的天数内的日期&#xff0c;超出不能选。 <el-date-picker v-…

Redis反序列化的一次问题

redis反序列化的一次问题 1. 问题描述 springbootredis不少用&#xff0c;但是一直没遇到什么问题&#xff0c;直接代码拷贝上去就用了。这次结合spring-security&#xff0c;将自定义的spring-security的UserDetails接口的实现类SecurityUser&#xff0c;反序列化取出时报错…

【解决】hosts文件无修改权限问题

1. 以管理员身份运行命令提示符&#xff08;cmd&#xff09;&#xff1a; 2. 在cmd中输入notepad进入记事本&#xff1a; 3. 通过记事本打开hosts文件&#xff1a; 4. 修改并保存&#xff1a;

系列六、MindManager取消首字母自动大写

一、MindManager取消首字母自动大写 1.1、步骤 主页>字体>设置字体样式>格式字体>文本和大写>文本大写>无 1.2、参考 https://tieba.baidu.com/p/3752136361

UI动效设计师通往高薪之路,AE设计从基础到进阶教学

一、教程描述 UI动效设计&#xff0c;顾名思义即动态效果的设计&#xff0c;用户界面上所有运动的效果&#xff0c;也可以视其为界面设计与动态设计的交集&#xff0c;或者可以简单理解为UI设计中的动画效果&#xff0c;是UI设计中不可或缺的组成部分。现在UI设计的要求越来越…

如何写html邮件 —— 参考主流outook、gmail、qq邮箱渲染邮件过程

文章目录 ⭐前言⭐outlook渲染邮件⭐gmail邮箱渲染邮件⭐qq邮箱渲染邮件 ⭐编写html邮件&#x1f496;table表格的属性&#x1f496;文本&#x1f496;图片&#x1f496;按钮&#x1f496;背景图片 ⭐总结⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于 …

PEFT: 在低资源硬件上对十亿规模模型进行参数高效微调

1 引言 最近&#xff0c;深度学习的研究中出现了许多大型预训练模型&#xff0c;例如 GPT-3、BERT 等&#xff0c;这些模型可以在多种自然语言处理任务中取得优异的性能表现。而其中&#xff0c;ChatGPT 模型因为在对话生成方面的表现而备受瞩目&#xff0c;成为了自然语言处理…

链表

目录 单链表 双链表 单链表 题目如下&#xff1a;模拟一个单链表&#xff0c;实现插入删除操作 解题代码 #include <iostream>using namespace std;const int N 100010;// head 表示头结点的下标 // e[i] 表示节点i的值 // ne[i] 表示节点i的next指针是多少 // idx …

vmlinux, vmlinux.bin, bzImage; cmake的find_package(Clang)新增了哪些变量( 比较两次记录的所有变量差异)

vmlinux, vmlinux.bin, bzImage cd /bal/linux-stable/ file vmlinux #vmlinux: ELF 32-bit LSB executable, Intel 80386, version 1 (SYSV), statically linked, BuildID[sha1]=b99bbd9dda1ec2751da246d4a7ae4e6fcf7d789b, not stripped #文件大小 20MB, 19940148Bfile ar…

小程序组件内的数据监听器

数据监听器可以用于监听和响应任何属性和数据字段的变化。从小程序基础库版本 2.6.1 开始支持。 有时&#xff0c;在一些数据字段被 setData 设置时&#xff0c;需要执行一些操作。例如&#xff0c; 一个值取决于另外两个值的变化&#xff0c;this.data.sum 永远是 this.data.…