资源:flink 1.17.0、dinky 1.0.2
问题:对于kafka相关的包内类找不到的情况
解决:使用 flink-sql-connector- 胖包即可,去掉 flink-connector- 相关瘦包,解决胖瘦包冲突
source使用 flink-sql-connector- 胖包,sink使用 flink-connector-jdbc-3.1.0-1.17.jar、mysql-connector-java 包
lib中则添加公共包 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 、mysql-connector-java-8.0.28.jar
结果:运行成功
可实现insert、update、delete 的DML语句-增加、修改、删除语句的CDC变更数据捕获,而注意truncate语句变更的数据不可捕获
mysql sink表会在首次执行自动建表
FlinkSQL:
EXECUTE CDCSOURCE demo_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.xxxx',
'port' = '3306',
'username' = 'xxx',
'password' = 'xxx',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://172.xxx:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'xxx',
'sink.password' = 'xxx',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)