接着 上期FlinkCDC基础篇章1-安装使用
下载 Flink 和所需要的依赖包 #
-
下载 Flink 1.17.0 并将其解压至目录
flink-1.17.0
-
下载下面列出的依赖包,并将它们放到目录
flink-1.17.0/lib/
下:下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译
- flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
- flink-sql-connector-mysql-cdc-2.4.0.jar
- flink-sql-connector-postgres-cdc-2.4.0.jar
首先,开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH (
'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
'hostname' = '10.194.183.120', -- SQL Server主机名
'port' = '30027', -- SQL Server端口
'username' = 'sa', -- SQL Server用户名
'password' = 'abc@123456', -- SQL Server密码
'database-name' = 'cdc_test', -- 数据库名称
'schema-name' = 'dbo', -- 模式名称
'table-name' = 'orders' -- 要捕获更改的表名
);
-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
)
WITH (
'connector' = 'jdbc', -- 使用JDBC连接器
'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL
'username' = 'root', -- MySQL用户名
'password' = 'root', -- MySQL密码
'table-name' = 'orders' -- 要写入的MySQL表名
);
-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (
serviceCode STRING,
accountPeriod STRING,
subjectCode STRING,
subjectName STRING,
amt DECIMAL(13,2),
PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxx:9200',
'index' = 'income_distribution',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);
可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
参考文献:
使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC
flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客