【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL

引入依赖

jdbc依赖

flink-connector-jdbc + mysql-jdbc-driver 操作mysql数据库

        <!-- Flink-Connector-Jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        </dependency>

        <!-- mysql jdbc driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

Table/SQL Api依赖

  1. Table/SQL Api 扩展依赖
  2. Table/SQL Api 基础依赖
  3. Table/SQL Api 和 DataStream Api 交互的依赖 bridge
  4. Flink Planner 依赖
        <!-- Table/SQL Api 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
        </dependency>
        <!-- Table/SQL Api 扩展依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
        </dependency>
        <!-- bridge桥接器,主要负责Table API和 DataStream API的连接支持 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        </dependency>
        <!-- Flink Planner 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        </dependency>

对应版本在这 (项目Flink版本为1.14.5

image-20231210161727111

Flink读写MySQL工具类

Table Api 环境加载

Table API和SQL Api都是基于Table接口

Table Api上下文环境有3种类型

  1. TableEnvironment:只支持Batch作业
  2. BatchTableEnvironment:只支持Batch作业
  3. StreamTableEnvironment: 支持流计算【用这个】

Planner(查询处理器)

Planner(查询处理器):解析sql、优化sql和执行sql

Flink Planner的类型:

  1. Flink Planner (Old Planner)
  2. Blink Planner (Flink 1.14之前需要手动导入依赖)

Blink Planner从Flink 1.11版本开始为Flink-table的默认查询处理器

Blink Planner使得Table Api & SQL 层实现了流批统一

Catalog对象

Catalog对象是提供了元数据信息,数据源与数据表的信息则存储在Catalog中

// 创建Catalog对象
new JdbcCatalog(catalog_name, database, username, passwd, url);

Catalog对象是接口

Catalog接口的实现:(Flink 1.14版本之前)

  1. PG (PostgresSQL) Catalog
  2. HiveCatalog
  3. Mysql Catalog (Flink 1.15 才有)

DDL与数据库表结构必须一模一样,建立映射,这种方式数据库表结构如果变化,代码也必须随之变化重新打包,因此这种方式用的不多,一般catalog会用的比较多。

但由于项目Flink依赖用的是1.14.5,因此还是使用DDL语句实现。

代码实现

public class MysqlUtil {

    /**
     * 数据库连接对象
     */
    private static Connection connection = null;
    /**
     * SQL语句对象
     */
    private static PreparedStatement preparedStatement = null;
    /**
     * 结果集对象
     */
    private static ResultSet rs = null;


    /**
     * 使用 Flink Table/SQL Api 读取Mysql
     *
     * @param env:           流计算上下文环境
     * @param parameterTool: 参数工具
     * @param clazz:         流水线输出对象的类
     * @param tableName:     表名
     * @param ddlString:     DDL字符串
     * @param sql:           SQL查询语句
     * @return DataStream<T>:DataStream对象
     */
    public static <T> DataStream<T> readWithTableOrSQLApi(
            StreamExecutionEnvironment env,
            ParameterTool parameterTool,
            Class<T> clazz,
            String tableName,
            String ddlString,
            String sql

    ) throws Exception {

        // 创建TableApi运行环境
        EnvironmentSettings bsSettings =
                EnvironmentSettings.newInstance()
                        // Flink 1.14不需要再设置 Planner
                        //.useBlinkPlanner()
                        // 设置流计算模式
                        .inStreamingMode()
                        .build();

        // 创建StreamTableEnvironment实例
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        // 指定方言 (选择使用SQL语法还是HQL语法)
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        // 编写DDL ( 数据定义语言 )
        String ddl = buildMysqlDDL(parameterTool, tableName, ddlString);

        // StreamTableEnvironment注册虚拟表
        tableEnv.executeSql(ddl);
        // 查询结果是Table对象
        Table table = tableEnv.sqlQuery(sql);
        // 将Table对象转换为DataStream对象
        return tableEnv.toDataStream(table, clazz);
    }

    /**
     * 根据参数生成MySQL的DDL语句
     *
     * @param parameterTool  参数工具,用于获取MySQL连接信息
     * @param tableName      要创建的表名
     * @param ddlFieldString 表字段的DDL语句
     * @return 生成的完整的MySQL DDL语句
     */
    public static String buildMysqlDDL(
            ParameterTool parameterTool,
            String tableName,
            String ddlFieldString
    ) {

        // 从参数工具中获取mysql连接的url
        String url = parameterTool.get(ParameterConstants.Mysql_URL);
        // 从参数工具中获取mysql连接的用户名
        String username = parameterTool.get(ParameterConstants.Mysql_USERNAME);
        // 从参数工具中获取mysql连接的密码
        String passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);
        // 从参数工具中获取MySQL的驱动程序
        String driver = parameterTool.get(ParameterConstants.Mysql_DRIVER);

        // 返回完整的DDL语句
        return "CREATE TABLE IF NOT EXISTS " +
                tableName +
                " (\n" +
                ddlFieldString +
                ")" +
                " WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = '" + driver + "',\n" +
                "'url' = '" + url + "',\n" +
                "'username' = '" + username + "',\n" +
                "'password' = '" + passwd + "',\n" +
                "'table-name' = '" + tableName + "'\n" +
                ")";
    }

    /**
     * 初始化 jdbc Connection
     */
    public static Connection init(ParameterTool parameterTool) {

        String _url = parameterTool.get(ParameterConstants.Mysql_URL);
        String _username = parameterTool.get(ParameterConstants.Mysql_USERNAME);
        String _passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);

        try {
            connection = DriverManager.getConnection(_url, _username, _passwd);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return connection;
    }

    /**
     * 生成 PreparedStatement
     */
    public static PreparedStatement initPreparedStatement(String sql) {
        try {
            preparedStatement = connection.prepareStatement(sql);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return preparedStatement;
    }

    /**
     * 关闭 jdbc Connection
     */
    public static void close() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 PreparedStatement
     */
    public static void closePreparedStatement() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 ResultSet
     */
    public static void closeResultSet() {
        try {
            if (rs != null) {
                rs.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 执行 sql 语句
     */
    public static ResultSet executeQuery(PreparedStatement ps) {
        preparedStatement = ps;
        try {
            rs = preparedStatement.executeQuery();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return rs;
    }

}

测试一下

测试库中有个tb_user表

image-20231210174346826

创建与表映射的实体类

@Data
public class UserPO {
    private Long id;
    private String name;
}
class MysqlUtilTest {

    @DisplayName("测试使用 Flink Table/SQL Api 读取Mysql")
    @Test
    public void testReadWithTableOrSQLApi() throws Exception {
        // 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // 设置并行度1
        env.setParallelism(1);
        // 获取参数工具实例
        ParameterTool parameterTool = ParameterUtil.getParameters();

        /* **********************
         *
         * CREATE 语句用于向当前或指定的 Catalog 中注册表。
         * 注册后的表、视图和函数可以在 SQL 查询中使用
         *
         * *********************/
        // 表名
        String tableName = "tb_user";

        // 表字段ddl
        String ddlFieldString =
                "id BIGINT,\n" +
                        "name STRING \n";

        // 查询表的全部字段
        String sql = "SELECT * FROM " + tableName;

        DataStream<UserPO> rowDataStream =
                MysqlUtil.readWithTableOrSQLApi(
                        env,
                        parameterTool,
                        UserPO.class,
                        tableName,
                        ddlFieldString,
                        sql
                );

        rowDataStream.print("mysql");
        env.execute();
    }
}

image-20231210174720832

查询成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/235035.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React antd如何实现<Upload>组件上传附件再次上传已清除附件缓存问题

最近遇到一个React上传组件的问题&#xff0c;即上传附件成功后&#xff0c;文件展示处仍然还有之前上传附件的缓存信息&#xff0c;需要解决的问题是&#xff0c;要把上一次上传的附件缓存在上传成功或者取消后&#xff0c;可以进行清除 经过一顿试错&#xff0c;终于解决了这…

高云GW1NSR-4C开发板M3硬核应用

1.M3硬核IP下载&#xff1a;Embedded M3 Hard Core in GW1NS-4C - 科技 - 广东高云半导体科技股份有限公司 (gowinsemi.com.cn) 特别说明&#xff1a;IDE必须是1.9.9及以后版本&#xff0c;1.9.8会导致编译失败&#xff08;1.9.8下1.1.3版本IP核可用&#xff09; 以下根据官方…

【电路笔记】-压敏电阻

压敏电阻 文章目录 压敏电阻1、概述2、交流波形瞬变3、抗静电能力4、特性曲线5、压敏电阻电容值6、金属氧化物压敏电阻7、压敏电阻应用8、总结 压敏电阻是一种无源两端固态半导体器件&#xff0c;用于为电气和电子电路提供保护。 1、概述 与提供过电流保护的保险丝或断路器不同…

nginx多ip部署

1.修改网卡信息自定义多个IP 进入/etc/sysconfig/network-scripts&#xff0c;编辑ifcfg-ens33网卡文件。将dhcp动态分配修改成static&#xff0c;同时添加ip地址子网掩码、网关和DNS。 修改完成后重启网卡&#xff0c;systemctl restart network 2.修改nginx配置文件 有几个…

IT新闻资讯系统,使用mysql作为后台数据库,此系统具有显示数据库中的所有信息和删除两大功能。

表的准备&#xff1a; -- MySQL Administrator dump 1.4 -- -- ------------------------------------------------------ -- Server version 5.1.40-community /*!40101 SET OLD_CHARACTER_SET_CLIENTCHARACTER_SET_CLIENT */; /*!40101 SET OLD_CHARACTER_SET_RESULTSCHAR…

Kubernetes实战(九)-kubeadm安装k8s集群

1 环境准备 1.1 主机信息 iphostname10.220.43.203master10.220.43.204node1 1.2 系统信息 $ cat /etc/redhat-release Alibaba Cloud Linux (Aliyun Linux) release 2.1903 LTS (Hunting Beagle) 2 部署准备 master/与slave主机均需要设置。 2.1 设置主机名 # master h…

iOS按钮控件UIButton使用

1.在故事板中添加按钮控件,步聚如下: 同时按钮Shift+Commad+L在出现在控件库中选择Button并拖入View Controller Scene中 将控件与变量btnSelect关联 关联后空心变实心 如何关联?直接到属性窗口拖按钮变量到控件上,出现一条线,然后松开,这样就关联成功了 关联成功后属性窗口…

学习 NVIDIA Omniverse 的最基础概念

无用的前言 近两年关于 Omniverse 的宣传一直很多&#xff0c;可我一直没去了解&#xff0c;连它是个啥都不知道。最近正好有契机需要了解它&#xff0c;于是我今天抽时间看了些它的官方介绍&#xff0c;并按照自己的理解梳理在这里。 官方资料索引 Omniverse 官网主页&…

halcon视觉缺陷检测常用的6种方法

一、缺陷检测综述 缺陷检测是视觉需求中难度最大一类需求,主要是其稳定性和精度的保证。首先常见缺陷:凹凸、污点瑕疵、划痕、裂缝、探伤等。常用的手法有六大金刚(在halcon中的ocv和印刷检测是针对印刷行业的检测,有对应算子封装): 1.blob+特征 2.blob+差分+特征 3.光度…

UE小:物品拼装功能

蓝图B1的实现步骤&#xff1a; 获取玩家控制器和视角&#xff1a;首先获取玩家控制器&#xff0c;然后使用Deproject Screen to World节点将屏幕上的鼠标位置转换为世界空间中的一条射线。 射线检测&#xff1a;使用Line Trace by Channel或Line Trace for Objects节点发射射线…

WLAN配置实验

本文记录了WLAN配置实践的过程&#xff0c;该操作在华为HCIA中属于相对较复杂的实验&#xff0c;记录过程备忘。这里不就WLAN原理解释&#xff0c;仅进行配置实践&#xff0c;可以作为学习原理时候的参考。本文使用华为ENSP进行仿真。实验拓扑图如下&#xff1a; 1.WLAN工作流程…

作业调度算法(含详细计算过程)和进程调度算法浅析

一.作业调度 作业调度算法需要知道以下公式 周转时间完成时间 - 到达时间 带权周转时间周转时间/运行时间 注&#xff1a;带权周转时间越大&#xff0c;作业&#xff08;或进程&#xff09;越短&#xff1b;带权周转时间越小&#xff0c;作业&#xff08;或进程&#xff09;越…

docker-centos中基于keepalived+niginx模拟主从热备完整过程

文章目录 一、环境准备二、主机1、环境搭建1.1 镜像拉取1.2 创建网桥1.3 启动容器1.4 配置镜像源1.5 下载工具包1.6 下载keepalived1.7 下载nginx 2、配置2.1 配置keepalived2.2 配置nginx2.2.1 查看nginx.conf2.2.2 修改index.html 3、启动3.1 启动nginx3.2 启动keepalived 4、…

7-8 报销

年底&#xff0c;报销都挤在一堆&#xff0c;财务忙得不可开交。每个报销表包括姓名&#xff0c;各项费用的金额。对于每个报销单&#xff0c;这里规定按如下要求处理&#xff1a; 金额高的优先处理&#xff1b;若金额相等时&#xff0c;则姓名字典序小的优先处理&#xff1b;…

call,apply,bind

1.这三个方法都能改变this的指向 2.代码实战 let obj1 {name: "小红",age: 20,fn: function () {console.log(当前this的指向,this);console.log(我叫${this.name},今年${this.age}岁);},};obj1.fn(); 这里的代码,obj1是一个对象,里面有属性name和age 正常情况下我…

深入理解网络中断:原理与应用

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任 Java 开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&…

大数据技术8:StarRocks极速全场景MPP数据库

前言&#xff1a;StarRocks原名DorisDB&#xff0c;是新一代极速全场景MPP数据库。StarRocks 是 Apache Doris 的 Fork 版本。StarRocks 连接的多种源。一是通过这个 CDC 或者说通过这个 ETL 的方式去灌到这个 StarRocks 里面&#xff1b;二是还可以去直接的和这些老的 kafka 或…

C++ 模拟实现vector

目录 一、定义 二、模拟实现 1、无参初始化 2、size&capacity 3、reserve 4、push_back 5、迭代器 6、empty 7、pop_back 8、operator[ ] 9、resize 10、insert 迭代器失效问题 11、erase 12、带参初始化 13、迭代器初始化 14、析构函数 完整版代码 一、…

MyBatis 四大核心组件之 Executor 源码解析

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

【IDEA】IntelliJ IDEA中进行Git版本控制

本篇文章主要记录一下自己在IntelliJ IDEA上使用git的操作&#xff0c;一个新项目如何使用git进行版本控制。文章使用的IDEA版本 IntelliJ IDEA Community Edition 2023.3&#xff0c;远程仓库为https://gitee.com/ 1.配置Git&#xff08;File>Settings&#xff09; 2.去Git…