一 :flinkcdc官网链接。
https://ververica.github.io/flink-cdc-connectors/release-2.1/content/about.html
二:在flink中添加jar包。
在flink lib目录下增加你所需要的包。
https://kdocs.cn/join/gv467qi?f=101
邀请你加入共享群「工作使用重要工具」一起进行文档协作
三:申请资源。
cd /opt/app/flink/flink-1.13.6/bin
yarn-session.sh -s 1 -jm 1024m -tm 1024m -nm test_cdc -d
yarn application -list
四:配置dlink。
五:编写代码。
set execution.checkpointing.interval=30s;
set state.checkpoints.dir=hdfs://cluster/flink/mysql_es1;
set state.savepoints.dir=hdfs://cluster/flink/mysql_es1;
CREATE TABLE `mobile_device`(
`id` int,
`user_id` string,
`app_id` string,
`group_id` string,
`app_version` string,
`sdk_version` string,
`mobile` string,
`brand` string,
`model_identifier` string,
`model` string,
`system` string,
`device_id` string,
`secret_key` string,
`pkd` string,
`access_token` string,
`refresh_token`string,
`iot_device_secret` string,
`status` int,
`expiration_timeout` timestamp,
`login_type` int,
`create_time` timestamp,
`update_time` timestamp,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xx.xx.xx.xx',
'port' = '3306',
'username' = 'xxxx',
'password' = 'xxxxx',
'database-name' = 'xx',
'table-name' = 'xxxx'
);
select * from mobile_device;
注意:大小写会影响数据的读取。
六:查看具体的任务。