Canal同步MySQL增量数据

引言

在现在的系统开发中,为了提高查询效率 , 以及搜索的精准度, 会大量的使用 redis 、memcache 等 nosql 系统的数据库 , 以及 solr 、 elasticsearch 类似的全文检索服务。

那么这个时候, 就又有一个问题需要我们来考虑, 就是数据同步的问题, 如何将实时变化的数据库中的数据同步到 MySQL数据库、solr 的索引库中或者 redis 中呢 ?

同步数据一般会考虑以下几个方法:

  • 业务代码手动同步
  • 定时任务同步
  • MQ实现同步
  • Canal进行同步

在以上方法中,最容易实现的就是业务代码手动同步,但它的缺点也很明显,代码耦合度很高并且执行效率低下;定时任务方法的优点是能够与业务代码解耦,但缺点是数据的实时性不高;MQ消息同步的话可以做到代码解耦,伪准时,但是已然需要添加MQ相关的代码。

Canal就解决了以上问题,它可以做到业务代码完全解耦,API完全解耦,可以做到准实时。

Canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志(binlog)解析,提供增量数据订阅和消费。

下面就来具体学习一下Canal同步原理以及同步MySQL增量数据到ES的实现。

Canal原理

Canal架构

canal_jiagou

  • server 代表一个 canal 运行实例,对应于一个 jvm。
  • instance 对应于一个数据队列 (1个 canal server 对应 1…n 个 instance )
  • instance 下的子模块
  • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
  • eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
  • eventStore: 数据存储
  • metaManager: 增量订阅 & 消费信息管理器

工作原理

MySQL主从同步

Canal是基于MySQL的binlog实现同步的,因此先了解一下MySQL主从同步的原:
canal_mysqlzhucong
从上层来看,主从复制分成三步:

  1. master 将改变记录到二进制日志 (binary log) 中(这些记录叫做二进制日志事件, binary log events ,可以通过 show binlog events 进行查看);
  2. slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log);
  3. slave 重做中继日志中的事件将改变反映它自己的数据。

内部原理

canal_yuanli
上图就是Canal的内部原理图,大致原理如下:

  1. canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave ,向 mysql master 发送 dump 协议。
  2. mysql master 收到 dump 请求,开始推送 binary log 给 slave( 也就是 canal) 。
  3. canal 解析 binary log 对象 ( 原始为 byte 流 ) 。

EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。工作流程如下图所示:
canal_gzlc
EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。

EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。

MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:

  1. Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]。
  2. void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作。
  3. void ack(long batchId),顾名思义,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作。

实践

真实场景中,canal 高可用依赖 zookeeper ,我将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。

实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

顺序消费
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
canal_xiaofei

MySQL配置

1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、创建数据库商品表 t_product

CREATE TABLE `t_product` (
 `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
 `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
 `price` DECIMAL ( 10, 2 ) NOT NULL,
 `status` TINYINT ( 4 ) NOT NULL,
 `create_time` datetime NOT NULL,
 `update_time` datetime NOT NULL,
   PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

Elasticsearch配置

使用 Kibana 创建商品索引 。

PUT /t_product
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mappings": {
            "properties": {
               "id": {
                    "type":"keyword"
                },
                "name": {
                    "type":"text"
                },
                "price": {
                    "type":"double"
                },
                "status": {
                    "type":"integer"
                },
                "createTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

执行完成,如图所示 :
canal_es

MQ配置

这里我是用的是RocketMQ,创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。
canal_mq

canal_mq2

Canal配置

我选取 canal 版本 1.1.6 ,进入 conf 目录。
1、配置 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默认值 展示出来 
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true

2、instance 配置文件
conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

3、服务启动
启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。
canal_zk

修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。
canal_mqjilu

消费者

1、产品索引操作服务
canal_xf1

2、消费监听器
canal_xf2

消费者逻辑重点有两点:

  • 顺序消费监听器
  • 将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATEINSERTDELETE 执行产品索引操作服务的方法。

最后

Canal 是一个非常有趣的开源项目,实际生产中很多公司使用 Canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。

推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972086.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MacOS 15.3 卸载系统内置软件

1、关闭系统完整性(SIP) 进入恢复模式(recovery) 如果您使用的是黑苹果或者白苹果,可以选择 重启按住CommandR 进入,如果是M系列芯片,长按开机键,进入硬盘选择界面进入。 我是MacMini M4芯片,关…

【核心算法篇七】《DeepSeek异常检测:孤立森林与AutoEncoder对比》

大家好,今天我们来深入探讨一下《DeepSeek异常检测:孤立森林与AutoEncoder对比》这篇技术博客。我们将从核心内容、原理、应用场景等多个方面进行详细解析,力求让大家对这两种异常检测方法有一个全面而深入的理解。 一、引言 在数据科学和机器学习领域,异常检测(Anomaly…

Ubuntu24.04无脑安装docker(含图例)

centos系统请看这篇 Linux安装Docker教程(详解) 一. ubuntu更换软件源 请看这篇:Ubuntu24.04更新国内源 二. docker安装 卸载老版docker(可忽略) sudo apt-get remove docker docker-engine docker.io containerd runc更新软件库 sudo a…

thingboard告警信息格式美化

原始报警json内容: { "severity": "CRITICAL","acknowledged": false,"cleared": false,"assigneeId": null,"startTs": 1739801102349,"endTs": 1739801102349,"ackTs": 0,&quo…

✨2.快速了解HTML5的标签类型

✨✨HTML5 的标签类型丰富多样&#xff0c;每种类型都有其独特的功能和用途&#xff0c;以下是一些常见的 HTML5 标签类型介绍&#xff1a; &#x1f98b;结构标签 &#x1faad;<html>&#xff1a;它是 HTML 文档的根标签&#xff0c;所有其他标签都包含在这个标签内&am…

day12_调度和可视化

文章目录 day12_调度和可视化一、任务调度1、开启进程2、登入UI界面3、配置租户4、创建项目5、创建工作流5.1 HiveSQL部署&#xff08;掌握&#xff09;5.2 SparkDSL部署&#xff08;掌握&#xff09;5.3 SparkSQL部署&#xff08;熟悉&#xff09;5.4 SeaTunnel部署&#xff0…

使用nvm管理node.js版本,方便vue2,vue3开发

在Vue项目开发过程中&#xff0c;我们常常会遇到同时维护Vue2和Vue3项目的情况。由于不同版本的Vue对Node.js 版本的要求有所差异&#xff0c;这就使得Node.js 版本管理成为了一个关键问题。NVM&#xff08;Node Version Manager&#xff09;作为一款强大的Node.js 版本管理工具…

Java8适配的markdown转换html工具(FlexMark)

坐标地址&#xff1a; <dependency><groupId>com.vladsch.flexmark</groupId><artifactId>flexmark-all</artifactId><version>0.60.0</version> </dependency> 工具类代码&#xff1a; import com.vladsch.flexmark.ext.tab…

Qt开发①Qt的概念+发展+优点+应用+使用

目录 1. Qt的概念和发展 1.1 Qt的概念 1.2 Qt 的发展史&#xff1a; 1.3 Qt 的版本 2. Qt 的优点和应用 2.1 Qt 的优点&#xff1a; 2.2 Qt 的应用场景 2.3 Qt 的应用案例 3. 搭建 Qt 开发环境 3.1 Qt 的开发工具 3.2 Qt SDK 的下载和安装 3.3 Qt 环境变量配置和使…

vscode插件开发

准备 安装开发依赖 npm install -g yo generator-code 安装后&#xff0c;运行命令 yo code 运行 打开项目&#xff0c; 点击 vscode 调式 按 F5 或点击调试运行按钮 会打开一个新窗口&#xff0c;在新窗口按快捷键 CtrlShiftP &#xff0c;搜索 Hello World 选择执行 右下角出…

自制简单的图片查看器(python)

图片格式&#xff1a;支持常见的图片格式&#xff08;JPG、PNG、BMP、GIF&#xff09;。 import os import tkinter as tk from tkinter import filedialog, messagebox from PIL import Image, ImageTkclass ImageViewer:def __init__(self, root):self.root rootself.root.…

嵌入式 lwip http server makefsdata

背景&#xff1a; 基于君正X2000 MCU Freertoslwip架构 实现HTTP server服务&#xff0c;MCU作为HTTP服务器通过网口进行数据包的传输&#xff0c;提供网页服务。其中设计到LWIP提供的工具makefsdata&#xff0c;常用于将文件或目录结构转换为适合嵌入到固件中的二进制格式。 …

架构设计系列(三):架构模式

一、概述 关于移动应用开发中常见的架构模式&#xff0c;这些模式是为了克服早期模式的局限性而引入。常见的 架构模式有&#xff1a; MVC, MVP, MVVM, MVVM-C, and VIPER 二、MVC, MVP, MVVM, MVVM-C, and VIPER架构模式 MVC、MVP、MVVM、MVVM-C 和 VIPER 是移动应用开发中…

CSS盒模

CSS盒模型就像一个快递包裹&#xff0c;网页上的每个元素都可以看成是这样一个包裹&#xff0c;它主要由以下几个部分组成&#xff1a; 内容&#xff08;content&#xff09;&#xff1a;就像包裹里真正装的东西&#xff0c;比如文字、图片等。在CSS里&#xff0c;可用width&a…

解决DeepSeek服务器繁忙的有效方法

全球42%的企业遭遇过AI工具服务器过载导致内容生产中断&#xff08;数据来源&#xff1a;Gartner 2025&#xff09;。当竞品在凌晨3点自动发布「智能家居安装指南」时&#xff0c;你的团队可能正因DeepSeek服务器繁忙错失「净水器保养教程」的流量黄金期⏳。147SEO智能调度系统…

Zookeeper 和 Redis 哪种更好?

目录 前言 &#xff1a; 什么是Zookeeper 和 Redis &#xff1f; 1. 核心定位与功能 2. 关键差异点 (1) 一致性模型 (2) 性能 (3) 数据容量 (4) 高可用性 3. 适用场景 使用 Zookeeper 的场景 使用 Redis 的场景 4. 替代方案 5. 如何选择&#xff1f; 6. 常见误区 7. 总结 前言…

动态规划从入坟走向入坑

目录 1.动态规划的含义 2.动态规划的解题步骤(动规五部曲) 3.动态规划的题型 4.相关思路 1.动规基础(由于我看的博主讲的动规很简单,所以这里就不讲了) 2.背包问题 1.二维dp数组01背包 1.dp[i][j] 表示从下标为[0-i]的物品里任意取&#xff0c;放进容量为j的背包&#xf…

Golang学习笔记_34——组合模式

Golang学习笔记_31——原型模式 Golang学习笔记_32——适配器模式 Golang学习笔记_33——桥接模式 文章目录 一、核心概念1. 定义2. 解决的问题3. 核心角色4. 类图 二、特点分析三、适用场景1. 文件系统2. 图形界面3. 组织架构 四、代码示例&#xff08;Go语言&#xff09;五、…

vue+elementplus创建初始化安装

项目创建初始化 D:\Tool\mysql\education_vue 这个路径下cmd 或打开vscode&#xff0c;把项目丢进code中打开 安装element plus Container 布局容器 | Element Plus npm install element-plus --save 把项目初始文件Homeview AboutView删了&#xff0c;Router index.js中删一…

SQL 优化工具使用之 explain 详解

一、导读 对于大部分开发人员来说&#xff0c;平常接触的无非就是增删改查这些基本操作&#xff0c;创建存储过程&#xff0c;视图等等都是 DBA 该干的活&#xff0c;但是想要把这些基本操作写的近乎完美也是一件难事。 而 explain 显示了 MySQL 如何使用索引来处理 select 语…