SpringBoot集成Canal实现MySQL实时同步数据到Redis

MySQL增量数据同步利器Canal环境搭建流程

软件环境

  • JDK17.0.12

  • canal-server1.1.7

  • canal-client1.1.7

  • MySQL5.7

  • IDEA2024.2.0.2

我们先看Canal1.1.7源码对应的项目结构

在这里插入图片描述

1、基于源码编译打包

# 源码下载地址
https://github.com/alibaba/canal
# 执行以下命令,打包编译
mvn clean install -Dmaven.test.skip=true

在这里插入图片描述

2、搭建canal-admin
2.1 安装Ebean enhancer插件
安装和编译时启用,如下图
在这里插入图片描述

在这里插入图片描述

2.2 创建数据库
创建canal_manager数据库和执行对应脚本,脚本在\canal-canal-1.1.7\admin\admin-web\src\main\resources目录下

在这里插入图片描述

在这里插入图片描述

2.3 修改配置文件
按照下图修改数据库配置信息

在这里插入图片描述

2.4 启动canal管理后台
基于源码启动管理后台

在这里插入图片描述

访问以下地址 http://127.0.0.1:8089/

默认用户名及密码 admin/123456

在这里插入图片描述

3、搭建canal-server
3.1 canal-server端配置
使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

3.2 启动canal-server
基于源码启动canal-server,启动成功后,在管理后台查看对应server

在这里插入图片描述

在这里插入图片描述

3.3 修改MySQL配置信息
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3.4 修改instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 192.168.0.104:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

在这里插入图片描述

3.5 启动instance
在这里插入图片描述

基于MySQL日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

软件环境

  • JDK17.0.12

  • SpringBoot3.4.0

  • redisson-spring-boot-starter3.38.1

  • Redis6.x

  • Canal-Server1.1.7

  • Canal-Admin1.1.7

  • Canal-Client1.1.7

  • IDEA2024.2.0.2

项目结构

在这里插入图片描述

1、项目搭建
1.1 Canal项目依赖项

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>cn.itbeien</groupId>
        <artifactId>springboot3-labs-master</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>springboot-canal</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <canal.client-version>1.1.7</canal.client-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>${canal.client-version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>${canal.client-version}</version>
        </dependency>
       <!-- <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>-->
    </dependencies>
</project>

1.2 配置信息

#application.properties
server.port=2001
server.servlet.context-path=/canal
#canal
canal-monitor-mysql.host=192.168.0.105
#canal.properties  canal.port
canal-monitor-mysql.port=11111

spring.data.redis.host=192.168.0.104
spring.data.redis.port=6379
spring.data.redis.password=Rootpwd20240809
# redis数据库编号
spring.data.redis.database=8

1.3 代码实现
canal实时从mysql获取数据,同步到分布式缓存redis,完成业务缓存刷新

package cn.itbeien.canal.util;


import cn.itbeien.canal.entity.SysUser;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Slf4j
@Component
public class CanalUtil {


    @Value("${canal-monitor-mysql.host}")
    String canalMonitorHost;

    @Value("${canal-monitor-mysql.port}")
    Integer canalMonitorPort;

    @Autowired
    private RedisClient redisClient;

    private final static int BATCH_SIZE = 10000;

    /**
     * 启动服务
     */
    // @Async("TaskPool")
    public void startMonitorSQL() {
        while (true) {
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "0.104", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        log.info("empty count :{} " , emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        printEntry(message.getEntries());
                    }

                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }

                log.info("empty too many times, exit");

            } catch (Exception e) {
                log.error("成功断开监测连接!尝试重连:{}",e);
            } finally {
                connector.disconnect();
                //防止频繁访问数据库链接: 线程睡眠 10秒
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    log.error("成功断开监测连接!尝试重连:{}",e);
                }
            }
        }
    }


    private  void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                //canal获取mysql数据库删除事件
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {//canal获取mysql数据库新增事件
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    log.info("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    log.info("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private  void printColumn(List<CanalEntry.Column> columns) {
        SysUser sysUser = new SysUser();
        for (CanalEntry.Column column : columns) { //一行数据库数据=一个对象
            log.info(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            //获取字段名称和字段值,设置到实体类中
            if(column.getName().equalsIgnoreCase("id")){
                sysUser.setId(column.getValue());
            }else if(column.getName().equalsIgnoreCase("name")){
                sysUser.setName(column.getValue());
            }else if(column.getName().equalsIgnoreCase("age")){
                sysUser.setAge(Integer.valueOf(column.getValue()));
            }else if(column.getName().equalsIgnoreCase("email")){
                sysUser.setEmail(column.getValue());
            }
        }
        if(sysUser.getId()!=null && !"".equals(sysUser.getId())){
            String userJson = JSON.toJSONString(sysUser);
            redisClient.set(sysUser.getId(),userJson);//保存用户数据
        }
        log.info(sysUser.toString());
    }

}

2、MySQL数据同步到Redis
2.1 测试代码

package cn.itbeien.canal.test;

import cn.itbeien.canal.util.CanalUtil;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
public class CanalApplication {
    @Autowired
    private CanalUtil canalUtil;
    @Test
    public void test(){
        this.canalUtil.startMonitorSQL();
    }
}

2.2 环境准备
2.2.1 启动canal-admin
在这里插入图片描述

2.2.2 启动canal-server
在这里插入图片描述

2.2.3 启动canal-instance
在这里插入图片描述

2.2.4 启动canal-client
启动canal-client监听mysql增量数据,运行cn.itbeien.canal.test.CanalApplication

在这里插入图片描述

3、整体流程测试
在MySQL中新增一条数据

在这里插入图片描述

在canal-client端进行数据变更的监听

在这里插入图片描述

最后我们查询redis分布式缓存是否有id为88的这条数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939083.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式驱动开发详解16(音频驱动开发)

文章目录 前言WM8960简介I2S协议接口说明 SAI音频接口简介驱动框架简介设备树配置内核使能声卡设置与测试 后续参考文献 前言 该专栏主要是讲解嵌入式相关的驱动开发&#xff0c;但是由于ALSA驱动框架过于复杂&#xff0c;实现音频编解码芯片的驱动不是一个人能完成的&#xf…

OpenGL ES 03 加载3张图片并做混合处理

OpenGL ES 02 加载3张图片并做混合处理 什么是纹理单元纹理单元的作用使用纹理单元的步骤详细解释加载图片并绑定到到GPU纹理单元采样器的设置1.设置采样器变量的纹理单元编号&#xff0c;目的是为了告诉纹理采样器&#xff0c;从哪个纹理单元采集数据2.如果你没有显式地设置采…

JAVA没有搞头了吗?

前言 今年的Java程序员群体似乎承受着前所未有的焦虑。投递简历无人问津&#xff0c;难得的面试机会也难以把握&#xff0c;即便成功入职&#xff0c;也往往难以长久。于是&#xff0c;不少程序员感叹&#xff1a;互联网的寒冬似乎又一次卷土重来&#xff0c;环境如此恶劣&…

短视频矩阵贴牌:打造品牌新势力的策略与实践

在数字化浪潮席卷全球的今天&#xff0c;短视频以其独特的魅力迅速崛起&#xff0c;成为连接用户与品牌的重要桥梁。企业为了快速抢占市场&#xff0c;提升品牌影响力&#xff0c;纷纷探索短视频矩阵贴牌这一新兴模式。本文将深入探讨短视频矩阵贴牌的概念、优势、实施流程及注…

视频生成Sora的全面解析:从AI绘画、ViT到ViViT、TECO、DiT、VDT、NaViT等

前言 真没想到&#xff0c;距离视频生成上一轮的集中爆发(详见《Sora之前的视频生成发展史&#xff1a;从Gen2、Emu Video到PixelDance、SVD、Pika 1.0》)才过去三个月&#xff0c;没想OpenAI一出手&#xff0c;该领域又直接变天了 自打2.16日OpenAI发布sora以来(其开发团队包…

简易记事本项目(基于Vue 3 + Element Plus + SSM 个人事件管理系统)

项目简介 点滴365是一个基于 Vue 3 Element Plus SSM 开发的个人事件管理系统,旨在帮助用户高效管理 个人日程 和 待办事项。系统支持日记撰写、待办事项管理、数据统计分析、图片上传、定时提醒、实时天气等功能,让用户可以更好地记录生活点滴、规划工作任务。 核心技术栈…

【C语言】头文件

所有学习过C语言的朋友都熟悉这样一段代码&#xff1a; #include <stdio.h>int main(int argc, char *argv[]) {return 0; }那么&#xff0c;你真的了解 <stdio.h> 吗&#xff1f; <stdio…

ChatGPT Search开放:实时多模态搜索新体验

点击访问 chatTools 免费体验GPT最新模型&#xff0c;包括o1推理模型、GPT4o、Claude、Gemini等模型&#xff01; ChatGPT Search&#xff1a;功能亮点解析 本次更新的ChatGPT Search带来了多项令人瞩目的功能&#xff0c;使其在搜索引擎市场中更具竞争力。 1. 高级语音模式&…

概率论得学习和整理31: 连续型随机变量的概率本质是求面积,均匀分布等

目录 1 连续性随机变量 2 连续性随机变量和 离散型随机变量&#xff0c;分布的区别 3 不要混淆概念 4 均匀分布的相关 4.1 定义 4.2 例子 1 连续性随机变量 连续性随机变量最大的特点&#xff0c;单个点上的概率0多了一个分布函数&#xff0c;因为从1维变2维了&#xff…

跟沐神学读论文-论文阅读管理

摘要 近期有读论文的需求&#xff0c;就需要去了解一下论文到底要怎么读&#xff0c;同一个系列之间的论文如何作整理和归纳&#xff0c;之前也有了解过市面上有成熟的论文阅读工具&#xff0c;但是对于学生党来讲没什么性价比&#xff0c;在B站上看到沐神有讲解他的思路Typor…

java_断点调试(debug)

按照如下配置好后&#xff0c;即可点击“F7”,进入相应的方法&#xff0c;查看源码 package com.hspedu.debug_;//debug对象创建的过程&#xff0c;加深对调试的理解 public class Debug01 {public static void main(String[] args) {//创建对象的流程//&#xff08;1&#xff…

c++数据结构算法复习基础--13--基数算法

基数排序 - 桶排序 时间复杂度 O(n*d) – d为数据的长度 每次比较一位&#xff08;个位、十位。。。&#xff09;&#xff0c;所以取值范围就为0-9。 根据该特点&#xff0c;设计桶的概念 – 0号桶、1号桶… 1、思想 1&#xff09;找出最长的数字&#xff0c;确定要处理的…

微信小程序TTS解决方案

微信小程序原生语音合成 API&#xff08;基础且简单&#xff09; 介绍&#xff1a;微信小程序提供了基础的语音合成能力。通过wx.createInnerAudioContext()等相关API&#xff0c;可以实现简单的语音播放功能。不过它主要是用于音频播放&#xff0c;对于完整的文本到语音&#…

matlab绘图时设置左、右坐标轴为不同颜色

目录 一、需求描述 二、实现方法 一、需求描述 当图中存在两条曲线&#xff0c;需要对两条曲线进行分别描述时&#xff0c;应设置左、右坐标轴为不同颜色&#xff0c;并设置刻度线&#xff0c;且坐标轴颜色需要和曲线颜色相同。 二、实现方法 1.1、可以实现&#xff1a; 1…

Vue3动态表单实现

实现方法&#xff1a;通过<component />标签动实现动态表单渲染 component标签&#xff1a; 在vue中 component 标签用于动态组件标签的渲染。它允许在同一个挂载点上条件渲染不同的组件&#xff0c;通过is属性可以渲染指定的属性 在上面的例子中&#xff0c;通过调用…

[C++]C++工具之对异常情况的处理(throw、catch、try)以及用命名空间避免同名冲突

一、C 异常处理&#x1f60a; 1.1 定义 C 中的异常处理用于应对程序运行中的异常情况&#xff08;如除零、数组越界等&#xff09;&#xff0c;通过 try-catch 机制捕获和处理错误&#xff0c;防止程序崩溃。 异常是程序运行时意外发生的事件&#xff0c;可以通过抛出&#xf…

游戏引擎学习第53天

仓库: https://gitee.com/mrxiao_com/2d_game 回顾 现在我们正进行游戏结构的重构&#xff0c;目的是为了更合理地安排游戏的组织方式&#xff0c;模拟玩家周围的实体。我们将这些实体分为两类&#xff1a;一类是当前正在屏幕上显示的实体&#xff0c;另一类是被映射到低频更…

【六足机器人】04上位机开发

图&#xff1a;QT界面效果图 一、主要功能介绍 1.1 登录界面 登录界面&#xff0c;用来判断是否账号密码输入正确&#xff0c;错误将会弹出消息框。 void first::on_enroll_clicked(){if(ui->account->text()"共创芯未来"&&ui->password->text…

RockyLinux9编译安装MySQL5.7

原文链接&#xff1a;RockyLinux9编译安装MySQL5.7 - Liu Zijians Blog | 刘子健的博客 本文最后更新于 2024年12月15日 使用源码编译安装MySQL5.7 1.下载 打开MySQL-Community-Server官方下载页面:https://downloads.mysql.com/archives/community/ 筛选出要下载的版本&…

什么是3DEXPERIENCE SOLIDWORKS,它有哪些角色和功能?

将业界领先的 SOLIDWORKS 3D CAD 解决方案连接到基于单一云端产品开发环境 3DEXPERIENCE 平台。您的团队、数据和流程全部连接到一个平台进行高效的协作工作&#xff0c;从而能快速的做出更好的决策。 目 录&#xff1a; ★ 1 什么是3DEXPERIENCE SOLIDWORKS ★ 2 3DEXPERIE…