最新版Flink CDC MySQL同步MySQL(一)

1.概述

Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
在这里插入图片描述

2.支持的连接器

连接器数据库驱动
mongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4
mysql-cdcMySQL: 5.6, 5.7, 8.0.x、RDS MySQL: 5.6, 5.7, 8.0.x、PolarDB MySQL: 5.6, 5.7, 8.0.x、Aurora MySQL: 5.6, 5.7, 8.0.x、MariaDB: 10.x、PolarDB X: 2.0.1JDBC Driver: 8.0.28
oceanbase-cdcOceanBase CE: 3.1.x, 4.x、OceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.x
oracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0
postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1
sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8
tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27
db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0
vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

3.支持的 Flink 版本

下表显示了 Flink CDC Connectors 与 Flink ®的版本对应关系:

Flink CDC版本_Flink 版本_
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*、1.14.*
2.3.*1.13.*、1.14.*、1.15.*、1.16.0
2.4.*1.13.*、1.14*、1.15.*、1.16.*、1.17.0

4.特征

支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理。

DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。

5.表/SQL API 的用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

首先我们安装了 1.17+ 版本的 Flink 集群(java 8+)。

注意: 如果需要安装Flink请查看笔者对应的博客 flink高可用集群搭建(Standalone模式)
本文用到的jar包flink-connector-jdbc-3.1.1-1.17.jar和flink-sql-connector-mysql-cdc-2.2.1.jar

下载 连接器 SQL jar (或自行构建)。

将下载的jar包放在FLINK_HOME/lib/.

重启Flink集群。

注意:目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的

6.使用 Flink CDC 对 MySQL 进行流式 ETL

本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。

假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中

在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。

架构概述如下:
在这里插入图片描述

7.环境准备

需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)

注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程

8.在 Flink SQL CLI 中使用 Flink DDL 创建表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
在这里插入图片描述首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.50.163',
    'port' = '3306',
    'username' = 'root',
    'password' = '****',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

编辑目标数据库Flink Sql代码,如下所示:

CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc
    'connector' = 'jdbc',
    #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    #需要访问的数据库驱动
    'driver' = 'com.mysql.cj.jdbc.Driver',
    #目标数据库账号
    'username' = 'root',
    #目标据库密码
    'password' = '***',
    #目标数据库表
    'table-name' = 'product'
  );

在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系

-- Flink SQL
Flink SQL> CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'product'
  );

9.将源数据表加载到目标MySQL

使用Flink SQL将表product与 表查询products表写入目标MySQL。

-- Flink SQL
Flink SQL> insert into product select * from products;

具体操作步骤如下所示:
在这里插入图片描述

这是源数据库,操作添加数据,如下图所示:
在这里插入图片描述
目标数据库同步操作如下图
在这里插入图片描述

10.flink可视化界面查看Running JOBS

红框勾选为运行的同步任务
在这里插入图片描述
至此Flink CDC MySQL同步MySQL第一节讲解完毕,后面会更新其复杂操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35781.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

支持跨语言、人声狗吠互换,仅利用最近邻的简单语音转换模型有多神奇

AI 语音转换真的越复杂越好吗?本文就提出了一个方法简单但同样强大的语言转换模型,与基线方法相比自然度和清晰度毫不逊色,相似度更是大大提升。 AI 参与的语音世界真神奇,既可以将一个人的语音换成任何其他人的语音,…

【VsCode远程开发】Windows SSH远程连接Linux服务器 - 无公网IP内网穿透

文章目录 前言视频教程1、安装OpenSSH2、vscode配置ssh3. 局域网测试连接远程服务器4. 公网远程连接4.1 ubuntu安装cpolar内网穿透4.2 创建隧道映射4.3 测试公网远程连接 5. 配置固定TCP端口地址5.1 保留一个固定TCP端口地址5.2 配置固定TCP端口地址5.3 测试固定公网地址远程 转…

使用Python爬虫和数据可视化,揭示人口大国历年人数的变迁

前言 人口大国通常在全球人口排名中位居前列,其人口数量远远超过其他国家。而印度和中国这两个国家的人口数量均已经超过14亿,而当前全球的人口总数也不过刚刚突破80亿而已,妥妥的天花板级别存在。或许是中国和印度在人口方面的表现太过“耀…

【Python】Python基础知识总结

🎉欢迎来到Python专栏~Python基础知识总结 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒🍹 ✨博客主页:小夏与酒的博客 🎈该系列文章专栏:Python学习专栏 文章作者技术和水平有限,如果文中出现错误,希望…

MySQL基本查询与内置函数

目录 聚合函数 分组查询 内置函数 日期函数 字符串函数 数学函数 聚合函数 COUNT:返回查询到的数据的数量 SUM:返回查询到的数据的总和(数字) AVG:返回数据的平均值 MAX:返回查询到的数据的最大值 MIN&a…

微软MFC技术中消息的分类

我是荔园微风,作为一名在IT界整整25年的老兵,今天来聊聊MFC技术中消息的分类。 微软Windows中的消息虽然很多,但是种类并不繁杂,大体上有3种:窗口消息、命令消息和控件通知消息。 窗口消息 窗口消息是系统中最为常见…

离线环境下安装微软Visual Studio 2022 生成工具

1. 前言 最近,在学习cython的时候,需要安装windows下的C/C编译、链接工具。开始觉得传统的msvc太大了,想要尝试Mingw,但是都是编译错误。无奈之下,还是要安装msvc。 微软提供了Visual Studio 2022 Build Tools &…

12.JavaWeb-Node.js+创建Vue项目

1.Node.js的概念 传统的Web服务器中,每个请求都会创建一个线程,这会导致线程数的增加,从而影响服务器的性能和扩展性,Ryan Dahl借助Chrome的V8引擎提供的能力实现了Node.js——可以在服务端运行的JavaScript(可以把Nod…

高数(下) 第九章:多元函数微分学 及其应用

文章目录 Ch9. 多元函数微分学 及其应用(一) 二重极限(二元函数的极限)(二) 多元函数的连续性(三) 偏导数1.偏导数的定义2.二阶混合偏导数相等3.变限积分求偏导 (四) 二元可微:全增量、全微分(五) 多元复合函数 求导法则(六) 多元隐函数 的求…

Mac如何在终端使用diskutil命令装载和卸载推出外接硬盘

最近用 macOS 装载外接硬盘的时候,使用mount死活装不上,很多文章也没详细的讲各种情况,所以就写一篇博客来记录一下。 如何装载和卸载硬盘(或者说分区) mount和umount是在 macOS 上是不能用的,如果使用会…

R语言——字符串处理

paste(abc, def, gh, sep ) #粘贴字符串 substr(abcdefg, 2, 3) # 取特定字符串 gsub(abc, , c(abc, abcc, abcbc)) # 将字符串中abc替换为空 strsplit(a;b;c, ;, fixed T) # 按照;切分字符串 strsplit(a222b2.2c, 2.2, fixed F) # 按照正则表达式分隔,这里的.是…

解放运营人员:钡铼技术S475物联网网关实现养殖环境的远程监控与告警

在养殖行业中,对环境参数的精确监测与控制至关重要。然而,传统的监测方法往往存在诸多痛点,如数据采集不准确、传输速度慢、可视化效果差等。为了解决这些问题,钡铼技术公司推出了其旗舰产品——S475多功能RTU,该产品在…

外包干了2个月,技术退步明显...

先说一下自己的情况,大专生,18年通过校招进入湖南某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

云原生之深入解析K8S的请求和限制

一、Kubernetes 限制和请求 在 Kubernetes 中使用容器时,了解涉及的资源是什么以及为何需要它们很重要。有些进程比其它进程需要更多的 CPU 或内存,这很关键,永远不应该让进程饥饿,知道了这一点,那么应该正确配置容器…

Visual Studio 2017下的C++开发环境搭建

Visual Studio 是Microsoft旗下的开发工具包系列产品,是一个基本完整的开发工具集,它包括整个软件生命周期中所需要的大部分工具,如UML工具、代码管控工具、集成开发环境(IDE)等等,是最流行的Windows平台应用程序的集成开发环境。…

【Springboot集成Neo4j完整版教程】

🚀 Neo4j 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,C…

低代码平台的价格范围及购买成本分析

Zoho Creator是一款强大而灵活的低代码应用程序开发平台,可帮助企业快速、高效地创建各种应用程序。但是,很多人可能会担心它的价格问题。在这篇文章中,我们将深入探讨Zoho Creator的定价策略和计划,以帮助您更好地理解其价格结构…

vue+elementui实现联想购物商城,样式美观大方

目录 一、首页效果图对比 1.联想商城首页截图: 2.作者项目效果图: 二、商品详情效果图对比 1.联想官方截图: 2.作者项目截图: 三、项目实现 1.数据分离维护 2.首页推荐列表数据处理 3.商品详情数据动态获取完成交互 4.商品详…

Spring MVC是什么?详解它的组件、请求流程及注解

作者:Insist-- 个人主页:insist--个人主页 作者会持续更新网络知识和python基础知识,期待你的关注 前言 本文将讲解Spring MVC是什么,它的优缺点与九大组件,以及它的请求流程与常用的注解。 目录 一、Spring MVC是什…

「深度学习之优化算法」(十)烟花算法

1. 烟花算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   烟花算法(Firework Algorithm,FWA)是一种受烟花爆炸产生火星,并继续分裂爆炸这一过程启发而得出的算法。算法的思想简单,但具体实现复杂。算法提出时间并不长,但是已经有了不少的改进研究和较为全…