FlinkCDC详解

1、FlinkCDC是什么

1.1 CDC是什么

CDC是Chanage Data Capture(数据变更捕获)的简称。其核心原理就是监测并捕获数据库的变动(例如增删改),将这些变更按照发生顺序捕获,将捕获到的数据,写入数据库种如神策数据的核心kudu、doris、mysql、kakfa等。

1.2 CDC的实现方式

1.2.1 基于查询的CDC

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。

1.2.2 基于日志的CDC

  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

1.2.3 常见的开源的CDC方案比较

在这里插入图片描述

1.2.4 个人对于CDC领域的一些浅见

其实对于CDC领域在数仓行业中很常见,无论是离线数仓也好还是实时数仓也好,或者说是业务系统也好,例如京东就是使用CDC方案来同步优惠卷的。其实在很多的CDC的同步方案中,大部分公司其实选用的是第一种,查询同步方案,为什么这么做呢,很多人可能会问,实时同步不好吗,我想说的是实时的CDC太复杂,虽然一致性不高,但是其实运营或者其他人员并不需要这么高的实时性,可能某些领域需要,当然也有很多的表结构设计没有update_time字段,这样的话如果同步一张表,可能会有点麻烦,但是并非是不能同步,如果数据量不大的话,或者有其他自增键的话会很方便,但是如果没有的话就会很麻烦,也可以做,可以做整行的md5这里我就不一一赘述了,在进行查询cdc同步的一些情况。日志cdc呢,其实根本原理就是监控类似于mysql的binlog。可以让整个数据的增删改,进行捕获,从而可以达到两个数据的一致性,当然这个一致性并不是实时的,哪怕是mysql的主从都有可能延迟,更别提咱们监控binlog了,当然这种延迟几乎很少见,业务也不会发现,这种CDC虽然听上去很好,但是实现较为困难,限制比较大,例如下游的数据源要支持改,不像离线可以用拉链表来解决。但是这种方式真的很好,如果开发人员和架构设计人员以及数据设计人员的设计比较好,这种方式效果是最棒的,我司的mysql同步器就支持这两种方式,根据使用人员的喜好来进行选择。

2、Flink CDC的原理

2.1 1.x Flink CDC

Flink1.x的cdc依赖于Debezium组件,debezium为了保证数据的一致性,在全量读取时,会加锁。
此时呢会分为全局锁权限和无全局锁权限。

在这里插入图片描述那么为什么debezium为什么要这么做呢,要加上全局锁呢,因为数据一致性问题,这就涉及到数据库的全局锁和表锁了,数据库的全局锁,以mysql为例,全局锁就是对整个数据库实例加锁。MySQL 提供了一个加全局读锁的方法,命令是Flush tables with read lock (FTWRL)。
当你需要让整个库处于只读状态的时候,可以使用这个命令,之后其他线程的以下语句会被阻塞:数据更新语句(数据的增删改)、数据定义语句(包括建表、修改表结构等)和更新类事务的提交语句。一般全局锁的使用场景在数据库备份上,当然如果主库加锁的话,会导致一些问题。例如加锁后,这个数据库实例无法更新,业务基本就停止了。从库呢,也不能从binlog拉取数据,这就导致了主从延迟,假如有的业务使用的是从库的话就会出现问题。当然全局锁有问题,那么不加锁会导致什么问题呢,数据不一致问题:
比如手机卡,购买套餐信息

这里分为两张表 u_acount (用于余额表),u_pricing (资费套餐表)
步骤:
1. u_account 表中数据 用户A 余额:300
    u_pricing 表中数据 用户A 套餐:空
2. 发起备份,备份过程中先备份u_account表,备份完了这个表,这个时候u_account 用户余额是300
3. 这个时候套用户购买了一个资费套餐100,餐购买完成,写入到u_print套餐表购买成功,备份期间的数据。
4. 备份完成

可以看到备份的结果是,u_account 表中的数据没有变, u_pricing 表中的数据 已近购买了资费套餐100.

哪这时候用这个备份文件来恢复数据的话,用户A 赚了100 ,用户是不是很舒服啊。但是你得想想公司利益啊。  
也就是说,不加锁的话,备份系统备份的得到的库不是一个逻辑时间点,这个数据是逻辑不一致的。

当然mysql的备份工具,mysqldump可以在备份的时候支持更新,基于MVCC的机制。MVCC (Multiversion Concurrency Control),多版本并发控制。顾名思义,MVCC 是通过数据行的 多个版本 管理来实现数据库的 并发控制。这项技术使得在InnoDB的事务隔离级别下执行 一致性读操 作有了保证。换言之,就是为了查询一些正在被另一个事务更新的行,并且可以看到它们被更新之前的值,这样在做查询的时候就不用等待另一个事务释放锁。
不再深入解释mysql的核心机制了。
表锁是什么呢,顾名思义就是锁住了整张表。在加表锁的表上,无法进行DDL、DML操作。当然在mysql5.5以后,有一个表锁是MDL,MDL不需要显示的使用,在访问一个表的时候会被自动加上。MDL 的作用是,保证读写的正确性。你可以想象一下,如果一个查询正在遍历一个表中的数据,而执行期间另一个线程对这个表结构做变更,删了一列,那么查询线程拿到的结果跟表结构对不上,肯定是不行的。因此,在 MySQL 5.5 版本中引入了 MDL,当对一个表做增删改查操作的时候,加 MDL读锁;当要对表做结构变更操作的时候,加 MDL 写锁。

  • 读锁之间不互斥,因此你可以有多个线程同时对一张表增删改查。
  • 读写锁之间、写锁之间是互斥的,用来保证变更表结构操作的安全性。因此,如果有两个线程要同时给一个表加字段,其中一个要等另一个执行完才能开始执行。

MDL锁有一些问题,假如在多个读session中进行更改表结构操作的话,可能会卡死。

这个就是debezium在flink1.x中的应用。

2.2 2.x Flink CDC

Flink 2.x不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍:

增量快照读取:Flink 2.x引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中,Flink首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。
精确一次性处理:Flink 2.x引入了Exactly-Once语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器,可以利用Flink的checkpoint机制来确保精确一次性处理。
动态加表:Flink 2.x支持动态加表,通过使用savepoint来复用之前作业的状态,解决了动态加表的问题。
无主键表的处理:Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中,Flink可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。

对于Flink 2.x的CDC方案呢,可以理解为全量读取时,在划分chunk块的时候,采用了查询读,他是将主键进行切分的。默认一个chunk8096条数据,知道这些就可以了。
2.x的 Flink cdc实现较为复杂,这里就不一一赘述了。

3、FlinkCDC的使用

3.1 导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
              <!-- 可以将依赖打到jar包中 -->
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

3.2 代码实操

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkCDC {
    public static void main(String[] args) throws Exception {

        //1.获取Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("cdc_test")	//监控的数据库
                .tableList("cdc_test.user_info")	//监控的数据库下的表
                .deserializer(new StringDebeziumDeserializationSchema())//反序列化
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC");
    }
}

4、Flink CDC输出数据解析

4.1 数据的数据结构

在这里插入图片描述flink cdc的输出结果大概可以分为 before、after、
before代表变更前数据,after代表变更后数据。

还有个op,这个op代表的是事务的操作:
r:读取历史
d:删除
c:创建
u:更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/399573.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Cesium for Unreal 从源码编译到应用——插件编译

一、安装环境 Unreal Engine 5.3 CMake 3.17.5 Microsoft Visual Studio 2019 二、源码准备 下载cesium-unreal-samples工程。 git clone https://github.com/CesiumGS/cesium-unreal-samples.git 然后在工程目录创建Plugins文件夹&#xff0c;并下载cesium-unreal工程。 …

演练纪实│同创永益助力中交财务公司成功开展灾备应急演练

12月26日同创永益协助中交财务公司顺利完成核心业务系统、高性能云平台等关键业务系统的子系统验证演练&#xff0c;以及模拟切换演练、同城灾备应急演练。 同创永益北方交付二组的同事在演练前与中交财务公司演练负责人紧密沟通&#xff0c;展现了出色的专业素养和团队协作能力…

Vue3学习——标签的ref属性

在HTML标签上&#xff0c;可以使用相同的ref名称&#xff0c;得到DOM元素ref放在组件上时&#xff0c;拿到的是组件实例&#xff08;组件defineExpose暴露谁&#xff0c;ref才可以看到谁&#xff09; <script setup lang"ts"> import RefPractice from /compo…

测试C#调用Emgucv读取并显示视频文件

微信公众号“CSharp编程大全”的文章《C# 视频播放》介绍了基于emgucv读取视频文件并播放的用法&#xff08;百度文章名没有找到对象的文章地址&#xff0c;有兴趣的可以在微信中搜索该公众号并查看文章具体内容&#xff09;&#xff0c;本文学习并测试Emgucv播放视频文件的基本…

拼多多关键字搜索API-通过关键字获取拼多多商品列表商品价格商品id商品链接

拼多多根据关键词取商品列表 API 返回值说明 item_search-根据关键词取商品列表 公共参数 请求地址: pinduoduo/item_search 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&…

Mysql系列之命令行登录、连接工具登录、数据库表常用命令

登录与常用命令 连接工具登录命令行登录数据库1、查看数据库2、指定数据库3、查看当前数据库4、建库语句 数据表1、查看数据表2、查看表结构信息3、查看建表语句4、建表语句 连接工具登录 首先下载mysql连接工具&#xff0c;解压后直接打开软件&#xff0c;按以下步骤操作&…

☀️将大华摄像头画面接入Unity 【2】配置Unity接监控画面

一、前言 上一篇咱们将大华摄像头接入到电脑上了&#xff0c;接下来准备接入到unity画面。 接入到监控就涉及到各种视频流的格式rtsp、rtmp、m3u8。 Unity里有一些播放视频流的插件&#xff0c;主要的就是AVPro Video 和 UMP等&#xff0c;这次我用的是UMP 最好使用2.0.3版本…

顺序表详解(SeqList)

本文使用C语言进行顺序表的代码实现。 博主将使用代码和相关知识相结合的方式进行讲解&#xff0c;简单易懂&#xff0c;懵懂的大学生一听就会~ 顺序表是一种线性表的存储结构&#xff0c;它将数据元素存储在一段连续的存储空间中&#xff0c;每个元素占据一个存储单元&#x…

VSCODE中使用Django处理后端data和data models

链接&#xff1a; Python and Django tutorial in Visual Studio Code MVC的理解 在实际的程序中采用MVC的方式进行任务拆分。 Model&#xff08;模型&#xff09;负责封装应用程序的数据和业务逻辑部分。Model包含数据结构&#xff0c;数据处理逻辑以及相关的操作方法&#…

Camunda和SpringBoot的兼容版本

官网 https://docs.camunda.org/manual/7.15/user-guide/spring-boot-integration/version-compatibility/ Camunda和SpringBoot的兼容版本

安达发|APS生产排程的多机台产线详解

在生产管理中&#xff0c;APS&#xff08;高级计划与排程&#xff09;系统它可以帮助企业实现生产过程的优化和效率提升。特别是在多机台产线的生产环境中&#xff0c;APS系统的作用更加明显。本文将详细解析APS生产排程的多机台产线&#xff0c;包括允许使用的最大设备数&…

ai数字仿真辩论主持人提升用户体验

Ai虚拟主持人是元宇宙和AI人工智能技术在播音主持行业的重要应用&#xff0c;AI虚拟主持人能极大提升新闻资讯内容的精准度&#xff0c;改变单一的播报形式。 首先&#xff0c;AI虚拟主持人极大地提升了节目的制作效率和灵活性。传统主持人需要花费大量时间进行彩排和录制&…

【JGit】分支管理实践

本文紧接【JGit】简述及学习资料整理。 以下梳理了使用 JGit 进行 Git 操作的实践 JGit实践 主函数 public static void main(String[] args) throws Exception {String localDir "D:\\tmp\\git-test\\";String gitUrl "http://192.168.181.1:3000/root/g…

PFA容量瓶在半导体晶圆化学机械抛光中的用处是什么?

PFA容量瓶又称可溶性聚四氟乙烯容量瓶、特氟龙容量瓶容量瓶&#xff0c;有螺纹和插口两种可供选择&#xff0c;常用有10ml、25ml、50ml、100ml、250ml、500ml、1000ml规格 Teflon系列PFA容量瓶是一个透明的长颈瓶&#xff0c;瓶体为梨形&#xff0c;便于摇荡液体和刷洗。每一个…

Linux进程(一)信号-----信号产生

前言 在 Linux 中&#xff0c;进程具有独立性&#xff0c;进程在运行后可能 “放飞自我”&#xff0c;这是不利于管理的&#xff0c;于是需要一种约定俗成的方式来控制进程的运行&#xff0c;这就是 进程信号&#xff0c;本文将会从什么是进程信号开篇&#xff0c;讲述各种进程…

harmony 鸿蒙系统学习 安装ohpm报错 ohpm install failed

一. 安装配置 DevEco Studio 安装包时报错 execute ohpm install failed. Install task failed: ArkTS 3.2.12.5. Install ArkTS dependencies failed. 解决办法 找原因&#xff0c;首先&#xff0c;我的电脑中之前安装过node&#xff0c;也许是因为这个。&#xff08;其实…

Redis之缓存雪崩问题解决方案

文章目录 一、书接上文二、介绍三、解决方案1. 锁2. 不同的过期时间3. 缓存预热和定时任务 一、书接上文 Redis之缓存穿透问题解决方案实践SpringBoot3Docker 二、介绍 缓存雪崩&#xff0c;指大量的缓存失效&#xff0c;大量的请求又同时落在数据库。主要的一种诱因是key设…

先进电机技术——步进电机与伺服电机

一、步进电机 步进电机是一种特殊类型的电动机&#xff0c;它的工作方式是将输入的电脉冲信号转换成精确的机械运动——通常是转子的角位移或直线移动。每接收到一个电脉冲信号&#xff0c;步进电机内部的定子绕组按顺序通电&#xff0c;产生磁场变化&#xff0c;使得与之相互…

企业级人脸美颜和美妆解决方案

视觉营销日益重要&#xff0c;而人脸美颜和美妆作为视觉营销的关键环节&#xff0c;更是受到了众多企业的关注。美摄科技&#xff0c;作为国内领先的人脸美颜和美妆解决方案提供商&#xff0c;以其先进的技术和卓越的产品&#xff0c;助力企业打造完美视觉体验&#xff0c;提升…

STM32引脚重定义问题

最近在搞资源管理&#xff0c;发现有些引脚不能用 比如这个PE引脚。我想用他输出PWM&#xff0c;但是不能用&#xff0c;我也重定义了&#xff0c;还是不能用。回去翻看了技术手册。 RCC_APB2PeriphClockCmd(RCC_APB2Periph_AFIO, ENABLE); //重映射引脚功能&#xff0c;需…