canal详解及demo

提示:如何保证Redis中的数据与数据库中的数据一致性?数据同步canal的介绍和demo、大型企业如何实现mysql到redis的同步?使用binlog实时更新redis缓存、canal的接入教程、win下canal的服务器端、canal客户端的创建、连接、测试教程、数据同步方式canal

文章目录

  • 前言
  • 一、canal是什么?
    • 1.1、工作原理
  • 二、canal服务端demo
    • 1.先看一下mysql的配置
    • 2.修改配置
    • 3.编辑my.ini 文件
    • 4.创建canal用户
    • 5.启动canal服务
    • 6.修改canal.properties
    • 7.修改instance.properties
    • 8.启动canal
    • 9.创建canal客户端
  • 三、canal客户端demo
    • 1.maven依赖
    • 2.demo代码
  • 三、测试
    • 1.启动服务器端
    • 2.启动测试端
    • 3.修改数据库
  • 总结


前言

很多时候我们需要数据库和redis(或者其他中间件,mq、es等)有交互。数据量少、并发量少的时候还好,那一旦并发上来了,怎么保证redis和数据库的数据一致性呢?这时就用到数据库和redis的同步工具-canal了。


一、canal是什么?

canal是阿里中基于java写的一个组件,他的官网是:canal官网
。他的作用是读取mysql数据的binlog日志,然后将其转换为对应的数据(数据的变化或者变化后的数据,跟配置有关),并且同步到相关中间件(本次demo中是用的reids中间件)

1.1、工作原理

原理是(从官网截的图)
在这里插入图片描述

二、canal服务端demo

1.先看一下mysql的配置

  • show variables like ‘%log_bin%’; – 判断binlong是on还是off,默认是off,需要打开
  • show variables like ‘binlog_format’; – 判断binlog的记录方式,有row和statement、fix,这里是用row,row是记录数据数据的变化,变成了啥,statement是记录执行的sql,看不到数据的变化以及最终的结果,fix是俩的混合
  • show variables like ‘%server_id%’; 是配置主从的,一般主节点设为1
  • show master status; 表示当前mysql中的binlog日志记录在哪里了,现在记录的多大内存了

2.修改配置

如果配置不满足,则需要修改mysql的配置文件: my.ini
win下的查找配置文件的方式: 服务 -》 属性 -》 可执行路径。 本次我这里的是:
“C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe” --defaults-file=“C:\ProgramData\MySQL\MySQL Server 5.7\my.ini”
在这里插入图片描述

3.编辑my.ini 文件

搜索log-bin,在这个log-bin下面新建两条:
log-bin=mysql-bin
binlog-format=ROW
在这里插入图片描述
然后重新启动
在这里插入图片描述
再次执行下看结果:
在这里插入图片描述

4.创建canal用户

创建用户,专门给canal使用

-- 创建一个canal用户(专门去给canal这个同步数据软件使用)
create user canal IDENTIFIED by 'canal';
-- 给canal赋予select、repl权限
grant select ,REPLICATION SLAVE,REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;

5.启动canal服务

此时拿到一个canal的服务包。 我的百度云盘:云盘

6.修改canal.properties

解压后,进入到conf目录,找到 canal.properties 文件。这里需要注意的其实就一行:
在这里插入图片描述
新增这个:

canal.destinations = example
//如果有多个的话,就用逗号分隔,例如:
//canal.destinations = promption,seckill

这里是一个,就需要在conf中有一个文件夹,文件夹的名字与这个配置的名字一致,如果是两个,就需要两个文件夹了
在这里插入图片描述

7.修改instance.properties

进入这个example文件夹里面,还有一个配置文件(instance.properties),这个就是我们要改的配置文件了
在这里插入图片描述
这个配置文件中,比较重要的几个是:

# 要监听的数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 当前binlog记录的位置
canal.instance.master.journal.name=mysql-bin.000002
# 当前binlog记录到哪了
canal.instance.master.position=154
# canal连接的账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 要监听的数据库表,例如: micromall.sms_home,miscromall.sms_brand
canal.instance.filter.regex=.*\\..*
# 不需要监听的数据库表,例如:mysql\\.test_.*
canal.instance.filter.black.regex=

其中canal.instance.master.journal.name对应的是file。
canal.instance.master.position 对应的是position, 这俩只需要配置一次,后续就会有一个缓存文件(meta.dat)去自己记录最大缓存到哪了
在这里插入图片描述

8.启动canal

到这里以后,就可以启动了,直接双击bin文件夹下的 start.bat
在这里插入图片描述

9.创建canal客户端

然后创建一个canal客户端去测试一下。

三、canal客户端demo

可以参考官网: https://github.com/alibaba/canal/wiki/ClientExample

在这里插入图片描述

1.maven依赖

<!-- canan 的依赖 这个最好是和服务端的版本一致 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2.demo代码

默认是127.0.0

package com.zheng.canal;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * @author: ztl
 * @date: 2024/11/24 22:31
 * @desc:
 */
public class MyCanal {

    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }




}

三、测试

1.启动服务器端

启动服务器端,也就是canal中的服务器,我本次是 D:\java\ruanjian\canal\canal.deployer-1.1.4\bin 下的 startup.bat 脚本![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/34deb7542e564e3a9cab0b03e95438cc.png

2.启动测试端

运行步骤2中的main方法: 此时看到的效果是不断的再刷新
在这里插入图片描述

3.修改数据库

修改数据库中的数据,然后看控制台中的变化:
(本次我修改了mytest库中的course_2023表中的第一条数据,控制台的变化如下:)
在这里插入图片描述
到此,我们就能拿到变化后的数据了,后面的操作我不用说大家也知道:将数据转化为实体,然后set进redis、es等各个地方。


总结

以上是一个简单的demo,大家可以基于demo或者官网去在自己项目中做一些个性化的处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/938465.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

平方根无迹卡尔曼滤波(SR-UKF)的MATLAB例程,使用三维非线性的系统

本MATLAB 代码实现了平方根无迹卡尔曼滤波&#xff08;SR-UKF&#xff09;算法&#xff0c;用于处理三维非线性状态估计问题 文章目录 运行结果代码概述代码 运行结果 三轴状态曲线对比&#xff1a; 三轴误差曲线对比&#xff1a; 误差统计特性输出&#xff08;命令行截图&…

汇编DOSBox 如何使文件可以运行

1.在vscode编写&#xff08;其他也可以&#xff09;如何在vscode中编写汇编语言并在终端进行调试(保姆级别&#xff09;_如何在vscode编译asm-CSDN博客 2.点击ML615中的DOS 2.1在命令行中输入命令 ml 文件名.asm ml 文件名.obj 2.2 将生成的exe文件移动到Assembly里面 这个文件…

QT多线程(三):基于条件等待的线程同步

在多线程的程序中&#xff0c;多个线程之间的同步问题实际上就是多个线程之间的协调问题。例如在以下例子中只有等 ThreadDAQ 写满一个缓冲区之后&#xff0c;ThreadShow 和ThreadSaveFile 才能读取缓冲区的数据。 int buffer[100]; QReadWriteLock Lock; //定义读写锁变量 v…

js 数组方法总结

在 JavaScript 中&#xff0c;数组有许多内置的方法&#xff0c;可以用于操作和处理数组。以下是一些常用的数组方法及其特点&#xff1a; 1. push() - 用途&#xff1a;向数组末尾添加一个或多个元素 - 改变原数组&#xff1a;是 - 返回值&#xff1a;返回数组的新长度 let ar…

MongoDB-副本集

一、什么是 MongoDB 副本集&#xff1f; 1.副本集的定义 MongoDB 的副本集&#xff08;Replica Set&#xff09;是一组 MongoDB 服务器实例&#xff0c;它们存储同一数据集的副本&#xff0c;确保数据的高可用性和可靠性。副本集中的每个节点都有相同的数据副本&#xff0c;但…

驱动开发-入门【1】

1.内核下载地址 Linux内核源码的官方网站为https://www.kernel.org/&#xff0c;可以在该网站下载最新的Linux内核源码。进入该网站之后如下图所示&#xff1a; 从上图可以看到多个版本的内核分支&#xff0c;分别为主线版本&#xff08;mainline&#xff09;、稳定版本&#…

数字电视标准与分类

数字电视相关内容是一个极其成熟且久远的领域&#xff0c;并不像其它的技术方面那么前沿。但是学习技术的另外一个方面也不就是可以维持咱们的好奇心以及认识生活中多个事务后面的技术本质。 近年来&#xff0c;电视领域发生了一系列的变化&#xff0c;电视数字化的进程明显加快…

【WRF安装】WRF编译错误总结1:HDF5库包安装

目录 1 HDF5库包安装有误&#xff1a;HDF5 not set in environment. Will configure WRF for use without.HDF5的重新编译 错误原因1&#xff1a;提示 overflow 错误1. 检查系统是否缺少依赖库或工具2. 检查和更新编译器版本3. 检查 ./configure 报错信息4. 检查系统环境变量5.…

51c嵌入式~单片机~合集3

我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、STM32代码远程升级之IAP编程 IAP是什么 有时项目上需要远程升级单片机程序&#xff0c;此时需要接触到IAP编程。 IAP即为In Application Programming&#xff0c;解释为在应用中编程&#xff0c;用户自己的…

LeetCode 11. 盛最多水的容器(超简单讲解)

11. 盛最多水的容器 题目示例示例1示例2 解题思路双指针实现设计 详细代码 题目 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多…

Spring Boot 集成 Elasticsearch怎样在不启动es的情况下正常启动服务

解释 在spingboot 集成es客户端后&#xff0c;每当服务启动时&#xff0c;服务默认都会查看es中是否已经创建了对应的索引&#xff0c;如果没有索引则创建。基于上面的规则我们可以通过配置不自动创建索引来达到在没有es服务的情况下正常启动服务。 解决办法 在entity类的Docu…

IOTIQS100芯片, TCP 发送数据+NSOSD,data要是hex16进制转换方法

命令&#xff1a;data以十六进制字符串格式发送的数据。 方法 代码 sprintf(temp, "%02X", data[i]);&#xff1a;将当前字节转换为两位宽的大写十六进制字符&#xff0c;并存储在 temp 中。如果需要小写字母&#xff0c;可以将格式说明符改为 "%02x"。 …

Python的3D可视化库【vedo】2-3 (plotter模块) 增删物体、控制相机

文章目录 4 Plotter类的方法4.3 渲染器内的物体操作4.3.1 添加物体4.3.2 移除物体4.3.3 渲染器的内容列表 4.4 相机控制4.4.1 访问相机对象4.4.2 重置相机状态4.4.3 移动相机位置4.4.4 改变相机焦点4.4.5 改变相机朝向的平面4.4.5 旋转相机4.4.6 对齐相机的上朝向4.4.7 缩放 ve…

Mumu模拟器12开启ADB调试方法

在使用安卓模拟器进行开发或调试时&#xff0c;ADB&#xff08;Android Debug Bridge&#xff09;是一项不可或缺的工具。大多数模拟器默认开启了ADB调试功能&#xff0c;但在安装最新版的 Mumu模拟器12 时&#xff0c;可能会遇到 adb devices 无法识别设备的问题。 问题描述 …

【OpenCV计算机视觉】图像处理——平滑

本篇文章记录我学习【OpenCV】图像处理中关于“平滑”的知识点&#xff0c;希望我的分享对你有所帮助。 目录 一、什么是平滑处理 1、平滑的目的是什么&#xff1f; 2、常见的图像噪声 &#xff08;1&#xff09;椒盐噪声 ​编辑&#xff08;2&#xff09; 高斯噪声 &a…

vue CSS 自定义宽高 翻页 剥离 效果

新增需求&#xff0c;客户需要类似PPT的剥离效果用于WEB页面翻页&#xff0c;查找资料后&#xff0c;参考下方的掘金博主的文章&#xff0c;并将HTML修改成vue的页面进行使用。其中宽度、高度改成了变量&#xff0c;样式style中的属性与宽高的关系整理成了公式进行动态计算。 …

单北斗+鸿蒙系统+国产芯片,遨游防爆手机自主可控“三保险”

在当今全球科技竞争日益激烈的背景下&#xff0c;技术自主可控的重要性愈发凸显。它不仅关乎国家安全&#xff0c;更是推动产业升级和经济发展的关键。特别是在一些特殊领域&#xff0c;如防爆通信&#xff0c;自主可控的技术更是不可或缺。遨游通讯推出了一款融合了单北斗、鸿…

Word2Vec:将词汇转化为向量的技术

文章目录 Word2Vec来龙去脉分层Softmax负采样 Word2Vec 下面的文章纯属笔记&#xff0c;看完后不会有任何收获&#xff0c;如果想理解这两种优化技术&#xff0c;给大家推荐一篇博客&#xff0c;讲的很好&#xff1a; 详解-----分层Softmax与负采样 来龙去脉 word2vec,即将词…

虚幻5描边轮廓材质

很多游戏内都有这种描边效果&#xff0c;挺实用也挺好看的&#xff0c;简单复刻一下 效果演示&#xff1a; Linethickness可以控制轮廓线条的粗细 这样连完&#xff0c;然后放到网格体细节的覆层材质上即可 可以自己更改粗细大小和颜色

websocket_asyncio

WebSocket 和 asyncio 指南 简介 本指南涵盖了使用 Python 中的 websockets 库进行 WebSocket 编程的基础知识&#xff0c;以及 asyncio 在异步非阻塞 I/O 中的作用。它提供了构建高效 WebSocket 服务端和客户端的知识&#xff0c;以及 asyncio 的特性和优势。 1. 什么是 WebS…