代码如下:
package com.weilanaoli.ruge.vlink.flink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;
class MysqlExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("xx.xx.xx.xx") //输入地址
.port(3306) //输入端口
.databaseList("xx") //输入库名
.tableList("xx.test") //输入表名
.username("xx") //输入用户名
.password("xxxx") //输入密码
.startupOptions(StartupOptions.initial()) //读取binlog策略,这个启动选项有五种
.deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次
.build();
//配置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
try {
System.out.println("processElement=====" + value);
}catch (Exception e) {
e.printStackTrace();
}
}
});
dataStreamSource.print("原始数据=====");
env.execute("Print MySQL Snapshot + Binlog");
}
}
运行结果如下: