Java开发 - Canal的基本用法

前言

今天给大家带来的是Canal的基本用法,Canal在Java中常被我们用来做数据的同步,当然不是MySQL与MySQL,Redis与Redis之间了,如果是他们,那就好办了,我们可以直接通过配置来完成他们之间的主从、主主,级联等的同步,为什么要用Canal呢?主要是为了完成MySQL与Redis、MySQL与ES之间的数据同步,其本质是同步的过程中降低代码的耦合度,否则我们完全可以通过代码分别往几种不同的存储方存储数据。

认识Canal

什么是Canal

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

下面这张图可以代表Canal的用途,就染我们来一起瞻仰一下:

在看到这张图后,我们要感谢开发者的付出,提供给我们这么好的工具,目前来说,很多公司做数据同步都是采用的这种方式,可以通过Canal分别向MySQL,ES里同步数据。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

基本原理

Canal的实现主要利用了MySQL主从复制的原理,细分如下:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

也就是说,Canal将自己伪装成一个MySQL的从库,像其他的Slava一样,向Master发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ),canal 解析 binary log 对象(原始为 byte 流)。

Canal准备

第一次接触Canal的小伙伴点击下面链接下载Canal:

Releases · alibaba/canal · GitHub

不要使用太新的版本,我们就用1.1.4的版本: 

下载完成之后放在一个英文路径下,我们改下文件夹的名字canal,下有四个文件夹:

 

MySQL配置

这里,我们不需要去配置MySQL的主从,如果你想了解,不妨去看这篇博客:

Java开发 - MySQL主从复制初体验

这里有你想要的主从配置,和对主从配置的一些心得体会。

在此处,我们只需要开启一个MySQL服务,设置一个连接的用户和密码,整体上和配置MySQL主从的步骤差不多,因为本质上也是要把Canal配置成MySQL的Slava的。

MySQL服务开启了吧?那么登陆MySQL服务,我们先来创建并授权一个用户.

创建用户:

CREATE USER 'canal'@'%' IDENTIFIED WITH 'mysql_native_password' BY '123456';

mysql8.0和5.x其中一个改动就是加密认证方式发生改变,这个在上面提到的MySQL主从复制里有提到,caching_sha2_password是8.0, mysql_native_password是5.x,canal我们这里都采用mysql_native_password的方式创建密码。

远程授权: 

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'  WITH GRANT OPTION;

刷新权限:

FLUSH PRIVILEGES;

修改my.cnf文件,这个根据自己mysql安装位置的路径去找,但似乎这个文件大多情况是不存在的,所以我们直接在etc目录下创建一个用就行,实在害怕,可以运行如下命令查看my.cnf的默认运行位置:

 mysql --help | grep 'my.cnf'

 

所以在默认路径下:/usr/local/Cellar/mysql/版本号/ ,此处没有etc文件,自己手动创建吧,不要怂,接着:

进入etc文件,在这里运行:

vim my.cnf

输入:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 不要和canal的slaveId重复即可
server_id=1

 退出并保存,然后重启mysql。

检查mysql的binlog是否开启:

show variables like 'log_bin';

 

已开启。

检查binlog_format:

show variables like "%binlog_format%";

 

显示ROW,代表我们设置生效。

检查server_id:

show variables like "%server_id%";

 

我们设置的1,已生效。 

查看当前正在写入的binlog文件:

show master status;

 

我们主要看的就是这两个参数,记住,到此为止,不要再动数据库的任何东西,否则这两个数据会改变,对我们配置canal会有影响。 上面的两个参数,我们在稍后配置canal的时候需要。

额。。。。。不过,这俩参数其实可以不用设置,不设置就代表从最新的地方开始同步,博主已经试过了,没问题。

Canal配置

我们打开刚刚下载的canal文件夹,打开这个路径下的文件:conf/example/instance.properties:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

我们需要改的核心参数暂时不多,如下:

canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157

canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

其他的暂时先不用改,后续将到实际应用的时候会讲,这几个参数不用博主说大家也应该知道什么意思了吧?保存一下。

现在我们来启动canal,canal的启动很简单,打开一个命令行工具,直接把bin/startup.sh文件拖进去回车就可以了,方式不固定:

 

命令行输出了一大段内容,但我们不知道canal启动成功了没,我们来看下:

 

通过jps可以看到CanalLauncher的进程号,看来应该是没问题的。 

单纯的Canal监听测试

下面我们创建一个最简单的Spring Boot工程,过程就不赘述了:

首先我们引入依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

版本号要和我们使用的一致。  

添加配置:

canal:
  serverAddress: 127.0.0.1
  serverPort: 11111
  instance:
    - example

在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet(),切记,这里只是监听,并不是真正项目上使用,不要照搬,此处知识单传让大家看到canal监听的效果:

package com.codingfire.canal.Client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {
    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有数据,处理数据
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

下面,就到了最激动人心的时刻,请运行我们的Spring Boot工程:

 

看到这里,就代表启动成功了,下面,我们连接数据库:

mysql -uroot -p123456

随便你是哪个用户连接的都行,没有数据库,你就创建新的数据库,如果已经有了,那么你直接操作里面的数据库表即可,博主目前有一个canal数据库,我们就用这个数据库:

use canal;

博主里面有一张用户表,操作里面的表:

insert into user value(null ,'小明','123456',20,'13812345678');

现在查看控制台有没有监听到数据库变化:

  

可以看到控制台已经打印出了我们刚刚操作的SQL,测试成功。

注意:这里只是监控,并不是真实使用场景,只是让大家直观看到SQL语句被监听到的场景,实际应用中,我们会结合MQ来使用,但不在这篇讲解。 

结语

这篇博客只是canal 的基本配置和监听机制的讲解,旨在帮助大家了解canal的工作方式,在下一篇博客中,我们将结合MQ来做数据的同步,所以大家也不要着急,咱们慢慢来,一步一给脚印,一定要把基础知识学扎实,canal的配置相较于MySQL的主从还是很相似的,也比较简单,主要都是配置项,所以更需要我们细心,不要出错,否则一个参数的错误都是导致系统无法正常运行。好了,咱们下篇再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34103.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SpringBoot】一、SpringBoot3新特性与改变详细分析

前言 本文适合具有springboot的基础的同学。 SpringBoot3改变&新特性 一、前置条件二、自动配置包位置变化1、Springboot2.X2、Springboot3.X 三、jakata api迁移1、Springboot2.X2、Springboot3.X3、SpringBoot3使用druid有问题&#xff0c;因为它引用的是旧的包 四 新特…

MySQL数据库--------简单理解文件的相关信息

作者前言 欢迎小可爱们前来借鉴我的gtiee秦老大大 (qin-laoda) - Gitee.com ———————————————————————————————————— 目录 文件的信息 文件的权限 权限的赋予 —————————————————————————————— 插播一些…

MySQL 备份与恢复

MySQL 备份与恢复 一、数据库备份的分类1.1 数据备份的重要性1.2 数据库备份的分类1.2.1 从物理与逻辑的角度&#xff0c;分为物理备份和逻辑备份1.2.2 从数据库的备份策略角度&#xff0c;分为完全备份&#xff0c;差异备份和增量备份1.2.3 常见的备份方法 二、MySQL完全备份与…

爬虫入门指南(1):学习爬虫的基础知识和技巧

文章目录 爬虫基础知识什么是爬虫&#xff1f;爬虫的工作原理爬虫的应用领域 爬虫准备工作安装Python安装必要的库和工具 网页解析与XPath网页结构与标签CSS选择器与XPathXpath 语法XPath的基本表达式&#xff1a;XPath的谓语&#xff08;Predicate&#xff09;&#xff1a;XPa…

【K8S系列】深入解析K8S存储

序言 做一件事并不难&#xff0c;难的是在于坚持。坚持一下也不难&#xff0c;难的是坚持到底。 文章标记颜色说明&#xff1a; 黄色&#xff1a;重要标题红色&#xff1a;用来标记结论绿色&#xff1a;用来标记一级论点蓝色&#xff1a;用来标记二级论点 Kubernetes (k8s) 是一…

elementui el-table-column表头换行,自定义表头以及排序图标的位置放置

目录 1、普通表头换行⭐️想实现以下效果 2、表头换行时调整文字和排序图标的位置⭐️想实现以下效果遇到问题 效果如下遇到问题 效果如下⭐️最终成功实现以下效果 &#x1f44d;写在最后 1、普通表头换行 https://www.jb51.net/article/228935.htm // 在需要换行的地方加入换…

开源虚拟化工具VirtualBox安装部署

什么是Virtualbox VirtualBox是一款由Oracle开发和维护的免费开源虚拟化软件&#xff0c;用于在一台计算机上创建和管理多个虚拟机。它允许用户在单个物理计算机上运行多个操作系统&#xff0c;例如Windows、Linux、macOS等。VirtualBox提供了一个虚拟化环境&#xff0c;使用户…

云原生(第二篇)k8s-二进制搭建

准备五台机器&#xff1a; master01&#xff1a;192.168.169.10 node01&#xff1a;192.168.169.40 node02&#xff1a;192.168.169.50 master02&#xff1a;192.168.169.60 负载均衡nginxkeepalive01&#xff08;master&#xff09;&#xff1a;192.168.169.20 负载均衡…

9.用python写网络爬虫,完结

前言 这是python网络爬虫的最后一篇给大家做个总结&#xff0c;且看且珍惜把&#xff01; 截止到目前&#xff0c; 前几章本书介绍的爬虫技术都应用于一个定制网站&#xff0c;这样可以帮助我们更加专注于学习特定技巧。而在本章中&#xff0c;我们将分析几个真实网站&#xff…

azure databricks因为notebook 日志打多或者打印图片太多,往下拉卡死怎么处理

1、同事碰到个问题&#xff0c;databricks 页面卡死不动了 2、我。。。。。。。。测试了下搞不定&#xff0c;找azure的工程师&#xff0c;特此笔记如下图 !](https://img-blog.csdnimg.cn/5db9756d0e224d15a9a607561b47591f.png)

2014年全国硕士研究生入学统一考试管理类专业学位联考逻辑试题——纯享题目版

&#x1f3e0;个人主页&#xff1a;fo安方的博客✨ &#x1f482;个人简历&#xff1a;大家好&#xff0c;我是fo安方&#xff0c;考取过HCIE Cloud Computing、CCIE Security、CISP、RHCE、CCNP RS、PEST 3等证书。&#x1f433; &#x1f495;兴趣爱好&#xff1a;b站天天刷&…

QT ObjectThread moveToThread多线程操作

QT多线程专栏共有15篇文章&#xff0c;从初识线程到、QMutex锁、QSemaphore信号量、Emit、Sgnals、Slot主线程子线程互相传值同步变量、QWaitCondition、事件循环、QObjects、线程安全、线程同步、线程异步、QThreadPool线程池、ObjectThread多线程操作、 moveToThread等线程操…

LeetCode 打卡day46-- 单词拆分和多重背包问题

一个人的朝圣 — LeetCode打卡第46天 知识总结 Leetcode 139. 单词拆分题目说明代码说明 知识总结 今天写了一道题目, 但是还挺难的, 而且是面试高频题目 还过了一遍多重背包问题. 多重背包与01背包的区别在于多重背包限制了物品的个数, 某些物品的个数可能不为1, 可以使用两…

c++day3

#include <iostream> using namespace std;class Person { private:int age;int *p; public://无参构造Person():p(new int(89)){age 18;cout << "无参构造函数" <<endl;}//有参构造Person(int age,int num){this->age age;this->pnew int…

【Docker】Docker中 AUFS、BTRFS、ZFS、存储池概念的详细讲解

前言 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。 &#x1f4d5;作者简介&#xff1a;热…

基于SpringCloud微服务毕业论文管理系统设计与实现

一、概述 1.1 课题背景及意义 随着学校不断扩大和学生人数的猛增,关于各类教学信息也越来越多。毕业论文的管理也成为了不可避免的一道关卡,学生需要及时获取论文相关进度,学校的管理者要求能方便对论文进行处理。基于这些需求,开发一个实用的微服务管理系统,以满足双方…

GELU激活函数

GELU是一种常见的激活函数&#xff0c;全称为“Gaussian Error Linear Unit”&#xff0c;其图像与ReLU、ELU对比如下&#xff1a; 文章链接&#xff1a;https://arxiv.org/pdf/1606.08415.pdf https://pytorch.org/docs/master/generated/torch.nn.GELU.html 公式为&#xff1…

Spring Cloud - HTTP 客户端 Feign 、自定义配置、优化、最佳实践

目录 一、Feign 是什么&#xff0c;有什么用呢&#xff1f; 二、Feign 客户端的使用 2.1、远程调用 1.引入依赖 2.在order-service&#xff08;发起远程调用的微服务&#xff09;的启动类添加注解开启Feign的功能 3.编写 Feign 客户端 4.通过 Feign 客户端发起远程调用 …

flutter 简介 flutter 能为我们做什么

flutter 简介 flutter 能为我们做什么 前言一、什么是Flutter&#xff1f;二、Flutter的特点和优势三、Flutter与其他跨平台框架的比较总结 前言 陆陆续续已经写了60多篇的flutter 的文章了&#xff0c;本篇文章就来说说我对flutter 的简单看法 一、什么是Flutter&#xff1f…

excel相关操作

文章目录 1、数据分列与绘图1.1、杂乱的数据拷贝到excel1.2、 智能分列1.2 或者手动设置分列1.3、杂论的符号替换掉1.4、对时间再次只能分裂1.5、绘图 1、数据分列与绘图 1.1、杂乱的数据拷贝到excel 1.2、 智能分列 选择数据&#xff0c;数据–>分列–> 智能分列 结…