Canal笔记:安装与整合Springboot模式Mysql同步Redis

官方文档

https://github.com/alibaba/canal

使用场景

学习一件东西前,要知道为什么使用它。

1、同步mysql数据到redis

常规情况下,产生数据的方法可能有很多地方,那么就需要在多个地方中,都去做mysql数据同步到redis的处理,相对麻烦很多。
可以使用canal,对mysql进行集中,统一的处理。

概述

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

原理

MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relaylog 中事件,将数据变更反映它自己的数据

image.png

canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

架构

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

image.png

安装和准备

Centos7安装Canal

1、Mysql配置

开启binlog日志

如果是使用Linux安装的话,则直接找my.cnf,直接修改内容即可

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

docker安装

1、安装my.cnf文件

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、修改docker-compose.yaml内容
配置挂载卷 前面路径为my.cnf的路径,/etc/mysql/conf.d的路径
image.png

3、查询是否成功

show variables like "%server_id%";

image.png

show variables like 'log_bin';

image.png

获取bin_log当前位置

show master status;

image.png
获取后,记录下来,然后不要动数据库了

创建canal数据库用户
这里可以使用
mysql -uroot -p登录进入设置,
或者直接可视化页面

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、Canal下载

下载
方式一:关注公众号 I am Walker 回复canal

方式二:https://github.com/alibaba/canal/releases?page=2
image.png

# 创建文件夹
mkdir /opt/env/canal
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal

3、配置文件修改

进入canal/conf/example/instance.properties
主要修改下列相关参数

# 数据库
canal.instance.master.address=127.0.0.1:3306
# bin log日志
canal.instance.master.journal.name=mysql-bin.000001
# bin log写入位置
canal.instance.master.position=157
 #数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

之后进入 canal/bin 执行 ./startup.sh
查看是否启动
image.png
有CanalLauncher则代表ok,或者看日志也ok

场景

springboot整合

简单整合

1、依赖
<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
2、配置文件
canal:
	# 服务地址
  serverAddress: localhost
  # 端口
  serverPort: 11111
  # 订阅 库 表
  subscrie: ".*\\..*"
  # 
  batchSize: 100
  # 实例
  instance:
    - example

subscrie配置


全库全表	
connector.subscribe(".*\\..*")
指定库全表	
connector.subscribe("test\\..*")
单表
connector.subscribe("test.user")
多规则组合使用
connector.subscribe("test\\..*,test2.user1,test3.user2")
3、properties类
package com.walker.mybatisplus.canal;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

@Data
@Component
@ConfigurationProperties(value = "canal")
public class CanalProperties {

    private String serverAddress;
    private Integer serverPort;
    private String subcribe;
    private Integer batchSize;
    private List<String> instance;


}

4、监听类编写
package com.walker.mybatisplus.canal;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class CanalListener {

    @Autowired
    private CanalProperties canalProperties;

    public static Map<String,Integer> NUM_MAP=new HashMap<>();


    /**
    * 可能会有多业务,不同的业务,应该有多个处理类,不要使用if等
    */

    @PostConstruct
    public void run() throws InterruptedException, InvalidProtocolBufferException {

        //创建Canal连接对象
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),
                        canalProperties.getServerPort()),
                canalProperties.getInstance().get(0),
                null, null);

        while(true){
            //连接
            conn.connect();
//            监听的数据库和表
            conn.subscribe(canalProperties.getSubcribe());
            //回滚操作
            conn.rollback();
            //获取信息
            Message message = conn.getWithoutAck(canalProperties.getBatchSize());
            long id = message.getId();
            List<CanalEntry.Entry> entries = message.getEntries();
            if(id!=-1&&entries.size()>0){
                //处理数据
                messageProcess(entries);
            }else{
                //防止重复链接数据库
                Thread.sleep(1000);
            }
            //确认消费信息
            conn.ack(id);
            //释放连接
            conn.disconnect();
        }


    }

    private void messageProcess(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entries) {
            log.info("接收Entry:{}", entry);
            
            CanalEntry.Header header = entry.getHeader();
            //数据库
            String schemaName = header.getSchemaName();
            //表名
            String tableName = header.getTableName();
            //事件类型
            CanalEntry.EventType eventType = header.getEventType();

            //这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义,但是一般可以配置一个库,然后表的可能可以是全部表
//            对库进行判断
            if(!"walker_share".equals(schemaName)){
                continue;
            }

            //对表进行判断
            //这里只是一个案例,如果是实际场景,可以使用工厂模式去编写,不然会有很多的if
            if("dish".equals(tableName)){
                //获取修改数据
                List<CanalEntry.RowData> rowDataList = getRowDataList(entry);
     
                //新增
                if(eventType.getNumber()==CanalEntry.EventType.INSERT_VALUE){
                    log.info("新增事件");
                    if(CollUtil.isEmpty(rowDataList)) continue;
                    //模拟场景:获取新增的数据,并存储到redis中,这里是直接存储到Map中
                    for (CanalEntry.RowData rowData : rowDataList) {
                        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                        for (CanalEntry.Column column : afterColumnsList) {
                            //获取name的类型
                            if("type".equals(column.getName())){
                                //模拟redis  根据类型进行分类
                                String key = column.getValue();
                                NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)+1);
                                log.info("NUM_MAP {}",NUM_MAP);
                                continue;
                            }
                        }
                    }
                }
                if(eventType.getNumber()==CanalEntry.EventType.UPDATE_VALUE){
                    log.info("修改事件");
                }
                if(eventType.getNumber()==CanalEntry.EventType.DELETE_VALUE){
                    log.info("删除事件");
                }

            }

        }
    }

    //获取row数据
    private List<CanalEntry.RowData> getRowDataList(CanalEntry.Entry entry) {
        CanalEntry.RowChange rowChange=null;
        try {
            //解析数据
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
        }
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        return rowDatasList;
    }
}

相关类和配置

CanalConnector

image.png

CanalEntry

image.png

EntryType

image.png

Header

image.png

EventType

事件类型,可以根据事件类型去做不一样的操作
image.png

RowChange

image.png

获取数据

CanalEntry.RowChange rowChange=null;
try {
    //解析数据
    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
    throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
log.info("rowDatasList:{}",rowDatasList);
RowData

image.png

CanalEntry.Column

属性
image.png

问题

IOException: caching_sha2_password Auth failed

因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

参考文档

Java开发 - Canal的基本用法_canal java-CSDN博客
15分钟学会Canal安装与部署-CSDN博客
SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)_canal同步数据到redis-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/219481.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv3 快速上手:Windows 10上的训练环境搭建

文章目录 前言一、前期准备二、基础环境准备1. 创建虚拟环境2. 打开Terminal3. 下载YOLOv3运行环境 三、PyCharm关联3.1 运行PyCharm3.2 关联Anaconda虚拟环境 四、运行环境检查1. 检查requirements.txt文件2. 安装依赖 五、运行代码5.1 运行检测代码5.2 运行训练代码 六、常见…

零信任组件和实施

零信任是一种安全标准&#xff0c;其功能遵循“从不信任&#xff0c;始终验证”的原则&#xff0c;并确保没有用户或设备受信任&#xff0c;无论他们是在组织网络内部还是外部。简而言之&#xff0c;零信任模型消除了信任组织安全边界内任何内容的概念&#xff0c;而是倡导严格…

如何计算 ChatGPT 的 Tokens 数量?

一、基本介绍 随着人工智能大模型技术的迅速发展&#xff0c;一种创新的计费模式正在逐渐普及&#xff0c;即以“令牌”&#xff08;Token&#xff09;作为衡量使用成本的单位。那么&#xff0c;究竟什么是Token呢&#xff1f; Token 是一种将自然语言文本转化为计算机可以理…

vue2项目中添加字体文件

vue2项目中添加字体文件 1、下载相关文件&#xff0c;放置文件夹中&#xff0c;这里我是在assets文件中新建了fontFamily 2、在assets文件中新建css文件 3、在页面中使用 <style lang"less" scoped> import ../../assets/css/fonts.less;.total-wrap {displa…

esp32使用命令查看芯片flash大小以及PSRAM的大小

在idf.py命令窗口中输入 esptool.py -p COM* flash_id 其中COM*是连接你的esp32芯片的端口号。

蓝桥杯算法心得——想吃冰淇淋和蛋糕(dp)

大家好&#xff0c;我是晴天学长&#xff0c;dp题&#xff0c;怎么设计状态很重要&#xff0c;需要的小伙伴可以关注支持一下哦&#xff01;后续会继续更新的。&#x1f4aa;&#x1f4aa;&#x1f4aa; 1) .想吃冰淇淋和蛋糕 想吃冰淇淋与蛋糕 输入格式 第一行输入一个整数n。…

认识异常 ---java

目录 一. 异常的概念 二. 异常的体系结构 三. 异常的分类 三. 异常的处理 3.1 异常的抛出throw 3.2. 异常声明throws 3.3 捕获并处理try-catch finally 3.4异常的处理流程 四. 自定义异常类 一. 异常的概念 在 Java 中&#xff0c;将程序执行过程中发生的不正常行为称为…

设计模式之结构型模式(适配器、桥接、组合、享元、装饰者、外观、代理)

文章目录 一、结构型设计模式二、适配器模式三、桥接模式四、组合模式五、享元模式六、装饰者模式七、外观模式八、代理设计模式 一、结构型设计模式 这篇文章我们来讲解下结构型设计模式&#xff0c;结构型设计模式&#xff0c;主要处理类或对象的组合关系&#xff0c;为如何…

怎样实现燃气产业的数字化转型之路?

关键词&#xff1a;智慧燃气、燃气数字化、智慧燃气建设、智慧燃气解决方案、智慧燃气平台 燃气产业不仅是我国能源的支柱产业&#xff0c;更是推进经济建设与生态保护协同发展的主战场。数字技术与企业生产、经营及管理深度融合是驱动企业转型升级的重要路径。基于产业融合视…

【bash指令全集合】最全教程-持续更新!

作者&#xff1a;20岁爱吃必胜客&#xff08;坤制作人&#xff09;&#xff0c;近十年开发经验, 跨域学习者&#xff0c;目前于新西兰奥克兰大学攻读IT硕士学位。荣誉&#xff1a;阿里云博客专家认证、腾讯开发者社区优质创作者&#xff0c;在CTF省赛校赛多次取得好成绩。跨领域…

智慧工地源码 SaaS模式云平台

伴随着技术的不断发展&#xff0c;信息化手段、移动技术、智能穿戴及工具在工程施工阶段的应用不断提升&#xff0c;智慧工地概念应运而生&#xff0c;庞大的建设规模催生着智慧工地的探索和研发。 什么是智慧工地&#xff1f; 伴随着技术的不断发展&#xff0c;信息化手段、移…

基于Jenkins实现接口自动化持续集成

一、JOB项目配置 1、添加描述 可选选项可填可不填 2、限制项目的运行节点 节点中要有运行环境所需的配置 节点配置教程&#xff1a;https://blog.csdn.net/YZL40514131/article/details/131504280 3、源码管理 需要将脚本推送到远程仓库中 4、构建触发器 可以选择定时构建…

内衣迷你洗衣机什么牌子好?好用不贵的内衣洗衣机推荐

由于内衣洗衣机在目前的市场上越来越受欢迎&#xff0c;使得不少的小伙伴都在犹豫要不要为自己入手一台专用的内衣洗衣机&#xff0c;专门来清洗一些内衣裤等等贴身衣物&#xff0c;这个问题的答案是很有必要的&#xff0c;因为目前市场上的家用大型洗衣机对衣物只能够起到清洁…

AI 大模型爆发后,智能计算的需求有多强烈?

自从 ChatGPT 横空出世以来&#xff0c;AI 技术就成为科技领域备受关注的热门话题之一。据 OpenAI 的报告显示&#xff0c;自 2012 年以来&#xff0c;AI 大模型的规模呈指数级增长&#xff0c;其参数数量每 16 个月翻一番。 这些大型预训练模型&#xff0c;如 GPT-4、文心一言…

uniapp-hubildx配置

1.配置浏览器 &#xff08;1&#xff09;运行》运行到浏览器配置》配置web服务器 &#xff08;2&#xff09;选择浏览器安装路径 &#xff08;3&#xff09;浏览器安装路径&#xff1a; &#xff08;3.1&#xff09; 右键点击图标》属性 &#xff08;3.2&#xff09;选择目标&…

ubuntu安装kafka

一、前提&#xff0c;先去安装java环境 二、安装kafka wget http://www.apache.org/dyn/closer.cgi?path/kafka/2.8.0/kafka_2.13-3.6.0.tgz tar xzf kafka_2.13-3.6.0.tgz mv kafka_2.13-3.6.0 /usr/local/kafka // 这一步也可以不用 启动zookeeper sudo /usr/local/kafka_2…

ubuntu启动kafka报错Could not create the Java Virtual Machine.

网上有两种方式&#xff0c;但是需要具体看自己的错误信息&#xff0c;我的错误信息如下: 这里大概是说要写入日志无权限&#xff0c;所以执行的时候&#xff0c;前面加一下sudo 执行成功。

10.机器人系统仿真(urdf集成gazebo、rviz)

目录 1 机器人系统仿真的必要性与本篇学习目的 1.1 机器人系统仿真的必要性 1.2 一些概念 URDF是 Unified Robot Description Format 的首字母缩写&#xff0c;直译为统一(标准化)机器人描述格式&#xff0c;可以以一种 XML 的方式描述机器人的部分结构&#xff0c;比如底盘…

利用yolov5输出提示框,segment-anything生成掩膜实现图像的自动标注

文章目录 一. 创建环境二. 下载模型文件三. 编辑代码 一. 创建环境 anaconda下新建一个环境 conda create -n yolo-sam python3.8激活新建的环境 conda activate yolo-sam更换conda镜像源 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/fre…

Hive SQL的各种join总结

说明 Hive join语法有6中连接 inner join&#xff08;内连接&#xff09;、left join&#xff08;左连接&#xff09;、right join&#xff08;右连接&#xff09;、full outer join&#xff08;全外连接&#xff09;、left semi join&#xff08;左半开连接&#xff09;、cr…