37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文详细的介绍了Debezium 的mysql connector的部署及验证、示例,同时也以具体的示例展示了Flink sql client通过debezium解析cdc数据同步至kafka的使用过程。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,还依赖kafka、flink、debezium。

本专题文章分为如下几篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署与示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 与Debezium 实践
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

一、Debezium Format

1、Debezium介绍

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。

2、binlog设置及验证

设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7

1)、配置

本示例设置的参数参考下面的配置

[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
......

log-bin=mysql-bin  # log-bin的名称,可以是任意名称
binlog-format=row  # 推荐该参数,其他的参数视情况而定,比如mixed、statement
server_id=1 # mysql集群环境中不要重复
binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....

  • STATEMENT模式(SBR)
    每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)

  • ROW模式(RBR)
    不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。

  • MIXED模式(MBR)
    以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2)、重启mysql

保存配置后重启mysql

service mysqld restart

3)、验证

重启后,可以通过2个简单的方法验证是否设置成功。

mysql默认的安装目录:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql    154 110 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql       1197 116 12:21 mysql-bin.index
.....

  • 查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002
  • 在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加

以上情况满足,则说明binlog配置正常

3、debezium部署及验证

1)、下载-mysql connector连接器

去其官网:https://debezium.io/releases/下载需要的版本。
本示例使用的是:debezium-connector-mysql-1.7.2.Final-plugin.tar.gz

2)、解压

创建解压目录:/usr/local/bigdata/debezium/connector
解压


tar zxvf /usr/local/bigdata/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz -C /usr/local/bigdata/debezium/connector
## 解压后
[alanchan@server3 connector]$ ll
总用量 4
drwxr-xr-x 2 alanchan root 4096 116 07:20 debezium-connector-mysql
[alanchan@server3 connector]$ cd debezium-connector-mysql/
[alanchan@server3 debezium-connector-mysql]$ ll
总用量 10312
-rw-rw-r-- 1 alanchan root  337864 1214 2021 antlr4-runtime-4.8.jar
-rw-rw-r-- 1 alanchan root  308966 1214 2021 CHANGELOG.md
-rw-rw-r-- 1 alanchan root   19228 1214 2021 CONTRIBUTE.md
-rw-rw-r-- 1 alanchan root    4981 1214 2021 COPYRIGHT.txt
-rw-rw-r-- 1 alanchan root   20682 1214 2021 debezium-api-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  400546 1214 2021 debezium-connector-mysql-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root  886363 1214 2021 debezium-core-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root 2825430 1214 2021 debezium-ddl-parser-1.7.2.Final.jar
-rw-rw-r-- 1 alanchan root    4617 1214 2021 failureaccess-1.0.1.jar
-rw-rw-r-- 1 alanchan root 2858426 1214 2021 guava-30.0-jre.jar
-rw-rw-r-- 1 alanchan root  129157 1214 2021 LICENSE-3rd-PARTIES.txt
-rw-rw-r-- 1 alanchan root   11357 1214 2021 LICENSE.txt
-rw-rw-r-- 1 alanchan root  193386 1214 2021 mysql-binlog-connector-java-0.25.3.jar
-rw-rw-r-- 1 alanchan root 2475087 1214 2021 mysql-connector-java-8.0.27.jar
-rw-rw-r-- 1 alanchan root   19520 1214 2021 README_JA.md
-rw-rw-r-- 1 alanchan root   15286 1214 2021 README.md
-rw-rw-r-- 1 alanchan root   13114 1214 2021 README_ZH.md

3)、kafka配置

因为配置的是kafka的插件,所以需要修改kafka的插件配置,同时需要注意的是,debezium的安装目录需要kafka能找到。
本示例中kafka的安装目录:/usr/local/bigdata/kafka_2.12-3.0.0

  • 修改kafka插件配置文件connect-distributed.properties
    修改内容如下,其他的根据情况进行配置,否则就是默认的
bootstrap.servers=server1:9092,server2:9092,server3:9092
group.id=connect-cluster
status.storage.replication.factor=2

plugin.path=/usr/local/bigdata/debezium/connector

根据实际的应用环境决定是否分发该配置文件

  • 重启kafka集群

4)、启动kafak的插件

需要 在部署debezium的机器上进行此操作

#在kafka的/usr/local/bigdata/kafka_2.12-3.0.0/bin目录下启动
#执行命令:
connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties

[alanchan@server3 config]$ cd /usr/local/bigdata/kafka_2.12-3.0.0/bin
[alanchan@server3 bin]$ connect-distributed.sh -daemon /usr/local/bigdata/kafka_2.12-3.0.0/config/connect-distributed.properties
[alanchan@server3 bin]$ jps
8980 ConnectDistributed
9271 Jps
826 Kafka
# ConnectDistributed 进程名称即为kafka插件
# 也可以通过下面的方式验证
[alanchan@server3 bin]$ curl server3:8083
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"dVRZjBtQQnum1bb7pu_ljg"}
# 也可以查看有哪些连接器在工作,由于当前还未注册任何的连接器,故为空
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
[]

5)、注册mysql的连接器

  • 参数说明

{
    "name": "alan-debezium-mysql-connector", // 向 Kafka Connect 服务注册时的连接器名称。
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 连接器的类名,不能修改。
        "database.hostname": "192.168.10.44", //MySQL 服务器地址
        "database.port": "3306", // MySQL 服务器端口号
        "database.user": "root", // 具有适当权限的 MySQL 用户
        "database.password": "123456", // MySQL 用户的密码
        "database.server.id": "184054", // 连接器的唯一 ID,随便写,但不应该重复
        "database.server.name": "ALAN", // MySQL 服务器或集群的逻辑名称,将来作为kafka的topic前缀
        "database.include.list": "cdctest", // 指定服务器托管的数据库列表,多个数据库可以用逗号分隔
        "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", // 连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表
        "database.history.kafka.topic": "alan.historydb", // 数据库历史主题的名称。本主题仅供内部使用,消费者不得使用
        "include.schema.changes": "true" // 指定连接器是否应为 DDL 更改生成事件并将它们发送到fulfillment架构更改主题以供使用者使用的标志
    }
}

  • 命令行执行
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.10.43:8083/connectors/ -d {"name": "alan-debezium-mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "192.168.10.44","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "184054", "database.server.name": "ALAN", "database.include.list": "cdctest", "database.history.kafka.bootstrap.servers": "server1:9092,server2:9092,server3:9092", "database.history.kafka.topic": "alan.historydb","include.schema.changes": "true"}}
  • postman等工具执行
    在这里插入图片描述
    执行后,返回
    在这里插入图片描述
    也可以在kafka的可视化工具中查看,比如offset explorer
    在这里插入图片描述
    也可以通过命令行查看是否注册成功
[alanchan@server3 bin]$ curl 192.168.10.43:8083/connectors/
["alan-debezium-mysql-connector"]

6)、数据验证

启动成功后,debezium会将监控 的数据库表中的数据同步到kafka的消息队列中。
本示例中,mysql中的原始数据如下
在这里插入图片描述

启动插件成功后,kafka对应的topic中的数据如下
在这里插入图片描述

以上,则表示完成debezium的初步验证成功。

4、示例:通过Debezium CDC 将mysql数据变化输出至kafka

以下是针对表userscoressink新增、修改和删除的数据后kafka主题ALAN.cdctest.userscoressink的变化情况

[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic ALAN.cdctest.userscoressink --from-beginning
......
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":null,"after":{"name":"alan_test","scores":666.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717276000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4645,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1705717750512,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":666.0},"after":{"name":"alan_test","scores":888.0},"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717298000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":4931,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1705717772785,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"double","optional":true,"field":"scores"}],"optional":true,"name":"ALAN.cdctest.userscoressink.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"ALAN.cdctest.userscoressink.Envelope"},"payload":{"before":{"name":"alan_test","scores":888.0},"after":null,"source":{"version":"1.7.2.Final","connector":"mysql","name":"ALAN","ts_ms":1705717322000,"snapshot":"false","db":"cdctest","sequence":null,"table":"userscoressink","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":5234,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1705717796886,"transaction":null}}


二、Flink 与 Debezium 实践

1、maven 依赖

为了使用Debezium格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQLJAR包的SQLClient都需要以下依赖项。

1)、avro

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.17.1</version>
</dependency>

2)、json

json格式是Flink 自带的依赖包。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.17.1</version>
</dependency>

参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。

2、Flink sql client 建表示例

Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL userscoressink表捕获的更新操作的简单示例:

{
	"before": {
		"name": "alan_test",
		"scores": 666.0
	},
	"after": {
		"name": "alan_test",
		"scores": 888.0
	},
	"source": {
		"version": "1.7.2.Final",
		"connector": "mysql",
		"name": "ALAN",
		"ts_ms": 1705717298000,
		"snapshot": "false",
		"db": "cdctest",
		"sequence": null,
		"table": "userscoressink",
		"server_id": 1,
		"gtid": null,
		"file": "alan_master_logbin.000004",
		"pos": 4931,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "u",
	"ts_ms": 1705717772785,
	"transaction": null
}

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一条更新事件,其中 name = alan_test的行的 scores 值从 666 更改为 888 。

此消息已同步到 Kafka 主题 ALAN.cdctest.userscoressink,则可以使用以下 DDL 来使用此主题并解析更改事件。

具体启动debezium参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示debezium环境都正常后,在Flink SQL client中的操作。

-- 元数据与 MySQL "userscoressink" 表完全相同

CREATE TABLE userscoressink_debezium (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
  -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
  -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
 'format' = 'debezium-json'  
);

重要说明

上面的debezium的json更新事件是基于在配置kafka插件的时候设置了以下参数的情况下,默认该参数是true,json的事件就是下面一个示例的情况。

key.converter.schemas.enable=false 
value.converter.schemas.enable=false

在某些情况下(默认),用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 ‘value.converter.schemas.enable’,用来在消息体中包含 schema 信息。
然后,Debezium JSON 消息可能如下所示:

{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "sequence"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "ALAN.cdctest.userscoressink.Envelope"
	},
	"payload": {
		"before": {
			"name": "alan_test",
			"scores": 666.0
		},
		"after": {
			"name": "alan_test",
			"scores": 888.0
		},
		"source": {
			"version": "1.7.2.Final",
			"connector": "mysql",
			"name": "ALAN",
			"ts_ms": 1705717298000,
			"snapshot": "false",
			"db": "cdctest",
			"sequence": null,
			"table": "userscoressink",
			"server_id": 1,
			"gtid": null,
			"file": "alan_master_logbin.000004",
			"pos": 4931,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "u",
		"ts_ms": 1705717772785,
		"transaction": null
	}
}

为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 ‘debezium-json.schema-include’ = ‘true’(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。

本示例采用默认的方式进行展示。

CREATE TABLE userscoressink_debezium (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'true',
 'format' = 'debezium-json'  
);

Flink SQL> CREATE TABLE userscoressink_debezium (
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ALAN.cdctest.userscoressink',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'debezium-json.schema-include' = 'true',
>  'format' = 'debezium-json'  
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink_debezium;
+----+--------------------------------+--------------------------------+
| op |                           name |                         scores |
+----+--------------------------------+--------------------------------+
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          100.0 |
| +I |                    alanchanchn |                          109.0 |
| +I |                      alan_test |                          666.0 |
| -U |                      alan_test |                          666.0 |
| +U |                      alan_test |                          888.0 |
| -D |                      alan_test |                          888.0 |
| +I |               test_alanchanchn |                          199.0 |
| -U |               test_alanchanchn |                          199.0 |
| +U |               test_alanchanchn |                          299.0 |
| -D |               test_alanchanchn |                          299.0 |

在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。

-- MySQL "userscoressink_debezium" 的实时物化视图
-- 按name分组统计scores
Flink SQL> select name ,sum(scores) from userscoressink_debezium group by name;
+----+--------------------------------+--------------------------------+
| op |                           name |                         EXPR$1 |
+----+--------------------------------+--------------------------------+
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          100.0 |
| +I |                    alanchanchn |                          109.0 |
| +I |                      alan_test |                          666.0 |
| -D |                      alan_test |                          666.0 |
| +I |                      alan_test |                          888.0 |
| -D |                      alan_test |                          888.0 |
| +I |               test_alanchanchn |                          199.0 |
| -D |               test_alanchanchn |                          199.0 |
| +I |               test_alanchanchn |                          299.0 |
| -D |               test_alanchanchn |                          299.0 |

3、Available Metadata

以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。

只有当相应的连接器转发格式元数据时,注意格式元数据字段才可用。截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。

在这里插入图片描述
以下示例显示了如何访问Kafka中的Debezium元数据字段:

CREATE TABLE userscoressink_debezium_meta (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'ALAN.cdctest.userscoressink',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'true',
 'format' = 'debezium-json'  
);

Flink SQL> CREATE TABLE userscoressink_debezium_meta (
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
>   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
>   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
>   origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ALAN.cdctest.userscoressink',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'debezium-json.schema-include' = 'true',
>  'format' = 'debezium-json'  
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink_debezium_meta;
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |               origin_ts |              event_time |                origin_database |                  origin_schema |                   origin_table |              origin_properties |                           name |                         scores |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 2024-01-20 02:17:48.975 | 2024-01-20 02:17:48.972 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                           alan |                           80.0 |
| +I | 2024-01-20 02:17:48.976 | 2024-01-20 02:17:48.976 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                       alanchan |                          100.0 |
| +I | 2024-01-20 02:17:48.977 | 2024-01-20 02:17:48.976 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                    alanchanchn |                          109.0 |
| +I | 2024-01-20 02:29:10.512 | 2024-01-20 02:21:16.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          666.0 |
| -U | 2024-01-20 02:29:32.785 | 2024-01-20 02:21:38.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          666.0 |
| +U | 2024-01-20 02:29:32.785 | 2024-01-20 02:21:38.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          888.0 |
| -D | 2024-01-20 02:29:56.886 | 2024-01-20 02:22:02.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |                      alan_test |                          888.0 |
| +I | 2024-01-20 02:53:49.248 | 2024-01-20 02:45:55.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          199.0 |
| -U | 2024-01-20 02:53:55.424 | 2024-01-20 02:46:01.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          199.0 |
| +U | 2024-01-20 02:53:55.424 | 2024-01-20 02:46:01.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          299.0 |
| -D | 2024-01-20 02:53:59.522 | 2024-01-20 02:46:05.000 |                        cdctest |                         (NULL) |                 userscoressink | {query=null, thread=null, s... |               test_alanchanchn |                          299.0 |
Query terminated, received a total of 11 rows

4、Format 参数

Flink 提供了 debezium-avro-confluent 和 debezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。
请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,
使用 debezium-json 来解析 Debezium 的 JSON 消息。

1)、Debezium Avro

在这里插入图片描述

2)、Debezium Json

在这里插入图片描述

5、数据类型映射

截至Flink 1.17 版本,Debezium Format 使用 JSON Format 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档 和 Confluent Avro Format 文档。

6、注意事项

1)、重复的变更事件

在正常的操作环境下,Debezium 应用能以 exactly-once 的语义投递每条变更事件。在这种情况下,Flink 消费 Debezium 产生的变更事件能够工作得很好。 然而,当有故障发生时,Debezium 应用只能保证 at-least-once 的投递语义。可以查看 Debezium 官方文档 了解更多关于 Debezium 的消息投递语义。 这也意味着,在非正常情况下,Debezium 可能会投递重复的变更事件到 Kafka 中,当 Flink 从 Kafka 中消费的时候就会得到重复的事件。 这可能会导致 Flink query 的运行得到错误的结果或者非预期的异常。因此,建议在这种情况下,将作业参数 table.exec.source.cdc-events-duplicate 设置成 true,并在该 source 上定义 PRIMARY KEY。 框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。

2)、消费 Debezium Postgres Connector 产生的数据

如果你正在使用 Debezium PostgreSQL Connector 捕获变更到 Kafka,请确保被监控表的 REPLICA IDENTITY 已经被配置成 FULL 了,默认值是 DEFAULT。 否则,Flink SQL 将无法正确解析 Debezium 数据。

当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。当为其他配置时,更新和删除事件的 “before” 字段将只包含 primary key 字段的值,或者为 null(没有 primary key)。 你可以通过运行 ALTER TABLE REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。 请阅读 Debezium 关于 PostgreSQL REPLICA IDENTITY 的文档 了解更多。

以上,本文详细的介绍了Debezium 的mysql connector的部署及验证、示例,同时也以具体的示例展示了Flink sql client通过debezium解析cdc数据同步至kafka的使用过程。

本专题文章分为如下几篇:

37、Flink 的CDC 格式:debezium部署以及mysql示例(1)-debezium的部署与示例
37、Flink 的CDC 格式:debezium部署以及mysql示例(2)-Flink 与Debezium 实践
37、Flink 的CDC 格式:debezium部署以及mysql示例(完整版)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/349995.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

记录yolov8_obb训练自己的数据集

一.数据集制作 1.标注软件&#xff1a;roLabelImg roLabelImg是基于labelImg改进的&#xff0c;是用来标注为VOC格式的数据&#xff0c;但是在labelImg的基础上增加了能够使标注的框进行旋转的功能。 2.数据格式转换 2.1 xml转txt # 文件名称 &#xff1a;roxml_to_dota.p…

Linux | makefile简单教程 | Makefile的工作原理

前言 在学习完了Linux的基本操作之后&#xff0c;我们知道在linux中编写代码&#xff0c;编译代码都是要手动gcc命令&#xff0c;来执行这串代码的。 但是我们难道在以后运行代码的时候&#xff0c;难道都要自己敲gcc命令嘛&#xff1f;这是不是有点太烦了&#xff1f; 在vs中…

接口自动化测试:mock server之Moco工具

什么是mock server mock&#xff1a;英文可以翻译为模仿的&#xff0c;mock server是我们用来解除依赖&#xff08;耦合&#xff09;&#xff0c;假装实现的技术&#xff0c;比如说&#xff0c;前端需要使用某些api进行调试&#xff0c;但是服务端并没有开发完成这些api&#…

SAP 票据批导实现方法

增强结构定义 变量定义 调用bapi前处理相关变量

【JavaScript权威指南第七版】读书笔记速度

JavaScript权威指南第七版 序正文前言&#xff1a;图中笔记重点知识第1章 JavaScript简介第一章总结 第2章 词法结构注释字面量标识符和保留字Unicode可选的分号第二章总结 第3章 类型、值和变量【重要】原始类型特殊类型第三章总结 第4章 表达式与操作符表达式操作符条件式调用…

HTML-框架标签、实体、全局属性和元信息

HTML 1.框架标签 <iframe name"b站" src"https://www.bilibili.com" width"500" height"300" frameborder"0"></iframe>iframe 标签的实际应用&#xff1a; 在网页中嵌入广告。与超链接或表单的 target 配合&a…

林浩然的Java冒险:从单分支到多分支的奇趣编程之旅

林浩然的Java冒险&#xff1a;从单分支到多分支的奇趣编程之旅 Lin Haoran’s Java Adventure: A Whimsical Journey from Single Branch to Multiple Branches 在那个充满神秘色彩的Java编程世界里&#xff0c;林浩然是一位无畏的程序武士。他的江湖历险从单一的分支结构开始&…

链表的应用1--多项式求和

今天学数据结构学到的链表应用于多项式相加&#xff0c;但是书上的代码没看懂&#xff0c;在看了点资料和问ChatGPT以后想到的一个算法&#xff0c;后面有更好的想法在回来更新算法 1.链表相关结构&#xff1a; //链表结点结构 typedef struct linknode {int coef;//系数 int…

磁悬浮轴承行业调研:未来高端市场占比将有所提升

磁悬浮轴承是利用磁力作用将转子悬浮于空中&#xff0c;使转子与定子之间没有机械接触。其原理是磁感应线与磁浮线成垂直&#xff0c;轴芯与磁浮线是平行的&#xff0c;所以转子的重量就固定在运转的轨道上&#xff0c;利用几乎是无负载的轴芯往反磁浮线方向顶撑&#xff0c;形…

【C++修行之道】STL(初识pair、vector)

目录 一、pair 1.1pair的定义和结构 1.2pair的嵌套 1.3pair自带排序规则 1.4代码示例 二、vector 2.1vector的定义和特性 2.2vector的初始化 一维初始化&#xff1a; 2.3vector的常用函数 2.4vector排序去重 排序: 去重&#xff1a; 示例&#xff1a; 一、pair …

Argo CD 可观测性最佳实践

什么是 Argo CD&#xff1f; Argo CD 是一个开源的 CD&#xff08;Continuous Delivery&#xff09;工具&#xff0c;能够帮助您在 Kubernetes 环境中进行持续交付。Argo CD 的主要功能是将配置文件同步到 Kubernetes 集群中并确保应用程序正确运行。Argo CD 可以自动检测应用…

14.5 Flash查询和添加数据库数据

14.5 Flash查询和添加数据库数据 在Flash与数据库通讯的实际应用中&#xff0c;如何实现用户的登录与注册是经常遇到的一个问题。登录实际上就是ASP根据Flash提供的数据查询数据库的过程&#xff0c;而注册则是ASP将Flash提供的数据写入数据库的过程。 1.启动Access2003&…

代码随想录算法训练营29期|day31 任务以及具体安排

理论基础 关于贪心算法&#xff0c;你该了解这些&#xff01; 题目分类大纲如下&#xff1a; #算法公开课 《代码随想录》算法视频公开课 (opens new window)&#xff1a;贪心算法理论基础&#xff01; (opens new window),相信结合视频再看本篇题解&#xff0c;更有助于大家…

【构造方法】这或许是讲的最好的关于Java构造方法的文章!!

致读者 对于看到这篇博客的你们&#xff0c;其实不需要刻意的去记这些知识&#xff0c;可以收藏起来&#xff0c;有事没事翻开看看&#xff0c;这些其实看得多了&#xff0c;自然就烂熟于心了&#xff0c;如果你只是刻意的记忆了一遍&#xff0c;那其实你很快就会忘记&#xf…

仰暮计划|“以前老一辈农村人真的是穷极了……苦极了……”

仰暮计划|“以前老一辈农村人真的是穷极了……苦极了……” 回首往事&#xff0c;几十年的坎坷经历难以件件叙述&#xff0c;却又历历在目。以前老一辈农村人真的是穷极了……苦极了……我奶奶是1941年生人&#xff0c;用她的话说就是“命很硬”。我爷爷1940年生人&#xff0c;…

[TII 2023] 基于压缩感知的多级隐私保护方案

Multilevel Privacy Preservation Scheme Based on Compressed Sensing | IEEE Journals & Magazine | IEEE Xplore 摘要 物联网的广泛应用在给人们带来便利的同时&#xff0c;也引发了人们对数据采集、分析和共享过程中隐私泄露的担忧。本文提出了一种基于压缩感知的多级…

电商系统设计到开发03 引入Kafka异步削峰

一、前言 系统设计&#xff1a;电商系统设计到开发01 第一版设计到编码-CSDN博客 接着上篇文章&#xff1a;电商系统设计到开发02 单机性能压测-CSDN博客 本篇为大制作&#xff0c;内容有点多&#xff0c;也比较干货&#xff0c;希望可以耐心看看 已经开发的代码&#xff0…

【行业应用-智慧零售】东胜物联餐饮门店智能叫号解决方案,为企业智能化升级管理服务

随着科技的不断进步&#xff0c;物联网设备已经广泛应用于各行各业&#xff0c;包括餐饮业。在餐饮门店的线下运营过程中&#xff0c;叫号系统是一项重要的设备需求。传统的叫号方式往往会消耗大量的人力和时间&#xff0c;而物联网技术为餐饮行业提供了一种更高效、智能化的解…

幻兽帕鲁服务器数据备份

搭建幻兽帕鲁个人服务器&#xff0c;最近不少用户碰到内存不足、游戏坏档之类的问题。做好定时备份&#xff0c;才能轻松快速恢复游戏进度 这里讲一下如何定时将服务器数据备份到腾讯云轻量对象存储服务&#xff0c;以及如何在有需要的时候进行数据恢复。服务器中间的数据迁移…

【论文阅读】GraspNeRF: Multiview-based 6-DoF Grasp Detection

文章目录 GraspNeRF: Multiview-based 6-DoF Grasp Detection for Transparent and Specular Objects Using Generalizable NeRF针对痛点和贡献摘要和结论引言模型框架实验不足之处 GraspNeRF: Multiview-based 6-DoF Grasp Detection for Transparent and Specular Objects Us…