目录
- Mysql实时数据同步工具Alibaba Canal 使用
- Canal是什么?
- 工作原理
- 重要版本更新说明
- 环境准备
- 安装Canal
- window
- Java : Canal Client 集成
- 依赖
- 编码
- 工作流程
- 其他学习canal资料
个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯
Mysql实时数据同步工具Alibaba Canal 使用
📖 本文核心知识点:
- Canal 是什么
- 安装Canal 服务
- 使用Canal 客户端
- 原生集成数据MQ
- 同步数据客户端服务【业务】
Canal是什么?
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理 - canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
重要版本更新说明
canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
- 整体性能测试&优化,提升了
150%
. #726 参考: Performance - 原生支持
prometheus
监控 #765 Prometheus QuickStart - 原生支持
kafka消息投递
#695 Canal Kafka/RocketMQ QuickStart - 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
- 原生支持
docker镜像
#801 参考: Docker QuickStart
canal1.1.4
版本,迎来最重要的WebUI能力,引入canal-admin
工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide
环境准备
安装Canal
DownLoad
版本: 1.1.7
window
- 下载 tar.gz包,解压
GitHub Canal - 配置文件设置:
解压完后修改配置文件
查看conf/canal.properties
,其中canal.port
是客户端连接的端口,需要放开,canal.admin.user
和canal.admin.passwd
是客户端连接的账号
再打开conf/example/ instance.properties
,master.address
填数据库地址,dbUsername
和dbPassword
是数据库账号,flter.regex
可以用来过滤数据库
,默认是监听所有数据库,如果想监听db_
开头的数据可以这么写db_.*\\..*
,多个用逗号分隔
- 启动服务
bin/startup.bat
log/canal.log
Java : Canal Client 集成
依赖
implementation 'com.alibaba.otter:canal.client:1.1.7'
implementation 'com.alibaba.otter:canal.protocol:1.1.7'
具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。
需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意
编码
package com.kongxiang.infrastructure.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
/**
* @author 孔翔
* @since 2023-12-27
* copyright for author : 孔翔 at 2023-12-27
* study-spring3
*/
@Component
@Slf4j
public class CanalService {
private String canalMonitorHost = "localhost";
private int canalMonitorPort = 11111;
private String filterRegexTable = "xkongdb\\..*";
private final static int BATCH_SIZE = 10000;
@Async("canalTask")
public void startCanal() {
Consumer<CanalConnector> connectorConsumer = new ConsumerTask();
while (true) {
executeCanal(connectorConsumer);
try {
//防止频繁访问数据库链接: 线程睡眠 10秒
ThreadUtils.sleep(Duration.ofSeconds(10));
log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public void executeCanal(Consumer<CanalConnector> runnable) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
try {
//打开连接
connector.connect();
log.debug("数据库检测连接成功!" + filterRegexTable);
//订阅数据库表,全部表q
connector.subscribe(filterRegexTable);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
if (runnable != null) {
runnable.accept(connector);
}
} catch (Exception e) {
e.printStackTrace();
log.error("成功断开监测连接!尝试重连");
} finally {
connector.disconnect();
}
}
public static class ConsumerTask implements Consumer<CanalConnector> {
public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//根据数据库名获取租户名
String databaseName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
log.info("数据库: {}, 表名: {}", databaseName, tableName);
// 获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
// 获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
// 反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
|| eventType == CanalEntry.EventType.DELETE) {
// 获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
// 遍历rowDataList,并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
// 变更前数据
for (CanalEntry.Column column : beforeColumnsList) {
log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
}
// 变更后数据
for (CanalEntry.Column column : afterColumnsList) {
log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
}
}
}
}
}
}
@Override
public void accept(CanalConnector connector) {
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
try {
log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
handleMessage(message.getEntries());
} catch (Exception e) {
connector.rollback(batchId); // 处理失败, 回滚数据
}
}
// 提交确认
connector.ack(batchId);
}
}
}
}
测试代码
@Test
public class CanalTest {
@Test
public void testListener() {
CanalService canalService = new CanalService();
canalService.startCanal();
}
}
测试结果
- 当
xkongdb
的数据表的数据进行insert
,update
,delete
的时候,就会触发canal任务执行。 - 日志
工作流程
其他学习canal资料
【开源实战】阿里开源MySQL中间件Canal快速入门
mysql的binlog开启方式