canal同步数据教程

canal简介

官网:https://github.com/alibaba/canal

主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,是一个实时同步的方案。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

   canal相当于一个mysql slave节点,工作原理如下:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal各个组件

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client(消费端)支持多种语言:

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal php客户端: https://github.com/xingwenge/canal-php
  • canal Python客户端:https://github.com/haozi3156666/canal-python
  • canal Rust客户端:https://github.com/laohanlinux/canal-rs
  • canal Nodejs客户端:https://github.com/marmot-z/canal-nodejs

记住:canal是基于订阅和消费机制的,这边的client就是消费端,为了支持更多的语言或者防止消费能力不足,可以把消息直接投递到mq,借助mq的削峰平谷的能力

  • 最重要的组件是deployer,也就是server
  • admin是一个webUI的动态管理组件,根据需要搭建
  • example是client的一些例子
  • adapter是一种客户端数据落地的适配,不如你想把数据同步到hbase,你可以直接用adapter,adapter会帮忙你做数据格式转换等,当然也可以通过客户端自己写。

实践

环境版本

  • mysql版本:5.7.24 docker版
  • canal版本:1.1.8-alpha-3

docker部署mysql

 启动一个实例,如下

docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7.24

修改容器内/etc/mysql/mysql.conf.d/mysqld.cnf的内容,在末尾增加如下3行内容,可以通过docker cp来修改

[mysqld]
pid-file	= /var/run/mysqld/mysqld.pid
socket		= /var/run/mysqld/mysqld.sock
datadir		= /var/lib/mysql
#log-error	= /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address	= 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# 以下3行是新增的
server_id=1
log_bin=mysql-bin
binlog_format=ROW

重新启动容器

docker restart mysql-test

通过客户端(比如DBeaver连接mysql),查看这个配置参数

创建数据库test

创建表sys_user

CREATE TABLE `sys_user` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `user_name` varchar(60) NOT NULL COMMENT '用户昵称',
  `email` varchar(50) DEFAULT '' COMMENT '用户邮箱',
  `sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',
  `password` varchar(50) DEFAULT '' COMMENT '密码',
  `status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户信息表';

到此mysql的准备工作已结束,重点在于开启bin log,并设置format为ROW,因为canal是作为mysql slave存在的,需要通过binlog来同步数据。

部署canal-server

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.deployer-1.1.8-SNAPSHOT.tar.gz,解压缩得到如下的目录

打开conf/example/instance.properties配置源数据库(mysql)的信息,我这边偷懒直接用root账号,密码来自docker部署mysql启动时设置的密码

进入bin,直接点击startup.bat(windows环境)

然后打开logs/canal/canal.log,如果出现以下信息表示启动成功。接下来就可以通过客户端去订阅了。

例子1:canal-client的使用

在idea中新建一个maven项目,引入client的依赖

        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

新增client类-CanalClient,代码如下:

package com.example.demo.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CanalClient {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        // 1. 创建链接,即server的地址和端口,其中example就是源数据库的名称(来自canal.deployer下的conf下的example文件及名称)
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "example", "", "");

        while (true) {
            // 2. 获取连接
            connector.connect();
            // 3. 指定订阅的数据库test
            connector.subscribe("test.*");
            // 4. 获取Message
            Message message = connector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() == 0) {
                System.out.println("没有数据,休息下");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                for (CanalEntry.Entry entry : entries) {
                    // 获取表名
                    String tableName = entry.getHeader().getTableName();
                    // Entry类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 获取序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // 反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 获取具体数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        // 遍历并打印数据
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            Map<String, Object> bMap = new HashMap<>();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                bMap.put(column.getName(), column.getValue());
                            }
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            Map<String, Object> aMap = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                aMap.put(column.getName(), column.getValue());
                            }
                            System.out.println("表名:" + tableName + ",操作类型:" + eventType);
                            System.out.println(",变化前:" + bMap);
                            System.out.println(",变化后:" + aMap);
                        }
                    }
                }
            }
        }
    }
}

启动后,控制台打印如下:

当往数据库插入一条记录

insert sys_user(user_name, email, sex, `password`, status) values('张三','zhangsan@qq.com', 0, '123',0);

结果,控制台打印如下:

当修改记录时,控制台打印如下

update sys_user set `password`='123456' where user_name='张三';

当删除记录时,控制台打印如下

delete from sys_user where user_name='张三';

至此获取到的数据的变化,你可以把他手动转换、处理等等操作,也可以同步到其他的数据库,灵活性很大,但是如果当只是想把数据同步到其他数据库时,可以直接选择使用client-adapter,不需要编码,client-adapter可以认为是针对特定场景的加强版的客户端。

例子2:springboot集成canal-client

这边采用非官方封装的jar包,但是使用很方便,首先新建一个springboot项目(略)

引入以下maven依赖

<dependency>
	<groupId>top.javatool</groupId>
	<artifactId>canal-spring-boot-starter</artifactId>
	<version>1.2.6-RELEASE</version>
</dependency>

使用很简单,比如同步数据到redis,详细参考官网:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client

例子3:同步数据到clickhouse(client-adapter的使用)

clickhouse-adapter是1.18新增的,不知是我的配置还有问题,当sys_user没有id字段时,只有新增能正常同步到clickhouse,故把sys_user.user_id改为id。

先部署clickhouse,在default新建表,建表语句如下:

CREATE TABLE default.test_user
(
    `id` Int64,
    `user_name` String,
    `email` String,
    `sex` String,
    `password` String,
    `status` String
)
ENGINE = MergeTree
PRIMARY KEY (id)  -- 明确指定 PRIMARY KEY
ORDER BY (id)     -- ORDER BY 是必须的
SETTINGS index_granularity = 8192;

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.adapter-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

在conf下新增clickhouse文件夹,并且新建test_user.yml文件,内容如下:

#dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: clickhouse1
concurrent: true
dbMapping:
  database: test
  table: sys_user
  targetTable: default.test_user
  targetPk:
    id: id
#  mapAll: true
  targetColumns:
    id:
    user_name:
    email:
    sex:
    password:
    status:  
  #etlCondition: "where id>={}"
  commitBatch: 3000 # 批量提交的大小

修改conf下的application.yml,把outerAdapters: 下的配置改为如下:主要是配置下目标clickhouse数据库的信息。

启动bin下的startup.bat(windows系统),当看到如下信息表示启动成功

此时去mysql中新增几条记录,adapter的控制台会打印出对应的信息:

查看clickhouse,会发现记录已同步,删除和修改类似的。

部署canal-admin

新建数据库canal_manager,用于admin,这边偷个懒,还是用root账号

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.admin-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

把conf下的canal_manager.sql sql导入数据库中,进行数据初始化。

修改conf/application.yml的配置,如下:

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

启动bin/startup.bat

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

关闭admin,接下去要把canal-server对接到admin上,

修改canal-server的conf下的canal_local.properties,内容如下:

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

然后在canal-deploy目录下以local的配置启动canal-server

$ bin/startup.bat local

重启canal-admin

进入网页端,能看到如下信息,server管理已经存在一个新的节点了

使用心得

canal定位是一个增量日志同步,但是也有方式可以设置全量同步,但都需要canal-adapter支持

  • canal可以设置全库同步,详见参考:Sync RDB · alibaba/canal Wiki · GitHub
  • 对于表级别的全量同步,可以使用etl接口,详见:ClientAdapter · alibaba/canal Wiki · GitHub

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/925105.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网络安全 | 漏洞挖掘】绕过SAML认证获得管理员面板访问权限

未经许可,不得转载。 文章目录 什么是SAML认证?SAML是如何工作的?SAML响应结构漏洞结果什么是SAML认证? SAML(安全断言标记语言)用于单点登录(SSO)。它是一种功能,允许用户在多个服务之间切换时无需多次登录。例如,如果你已经登录了facebook.com,就不需要再次输入凭…

【Redis】Redis介绍

目录 1.Redis是什么? 2. Redis特性 2.1 速度快 2.2 基于键值对的数据结构服务器 2.3 丰富的功能 2.4 简单稳定 2.5 客户端语言多 2.6 持久化 2.7 主从复制 2.8 高可用和分布式 3. Redis使用场景 3.1 缓存(Cache) 3.2 排行榜系统 3.3 计数器应用 3.4 社交网络 …

【HarmonyOS学习日志(10)】一次开发,多端部署之功能级一多开发,工程级一多开发

功能级一多开发 SysCap机制介绍 HarmonyOS使用SysCap机制&#xff08;即SystemCapability&#xff09;&#xff0c;可以帮助开发者仅关注设备的系统能力&#xff0c;而不用考虑成百上千种具体的设备类型。 在过去&#xff0c;开发不同设备上的应用就用不同设备的SDK进行开发&…

vue3 与 spring-boot 完成跨域访问

spring-boot&#xff0c;写一个接口用于前端访问&#xff0c;并且给接口设置跨域访问&#xff0c;这里我前端的域名为 localhost:5173 RestController CrossOrigin(origins "http://localhost:5173") public class Vue3Controller {GetMapping("/vue")pu…

机器学习-神经网络(BP神经网络前向和反向传播推导)

1.1 神经元模型 神经网络(neural networks)方面的研究很早就已出现,今天“神经网络”已是一个相当大的、多学科交叉的学科领域.各相关学科对神经网络的定义多种多样,本书采用目前使用得最广泛的一种,即“神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它的组织能够…

如何通过智能生成PPT,让演示文稿更高效、更精彩?

在快节奏的工作和生活中&#xff0c;我们总是追求更高效、更精准的解决方案。而在准备演示文稿时&#xff0c;PPT的制作往往成为许多人头疼的问题。如何让这项工作变得轻松且富有创意&#xff1f;答案或许就在于“AI生成PPT”这一智能工具的广泛应用。我们就来聊聊如何通过这些…

丹摩|丹摩智算平台使用教学指南

本指南旨在为新用户提供一个详细的操作步骤和实用的入门指导&#xff0c;帮助大家快速上手丹摩智算平台。 一、平台简介 丹摩智算平台是一款强大的数据分析和计算平台&#xff0c;支持多种编程语言&#xff0c;提供丰富的数据处理和机器学习工具。无论您是数据分析师、开发者…

Python学习第十天--处理CSV文件和JSON数据

CSV&#xff1a;简化的电子表格&#xff0c;被保存为纯文本文件 JSON&#xff1a;是一种数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也易于机器解析和生成&#xff0c;以JavaScript源代码的形式将信息保存在纯文本文件中 一、csv模块 CSV文件中的每行代表电…

mini-spring源码分析

IOC模块 关键解释 beanFactory&#xff1a;beanFactory是一个hashMap, key为beanName, Value为 beanDefination beanDefination: BeanDefinitionRegistry&#xff0c;BeanDefinition注册表接口&#xff0c;定义注册BeanDefinition的方法 beanReference&#xff1a;增加Bean…

2024年9月中国电子学会青少年软件编程(Python)等级考试试卷(六级)答案 + 解析

一、单选题 1、下面代码运行后出现的图像是&#xff1f;&#xff08; &#xff09; import matplotlib.pyplot as plt import numpy as np x np.array([A, B, C, D]) y np.array([30, 25, 15, 35]) plt.bar(x, y) plt.show() A. B. C. D. 正确答案&#xff1a;A 答案…

UniApp开发实战:常见报错解析与解决方案

UniApp开发实战&#xff1a;常见报错解析与解决方案 病例1、TypeError: undefined is not an object (evaluating ‘this. s c o p e . scope. scope.getAppWebview’) 需求&#xff1a;获取页面示例&#xff0c;动态修改头部搜索框内容&#xff0c;获取页面实例时候报错unde…

BGP对等体建立方法--实验

目录 实验拓扑图 实验要求&#xff1a; 第一步、IP地址规划 第二步、配置接口IP地址 第三步、AS 200使用IGP OSPF实现网络互通 第四步、建立BGP对等体关系 1、R1与R2使用直连链路建立EBGP关系。 2、R2与R4使环回建立非直连IBGP关系。 3、R4与R5使用环回建立EBGP关系。…

(已解决)wps无法加载此加载项程序mathpage.wll

今天&#xff0c;在安装Mathtype的时候遇到了点问题&#xff0c;如图所示 尝试了网上的方法&#xff0c;将C:\Users\Liai_\AppData\Roaming\Microsoft\Word\STARTUP路径中的替换为32位的Mathtype加载项。但此时&#xff0c;word又出现了问题 后来知道了&#xff0c;这是因为64位…

Vue+Element Plus实现自定义表单弹窗

目录 一、基本框架 1.父组件index.vue 2.子组件FormPop.vue 二、细节补充 1&#xff09;input、textarea、select、input number 2&#xff09;daterange、date、monthrange 3&#xff09;数据定义 4&#xff09;没改样式的效果 5&#xff09;最终效果 三、最终代码 …

【插入排序】:直接插入排序、二分插入排序、shell排序

【插入排序】&#xff1a;直接插入排序、二分插入排序、shell排序 1. 直接插入排序1.1 详细过程1.2 代码实现 2. 二分插入排序2.1 详细过程2.2 代码实现 3. shell排序3.1 详细过程3.2 代码实现 1. 直接插入排序 1.1 详细过程 1.2 代码实现 public static void swap(int[]arr,…

PHP 生成分享海报

因为用户端有多个平台&#xff0c;如果做分享海报生成&#xff0c;需要三端都来做&#xff0c;工作量比较大。 所以这个艰巨的任务就光荣的交给后端了。经过一定时间的研究和调试&#xff0c;最终圆满完成了任务&#xff0c;生成分享海报图片实现笔记如下。 目录 准备字体文件…

MySQL底层概述—5.InnoDB参数优化

大纲 1.内存相关参数优化 (1)缓冲池内存大小配置 (2)配置多个Buffer Pool实例 (3)Chunk(块)大小配置 (4)InnoDB缓存性能评估 (5)Page管理相关参数 (6)Change Buffer相关参数优化 2.日志相关参数优化 (1)日志缓冲区相关参数配置 (2)日志文件参数优化 3.IO线程相关参数…

05_JavaScript注释与常见输出方式

JavaScript注释与常见输出方式 JavaScript注释 源码中注释是不被引擎所解释的&#xff0c;它的作用是对代码进行解释。lavascript 提供两种注释的写法:一种是单行注释&#xff0c;用//起头:另一种是多行注释&#xff0c;放在/*和*/之间。 //这是单行注释/* 这是 多行 注释 *…

HTML CSS JS基础考试题与答案

一、选择题&#xff08;2分/题&#xff09; 1&#xff0e;下面标签中&#xff0c;用来显示段落的标签是&#xff08; d &#xff09;。 A、<h1> B、<br /> C、<img /> D、<p> 2. 网页中的图片文件位于html文件的下一级文件夹img中&#xff0c;…

【工具】JS解析XML并且转为json对象

【工具】JS解析XML并且转为json对象 <?xml version1.0 encodingGB2312?> <root><head><transcode>hhhhhhh</transcode></head><body><param>ccccccc</param><param>aaaaaaa</param><param>qqqq<…