全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

在当今数据快速增长的时代,实时、高效地同步和处理来自各种数据源的信息成为了企业和开发者面临的重要挑战。

file

MySQL作为广泛使用的数据库之一,其变更数据捕获(CDC)功能对于实现这一目标至关重要。在这篇文章中,我们将深入探讨MySQL CDC源连接器在SeaTunnel框架下的应用,涵盖从基础设置到高级配置的各个方面。

MySQL CDC源连接器

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批量
  • 流式
  • 精确一次
  • 列投影
  • 并行处理
  • 支持用户定义的拆分

描述

MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。本文档描述了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。

支持的数据源信息

数据源支持的版本驱动UrlMaven
MySQL
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
com.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/testhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

数据库依赖

安装Jdbc驱动程序

请将mysql驱动程序下载并放入${SEATUNNEL_HOME}/lib/目录中。例如:cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/lib/

创建MySQL用户

您必须为Debezium MySQL连接器监视的所有数据库定义一个具有适当权限的MySQL用户。

  1. 创建MySQL用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  1. 为用户授予所需权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
  1. 完成用户的权限设置:
mysql> FLUSH PRIVILEGES;

启用MySQL binlog

为了实现MySQL的复制,必须启用二进制日志。二进制日志记录了用于复制工具传播更改的事务更新。

  1. 检查log-bin选项是否已经开启:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)
  1. 如果与上述结果不一致,请使用以下属性配置您的MySQL服务器配置文件($MYSQL_HOME/mysql.cnf),如下表所示:
# 启用二进制复制日志并设置前缀、过期时间和日志格式。
# 前缀是任意的,过期时间对于集成测试可以短一些,但在生产系统中会更长。
# 行级信息对于摄取工作是必需的。
# 服务器ID在生产系统上是必需的,但会有所不同。
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 10
binlog_format     = row
binlog_row_image  = FULL

# 启用gtid模式
gtid_mode = on
enforce_gtid_consistency = on
  1. 重启MySQL服务器
/etc/inint.d/mysqld restart
  1. 再次确认您的更改,通过再次检查binlog状态:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)

注意

设置MySQL会话超时

当为大型数据库创建初始一致快照时,在读取表期间,已建立的连接可能会超时。您可以通过在MySQL配置文件中配置interactive_timeoutwait_timeout来防止这种行为。

  • interactive_timeout:服务器等待交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。
  • wait_timeout:服务器等待非交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。

有关更多数据库设置,请参见Debezium MySQL连接器

数据类型映射

Mysql Data typeSeaTunnel Data type
BIT(1)
TINYINT(1)
BOOLEAN
TINYINTTINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
DECIMAL(p,s)
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
REAL
REAL UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
ENUM
JSON
STRING
DATEDATE
TIMETIME
DATETIME
TIMESTAMP
TIMESTAMP
BINARY
VARBINAR
BIT(p)
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BYTES

源选项

名称类型必需默认值描述
base-urlString-JDBC连接的URL。参考示例:jdbc:mysql://localhost:3306:3306/test
usernameString-连接到数据库服务器时使用的数据库名称。
passwordString-连接到数据库服务器时使用的密码。
database-namesList-要监视的数据库的名称。
table-namesList-要监视的数据库的表名。表名需要包含数据库名称,例如:database_name.table_name
startup.modeEnumNoINITIALOptional startup mode for MySQL CDC consumer, valid enumerations are initial, earliest, latest and specific.
initial: Synchronize historical data at startup, and then synchronize incremental data.
earliest: Startup from the earliest offset possible.
latest: Startup from the latest offset.
specific: Startup from user-supplied specific offsets.
startup.specific-offset.fileStringNo-Start from the specified binlog file name. Note, This option is required when the startup.mode option used specific.
startup.specific-offset.posLongNo-Start from the specified binlog file position. Note, This option is required when the startup.mode option used specific.
stop.modeEnumNoNEVEROptional stop mode for MySQL CDC consumer, valid enumerations are never, latest or specific.
never: Real-time job don't stop the source.
latest: Stop from the latest offset.
specific: Stop from user-supplied specific offset.
stop.specific-offset.fileStringNo-Stop from the specified binlog file name. Note, This option is required when the stop.mode option used specific.
stop.specific-offset.posLongNo-Stop from the specified binlog file position. Note, This option is required when the stop.mode option used specific.
snapshot.split.sizeIntegerNo8096The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.
snapshot.fetch.sizeIntegerNo1024The maximum fetch size for per poll when read table snapshot.
server-idStringNo-A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like 5400, the numeric ID range syntax is like '5400-5408'.
Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the
MySQL cluster as another server (with this unique ID) so it can read the binlog.
By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
server-time-zoneStringNoUTCThe session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
connect.timeout.msDurationNo30000The maximum time that the connector should wait after trying to connect to the database server before timing out.
connect.max-retriesIntegerNo3The max retry times that the connector should retry to build database server connection.
connection.pool.sizeIntegerNo20The jdbc connection pool size.
chunk-key.even-distribution.factor.upper-boundDoubleNo100The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.
chunk-key.even-distribution.factor.lower-boundDoubleNo0.05The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.
sample-sharding.thresholdIntegerNo1000This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
inverse-sampling.rateIntegerNo1000The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
exactly_onceBooleanNotrueEnable exactly once semantic.
formatEnumNoDEFAULTOptional output format for MySQL CDC, valid enumerations are DEFAULTCOMPATIBLE_DEBEZIUM_JSON.
debeziumConfigNo-Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
common-optionsno-Source plugin common parameters, please refer to Source Common Options for details

Task Example

Simple

Support multi-table reading

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    catalog = {
      factory = MySQL
    }
    base-url = "jdbc:mysql://localhost:3306/testdb"
    username = "root"
    password = "root@123"
    table-names = ["testdb.table1", "testdb.table2"]

    startup.mode = "initial"
  }
}

sink {
  Console {
  }
}

Support debezium-compatible format send to kafka

Must be used with kafka connector sink, see compatible debezium format for details

Changelog

  • Add MySQL CDC Source Connector

next version

通过对MySQL CDC源连接器的深入了解,我们不仅能够更好地掌握数据同步的核心机制,还能有效提升数据处理的效率和精度。

无论是在数据集成、实时分析还是其他复杂的数据处理场景中,MySQL CDC源连接器都将成为SeaTunnel用户强大的助手。随着数据技术的不断进步,期待看到更多创新和优化在未来版本中的实现,为开发者带来更多便利和可能。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/225520.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java接口:用于实现各种动态功能

👑专栏内容:Java⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 1、接口概念2、实现一个接口3、实现多个接口4、接口间的继承 1、接口概念 在现实生活中,接口的例子比比皆是&#…

java接口自动化测试框架及断言详解

在上篇文章,我们介绍了Get方法的设计过程和测试结果,现在我们需要对前面代码进行重构和修改,本篇需要完成以下目标。 1)重构Get方法2)如何进行JSON解析3)使用TestNG方法进行测试断言 1.重构Get方法 在前…

家政小程序源码,师傅竞价接单

家政预约上门服务小程序开发方案,php开发语言,前端是uniapp,有成品源码,可以二开,可以定制。 一家政小程序用户端功能:服务分类、在线预约、在线下单。 师傅端:在线接单,竞价&…

算法初阶双指针+C语言期末考试之编程题加强训练

双指针 常⻅的双指针有两种形式,⼀种是对撞指针,⼀种是左右指针。 对撞指针:⼀般⽤于顺序结构中,也称左右指针。 • 对撞指针从两端向中间移动。⼀个指针从最左端开始,另⼀个从最右端开始,然后逐渐往中间逼…

哥尼斯堡的“七桥问题”——欧拉回路

哥尼斯堡是位于普累格河上的一座城市,它包含两个岛屿及连接它们的七座桥,如下图所示。 可否走过这样的七座桥,而且每桥只走过一次?瑞士数学家欧拉(Leonhard Euler,1707—1783)最终解决了这个问题,并由此创立…

Pacifist:一款专为技术开发者打造的软件提取工具

对于技术开发者而言,有效且便捷的工具可以显著提高工作效率。Pacifist,作为一款专业的软件提取工具,专为技术开发者而设计,旨在提供简单、安全的软件提取和管理工作。 一、Pacifist的技术特点 Pacifist主要采用AppleScript作为其…

ROS小练习——话题订阅

目录 一、话题与消息获取 二、代码编写 1、C 2、python 三、编译运行 一、话题与消息获取 rostopic list rostopic type /turtle1/pose rosmsg info turtlesim/Pose 二、代码编写 1、C //包含头文件 #include "ros/ros.h" #include "turtlesim/Pose…

js vue 输入正确手机号/邮箱后,激活“发送验证码”按钮

按钮禁止点击状态: 按钮能够点击状态: 我采用的方式是监听手机号/邮箱输入框的输入事件,即实判断用户输入的数据是否满足规则,如果满足手机号/邮箱规则,则激活“获取验证码”按钮。 话不多说,上代码 样式…

Java期末复习题之封装

点击返回标题->23年Java期末复习-CSDN博客 第1题. 定义一个类Person,定义name和age私有属性,定义有参的构造方法对name和age进行初始化。在测试类中创建该类的2个对象,姓名、年龄分别为lili、19和lucy、20,在屏幕打印出2个对象的姓名和年龄…

【Lidar】基于Python的三维点云数据转二维平面+散点图绘制

最近一直在搞点云相关的操作,有时候在处理点云数据时需要查看处理后的数据是否满足需求,所以就想着写一套展示点云的代码。之前已经分享过如何可视化点云了,感兴趣的可以自己去看下:【Lidar】基于Python的Open3D库可视化点云数据。…

微信商城小程序怎么弄

随着移动互联网的快速发展,微信小程序已经成为了许多商家和企业拓展业务的新渠道。其中,微信商城小程序更是受到了广大用户的喜爱。那么微信商城小程序怎么弄呢?下面给大家做个详细讲解。 首先,你需要在微信公众平台注册一个小程…

孩子都能学会的FPGA:第二十三课——用FPGA实现格雷码的编码和解码

(原创声明:该文是作者的原创,面向对象是FPGA入门者,后续会有进阶的高级教程。宗旨是让每个想做FPGA的人轻松入门,作者不光让大家知其然,还要让大家知其所以然!每个工程作者都搭建了全自动化的仿…

文心一言大模型应用开发入门

本文重点介绍百度智能云平台、文心一言、千帆大模型平台的基本使用与接入流程及其详细步骤。 注册文心一言 请登录文心一言官方网站 https://yiyan.baidu.com/welcome 点击登录;图示如下: 请注册文心一言账号并点击登录,图示如下&#xff1…

深入理解数据在内存中是如何存储的,位移操作符如何使用(能看懂文字就能明白系列)文章超长,慢慢品尝

系列文章目录 C语言笔记专栏 能看懂文字就能明白系列 🌟 个人主页:古德猫宁- 🌈 信念如阳光,照亮前行的每一步 文章目录 系列文章目录🌈 *信念如阳光,照亮前行的每一步* 前言引子一、2进制和进制转化为什么…

ORACLE数据库实验总集 实验四 Oracle数据库物理存储结构管理

一、实验目的 (1)掌握 Oracle数据库数据文件的管理 (2)掌握 Oracle数据库控制文件的管理 (3)掌握 Oracle数据库重做日志文件的管理 (4)掌握 Oracle数据库归档管理, 二、…

Golang 原生Rpc Server实现

Golang 原生Rpc Server实现 引言源码解析服务端数据结构服务注册请求处理 客户端数据结构建立连接请求调用 延伸异步调用定制服务名采用TPC协议建立连接自定义编码格式自定义服务器 参考 引言 本文我们来看看golang原生rpc库的实现 , 首先来看一下golang rpc库的demo案例: 服…

忘记PDF密码了,怎么办?

PDF文件有两种密码,一个打开密码、一个限制编辑密码,因为PDF文件设置了密码,那么打开、编辑PDF文件就会受到限制。忘记了PDF密码该如何解密? PDF和office一样,可以对文件进行加密,但是没有提供恢复密码的功…

动态代理IP和静态代理IP有什么区别,适用场景是什么?

互联网行业的从业者经常会用到一种工具,那就是代理IP工具。动态代理IP和静态代理IP是两种常见的代理IP技术,它们在网络通信中起到了重要的作用,比如大数据行业的从业者会经常需要用到动态代理IP,跨境行业的从业者会经常用到静态代…

MySQL 忘记root密码后重置密码操作

在忘记 MySQL 密码的情况下,可以通过 --skip-grant-tables 关闭服务器的认证,然后重置 root 的密码,具体操作步骤如下。 步骤 1):关闭正在运行的 MySQL 服务。打开 cmd 进入 MySQL 的 bin 目录。 步骤 2):输入mysqld -…

Constraining Async Clock Domain Crossing

Constraining Async Clock Domain Crossing 我们在normal STA中只会去check 同步clock之间的timing,但是design中往往会存在很多CDC paths,这些paths需要被正确约束才能保证design function正确,那么怎么去约束这些CDC paths呢? 以下面的design为例,如下图所示 这里clk…