基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

简介

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:
• 数据同步:用于备份,容灾;
• 数据分发:一个数据源分发给多个下游系统;
• 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。
CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
• 基于查询的 CDC:
• 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
• 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
• 不保障实时性,基于离线调度存在天然的延迟。
• 基于日志的 CDC:
• 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
• 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
• 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
对比常见的开源 CDC 方案,我们可以发现:
• 对比增量同步能力,
• 基于日志的方式,可以很好的做到增量同步;
• 而基于查询的方式是很难做到增量同步的。
• 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
• 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
• 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
• 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
• 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
• 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
• 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
在这里插入图片描述
在这里插入图片描述

1.安装单机版

下载

yum install -y java-1.8.0-openjdk.x86_64
yum install -y  java-1.8.0-openjdk-devel
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
mkdir -p /opt/flink
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/flink 

下载jar复制到/opt/flink/flink-1.17.2/lib

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.4.2</version>
    <scope>provided</scope>
</dependency>

配置

vim /opt/flink/flink-1.17.2/conf/flink-conf.yaml

rest.port: 8081
rest.bind-address: 0.0.0.0
jobmanager.execution.timezone: Asia/Shanghai

启动

/opt/flink/flink-1.17.2/bin/stop-cluster.sh
/opt/flink/flink-1.17.2/bin/start-cluster.sh 

访问http://10.6.8.227:8081/

2.创建 两个mysql 数据库

docker run -p 13306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

docker run -p 23306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

初始化mysql 表结构

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);

在源库中插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

3.CDC 步骤

启动 /opt/flink/flink-1.17.2/bin/sql-client.sh
只能一条语句一条语句的执行

CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '13306',
    'username' = 'root',
    'password' = 'mysql',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );


CREATE TABLE sink_products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:23306/mydb?serverTimezone=Asia/Shanghai',
    'username' = 'root',
    'password' = 'mysql',
    'table-name' = 'sink_products'
  );



insert into sink_products select * from products;

4.验证

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

参考文档

http://124.220.104.235/web/chatgpt

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/226825.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索SpringBoot发展历程

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 循序渐进学SpringBoot ✨特色专栏&…

可以彻底告别手写正则表达式了

大家好&#xff0c;我是风筝&#xff0c;公众号「古时的风筝」 这篇文章的目的是让你能得到完美的正则表达式&#xff0c;而且还不用自己拼。 说到正则表达式&#xff0c;一直是令我头疼的问题&#xff0c;这家伙一般时候用不到&#xff0c;等用到的时候发现它的规则是一点儿…

5G入门到精通 - 5G的十大关键技术

文章目录 一、网络切片二、自组织网络三、D2D技术四、低时延技术五、MIMO技术六、毫米波七、内容分发网络八、M2M技术九、频谱共享十、信息中心网络 一、网络切片 5G中的网络切片是一项关键技术&#xff0c;它允许将整个5G网络分割成多个独立的虚拟网络&#xff0c;每个虚拟网络…

超级好用的IDEA插件推荐

IDEA是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它可以帮助开发人员更加高效地编写、调试和部署软件应用程序。我们在编写完接口代码后需要进行接口调试等操作&#xff0c;一般需要打开额外的调试工具。 今天给大家介绍一款IDEA插件&#xff1a;Api…

YOLO的全面综述:从YOLOv1到最新版本

文章目录 摘要1、简介2、YOLO在不同领域的应用3、目标检测的度量标准和非最大值抑制&#xff08;NMS&#xff09;3.1. AP如何工作&#xff1f;3.2. 计算AP3.3、非极大值抑制&#xff08;NMS&#xff09; 4、YOLO: You Only Look Once4.1、YOLOv1的工作原理4.2、YOLOv1架构4.3、…

Xilinx FPGA——ISE时序约束“建立时间不满足”问题解决记录

一、现象 最近使用赛灵思的FPGA设计项目时&#xff0c;出现时序约束失效问题。 点进去发现如下&#xff1a; 一个始终约束没有生效&#xff0c;有多处报错。 二、原因 出现这个问题的原因是&#xff0c;建立时间不满足。 时序违例的主要原因是建立时间和保持时间不满足要求&a…

用23种设计模式打造一个cocos creator的游戏框架----(九)访问者模式

1、模式标准 模式名称&#xff1a;访问者模式 模式分类&#xff1a;行为型 模式意图&#xff1a;将数据操作与数据结构分离&#xff0c;使得在不修改数据结构的前提下&#xff0c;可以添加或改变对数据的操作。 结构图&#xff1a; 适用于&#xff1a; 当你需要对一个复杂对…

Dockerfile详解#如何编写自己的Dockerfile

文章目录 前言编写规则指令详解FROM&#xff1a;基础镜像LABEL&#xff1a;镜像描述信息MAINTAINER&#xff1a;添加作者信息COPY&#xff1a;从宿主机复制文件到镜像中ADD&#xff1a;从宿主机复制文件到镜像中WORKDIR&#xff1a;设置工作目录 前言 Dockerfile是编写docker镜…

Spring AOP从入门到精通

目录 1. AOP的演化过程 1. 代理模式 2. 动态代理 2.1 JDK动态代理 2.2 Cglib动态代理 3. Spring模式 3.1 ProxyFactory 3.2 ProxyFactoryBean 3.3 AbstractAutoProxyCreator 2. Spring AOP抽象 1. 核心术语 1.1 连接点(JoinPoint) 1.2 切点(Pointcut) 1.3 增强(Ad…

JAVA 多线程并发(一)

1.JAVA 并发知识库 2.JAVA 线程实现/创建方式 2.1. 继承 Thread 类 Thread 类本质上是实现了 Runnable 接口的一个实例&#xff0c;代表一个线程的实例。启动线程的唯一方法就是通过 Thread 类的 start()实例方法。start()方法是一个 native 方法&#xff0c;它将启动一个新线…

使用JMeter创建数据库测试

好吧&#xff01;我一直觉得我不聪明&#xff0c;所以&#xff0c;我用最详细&#xff0c;最明了的方式来书写这个文章。我相信&#xff0c;我能明白的&#xff0c;你们一定能明白。 我的环境&#xff1a;MySQL&#xff1a;mysql-essential-5.1.51-win32 jdbc驱动&#xff1a…

支持生成接口文档!Apipost IDEA插件使用体验

前言 Idea 是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它可以帮助开发人员更加高效地编写、调试和部署软件应用程序,Idea 还具有许多插件和扩展&#xff0c;可以根据开发人员的需要进行定制和扩展&#xff0c;从而提高开发效率,今天我们就来介绍一款…

交易历史记录20231207 记录

昨日回顾&#xff1a; select top 10000 * from dbo.全部&#xff21;股20231207_ALL where 连板天 >1 and DDE大单净量>0 and DDE散户数量<0 and RSI> 80 and 五指标共振>0 and 涨停基因>20 and CONVERT(datetime,最后涨停时间,120) <CONVERT(d…

富时中国A50指数暴跌

近年来&#xff0c;中国股市的波动一直备受关注&#xff0c;而富时中国A50指数更是其中一项备受瞩目的指标之一。然而&#xff0c;近期却出现了一场引人瞩目的暴跌&#xff0c;引发了广泛的关注和讨论。 富时中国A50指数简介 富时中国A50指数&#xff0c;作为富时罗素指数系列…

Linux:缓冲区的概念理解

文章目录 缓冲区什么是缓冲区&#xff1f;缓冲区的意义是什么&#xff1f;缓冲区的刷新方式 理解缓冲区用户缓冲区和内核缓冲区缓冲区在哪里&#xff1f; 本篇主要总结的是关于缓冲区的概念理解&#xff0c;以及再次基础上对文件的常用接口进行一定程度的封装 缓冲区 什么是缓…

linux文件查找

grep: 文件内容过滤 [rootzaotounan ~]# grep 文件内容 路径 #从某个路径下的文件中过滤拥有文件内容的字段 ​ [rootzaotounan ~]# grep -r #递归查找 查找命令配置文件位置 查找命令位置 [rootzaotounan ~]# which 命令名 ​ 查找配置文件位置 [rootzaotounan ~]# wherei…

el-select的多选multible带全选组件二次封装(vue2,elementUI)

1.需求 Select 选择器 多选需要增加 全选 和 取消全选 功能&#xff0c;前端框架为vue2&#xff0c;UI组件为elementUI。 2. 代码 html部分 <template><el-tooltip effect"dark" :disabled"defaultValue.length < 0" :content"defaul…

应用程序中实现用户隐私合规和数据保护合规的处理方案及建议

随着移动互联网的发展&#xff0c;用户隐私合规和数据保护合规已经成为应用开发过程中不可忽视的重要环节。为了帮助开发者实现隐私和数据保护合规&#xff0c;本文将介绍一些处理方案和建议。 图片来源&#xff1a;应用程序中实现用户隐私合规和数据保护合规的处理方案及建议 …

IOday5作业

使用两个线程完成两个文件的拷贝&#xff0c;分支线程1完成前一半内容拷贝&#xff0c;分支线程2完成后一半内容的拷贝&#xff0c;主线程完成资源的回收 #include<myhead.h> //定义结构体 struct file {const char* srcfile;//背拷贝文件路径const char* destfile;//拷…

LeetCode Hot100 200.岛屿数量

题目&#xff1a; 给你一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的的二维网格&#xff0c;请你计算网格中岛屿的数量。 岛屿总是被水包围&#xff0c;并且每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。 此外&#xff0c;你可以…