在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。
环境准备
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
- 获取Flink执行环境
首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 启用检查点和设置并行度
为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。
env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
- 使用Debezium Source读取MySQL的binlog
接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海
.hostname("localhost") // MySQL的IP地址
.port(3306) // MySQL的端口
.username("root") // MySQL的用户名
.password("123456") // MySQL的密码
.databaseList("my_db") // 监控的数据库
.tableList("my_db.user") // 监控的数据库下的表
.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化
.startupOptions(StartupOptions.initial()) // 启动选项
.build();
这里 JsonDebeziumDeserializationSchema类的代码如下:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* 自定义DeserializationSchema进行反序列化。
*/
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//创建JSON对象用于存储最终数据
JSONObject result = new JSONObject();
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct)sourceRecord.value();
//获取before数据
Struct before = value.getStruct("before");
JSONObject beforeJson = getJson(before);
//获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = getJson(after);
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//将字段写入JSON对象
result.put("database",database);
result.put("tableName",tableName);
result.put("type",operation);
result.put("before",beforeJson);
result.put("after",afterJson);
//输出数据
collector.collect(result.toJSONString());
}
/**
* 获取字段值并写入result对象
* @param before
* @return
*/
private JSONObject getJson(Struct before) {
JSONObject jsonObject = new JSONObject();
if(before != null){
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
jsonObject.put(field.name(), beforeValue);
}
}
return jsonObject;
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
- 添加数据源并打印数据
将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。
DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
- 启动任务
最后,启动Flink作业,开始处理数据流。
env.execute("Flink-CDC");
6.测试
总结
通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。