博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,所以这是一个完整的端到端的解决方案,而上一篇文章我们省略了搭建 Kafka Connect + Debezium MySQL Connector 采集 CDC 数据的环节,因为这部分操作确实很复杂,很难在一篇文章中详细展开,这也说明了使用 Flink CDC 的一个优势,那就是:Flink CDC 在应用和架构上确实要比 Kafka Connect + Debezium MySQL Connector 的组合简单很多,如果你需要,甚至可以跳过 Kafka 直接将数据落到数据湖上。
1. 环境准备
-
本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的
inventory
数据库;
(https://docs.confluent.io/platform/7.4/installation/docker/config-reference.html#sr-long-configuration); -
我们需要安装多个 Flink Connector 和 Format 组件,包括:Flink CDC MySQL Connector、Flink Hudi Connector、Flink ‘debezium-avro-confluent’ Format Support,关于这些组件的安装,已经全部记录在了《Flink SQL Client 安装各类 Connector、Format 组件的方法汇总(持续更新中…)》一文中,请移步此文选择需要的组件酌情安装;
-
安装好各种依赖组件后,执行如下脚本,清空 Hudi 表目标位置上的文件,停止正在运行中的 Yarn App,并启动一个新的 Flink Yarn Session:
echo "clean kafak topic..." # run on a host installed kafka console client kafka-topics.sh --bootstrap-server 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092' --delete --topic 'orders_kafka_debezium_json' echo "clean hudi table target location..." aws s3 rm --recursive s3://glc-flink-hudi-test/sink_hudi_orders echo "Kill all running apps..." for appId in $(yarn application -list -appStates RUNNING 2>1 | awk 'NR > 2 { print $1 }'); do yarn application -kill $appId &> /dev/null done echo "start a flink yarn session..." flink-yarn-session -d
-
清理 Kafka 中的 Topic (二次测试时会上次写入的消息会影响测试结果)
export KAFKA_BOOTSTRAP_SERVERS='b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092' kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --delete --topic 'orders.*' kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
-
启动 Flink SQL Client
/usr/lib/flink/bin/sql-client.sh embedded shell
2. 创建 Flink CDC 源表
本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory
数据库。在一个既有的 Flink 环境上提前安装 Kafka 和 Flink CDC Connector 以及 Debezium Json Format,具体安装方法参考:Flink SQL Client 安装各类 Connector、组件的方法汇总(持续更新中…)。环境准备好后,打开 Flink SQL Client,执行如下建表语句:
SET 'sql-client.execution.result-mode' = 'TABLEAU';
DROP TABLE IF EXISTS orders_mysql_cdc;
CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
`order_number` INT NOT NULL,
`order_date` DATE NOT NULL,
`purchaser` INT NOT NULL,
`quantity` INT NOT NULL,
`product_id` INT NOT NULL,
CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.0.13.30',
'port' = '3307',
'username' = 'root',
'password' = 'Admin1234!',
'database-name' = 'inventory',
'table-name' = 'orders'
);
-- SELECT * FROM orders_mysql_cdc;
3. 创建 Kafka 中间表 ( debezium-json 格式)
Kafka 中间表使用 kafka connector + debezium-json 格式,也就是 《Flink CDC 与 Kafka 集成:State Snapshot 还是 Changelog?Kafka 还是 Upsert Kafka?》一文 第 《4. 测试组合:'connector'='kafka'
+ 'format'='debezium-json'
》一节介绍的方式:
DROP TABLE IF EXISTS orders_kafka_debezium_json;
CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (
order_number int,
order_date date,
purchaser int,
quantity int,
product_id int
) WITH (
'connector' = 'kafka',
'topic' = 'orders_kafka_debezium_json',
'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
'properties.group.id' = 'orders_kafka_debezium_json',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
insert into orders_kafka_debezium_json select * from orders_mysql_cdc;
-- select * from orders_kafka_debezium_json;
4. 创建 Hudi 目标表
Kafka 中流式注入 debezium-json 格式的 CDC 消息后,就可以写入 Hudi 表了,Hudi 表使用 Hudi Connector 创建,必须使用流式读取(‘read.streaming.enabled’=‘true’),且必须开启 changelog 模式(‘changelog.enabled’ = ‘true’):
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2s';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.incremental' = 'true';
SET 'state.checkpoints.num-retained' = '10';
DROP TABLE IF EXISTS sink_hudi_orders;
CREATE TABLE IF NOT EXISTS sink_hudi_orders (
order_number int PRIMARY KEY NOT ENFORCED,
order_date date,
purchaser int,
quantity int,
product_id int
) WITH (
'connector' = 'hudi',
'path' = 's3://glc-flink-hudi-test/sink_hudi_orders',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '2',
'read.streaming.skip_compaction' = 'true',
'read.streaming.start-commit' = 'earliest'
);
insert into sink_hudi_orders select * from orders_kafka_debezium_json;
select * from sink_hudi_orders;
5. 完整演示
关联阅读:
《CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi》
《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》
《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》