Canal1--搭建Canal监听数据库变化

1.安装mysql

默认安装了mysql(版本8.0.x);

新创建用户

-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';

授权

grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' with grant option;

flush privileges;

在这里插入图片描述

查看MySQL是否开启binlog模式

show variables like 'log_bin';

在这里插入图片描述
查看当前正在写入的binlog日志:

show master status;

在这里插入图片描述
记住文件名和偏移量

2.安装Canal

去官网下载页面进行下载;

我这里下载的是1.1.7的版本:
在这里插入图片描述
解压canal.deployer-1.1.7.tar.gz,我们可以看到里面有五个文件夹:
在这里插入图片描述
打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=自己的日志名称
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=日志的偏移量
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

开启Canal服务端:
进入bin目录

.\startup.bat

3.Java客户端操作

首先引入maven依赖:

<!--canal客户端-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

然后创建一个CanalClient类

import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

完成之后再对应数据库进行操作,控制台会打印对应的操作,说明对数据的写入操作进行了有效的监控;
注意只读操作并不会写入binlog也不会被Canal监控到(也没必要监控读取操作)。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/564254.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法库应用-有序单链表插入节点

学习源头: 模仿贺利坚老师单链表排序文章浏览阅读5.9k次。  本文针对数据结构基础系列网络课程(2)&#xff1a;线性表中第11课时单链表应用举例。例&#xff1a;拆分单链表 &#xff08;linklist.h是单链表“算法库”中的头文件&#xff0c;详情单击链接…&#xff09;//本程…

前端通过http请求访问本地图片

1、前端直接引用本地图片&#xff0c;图片加载失败 具体报错信息如下&#xff1a; Not allowed to load local resource不允许加载本地资源 2、针对以上问题&#xff0c;只需要利用拦截器将本机地址映射成url路径就行 具体代码如下 Configuration public class FileConfig i…

Android Studio实现内容丰富的安卓校园超市

获取源码请点击文章末尾QQ名片联系&#xff0c;源码不免费&#xff0c;尊重创作&#xff0c;尊重劳动 项目代号168 1.开发环境 后端用springboot框架&#xff0c;安卓的用android studio开发 android stuido3.6 jdk1.8 idea mysql tomcat 2.功能介绍 安卓端&#xff1a; 1.注册…

区块链 | OpenSea:Toward Achieving Anonymous NFT Trading 一文的改进方案

&#x1f951;原文&#xff1a; Toward Achieving Anonymous NFT Trading &#x1f951;吐槽&#xff1a; 这论文怎么老有描述不清、前后不一致的地方&#x1f607; 正文 在本节中&#xff0c;我们将具体展示我们方案的构建。我们将基于一个示例来描述我们方案的工作流程&…

如何快速完成手头工作 待办工作的处理技巧

作为忙碌的上班族&#xff0c;我们每天都要处理大量的工作任务。面对堆积如山的工作&#xff0c;稍有不慎&#xff0c;就可能导致任务的延误甚至遗忘。想象一下&#xff0c;在紧张而忙碌的一天结束时&#xff0c;突然发现一个重要的报告还未完成&#xff0c;或者忘记了一个关键…

Python从0到100(十五):函数的高级应用

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

【ARFoundation自学01】搭建AR框架+检测平面+点击克隆立方体到地面=自信入门!

介绍 AR 的功能其实是个大手机系统厂商和眼镜设备厂商开发的功能&#xff0c;并不是Unity的功能&#xff0c;毕竟Unity没有自己的手机设备&#xff01;比如谷歌公司的安卓开发了ARcore&#xff0c;让所有安卓8.0版本以上的用户能够在手机上体验AR功能&#xff01;苹果推出了AR…

token逆向和简单的字体解密

一、token加密部分 网址&#xff1a;aHR0cHM6Ly9qenNjLmpzdC56ai5nb3YuY24vUHVibGljV2ViL2luZGV4Lmh0bWwjLw 点开查询的xhr的包&#xff0c;发现headers中含有token这一加密参数。 直接跟栈&#xff0c;很容发现在第三个栈加密。 s Object(L.b)(l).toString()&#xff0c;由…

VASA-1:一键生成高质量视频,颠覆你的想象!

VASA-1&#xff1a;语音生成AI视频 前言 最近&#xff0c;微软公司公布了一项图生视频的 VASA-1 框架&#xff0c;该 AI 框架只需使用一张真人肖像照片和一段个人语音音频&#xff0c;就能够生成精确逼真的相对应文本的视频&#xff0c;而且可以使表情和面部动作表现的十分自然…

Dubbo应用可观测性升级指南与踩坑记录

应用从dubbo-3.1.*升级到dubbo-*:3.2.*最新稳定版本&#xff0c;提升应用的可观测性和度量数据准确性。 1. dubbo版本发布说明(可不关注) dubbo版本发布 https://github.com/apache/dubbo/releases 【升级兼容性】3.1 升级到 3.2 2. 应用修改点 应用一般只需要升级dubbo-s…

Windows 的常用命令(不分大小写)

Net user &#xff08;查看当前系统所有的账户&#xff09; net user yourname password /add 添加新用户 net localgroup administrators yourname /add 添加管理员权限 net user yourname /delete 删除用户 net user 命令 [colorred]说明&#xff1a;以下命令仅限持管理员…

CentOS 7安装、卸载MySQL数据库(一)

说明&#xff1a;本文介绍如何在CentOS 7操作系统下使用yum方式安装MySQL数据库&#xff0c;及卸载&#xff1b; 安装 Step1&#xff1a;卸载mariadb 敲下面的命令&#xff0c;查看系统mariadb软件包 rpm -qa|grep mariadb跳出mariadb软件包信息后&#xff0c;敲下面的命令…

振弦采集仪在岩土工程施工控制中的作用与效果

振弦采集仪在岩土工程施工控制中的作用与效果 河北稳控科技振弦采集仪在岩土工程施工中是一种常见的测量仪器&#xff0c;它通过对地表地震波的传播和振动特性进行监测&#xff0c;可以提供关键的信息&#xff0c;用于控制施工质量和安全。振弦采集仪具有精度高、反应快的特点…

ADOP带您了解光模块发射光功率的四种测试方法

&#x1f932;&#x1f932;品质优良的光模块在发货前一般经过了严格的硬件测试和光学测试&#xff0c;光学测试主要检测光模块的兼容性&#xff0c;硬件测试主要是参数测试&#xff0c;其中包含发射光功率、接收灵敏度、工作温度、偏置电流等等。 &#x1f310;&#x1f310;…

07 文件-IO流字节流

File File类的使用 File对象既可以代表文件、也可以代表文件夹。它封装的对象仅仅是一个路径名&#xff0c;这个路径可以存在&#xff0c;也可以不存在 创建File类的对象 构造器说明public File(String pathname)根据文件路径创建文件对象public File(String parent, Strin…

(精)3款好用的企业防泄密软件推荐 | 好用电脑文件防泄密软件推荐

企业防泄密是指通过一系列的技术手段和管理措施&#xff0c;防止企业的敏感信息、核心技术、商业机密等被未经授权的第三方获取或泄露。 这不仅是保护企业资产和知识产权的重要手段&#xff0c;也是维护企业声誉、保持市场竞争力的关键因素。 而企业防泄密软件则是专门用于实…

JavaScript之模块化规范详解

文章的更新路线&#xff1a;JavaScript基础知识-Vue2基础知识-Vue3基础知识-TypeScript基础知识-网络基础知识-浏览器基础知识-项目优化知识-项目实战经验-前端温习题&#xff08;HTML基础知识和CSS基础知识已经更新完毕&#xff09; 正文 CommonJS、UMD、CMD和ES6是不同的模块…

2024/4/21周报

文章目录 摘要Abstract文献阅读题目问题贡献方法卷积及池化层LSTM层CNN-LSTM模型 数据集参数设置评估指标实验结果 深度学习使用GRU和LSTM进行时间预测1.库的导入&数据集2.数据预处理3.模型定义4.训练过程5.模型训练 总结 摘要 本周阅读了一篇基于CNN-LSTM黄金价格时间序列…

【热门话题】常用经典目标检测算法概述

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 常用经典目标检测算法概述1. 滑动窗口与特征提取2. Region-based方法R-CNN系列M…

心理学|变态心理学健康心理学——躯体疾病患者的一般心理特点

一、对客观世界和自身价值的态度发生改变 患者除了内部器官有器质或功能障碍外&#xff0c;他们的自我感觉和整个精神状态也会发生变化。使人改变对周围事物的感受和态度&#xff0c;也可以改变患者对自身存在价值的态度。这种主观态度的改变&#xff0c;可以使患者把自己置于人…