canal1.1.7实战

1.环境搭建

canal可以用来监听mysql数据库的变化,用来同步数据

先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub

包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

 下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz

解压完后修改配置文件

查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号

再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔

 修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭

进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了

 2.客户端代码

引入依赖

  <dependencies>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.7</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.protocol</artifactId>
      <version>1.1.7</version>
    </dependency>
  </dependencies>

代码实现:

package cn.hollycloud.iplatform;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * Unit test for simple App.
 */
@Slf4j
public class CanalTest {
    private Map<String, String> errorMap = new HashMap<>();

    @Test
    public void testCanal() {
        initThread();
    }

    private void initThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        initConnect();
                    } catch (Exception e) {
                        String key = "canal_connection_error";
                        if (!hasSameError(key, e.getMessage())) {
                            log.error("canal连接出错: {}", e);
                        }
                    }
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }).start();
    }

    private void initConnect() {
        String canalIp = "localhost";
        int canalPort = 11111;
        String canalDestination = "example";
        String canalUsername = "admin";
        String canalPassword = "123456";
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,
                canalPort), canalDestination, canalUsername, canalPassword);
        int batchSize = 200;
        try {
            connector.connect(); // 连接到canal server
            connector.subscribe("db_.*\\..*"); // 订阅指定的消息
            connector.rollback(); // 回滚到未进行ack 的地方
            log.info("canal连接成功");
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        //未获取到消息则睡眠
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    try {
                        //处理消息
                        log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
                        handleMessage(message.getEntries());
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                        String key = "canal_sync_data_error";
                        String errMsg = e.getMessage();
                        if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();
                        if (!hasSameError(key, errMsg)) {
                            log.error("同步数据出错: {}", e);
                        }
                        //休眠一段时间继续获取数据
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                        continue;
                    }
                }
                connector.ack(batchId); // 提交确认
            }
        } finally {
            connector.disconnect();
        }
    }

    private boolean hasSameError(String key, String error) {
        String lastError = errorMap.get(key);
        if (Objects.equals(lastError, error)) {
            return true;
        }
        errorMap.put(key, error);
        return false;
    }

    private void handleMessage(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //根据数据库名获取租户名
            String databaseName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            log.info("数据库: {}, 表名: {}", databaseName, tableName);
            // 获取类型
            CanalEntry.EntryType entryType = entry.getEntryType();

            // 获取序列化后的数据
            ByteString storeValue = entry.getStoreValue();
            if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                // 反序列化数据
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                // 获取当前事件的操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
                        || eventType == CanalEntry.EventType.DELETE) {
                    // 获取数据集
                    List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                    // 遍历rowDataList,并打印数据集
                    for (CanalEntry.RowData rowData : rowDataList) {
                        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                        // 变更前数据
                        for (CanalEntry.Column column : beforeColumnsList) {
                            log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());
                        }
                        // 变更后数据
                        for (CanalEntry.Column column : afterColumnsList) {
                            log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());
                        }
                    }
                }
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/157881.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《Effective C++》条款20

宁以pass-by-reference-to-const替换pass-by-value class A { public:A() {cout << "A()" << endl;}A(const A& a){cout << "A(const A& a)" << endl;}~A(){cout << "~A()" << endl;} private:stri…

如何选择数据恢复软件?前 5 名免费数据恢复软件榜单供参考

我们都知道开源数据恢复软件有很多优点。搜索免费解决方案的用户会被其可用性所吸引&#xff0c;而那些拥有足够技术技能的用户可能会被其定制软件以满足其需求的灵活性所吸引。在本文中&#xff0c;我们为您挑选了最好的开源数据恢复软件&#xff0c;并将尝试回答开源软件是否…

基于SpringBoot的SSMP整合案例(在Linux中发布项目的注意事项与具体步骤步骤)

前言与注意 这几天在Linux中上线之前的小项目时&#xff0c;遇到了很多的问题&#xff0c;Linux镜像的选择&#xff0c;jdk&#xff0c; mysql在linux中的下载&#xff0c;使用finallshell连接linux&#xff0c;使用tomcat连接linux中的数据库........ 在下面的注意事项中我会将…

C#学习相关系列之Linq常用方法---排序(一)

一、构建数据 public class Student_1{public int ID { get; set; }public string Name { get; set; }public int Chinese { get; set; }public int Math { get; set; }public int English { get; set; }public override string ToString(){return string.Format("ID:{0},…

PostgreSQL按月计算每天值的累加

要按月计算每天值的累加&#xff0c;您可以使用PostgreSQL中的日期函数和窗口函数。下面是一个示例查询&#xff0c;假设您有一个名为"table_name"的表&#xff0c;其中包含一个日期列"date_column"和一个数值列"value_column"&#xff1a; SELE…

FindMy技术用于保温杯

在即将到来的冬季&#xff0c;每个人都开始给自己准备一个保温杯&#xff0c;保温杯是一种盛水的容器&#xff0c;主要由陶瓷或不锈钢制成&#xff0c;并加入真空层&#xff0c;以实现保温效果。这种杯子顶部有盖&#xff0c;密封严实&#xff0c;能够延缓内部液体散热&#xf…

Taro编译警告解决方案:Error: chunk common [mini-css-extract-plugin]

文章目录 1. 背景2. 问题分析3. 解决方案3.1 更新 Taro 版本3.2 更新相关依赖3.3 调整 webpack 配置3.4 检查依赖版本 4. 拓展与分析4.1 拓展4.2 避免不必要的依赖4.3 查阅 Taro GitHub 仓库 5. 总结 &#x1f389;欢迎来到Java学习路线专栏~Taro编译警告解决方案&#xff1a;E…

59 权限提升-Win溢出漏洞及ATSCPS提权

目录 知识点必备&#xff1a;windows权限认识(用户及用户组)0x01 普通权限0x02特殊权限 演示案例:基于WEB环境下的权限提升-阿里云靶机基于本地环境下的权限提升-系统溢出漏洞基于本地环境下的权限提升-AT&SC&PS命令 案例给到的思路点总结如下:涉及资源: 这个章节会讲到…

【论文解读】CP-SLAM: Collaborative Neural Point-based SLAM System_神经点云协同SLAM系统(上)

目录 1 Abstract 2 Related Work 2.1 单一智能体视觉SLAM&#xff08;Single-agent Visual SLAM&#xff09; 2.2 协同视觉SLAM&#xff08;Collaborative Visual SLAM&#xff09; 2.3 神经隐式表示&#xff08;Neural Implicit Representation&#xff09; 3 Method 3.…

这款IDEA插件真的爱了

IDEA是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它可以帮助开发人员更加高效地编写、调试和部署软件应用程序。我们在编写完接口代码后需要进行接口调试等操作&#xff0c;一般需要打开额外的调试工具。 今天给大家介绍一款IDEA插件&#xff1a;Api…

单机版-redis(手动部署)

单机版-redis部署 部署模式:单机版-redis部署 Redis版本&#xff1a;redis-4.0.1 部署redis方式&#xff1a;手动部署 在完成第三步时已完成配置&#xff0c;后续为操作命令以及注意事项&#xff1b; 在进行操作数据库时&#xff0c;需要关注第五步注意事项&#xff0c;会涉…

Unity 场景烘培 ——unity灯光和设置天空盒(二)

提示&#xff1a;文章有错误的地方&#xff0c;还望诸位大神指出。 文章目录 前言一、光源种类1.Directional Light(方向光&#xff0c;平行光)2.Point Light&#xff08;点光源&#xff09;3.Spotlight&#xff08;聚光灯&#xff09;4.Area Light&#xff08;区域光&#xff…

【网络奇遇记】那年我与计算机网络的初相识 —— 网络的体系结构

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. 常见的三种计算机网络体系结构1.1 开放系统互连参考模型1.2 TCP/IP参考模型1.3 原理参考模型 二…

SSM项目初始化流程与操作概念解释-SpringBoot简化版

文章目录 1.引入概念2.导入依赖3.项目配置4.依照SpringMVC框架构建项目 1.引入概念 例如某一个XX系统&#xff0c;该系统存在前台页面&#xff08;给用户直观看或使用&#xff09;&#xff0c;和后台页面&#xff08;给管理人员调整数据和权限&#xff09;。 这二个页面都通过…

世界坐标系,相机坐标系,像素坐标系转换 详细说明(附代码)

几个坐标系介绍,相机内外参的回顾参考此文。 本文主要说明如何在几个坐标系之间转换。 本文涉及: 使用相机内参 在 像素坐标系 和 相机坐标系 之间转换。使用相机外参(位姿)在相机坐标系 和 世界坐标系 之间转换。(qw,qx,qy,qz,tx,ty,tz)形式的外参如何使用。以具体情景为…

CSGO饰品持续跌价,市场真的要崩盘了吗?

CSGO饰品市场会崩盘吗&#xff1f;CSGO还能做多久&#xff1f; CSGO饰品持续跌价&#xff0c;市场真的要崩盘了吗&#xff1f; 除非v社那边有什么大动作&#xff0c;不然就市场而言&#xff0c;饰品恐怕永远不会崩盘。 原因其实很简单&#xff0c;只要庄家和大户不崩&#xf…

leetcode刷题日记:141. Linked List Cycle(环形链表)

这一题是给我们一个链表让我们判断这是否是一个环形链表&#xff0c;我们知道如果一个链表中有环的话这一个链表是没有办法访问到尾的&#xff0c; 假若有如图所示的带环链表&#xff1a; 我们从图示中很容易看出来这一个链表在访问的时候会在里面转圈&#xff0c;我们再来看看…

进程控制3——进程程序替换

进程的创建有fork&#xff0c;进程的退出有main函数的return&#xff0c;exit&#xff0c;_exit函数 而进程的退出中&#xff0c;一个进程的退出只能有三种情况&#xff0c;退出成功结果对/不对&#xff0c;或者是运行异常收到信号终止 但是我们发现我们用代码创建的子进程它是…

面试鸭 - 专注于面试刷题的网站

网上面试题有很多&#xff0c;但此套面试题真实、原创、高频&#xff0c;全网最强。 题目涵盖大中小公司&#xff0c;真实靠谱&#xff0c;有频率和难度的标记&#xff0c;助你成为Offer收割机。 面试鸭地址&#xff1a;https://mianshiya.skyofit.com/ 本套题是我原创&…