采用Flink CDC操作SQL Server数据库获取增量变更数据

采用Flink CDC操作SQL Server数据库获取增量变更数据

Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalogSqlServerTable。在SqlServerCatalog中,你可以根据表名获取对应的字段和字段类型。

SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。

1、准备工作

软件版本

Flink 1.17.1

数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)

1.1、数据库准备 启动CDC

--  开启SQL Server数据库CDC。  在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases 

1.2、开启SQL Server代理

打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启

在这里插入图片描述

1.3、为需要跟踪更改的表启用 CDC。

-- 开启表级别的CDC   --需要开启先SQL Server代理  然后执行 
 EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'AIR_STATION_HOUR_DATA', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL -- role_name
 --  验证表是否开启cdc成功
 EXEC sys.sp_cdc_help_change_data_capture

2、代码编写

2.1、引入依赖

    <properties>
        <flink.version>1.17.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.1.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

2.2、代码编写

2.2.1 、数据库配置文件编写

public class SQLServerConstant {
    public static final String SQLSERVER_HOST = "0.0.0.0";  //数据库地址
    public static final Integer SQLSERVER_PORT = 1433; //端口
 
    public static final String SQLSERVER_DATABASE = "HBDC_AQI";  //库

    public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表
    public static final String SQLSERVER_USER_NAME = "sa"; //用户
    public static final String SQLSERVER_PASSWORD = "*******"; //密码
}

2.2.2 CDC数据实体类

@Data
public class DataChangeInfo implements Serializable {
    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;

}

2.2.2 、SQLServer消息读取自定义序列化

@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> 		         Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());

        }

    }
    /**
     *
     * 从源数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {

        return TypeInformation.of(DataChangeInfo.class);
    }
}

2.2.3 、功能工具类

public class FlinkSourceUtil {
   
    /**
     * 构造SQL Server CDC数据源
     */
    public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(SQLSERVER_HOST)
                .port(SQLSERVER_PORT)
                .database(SQLSERVER_DATABASE) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(SQLSERVER_USER_NAME)
                .password(SQLSERVER_PASSWORD)
                /*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 */
                .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial())
                .deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }

   
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.print();

        env.execute("SQLServer-stream-cdc");


    }
}

2.3、运行main方法测试

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/530193.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

李廉洋:4.10黄金原油走势最新分析及策略。

美联储博斯蒂克重申了他对今年降息一次的预期&#xff0c;但他补充说&#xff0c;如果经济形势发生变化&#xff0c;他对推迟降息或进一步降息持开放态度。博斯蒂克强调了美国经济和劳动力市场的持续强劲&#xff0c;但表示就业市场的疲软迹象将促使他考虑比目前预期的更早和更…

GFS部署实验

目录 1、部署环境 ​编辑 2、更改节点名称 3、准备环境 4、磁盘分区&#xff0c;并挂载 5. 做主机映射--/etc/hosts/ 6. 复制脚本文件 7. 执行脚本完成分区 8. 安装客户端软件 1. 安装解压源包 2. 创建gfs 3. 安装 gfs 4. 开启服务 9、 添加节点到存储信任池中 1…

SVM向量支持机

1.通俗理解 svm&#xff1a;support vector machine目标&#xff1a;利用超平面将两类数据分割开来&#xff0c;这个超平面就是我们要设计的对象 如何设计&#xff1f;我们设计之后会有间隔&#xff0c;间隔越大分类效果就越好&#xff1b;距离决策边界最近的点我们成为支持向…

40.Python从入门到精通—Python3 JSON 数据解析 Python3 日期和时间 什么是时间元组? 获取当前时间 获取格式化的时间

40.Python从入门到精通—Python3 JSON 数据解析 Python3 日期和时间 什么是时间元组&#xff1f; 获取当前时间 获取格式化的时间 Python3 JSON 数据解析Python3 日期和时间什么是时间元组&#xff1f;获取当前时间获取格式化的时间 Python3 JSON 数据解析 Python3 中可以使用…

使用MongoDB 构建AI:轻松应对从预测式AI到生成式AI

毫无疑问&#xff0c;如今从生成式AI (GenAI )中获益最大的&#xff0c;是那些早已运用预测式AI (Predictive AI )的组织。 2023年6月&#xff0c;麦肯锡在2023年6月发布的《生成式人工智能的经济潜力》研究中也得出了与此相同的结论。 原因主要有以下几点&#xff1a; 内部文…

顺序表(C语言实现)

什么是顺序表 顺序表和数组的区别 顺序表本质就是数组 结构体初阶进阶 系统化的学习-CSDN博客 简单解释一下&#xff0c;就像大家去吃饭&#xff0c;然后左边是苍蝇馆子&#xff0c;右边是修饰过的苍蝇馆子&#xff0c;但是那个好看的苍蝇馆子一看&#xff0c;这不行啊&a…

Harmony鸿蒙南向驱动开发-MIPI CSI

CSI&#xff08;Camera Serial Interface&#xff09;是由MIPI联盟下Camera工作组指定的接口标准。CSI-2是MIPI CSI第二版&#xff0c;主要由应用层、协议层、物理层组成&#xff0c;最大支持4通道数据传输、单线传输速度高达1Gb/s。 物理层支持HS&#xff08;High Speed&…

kettle经验篇:取出一个字符串中的两个数值

项目场景 在一个数据清洗、同步的需求中&#xff1b;有一个要求是判断两个数值是否在正常范围内&#xff0c;并根据判断结果给出异常标记。但这两个数值是以XML的格式存储在Oracle的CLOB字段中&#xff0c;且在同一个XML节点中。该节点的内容如下: "OD: 17.4 mmHg O…

2024Peak码支付系统网站源码

系统简介 Peak码支付-是专为个人站长打造的聚合免签系统&#xff0c;拥有卓越的性能和丰富的功能。用全新轻量化的界面UI&#xff0c;让您可以更加方便快捷地解决知识付费和运营赞助的难题。同时&#xff0c;它基于高性能SpeedPHPLayuiPearAdmin架构&#xff0c;提供实时监控和…

https的配置和使用(以腾讯云为例)

1、注册域名 2、获取证书 3、下载证书 下载下来的证书所有格式 4、在服务器上下载nginx并配置 nginx的配置文件 如下 server {listen 80;listen 443 ssl;server_name delegate.letspiu.net.cn;ssl on; #开启ssl#指定证书位置ssl_certificate /etc/ss…

低代码ARM计算机在IIoT中的采集控制生产面板

工业4.0的大潮下工业物联网&#xff08;IIoT&#xff09;已成为推动制造业转型升级的重要动力。其中&#xff0c;低代码ARM嵌入式计算机凭借其出色的性能、灵活的配置以及高度集成化的特点&#xff0c;在工业设备远程监控、维护与诊断方面发挥着关键作用。 一、远程监控与维护 …

【配电网故障定位】基于二进制蝗虫优化算法的配电网故障定位 12节点配电系统故障定位【Matlab代码#75】

文章目录 【获取资源请见文章第5节&#xff1a;资源获取】1. 配电网故障定位2. 二进制蝗虫优化算法3. 部分代码展示4. 仿真结果展示5. 资源获取 【获取资源请见文章第5节&#xff1a;资源获取】 1. 配电网故障定位 配电系统故障定位&#xff0c;即在配电网络发生故障的时候&am…

ArcGIS Server 10发布要素服务时遇到的数据库注册问题总结(一)

工作环境&#xff1a; Windows 7 64 位旗舰版 ArcGIS Server 10.1 ArcGIS Desktop 10.1 IIS 7.0 开始的时候以为10.1发布要素服务和10.0一样&#xff0c;需要安装ArcSDE&#xff0c;后来查阅资料发现不需要&#xff0c;数据库直连方式就可以了。 首先我来说一下发布要素服…

用Wireshark工具对gRPC接口进行本地抓包

前言&#xff1a; 本人一名敲代码的程序员&#xff0c;突然领导安排研究gRPC接口&#xff0c;并且抓包分析&#xff0c; 抓包工具试了Charles、mitmproxy都不行&#xff0c;浪费很多时间&#xff0c;最后使用Wireshark工具对本地启动的gRPC接口成功抓包&#xff0c;关于安装W…

AI智能客服机器人在医疗健康行业中的应用

随着科技的飞速发展&#xff0c;AI智能客服机器人已经逐渐渗透到我们生活的各个领域&#xff0c;而在医疗健康行业中&#xff0c;它的应用更是为人们带来了很多便利。那么&#xff0c;AI智能客服机器人在医疗健康行业中的应用是怎么样的呢&#xff1f;今天&#xff0c;我们就来…

stack和queue模拟实现

前言 上一期我们介绍了stack和queue的使用&#xff0c;本期我们来模拟实现一下他们&#xff01; 本期内容介绍 容器适配器 deque介绍 为什么stack和queue的底层选择deque为默认容器&#xff1f; stack 模拟现实 queue 模拟实现 什么是容器适配器&#xff1f; 适配器是一种设…

Flutter之TabBar篇

总结了一下项目中用到的几种TabBar&#xff0c;针对不同的样式&#xff0c;有采用系统提供的&#xff0c;也有三方插件提供的&#xff0c;也有自定义的&#xff0c;效果如下&#xff08;后续如果遇到新的样式&#xff0c;会不间断地记录更新&#xff0c;避免重复造轮子…&#…

大创项目推荐 深度学习 机器视觉 车位识别车道线检测 - python opencv

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习 机器视觉 车位识别车道线检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满分5分) …

Vue第三方组件使用

文章目录 一、组件传值二、elementui组件使用三、fontawesome图标 一、组件传值 1、父组件与孩子组件传值 在孩子组件中定义props属性&#xff0c;里面定义好用于接收父亲数据的变量。 孩子组件是Movie Movie.vue。注意看在Movie组件里面有props对象中的title和rating属性用…

2024 抖音欢笑中国年(三):编辑器技巧与实践

前言 本次春节活动中&#xff0c;我们大部分场景使用内部的 SAR Creator互动方案来实现。 SAR Creator 是一款基于 TypeScript 的高性能、轻量化的互动解决方案&#xff0c;目前支持了Web和字节内部跨端框架平台&#xff0c;服务于字节内部的各种互动业务&#xff0c;包括但不限…