SpringBoot结合Canal 实现数据同步

1、Canal介绍

     Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。

2、Mysql 主从数据同步

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。

3、Canal 原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。

4、实现过程

       下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压

  修改Canal的配置文件(conf/canal.properties)

指定模式
        canal.serverMode = rabbitMQ
        # 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
        canal.destinations = example

        # rabbitmq 服务端 ip
        rabbitmq.host = 你的ip(注意不要加端口号哦)
        # rabbitmq 虚拟主机
        rabbitmq.virtual.host = /
        # rabbitmq 交换机
        rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)
        # rabbitmq 用户名
        rabbitmq.username = 你的用户名
        # rabbitmq 密码
        rabbitmq.password = 你的密码
        rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties。

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
        canal.instance.mysql.slaveId=10

        # 数据库地址:配置自己的ip和端口
        canal.instance.master.address=你的IP:端口号

        # 数据库用户名和密码
        canal.instance.dbUsername=用户名
        canal.instance.dbPassword=密码

        # 指定库和表
        canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库

        # mq config
        # rabbitmq 的 routing key
        canal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

2 、搭建RabbitMq并且 配置Exchange、Routing key。

3、Spring  Boot 实现代码

   引入依赖

<!--引入rabbitmq集成的依赖-->       
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

编辑配置文件

spring:
  #rabbitmq
  rabbitmq:
    #    host: 192.168.31.189
    host: 192.168.1.12
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:         #手动确认消息 ack
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3
          max-interval: 1000ms
server:
  port: 8888

   定义队列的消息接受的数据对象

@Data
public class CanalMessage<T> {
    private String type;
    private String table;
    private List<T> data;
    private String database;
    private Long es;
    private Integer id;
    private Boolean isDdl;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Long ts;
}

定义rabbitmq的配置,代码如下

@Configuration
@Slf4j
public class RabbitConfig {

    /**
     * 消息序列化配置
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        // SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。
        // 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接
        factory.setConnectionFactory(connectionFactory);
        //使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化
        factory.setMessageConverter(  new Jackson2JsonMessageConverter());
        return factory;
    }
}

消费的代码

@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {
    @RabbitHandler
    public void receive(CanalMessage CanalMessage) {
        //进行同步业务处理。。。 
        
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/618731.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据分析需要注意哪些法律法规

数据分析 前言一、数据处理过程二、数据收集阶段的法律规则数据收集应具备合法、正当、透明原则数据收集应坚持最小必要原则数据收集应遵守知情-同意规则数据收集应遵守目的明确性要求 三、数据储存的法律规则四、数据使用与处理的阶段的法律规则数据安全保护义务按照数据分级分…

全球首例!猪肾移植患者死亡,人类科技与伦理或将面临挑战?

全球首例猪肾移植患者的离世&#xff0c;如同一颗重磅炸弹&#xff0c;在医学界激起千层浪花&#xff0c;让原本充满希望的“死而复生”异种器官移植技术再次被推至风口浪尖。 今年3月&#xff0c;一场与命运的较量在麻省总医院悄然落幕。全球首位接受转基因猪肾移植的患者理查…

Boss让你设计架构图,你懵逼了,解救你的参考图来啦。

架构图是指用于描述系统或软件的结构和组成部分之间关系的图形表示。 它是一种高层次的图示&#xff0c;用于展示系统的组件、模块、接口和数据流等&#xff0c;以及它们之间的相互作用和依赖关系。架构图通常被用于可视化系统的整体设计和组织结构&#xff0c;帮助人们理解系…

数据结构(四)——二叉树和堆(下)

制作不易&#xff0c;三连支持一下呗&#xff01;&#xff01;&#xff01; 文章目录 前言一、二叉树链式结构的实现总结 前言 这篇博客我们将来了解普通二叉树的实现和应用&#xff0c;对大家之前分治和递归的理解有所挑战。 一、二叉树链式结构的实现 1.前置说明 在学习二叉…

对Windows超融合S2D的一些补充

先说一个不知道算不算BUG的例子&#xff0c;下面这个存储池是用两台服务器各2块10G建立的&#xff0c;除去系统保留的部分&#xff0c;显示还有13G可用。 但如果使用其新建虚拟磁盘会显示可用的空间为0 然后我又各增加了一块10G硬盘进池&#xff0c;变成了可用空间为30.5GB …

“二代”接班进行时:达利食品许阳阳揭秘“零食大王”成长密钥

“二代接班”早已不是一个新鲜话题。近年来&#xff0c;随着时间的推移&#xff0c;那些伴随改革开放和中国制造崛起的民营企业&#xff0c;更多的正在经历或已完成“二代接班”。 “毛巾王子”家的洁丽雅&#xff0c;最近因大手笔签约多位代言人而引起讨论的九牧王&#xff0…

基于Unity为Vision Pro 构建游戏的4个关键

为Vision Pro开发游戏时需要考虑的四个关键概念:输入的自然性、物理尺寸的真实匹配、交互空间的充足性以及Unity组件的有效利用。 AVP交互小游戏(Capsule Critters)作者分享了使用Unity构建的几个核心关键: Bounded - 游戏定义:Bounded(有限)是Unity的术语,指的是游戏作…

[AIGC] 几道 redis数据结构相关面试题

文章目录 7. 数据类型的实现8. 什么是空间预分配以及惰性空间释放&#xff0c;SDS 是怎么实现的9. 为什么说 SDS 是二进制安全的呢10. 说说 redis 里的对象11. 使用 RedisObject 的好处12. RedisObject 的具体结构是什么 7. 数据类型的实现 8. 什么是空间预分配以及惰性空间释放…

Tomcat添加服务以及设置开机自启

下载地址连接 Index of /dist/tomcat&#x1f453; 注意点&#xff1a;不要出现中文路径 #环境变量 CATALINA_HOMED:\apache-tomcat-7.0.62 TOMCAT_HOMED:\apache-tomcat-7.0.62 JAVA_HOMED:\tool\jdk1.8.0_111 PATH%CATALINA_HOME%\bin;%CATALINA_HOME%\lib;%CATALINA_HOME%\…

【JVM基础篇】类加载器分类介绍

文章目录 类加载器什么是类加载器类加载器的作用是什么应用场景类加载器的分类启动类加载器用户扩展基础jar包 扩展类加载器和应用程序类加载器扩展类加载器通过扩展类加载器去加载用户jar包&#xff1a; 应用程序加载器 Arthas中类加载器相关功能 文章说明 类加载器 什么是类…

车载测试:为什么你投十份简历,只有一两家公司约你?

最根本的原因&#xff0c;就是一方在汲汲渴求&#xff0c;而恰恰另一方呈现出的关键点让其怦然心动。求者心中有所想&#xff0c;而应者恰恰展现了求者所想的那一面。这就是个中奥妙。 程序员在找工作时&#xff0c;在一开始有三件事情会对能否获得面试机会至关重要&#xff1…

C++笔记(STL标准库)

1.STL六大部件 容器 Containers分配器 Allocators&#xff1a;帮容器分配内存算法 Algorithms迭代器 Iterators&#xff1a;算法通过迭代器操作容器里的数据&#xff0c;是一种泛化的指针适配器 Adapters&#xff1a;修改或扩展已有类或函数的接口以满足特定的需求仿函数 Func…

自动秒收录网址导航分类目录模板

自动秒收录网址导航是一个以html5css3进行开发的免费版网址自动收录模板源码。 模板特点&#xff1a;全站响应式H5网站制作技术&#xff0c;一个网站适应不同终端&#xff0c;模板支持网址导航一键采集入库&#xff0c;免规则文章资讯智能批量采集内置伪原创&#xff0c;本地化…

DSA理解理解蓝桥杯例题signature

一、历史 1991年8月&#xff0c;NIST&#xff08;Nation Institute of Standards and Technology&#xff0c;美国国家标准技术研究所&#xff09;提出了数字签名算法&#xff08;DSA&#xff09;用于他们的数字签名标准&#xff08;DSS&#xff09;中。 DSA是算法&#xff0c…

SCI论文发表:寻找论文选题的7种方法 (建议收藏)!

我是娜姐 迪娜学姐 &#xff0c;一个SCI医学期刊编辑&#xff0c;探索用AI工具提效论文写作和发表。 近期有好几个同学问娜姐关于论文选题的问题&#xff1a;什么样的选题是好的选题-有价值好发表。这篇来分享7种寻找有效选题的方法。 在我们寻找论文选题的过程中&#xff0c;…

Linux配置两个局域网间的网络转发

网络拓扑如上图所示&#xff0c;有192.168.1.0(255.255.255.0)&#xff0c;192.168.2.0(255.255.255.0)两个局域网。若要使host1可直接通过ip地址访问host3&#xff0c;则需在host2中配置路由转发。 host2 配置静态路由&#xff08;系统一般会自动配置&#xff09; # 添加静…

Electron、QT、WPF三强争霸,该支持谁呢?

Electron、QT、WPF都是跨平台的桌面应用开发框架&#xff0c;都是非常流行的&#xff0c;作为开发者该选用哪个呢&#xff1f;本文从多个角度分析一下。 一、定义 Electron、Qt 和 WPF 都是用于创建桌面应用程序的框架或工具&#xff0c;它们各自有着不同的特点和优势。 Elec…

Windows下安装 Emscripten 详细过程

背景 最近研究AV1编码标准的aom编码器&#xff0c;编译的过程中发现需要依赖EMSDK&#xff0c;看解释EMSDK就是Emscripten 的相应SDK&#xff0c;所以此博客记录下EMSDK的安装过程&#xff1b;因为之前完全没接触过Emscripten 。 Emscripten Emscripten 是一个用于将 C 和 …

论文盲审吐槽多,谁给盲审不负责的老师买单?如何看待浙江大学「一刀切」的研究生学位论文双盲评审制度?

::: block-1 “时问桫椤”是一个致力于为本科生到研究生教育阶段提供帮助的不太正式的公众号。我们旨在在大家感到困惑、痛苦或面临困难时伸出援手。通过总结广大研究生的经验&#xff0c;帮助大家尽早适应研究生生活&#xff0c;尽快了解科研的本质。祝一切顺利&#xff01;—…

Windows11“重置此电脑”后,Edge浏览器在微软应用商店显示“已安装”,但是开始菜单搜索不到的解决办法

Windows11“重置此电脑”后&#xff0c;Edge浏览器在微软应用商店显示“已安装”&#xff0c;但是开始菜单搜索不到的解决办法 为什么重新使用Edge&#xff1f;问题描述不该更新可用更新问过AI&#xff08;通义千问&#xff09;&#xff0c;并且AI提供方法全都无效。现象 操作步…