采用Flink CDC操作SQL Server数据库获取增量变更数据
Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalog
和SqlServerTable
。在SqlServerCatalog
中,你可以根据表名获取对应的字段和字段类型。
SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。
1、准备工作
软件版本
Flink 1.17.1
数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)
1.1、数据库准备 启动CDC
-- 开启SQL Server数据库CDC。 在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases
1.2、开启SQL Server代理
打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启
1.3、为需要跟踪更改的表启用 CDC。
-- 开启表级别的CDC --需要开启先SQL Server代理 然后执行
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'AIR_STATION_HOUR_DATA', -- table_name
@capture_instance = NULL, -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL -- role_name
-- 验证表是否开启cdc成功
EXEC sys.sp_cdc_help_change_data_capture
2、代码编写
2.1、引入依赖
<properties>
<flink.version>1.17.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>9.4.1.jre8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2、代码编写
2.2.1 、数据库配置文件编写
public class SQLServerConstant {
public static final String SQLSERVER_HOST = "0.0.0.0"; //数据库地址
public static final Integer SQLSERVER_PORT = 1433; //端口
public static final String SQLSERVER_DATABASE = "HBDC_AQI"; //库
public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表
public static final String SQLSERVER_USER_NAME = "sa"; //用户
public static final String SQLSERVER_PASSWORD = "*******"; //密码
}
2.2.2 CDC数据实体类
@Data
public class DataChangeInfo implements Serializable {
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 变更时间
*/
private LocalDateTime changeTime;
/**
* 变更类型 1新增 2修改 3删除
*/
private Integer eventType;
/**
* 变更前数据
*/
private String beforeData;
/**
* 变更后数据
*/
private String afterData;
}
2.2.2 、SQLServer消息读取自定义序列化
@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {
public static final String TS_MS = "ts_ms";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String CREATE = "CREATE";
public static final String UPDATE = "UPDATE";
@Override
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {
try {
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
// 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toUpperCase();
int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
ZoneId zone = ZoneId.systemDefault();
Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
//7.输出数据
collector.collect(dataChangeInfo);
} catch (Exception e) {
log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());
}
}
/**
*
* 从源数据获取出变更之前或之后的数据
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<DataChangeInfo> getProducedType() {
return TypeInformation.of(DataChangeInfo.class);
}
}
2.2.3 、功能工具类
public class FlinkSourceUtil {
/**
* 构造SQL Server CDC数据源
*/
public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");
return SqlServerSource.<DataChangeInfo>builder()
.hostname(SQLSERVER_HOST)
.port(SQLSERVER_PORT)
.database(SQLSERVER_DATABASE) // monitor sqlserver database
.tableList(tables) // monitor products table
.username(SQLSERVER_USER_NAME)
.password(SQLSERVER_PASSWORD)
/*
*initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
*/
.startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial())
.deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource, "SQLServer-source")
.setParallelism(1);
streamSource.print();
env.execute("SQLServer-stream-cdc");
}
}