Mysql实时数据同步工具Alibaba Canal 使用

目录

  • Mysql实时数据同步工具Alibaba Canal 使用
    • Canal是什么?
      • 工作原理
      • 重要版本更新说明
    • 环境准备
    • 安装Canal
      • window
    • Java : Canal Client 集成
      • 依赖
      • 编码
    • 工作流程
    • 其他学习canal资料

个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯


在这里插入图片描述

Mysql实时数据同步工具Alibaba Canal 使用

📖 本文核心知识点:

  • Canal 是什么
  • 安装Canal 服务
  • 使用Canal 客户端
  • 原生集成数据MQ
  • 同步数据客户端服务【业务】

Canal是什么?

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
    canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

重要版本更新说明

canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance
  • 原生支持prometheus监控 #765 Prometheus QuickStart
  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
  • 原生支持docker镜像 #801 参考: Docker QuickStart
    canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide

环境准备

安装Canal

DownLoad

版本: 1.1.7

window

  1. 下载 tar.gz包,解压
    GitHub Canal
  2. 配置文件设置:
    解压完后修改配置文件
    查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.usercanal.admin.passwd是客户端连接的账号
    在这里插入图片描述
    再打开conf/example/ instance.properties, master.address填数据库地址,dbUsernamedbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔
    在这里插入图片描述
  3. 启动服务 bin/startup.bat
    log/canal.log
    在这里插入图片描述

Java : Canal Client 集成

依赖

    implementation 'com.alibaba.otter:canal.client:1.1.7'
    implementation 'com.alibaba.otter:canal.protocol:1.1.7'

具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。

需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意

编码

package com.kongxiang.infrastructure.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;


/**
 * @author 孔翔
 * @since 2023-12-27
 * copyright for author : 孔翔 at 2023-12-27
 * study-spring3
 */
@Component
@Slf4j
public class CanalService {

    private String canalMonitorHost = "localhost";
    private int canalMonitorPort = 11111;

    private String filterRegexTable = "xkongdb\\..*";


    private final static int BATCH_SIZE = 10000;


    @Async("canalTask")
    public void startCanal() {
        Consumer<CanalConnector> connectorConsumer = new ConsumerTask();
        while (true) {
            executeCanal(connectorConsumer);
            try {
                //防止频繁访问数据库链接: 线程睡眠 10秒
                ThreadUtils.sleep(Duration.ofSeconds(10));
                log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    public void executeCanal(Consumer<CanalConnector> runnable) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
        try {
            //打开连接
            connector.connect();
            log.debug("数据库检测连接成功!" + filterRegexTable);
            //订阅数据库表,全部表q
            connector.subscribe(filterRegexTable);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            if (runnable != null) {
                runnable.accept(connector);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("成功断开监测连接!尝试重连");
        } finally {
            connector.disconnect();
        }
    }

    public static class ConsumerTask implements Consumer<CanalConnector> {
        public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                //根据数据库名获取租户名
                String databaseName = entry.getHeader().getSchemaName();
                String tableName = entry.getHeader().getTableName();
                log.info("数据库: {}, 表名: {}", databaseName, tableName);
                // 获取类型
                CanalEntry.EntryType entryType = entry.getEntryType();

                // 获取序列化后的数据
                ByteString storeValue = entry.getStoreValue();
                if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    // 反序列化数据
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                    // 获取当前事件的操作类型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
                            || eventType == CanalEntry.EventType.DELETE) {
                        // 获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        // 遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            // 变更前数据
                            for (CanalEntry.Column column : beforeColumnsList) {
                                log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                            // 变更后数据
                            for (CanalEntry.Column column : afterColumnsList) {
                                log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                        }
                    }
                }
            }
        }


        @Override
        public void accept(CanalConnector connector) {
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                } else {
                    try {
                        log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
                        handleMessage(message.getEntries());
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                    }
                }
                // 提交确认
                connector.ack(batchId);
            }
        }
    }
}

测试代码

@Test
public class CanalTest {

    @Test
    public void testListener() {
        CanalService canalService = new CanalService();
        canalService.startCanal();
    }
}

测试结果

  1. xkongdb的数据表的数据进行 insert,update,delete的时候,就会触发canal任务执行。
  2. 日志 在这里插入图片描述

工作流程

在这里插入图片描述

其他学习canal资料

【开源实战】阿里开源MySQL中间件Canal快速入门
mysql的binlog开启方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/274482.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析

文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间&#xff1a;北京时间2024年2月2日&#xff08;周五&#xff…

excel 函数技巧

1&#xff1a;模糊查询 LOOKUP(1,0/FIND(F1062,Sheet1!C$2:Sheet1!C$9135),Sheet1!B$2:Sheet1!B$9135) 函数含义&#xff1a;寻找F列1062行和sheet1中的C2行到C9135行进行模糊查询&#xff0c;返回该行对应的B2行到B9135行的结果。未查到返回结果0 函数公式&#xff1a; LO…

leetcode贪心算法题总结(一)

此系列分三章来记录leetcode的有关贪心算法题解&#xff0c;题目我都会给出具体实现代码&#xff0c;如果看不懂的可以后台私信我。 本章目录 1.柠檬水找零2.将数组和减半的最少操作次数3.最大数4.摆动序列5.最长递增子序列6.递增的三元子序列7.最长连续递增序列8.买卖股票的最…

设计模式-过滤器模式

设计模式专栏 模式介绍模式特点应用场景Java中的过滤器介绍代码示例Java实现过滤器模式Python实现过滤器模式 过滤器模式在spring中的应用 模式介绍 过滤器模式是一种设计模式&#xff0c;它允许开发人员使用不同的标准来过滤一组对象。这种模式是通过运算逻辑以解耦的方式将它…

XIAO ESP32S3之物体检测加入视频流

一、前言 由于XIAO ESP32S3开发套件没有显示屏配件&#xff0c;因此加入http视频流功能&#xff0c;可通过浏览器请求ESP32S3上的视频流。 二、思路 1、XIAO ESP32S3启动后通过wifi连接到AP&#xff1b; 2、启动http服务器&#xff0c;注册get_mjpeg处理函数&#xff1b; 3…

2023年中职“网络安全”——B-5:网络安全事件响应(Server2216)

B-5&#xff1a;网络安全事件响应 任务环境说明&#xff1a; 服务器场景&#xff1a;Server2216&#xff08;开放链接&#xff09; 用户名:root密码&#xff1a;123456 1、黑客通过网络攻入本地服务器&#xff0c;通过特殊手段在系统中建立了多个异常进程&#xff0c;找出启…

【Pytorch】学习记录分享8——PyTorch自然语言处理基础-词向量模型Word2Vec

【Pytorch】学习记录分享7——PyTorch自然语言处理基础-词向量模型Word2Vec 1. 词向量模型Word2Vec)1. 如何度量这个单词的&#xff1f;2.词向量是什么样子&#xff1f;3.词向量对应的热力图&#xff1a;4.词向量模型的输入与输出![在这里插入图片描述](https://img-blog.csdni…

Java面试题及答案汇总来啦!快来领取

Java面试题及答案汇总来啦&#xff01;快来领取 还有不到两个月就要过年了&#xff0c;过完年紧接着“金三银四”招聘热季就要到了&#xff0c;在过年期间只想着吃吃喝喝玩玩&#xff0c;这习是学不了一点。那就趁着过年前这段时间开始恶补Java面试题&#xff0c;实现弯道超车吧…

ArkTS基本概念装饰器

目录 ArkTS基本概念 装饰器汇总 ArkTS基本概念 ArkTS是HarmonyOS的主力应用开发语言。 它在TypeScript&#xff08;简称TS&#xff09;的基础上&#xff0c;匹配ArkUI框架&#xff0c;扩展了声明式UI、状态管理等相应的能力&#xff0c;让开发者以更简洁、更自然的方式开发跨…

FTP简介FTP服务器的搭建【虚拟机版】以及计算机端口的介绍

目录 一. FTP简介 二. FTP服务器的搭建【虚拟机Windows2012版】 1. 启用防火墙 2. 打开服务器管理器➡工具➡计算机管理 3. 选择本地用户与组➡新建组 4. 给组命名&#xff0c;输入描述&#xff0c;点击创建 5. 新建用户&#xff0c;设置用户名称&#xff0c;添加描述&a…

立体匹配算法(Stereo correspondence)SGM

SGM(Semi-Global Matching)原理&#xff1a; SGM的原理在wiki百科和matlab官网上有比较详细的解释&#xff1a; wiki matlab 如果想完全了解原理还是建议看原论文 paper&#xff08;我就不看了&#xff0c;懒癌犯了。&#xff09; 优质论文解读和代码实现 一位大神自己用c实现…

IntelliJ IDEA [插件 MybatisX] mapper和xml间跳转

文章目录 1. 安装插件2. 如何使用3. 主要功能总结 MybatisX 是一款为 IntelliJ IDEA 提供支持的 MyBatis 开发插件 它通过提供丰富的功能集&#xff0c;大大简化了 MyBatis XML 文件的编写、映射关系的可视化查看以及 SQL 语句的调试等操作。本文将介绍如何安装、配置和使用 In…

redis 三主六从高可用docker(不固定ip)

redis集群(cluster)笔记 redis 三主三从高可用集群docker swarm redis 三主六从高可用docker(不固定ip) 此博客解决&#xff0c;redis加入集群后&#xff0c;是用于停掉后重启&#xff0c;将nodes.conf中的旧的Ip替换为新的IP&#xff0c;从而达到不会因为IP变化导致集群无法…

StackOverflowError的JVM处理方式

背景&#xff1a; 事情来源于生产的一个异常日志 Caused by: java.lang.StackOverflowError: null at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:908) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayL…

阿里云 ACK 云上大规模 Kubernetes 集群高可靠性保障实战

作者&#xff1a;贤维 马建波 古九 五花 刘佳旭 引言 2023 年 7 月&#xff0c;阿里云容器服务 ACK 成为首批通过中国信通院“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c; 并荣获“先进级”认证。随着 ACK 在生产环境中的采用率越来越高&#xff0c;稳定性保…

【ES6】Class继承-super关键字

目录 一、前言二、ES6与ES5继承机制区别三、super作为函数1、构造函数this1&#xff09;、首先要明确this指向①、普通函数②、箭头函数③、注意事项 2&#xff09;、其次要明确new操作符做了哪些事情 2、super()的用法及注意点1&#xff09;、用法2&#xff09;、注意点 四、s…

Unity引擎有哪些优点

Unity引擎是一款跨平台的游戏引擎&#xff0c;拥有很多的优点&#xff0c;如跨平台支持、强大的工具和编辑器、灵活的脚本支持、丰富的资源库和强大的社区生态系统等&#xff0c;让他成为众多开发者选择的游戏开发引擎。下面我简单的介绍一下Unity引擎的优点。 跨平台支持 跨…

用Xshell连接虚拟机的Ubuntu20.04系统记录。虚拟机Ubuntu无法上网。本机能ping通虚拟机,反之不能。互ping不通

先别急着操作&#xff0c;看完再试。 如果是&#xff1a;本机能ping通虚拟机&#xff0c;反之不能。慢慢看到第8条。 如果是&#xff1a;虚拟机不能上网&#xff08;互ping不通&#xff09;&#xff0c;往下一直看。 系统是刚装的&#xff0c;安装步骤&#xff1a;VMware虚拟机…

TCP 滑动窗口

滑动窗口&#xff08;Sliding window&#xff09;是一种流量控制技术。早期的网络通信中&#xff0c;通信双方不会考虑网络的拥挤情况直接发送数据。由于大家不知道网络拥塞状况&#xff0c;同时发送数据&#xff0c;导致中间节点阻塞掉包&#xff0c;谁也发不了数据&#xff0…

数据分析工具 Top 8

你能想象一个没有工具箱的水管工吗? 没有,对吧? 数据从业者也是如此。如果没有他们的数据分析工具&#xff0c;数据从业者就无法分析数据、可视化数据、从数据中提取价值&#xff0c;也无法做数据从业者在日常工作中做的许多很酷的事情。 根据你最感兴趣的数据科学职业——数…