【大数据】Flink CDC 的概览和使用

Flink CDC 的概览和使用

  • 1.什么是 CDC
  • 2.什么是 Flink CDC
  • 3.Flink CDC 前生今世
    • 3.1 Flink CDC 1.x
    • 3.2 Flink CDC 2.x
    • 3.3 Flink CDC 3.x
  • 4.Flink CDC 使用
  • 5.Debezium 标准 CDC Event 格式详解

1.什么是 CDC

CDCChange Data Capture数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  • ✅ 插入(Insert):当新数据被插入到数据库表中时。
  • ✅ 更新(Update):当数据库表中的现有数据被修改时。
  • ✅ 删除(Delete):当数据从数据库表中被删除时。

2.什么是 Flink CDC

Flink CDC 是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步数据管道实时分析实时应用 等功能。

本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。

🚀 https://github.com/ververica/flink-cdc-connectors

3.Flink CDC 前生今世

3.1 Flink CDC 1.x

Flink CDC 1.x 开启了 Flink 在 CDC 上的实践之路,Flink CDC 1.x 第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x 只能说是可以用,但不能生产上用,为什么:

  • 1.x 版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。
  • 各方面功能还不够完善,比如自动加表、DDL 事件传递等。

在这里插入图片描述

总体而言 Flink CDC 1.x 只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

在这里插入图片描述

3.2 Flink CDC 2.x

2.x 版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。

在这里插入图片描述

3.3 Flink CDC 3.x

近期,Flink CDC 发布了全新的 3.0 版本,并宣布捐赠回 Flink 主项目,在新的 3.0 版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与 SeaTunnelDataXChunjun 等一系列老牌数据集成项目同台竞技,让我们拭目以待。

在这里插入图片描述

4.Flink CDC 使用

在本地启动一个 MySQL 的 Docker 环境。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

创建表:

create database cdc_test;
use cdc_test;

create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

在 IDEA 中新建一个Java 项目。

导入依赖:

<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink-cdc.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
</dependency>

编写代码:

public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(60000L);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("cdc_test.cdc_table") // set captured table
            .username("root")
            .password("debezium")
            .includeSchemaChanges(true)
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
            .print();
        env.execute();
    }
}

添加日志配置:

<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one or more
  ~ contributor license agreements.  See the NOTICE file distributed with
  ~ this work for additional information regarding copyright ownership.
  ~ The ASF licenses this file to You under the Apache License, Version 2.0
  ~ (the "License"); you may not use this file except in compliance with
  ~ the License.  You may obtain a copy of the License at
  ~
  ~    http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->

<configuration>
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
             <pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
          </encoder>
       </appender>

       <root level="INFO">
          <appender-ref ref="STDOUT" />
       </root>
</configuration>

5.Debezium 标准 CDC Event 格式详解

{
    "before": null,
    "after": {
        "id": 1,
        "name": "xing.yu",
        "age": 26,
        "new_column": "dewu"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1702723640000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "cdc_table",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2394,
        "row": 0,
        "thread": 39,
        "query": null
    },
    "op": "c",
    "ts_ms": 1702723640483,
    "transaction": null
}
{
    // 表数据更新前的值,update/delete
    "before": {},
    // 表数据更新后的值,create/update
    "after": {},
    // 元数据信息
    "source": {},
    // 操作类型 c/d/u
    "op": "",
    // 记录解析时间
    "ts_ms": "",
    "transaction": ""
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/299815.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud-高级篇(十三)

前面的主从集群可以应对Redis高并发读的问题&#xff0c;Redis主从之间可以做同步&#xff0c;为了提高主从同步时的性能&#xff0c;单节点Redis的内存不要设置太高&#xff0c;如果内存占用过多&#xff0c;做RDB的持久化&#xff0c;或者做全量同步的时候&#xff0c;导致大…

Kubernetes复习总结(二):Kubernetes容器网络

2、Kubernetes容器网络 1&#xff09;、Docker网络原理 Docker默认使用的网络模型是bridge&#xff0c;这里只讲bridge网络模型 1&#xff09;容器之间通信原理 当安装完docker之后&#xff0c;docker会在宿主机上创建一个名叫docker0的网桥&#xff0c;默认IP是172.17.0.1…

家具电子图册制作方法

​随着互联网的普及&#xff0c;越来越多的人选择在线购物&#xff0c;家具行业也不例外。为了满足消费者对高品质家具的需求&#xff0c;家具电子图册应运而生。与传统纸质图册相比&#xff0c;家具电子图册具有更高的转化率、更低的成本和更快的更新速度。 一、与纸质版相比有…

Linux 目录结构及其说明

Linux 操作系统遵循一种标准的目录结构&#xff0c;称为 Filesystem Hierarchy Standard&#xff08;文件系统层次结构标准&#xff09;&#xff0c;其定义了不同目录的用途和内容。 浅蓝色文字 /&#xff08;根目录&#xff09;&#xff1a; /根目录是整个文件系统的起点&…

迁移学习|代码实现

还记得我们之前实现的猫狗分类器吗&#xff1f;在哪里&#xff0c;我们设计了一个网络&#xff0c;这个网络接受一张图片&#xff0c;最后输出这张图片属于猫还是狗。实现分类器的过程比较复杂&#xff0c;准备的数据也比较少。所以我们是否可以使用一种方法&#xff0c;在数据…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)添加 删除 修改 释放

在上篇文章&#xff08;处理任务队列中的任务&#xff09;中我们讲解了处理任务队列中的任务的具体流程&#xff0c;eventLoopProcessTask函数的作用&#xff1a; 处理队列中的任务&#xff0c;需要遍历链表并根据type进行对应处理,也就是处理dispatcher中的任务。 // 处理任…

Linux之Ubuntu环境Jenkins部署前端项目

今天分享Ubuntu环境Jenkins部署前端vue项目 一、插件安装 1、前端项目依赖nodejs&#xff0c;需要安装相关插件 点击插件管理&#xff0c;输入node模糊查询 选择NodeJS安装 安装成功 2、配置nodejs 点击后进入 点击新增 NodeJS 配置脚手架类型&#xff1a;如果不填 默认npm …

华为HarmonyOS 创建第一个鸿蒙应用 运行Hello World

使用DevEco Studio创建第一个项目 Hello World 1.创建项目 创建第一个项目&#xff0c;命名为HelloWorld&#xff0c;点击Finish 选择Empty Ability模板&#xff0c;点击Next Hello World 项目已经成功创建&#xff0c;接来下看看效果 2.预览 Hello World 点击右侧的预…

[VUE]2-vue的基本使用

目录 vue基本使用方式 1、vue 组件 2、文本插值 3、属性绑定 4、事件绑定 5、双向绑定 6、条件渲染 7、axios 8、⭐跨域问题 &#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅…

RPC基础知识总结

RPC 是什么? RPC&#xff08;Remote Procedure Call&#xff09; 即远程过程调用&#xff0c;通过名字我们就能看出 RPC 关注的是远程调用而非本地调用。 为什么要 RPC &#xff1f; 因为&#xff0c;两个不同的服务器上的服务提供的方法不在一个内存空间&#xff0c;所以&am…

【UML】第17篇 包图

目录 一、什么是包图 二、包图的作用&#xff1a; 三、应用场景&#xff1a; 四、绘图符号的说明&#xff1a; 五、语法&#xff1a; 六、其他要说的 一、什么是包图 包图&#xff08;Package Diagram&#xff09;是一种用于描述系统中包和包之间关系的UML图。包是一种将…

Thonny开发ESP32点灯

简介 ESP32是一款功能强大的低功耗微控制器&#xff0c;由乐鑫&#xff08;Espressif&#xff09;公司开发。它集成了Wi-Fi和蓝牙功能&#xff0c;适用于各种物联网应用。Thonny是一款基于Python的开源集成开发环境&#xff08;IDE&#xff09;&#xff0c;专为MicroPython设计…

【数据分享】2024年我国主要城市地铁站点和线路数据

地铁站点与线路数据是我们经常会用到的一种基础数据。去哪里获取该数据呢&#xff1f; 今天我们就给大家分享一份2024年1月采集的全国有地铁城市的地铁站点与线路数据&#xff0c;数据格式为shp&#xff0c;数据坐标为wgs1984地理坐标。数据中不仅包括地铁&#xff0c;也包括轻…

Java Swing手搓坦克大战遇到的问题和思考

1.游戏中的坐标系颇为复杂 像素坐标系还有行列坐标&#xff0c;都要使用&#xff0c;这之间的互相转化使用也要注意 2.游戏中坦克拐弯的处理&#xff0c;非常重要 由于坦克中心点是要严格对齐到一条网格线&#xff0c;并沿着这条线前进的&#xff0c;如果拐弯不做处理&#…

法线变换矩阵的推导

背景 在冯氏光照模型中&#xff0c;其中的漫反射项需要我们对法向量和光线做点乘计算。 从顶点着色器中读入的法向量数据处于模型空间&#xff0c;我们需要将法向量转换到世界空间&#xff0c;然后在世界空间中让法向量和光线做运算。这里便有一个问题&#xff0c;如何将法线…

AI爆文变现:怼量也有技巧!如何提升你的创作收益

做AI爆文项目&#xff0c;赚小钱是没有问题的。 想要赚大钱&#xff0c;就是要做矩阵&#xff0c;怼量。 之前参加训练营的时候&#xff0c;也是要求怼量。 怼量&#xff0c;加高质量文章&#xff0c;让你的收益更高。 如何提升文章质量&#xff0c;减少AI味&#xff0c;AI…

性能优化-OpenMP基础教程(二)

本文主要介绍OpenMP并行编程技术&#xff0c;编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&am…

动手学深度学习之卷积神经网络之池化层

池化层 卷积层对位置太敏感了&#xff0c;可能一点点变化就会导致输出的变化&#xff0c;这时候就需要池化层了&#xff0c;池化层的主要作用就是缓解卷积层对位置的敏感性 二维最大池化 这里有一个窗口&#xff0c;来滑动&#xff0c;每次我们将窗口中最大的值给拿出来 还是上…

更改ERPNEXT源

更改ERPNEXT源 一&#xff0c; 更改源 针对已经安装了erpnext的&#xff0c;需要更改源的情况&#xff1a; 1, 更改为官方默认源, 进入frapp-bench的目录&#xff0c; 然后执行: bench remote-reset-url frappe //重设frappe的源为官方github地址。 bench remote-reset-url…

ARM工控机Node-red使用教程

嵌入式ARM工控机Node-red安装教程 从前车马很慢书信很远&#xff0c;而现在人们不停探索“科技改变生活”。 智能终端的出现改变了我们的生活方式&#xff0c;钡铼技术嵌入式工控机协助您灵活布建能源管理、大楼自动化、工业自动化、电动车充电站等各种多元性IoT应用&#xff…