一、运行环境
(1)Flink:1.17.2
(2)Scala:2.12.20
(3)Mysql:5.7.43 ##开启binlog
二、代码示例
思路:通过滚动窗口收集一批数据推给sink消费。binlog日志对于dataStream是没有key的,那么需要给每条数据造一个key。两种方式:(1)通过UUID创建一个全局key。(2)解析数据中的时间字段作为key。
package org.example;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.val;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.sink.MyJdbcSinkFunction;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
public class FLCDCToMySql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("snapshot.locking.mode", "none");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("root321")
.databaseList("test")
.tableList("test.t_class")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.earliest())
.build();
DataStreamSource<String> dataStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql Source");
// 10s一批数据推给sink
SingleOutputStreamOperator<List<String>> outputStream = dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return Tuple2.of(UUID.randomUUID().toString(), s);
}
}).keyBy(x -> x.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>.Context context, Iterable<Tuple2<String, String>> iterable, Collector<List<String>> collector) throws Exception {
List datalist = new ArrayList();
for(Tuple2<String, String> element : iterable){
datalist.add(element.f1);
}
collector.collect(datalist);
}
});
// outputStream.print();
outputStream.addSink(new MyJdbcSinkFunction());
env.execute("flink cdc demo ");
}
}
批次写入思路:使用executeBatch方式提交,url中需添加rewriteBatchedStatements=true 。
package org.example.sink;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.RecordData;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
public class MyJdbcSinkFunction extends RichSinkFunction<List<String>> {
private int batchSize = 10;
private Connection connection = null;
private PreparedStatement statement = null;
private String jbcUrl = String.format("jdbc:mysql://localhost:3306/test?&useSSL=false&rewriteBatchedStatements=true");
private String insertSql = String.format("insert into %s values (?,?,?,?,?,?)","t_demo");
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = DriverManager.getConnection(jbcUrl,"root","xxxxxx");
connection.setAutoCommit(false);
statement = connection.prepareStatement(insertSql);
}
@Override
public void invoke(List<String> value, Context context) throws Exception {
super.invoke(value, context);
System.out.println("dataInfo: "+ value);
int flag = 0; //处理数据条数的标记
int batchSize = 1000; //批次commit
int dataSize = value.size();
String before = "";
String after = "";
String op = "";
String dbname = "";
String tablename = "";
String ts_ms = "";
for (int i = 0; i < dataSize; i++){
String data = value.get(i);
JSONObject object = JSONObject.parseObject(data);
op = String.valueOf(object.get("op"));
if("c".equals(op)){
after = String.valueOf(object.get("after"));
}else if("d".equals(op)){
before = String.valueOf(object.get("before"));
}else if("u".equals(op)){
before = String.valueOf(object.get("before"));
after = String.valueOf(object.get("after"));
}
JSONObject sourceObj = JSONObject.parseObject(object.get("source").toString());
dbname = String.valueOf(sourceObj.get("db"));
tablename = String.valueOf(sourceObj.get("table"));
ts_ms = String.valueOf(sourceObj.get("ts_ms"));
statement.setString(1, before);
statement.setString(2, after);
statement.setString(3, op);
statement.setString(4, dbname);
statement.setString(5, tablename);
statement.setString(6, ts_ms);
statement.addBatch();
flag = flag + 1;
if(i % batchSize == 0 ){
statement.executeBatch();
connection.commit();
statement.clearBatch();
flag = 0; //批次提交后重置
}
}
if(flag > 0){ //不满批次的提交
statement.executeBatch();
connection.commit();
statement.clearBatch();
}
}
@Override
public void close() throws Exception {
super.close();
connection.close();
}
}
三、插入目标表结果