博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium
格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。
同时,Flink 还有一种专门的数据格式 debezium-json
,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?
首先,我们要主要到这样一个细节:mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式,对外,这不可配置,也不可见,只有向下游传导数据时,才会涉及到解析和转换的问题。
其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:
使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。
Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,但是,这都是发生在 Flink CDC 内部的,对外是不可见的!当需要把 CDC 数据传给下游时,才会针对下游指定的格式进行转换,这种转换也是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。
我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。
原生 Debezium 格式(样例)
使用如下 SQL 创建一个 mysql-cdc 的源表:
SET 'sql-client.execution.result-mode' = 'TABLEAU';
DROP TABLE IF EXISTS orders_mysql_cdc;
CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
`order_number` INT NOT NULL,
`order_date` DATE NOT NULL,
`purchaser` INT NOT NULL,
`quantity` INT NOT NULL,
`product_id` INT NOT NULL,
CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.0.13.30',
'port' = '3307',
'username' = 'root',
'password' = 'Admin1234!',
'database-name' = 'inventory',
'table-name' = 'orders'
);
那从 Flink CDC 源表提取出来的数据应该是什么样子呢?前面我们已经说过,这个动作发生在 Flink CDC 内部,提取的数据也是外部不可见的,那我们能不能从其他渠道确定实际的数据格式吗?能,如果说 Flink CDC 就是通过 Debezium 来采集数据,那么采集到的最原始的数据格式就是标准的 Debezium 格式,通常,这是这个样子的:
{
"before": null,
"after": {
"osci.mysql-server-3.inventory.orders.Value": {
"order_number": 10006,
"order_date": 16852,
"purchaser": 1003,
"quantity": 1,
"product_id": 107
}
},
"source": {
"version": "2.2.0.Final",
"connector": "mysql",
"name": "osci.mysql-server-3",
"ts_ms": 1705645511000,
"snapshot": {
"string": "false"
},
"db": "inventory",
"sequence": null,
"table": {
"string": "orders"
},
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000004",
"pos": 640,
"row": 0,
"thread": {
"long": 10
},
"query": null
},
"op": "c",
"ts_ms": {
"long": 1705645511455
},
"transaction": null
}
再次强调,上述格式的数据在 Flink CDC 中是不可见的,发生于 Flink CDC 内部,以上格式是标准的 debezium 数据格式,Flink CDC一定是率先拿到了这种格式的数据然后再经处理转发给下游的,比如:如果 DDL 中提取了某些元数据,也是从上面这种原始的 Debezium 数据中获取的。
debezium-json 格式(样例)
如下的 SQL 在 Kafka 上创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ...
把源表和目标表的数据流驱动起来:
DROP TABLE IF EXISTS orders_kafka_debezium_json;
CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (
order_number int,
order_date date,
purchaser int,
quantity int,
product_id int
) WITH (
'connector' = 'kafka',
'topic' = 'orders_kafka_debezium_json',
'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
'properties.group.id' = 'orders_kafka_debezium_json',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
-- 提交持续查询,驱动整个 Pipeline
insert into orders_kafka_debezium_json select * from orders_mysql_cdc;
这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:
{
"before": {
"order_number": 10003,
"order_date": "2016-02-19",
"purchaser": 1002,
"quantity": 2,
"product_id": 106
},
"after": null,
"op": "d"
}
结论
比较上述两种消息格式就能看出:
debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,原生 debezium 格式仅发生并存在于 Flink CDC 内部,对外不可见,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。