flink left join消费kafka数据

left join会产生回车流数据

在控制台数据


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: YSKSolution
 * @Date: 2022/11/8/19:16
 * @Package_name: PACKAGE_NAME
 */
public class LeftJoin extends BaseSQLAPP {

    public static void main(String[] args) {
        new LeftJoin().init(
                2003,
                    2,
                "BaseSQLAPP"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {

        //join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));
        tEnv.executeSql("create table t1 (" +
                " id int, "+
                " name string "+
                ")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv")
        );

        tEnv.executeSql("create table t2 (" +
                " id int, "+
                " age int "+
                ")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv")
        );

        Table table = tEnv.sqlQuery(" select " +
                "t1.id," +
                "t1.name," +
                "t2.age" +
                " from t1 " +
                " left join t2 " +
                " on t1.id = t2.id "
        );

//        tEnv.createTemporaryView("result",table);


        table.execute().print();
    }
}

先输入t1数据
在这里插入图片描述
控制台数据 ,左表数据输出,右表数据为null
在这里插入图片描述
再输入右表数据
在这里插入图片描述
控制台产生两条数据,一条是回撤流,一条是join得到的数据
在这里插入图片描述
2.写入upsertkakfa消费


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: YSKSolution
 * @Date: 2022/11/8/19:16
 * @Package_name: PACKAGE_NAME
 */
public class LeftJoin extends BaseSQLAPP {

    public static void main(String[] args) {
        new LeftJoin().init(
                2003,
                    2,
                "BaseSQLAPP"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {

        //join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));
        tEnv.executeSql("create table t1 (" +
                " id int, "+
                " name string "+
                ")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv")
        );

        tEnv.executeSql("create table t2 (" +
                " id int, "+
                " age int "+
                ")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv")
        );

        Table table = tEnv.sqlQuery(" select " +
                "t1.id," +
                "t1.name," +
                "t2.age" +
                " from t1 " +
                " left join t2 " +
                " on t1.id = t2.id "
        );

//        tEnv.createTemporaryView("result",table);

        tEnv.executeSql("create table t3(" +
                "id int," +
                "name string," +
                "age int," +
                "primary key (id) not enforced"+
                ")"
                +SQLUtil.getUpsertKafkaDDL("t3","json"));

        table.executeInsert("t3");

    }
}

先写左表,消费到的数据如下,右表数据为null
在这里插入图片描述
再写右表,产生两条数据,第一条是null,表示删除上面那条数据,第二条是left join得到的结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/667022.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu20.04安装VINS_Mono 和 VINS_Fusion

文章目录 一、问题描述二、依赖环境1. Eigen 安装2. glog 安装3. gflags 安装4. ceres 安装 三、VINS-Mono 安装1. git 下载并安装2. OpenCV 版本冲突3. 运行 四、VINS—Fusion 安装1. git 下载并安装2. OpenCV 版本冲突3. 运行 五、日常bug1. 动静态库链接冲突 一、问题描述 …

无缝接入GPT-4o:智创聚合API平台的创新与实践

在2024年5月13日,美国开放人工智能研究中心(OpenAI)发布了最新版本的ChatGPT——GPT-4o。这一更新标志着人工智能领域的又一重大进步,引起了全球科技界的广泛关注。GPT-4o的“o”代表“omni”(全能)&#x…

简要分析学习spring内存马,劫持马

简要分析学习spring内存马,劫持马 本文主要是通过SpringMemShell这个工程,来对spring内存马进行演示,利用。 写在前面: 参考的是大佬给的流程以及思路,其中的解释与分析非常详细 ----->>大佬的链接 这里的内存马文件取自gi…

大坝安全监测自动化技术的规范化设计准则

大坝安全监测自动化技术的规范化设计准则 一、施工阶段自动化系统设计要点 在施工阶段,大坝安全监测自动化系统的设计应当涵盖以下几个核心内容: 监测仪器的布局规划及详细的施工图纸设计。 配套土建项目以及防雷设施的施工设计规划。 明确施工过程中的技…

基于Chisel语言的FPGA流水灯程序

目录 一、 内容概要二、 Chisel介绍三、 Chisel的使用四、 流水灯实现五、 心得体会六、 参考链接 一、 内容概要 Chisel介绍Chisel使用流程Chisel流水灯实操 二、 Chisel介绍 Chisel 是一种构建硬件描述语言(HDL)的高级编程语言,它允许硬…

Python学习需要哪些知识基础?

基础知识是非常重要的,这些内容确实是Python学习的基础。我这里有一套编程入门教程,不仅包含了详细的视频讲解,项目实战。如果你渴望学习编程,不妨点个关注,给个评论222,私信22,我在后台发给你。…

Pytorch反向传播算法(Back Propagation)

一:revise 我们在最开始提出一个线性模型。 x为我们的输入,w为权重。相乘的结果是我们对y的预测值。 那我们在训练时就是对这个权重w进行更新,就需要用到上一章提到的梯度下降算法,不断更新w。但是此时注意不是用y的预测值对w进…

前端Vue自定义支付密码输入框键盘与设置弹框组件的设计与实现

摘要 随着信息技术的不断发展,前端开发的复杂性日益加剧。传统的开发方式,即将整个系统构建为一个庞大的整体应用,往往会导致开发效率低下和维护成本高昂。任何微小的改动或新功能的增加都可能引发对整个应用逻辑的广泛影响,这种…

Mybatis-plus 更新或新增时设置某些字段值为空

方式一 在实体中设置某个字段为的注解中 TableField(updateStrategy FieldStrategy.IGNORED)private Date xxxxxxTime;通过这种方式会指定更新时该字段的策略,通常情况下updateById这种会根据字段更新,通常都会判断null 以及空值 指定 updateStrategy …

学习Java的日子 Day51 数据库,DDL

Day51 MySQL 1.数据库 数据库(database)就是一个存储数据的仓库。为了方便数据的存储和管理,它将数据按照特定的规律存储在磁盘上。通过数据库管理系统,可以有效地组织和管理存储在数据库中的数据 MySQL就是数据库管理系统&#…

[ubuntu18.04]搭建mptcp测试环境说明

MPTCP介绍 Multipath TCP — Multipath TCP -- documentation 2022 documentation 安装ubuntu18.04,可以使用虚拟机安装 点击安装VMware Tool 桌面会出现如下图标 双击打开VMware Tools,复制如下图所示的文件到Home目录 打开终端,切换到管…

安卓启动 性能提升 20-30% ,基准配置 入门教程

1.先从官方下载demohttps://github.com/android/codelab-android-performance/archive/refs/heads/main.zip 2.先用Android studio打开里面的baseline-profiles项目 3.运行一遍app,这里建议用模拟器,(Pixel 6 API 34)设备运行&a…

[Algorithm][动态规划][子序列问题][最长递增子序列的个数][最长数对链]详细讲解

目录 1.最长递增子序列的个数1.题目链接2.算法原理详解3.代码实现 2.最长数对链1.题目链接2.算法原理详解3.代码实现 1.最长递增子序列的个数 1.题目链接 最长递增子序列的个数 2.算法原理详解 注意:本题思路和思维方式及用到的方法很值得考究,个人感…

GPT4o还没用上?落后一个月!

文章目录 一.Share官方网站:以一半的价格享受官网服务1.1 网址1.2 一些介绍和教学实战:1.3 主界面(支持4o):1.4 GPTS(上千个工具箱任你选择):1.5 快速的文件数据分析(以数学建模为例…

CPU/GPU/FPSGO,负载调试/设置命令开关

CPU/GPU/FPSGO,负载调试/设置命令开关 首先,进入: adb shell cat sys/kernel/ged/hal/gpu_utilization 查看GPU的负载情况。输出三个数字,第1个表示使用率,第3个表示空闲率。 echo 0 /sys/kernel/fpsgo/common/force…

Tableau创建数据提取

Tableau创建数据提取通过与原始数据集分离可有效减少总体数据量。以下通过示例-超市数据进行演示: 需求:提取华北及东北地区家具销售利润低于5000的数据 1) 连接到数据并在“数据源”页面上设置数据源后,请在右上角选择“数据提…

Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明

Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明 目录 Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明 一、简单介绍 二、处理文本数据 三、用…

Java中的软引用,你了解吗?

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一…

关系数据库:关系运算

文章目录 关系运算并(Union)差(Difference)交(Intersection)笛卡尔积(Extended Cartesian Product)投影(projection)选择(Selection)除…

翻译《The Old New Thing》- What a drag: Dragging a virtual file (IStream edition)

What a drag: Dragging a virtual file (IStream edition) - The Old New Thing (microsoft.com)https://devblogs.microsoft.com/oldnewthing/20080319-00/?p23073 Raymond Chen 2008年03月19日 拖拽虚拟文件(IStream 版本) 上一次,我们看…