canal实操应用

一、MySQL的binlog日志

1.1、binlog的分类

binlog一般分为三类:statement语句级,记录一条一条的SQL,一条SQL可能更改多行,且SQL语句中如果用到now()函数或者random()函数,会存在数据不一致的问题。row行级,记录一行行的数据,记录特别细致,但是日志文件会比较大。mixed:混合模式,默认还是statement,某些情况下,如UUID()函数就会用row的方式进行处理。

1.2、准备好数据库及表

我这里准备了一个canal库,一张student表

CREATE TABLE `student` (
  `id` int NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `sex` bit(1) DEFAULT NULL COMMENT '0:女,1:男',
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

小知识:mysql utf8mb4_0900_ai_ci和utf8mb4_general_ci区别?

utf8mb4_general_ci:
utf8mb4_general_ci 是较为简单的排序规则,对字符的比较是基于字符的二进制值的。
不考虑语言特定的规则,适用于一般的字符比较,但在某些语境下可能导致不符合期望的排序结果。
不区分大小写,但对于一些特殊字符,可能不按照某些语言的预期顺序排序。
utf8mb4_0900_ai_ci:
utf8mb4_0900_ai_ci 是 MySQL 8.0.0 版本后引入的排序规则,采用 Unicode 9.0.0 版本的规则。
它考虑了更多的语言和语境,提供更精确的字符排序,适用于多语言环境。
支持大小写不敏感的比较,并且对于一些特殊字符的排序更符合语言特定的规范。
选择哪种排序规则取决于你的具体需求。如果你的应用面向的是特定语言环境,需要更精确和符合语言规范的字符比较,那么 utf8mb4_0900_ai_ci 可能更适合。如果你对语言特定的排序规则没有特别的要求,或者你希望比较快速并且不涉及复杂的字符排序问题,那么 utf8mb4_general_ci 也是一个可行的选择。

CREATE TABLE my_table (
    column1 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
    column2 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
);

注:但是utf8只有utf8_general_ci这种字符集排序规则
在这里插入图片描述

二、实操

2.1、修改配置文件开启binlog

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html

[mysqld]
# [必须]主服务器唯一ID,千万注意做mysql集群的时候这个id不能重复,
# 而当前我们用canal同步也是,canal是伪装成mysql的slave,所以也不能和canal配置文件中的id重复
server-id=1
# [必须]启动二进制日志,指明路径。比如:自己本地的路径/log/mysqlbin
log-bin=mysql-bin
# [可选]设置需要复制的数据库,默认全部记录。比如
# 开启需要监控的数据库
binlog-do-db=replica_master_slave
binlog-do-db=canal
# binlog日志级别statement,row,mixed
#binlog_format=STATEMENT
binlog_format=ROW

修改完MySQL配置文件之后记得重启MySQL服务, systemctl restart mysqld
重启之后开是否真正开启binlog日志,可以插入一条数据前看一下日志大小,插入数据之后再查看一下binlog文件的大小,linu直接安装的一般在/var/lib/mysql文件夹下就可以看到binlog日志

2.2、canal用户赋值MySQL权限

最小权限原则,赋予canl用户读的权限即可,mysql -u root -p123456 密码写你自己的,登录客户端执行如下语句

# MySQL5.7需要执行如下2条语句
set global validate_password_length=4;
set global validate_password_policy=0;
#8.0使用下面一条语句即可
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
#也许你也和我一样,上面那条语句根本执行不成功,执行如下语句即可——先创建用户,再赋予权限
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

如下图只开放了读的权限
在这里插入图片描述

2.3、下载canal并使用

网址,我用的当前最新版,冲冲冲
下载deploy版本即可。
解压tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/cancal-1.1.7/之后如下图更改关键配置
tcp是自己写代码想怎么操作随自己来,然后就是写入各种MQ,请君自选。
在这里插入图片描述
在这里插入图片描述

三、搭建Java客户端

    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>

canal组件细节图
在这里插入图片描述

package org.tg.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Author Sumuxi
 * @Date 2023/11/10 21:58
 * @Desc
 */

public class Main {


    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1。获取链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop113", 11111), "example", "", "");

        // 2.链接canal
        connector.connect();
        // 3.订阅数据库
        connector.subscribe("canal.*");
        while (true) {

            // 4.获取数据
            Message message = connector.get(100);// 获取指定数量的数据,有多少取多少,不会阻塞等待
            // 5.获取entry集合
            List<CanalEntry.Entry> entryList = message.getEntries();

            // 集合是空,就等3s后再去拉取
            if (entryList.size() == 0) {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                // 遍历entryList,单条单条解析
                for (CanalEntry.Entry entry : entryList) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();

                    // 2.获取类型
                    //    TRANSACTIONBEGIN      事务开启,
                    //    ROWDATA               行数据,
                    //    TRANSACTIONEND        事务关闭
                    //    HEARTBEAT             心跳,
                    //    GTIDLOG               GTID日志;
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
                    // 4.判断当前entry的类型是什么?
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 6.获取事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        for (CanalEntry.RowData rowData : rowDataList) {
                            // 更新前
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            // 更新后
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }

                            System.out.println(
                                    "Table:" + tableName +
                                            ",EventType:" + eventType +
                                            ",Before:" + beforeData +
                                            ",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

最后,其实参考github上的这个实例也能书写客户端,https://github.com/alibaba/canal/wiki/ClientExample,不妥之处,盼多多指正,后续持续完善。
纨绔不饿死,儒冠多误身。读书破万卷,下笔如有神。致君尧舜上,再使风俗淳。骑驴三十载,旅食京华春。朝扣富儿门,暮随肥马尘埃。残杯与冷炙,到处潜悲辛。白鸥没浩荡,万里谁能驯。——节选杜甫《奉赠韦左丞丈二十二韵》中的几句

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/129560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

打开ps提示,计算机中丢失d3dcompiler_47.dll怎么解决?

“d3dcompiler_47.dll丢失5个解决办法”。相信很多同事在工作或者娱乐的过程中&#xff0c;都遇到过这个错误提示。那么&#xff0c;究竟什么是d3dcompiler_47.dll文件&#xff1f;为什么会丢失呢&#xff1f;又该如何解决这个问题呢&#xff1f;接下来&#xff0c;我将为大家详…

基于级联广义积分器(CGI)的谐波信号提取MATLAB仿真

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 此方法可用于信号检测、虚拟阻抗合成、锁相环等方面。 在现有的信号提取方法中&#xff0c;众多学者采用了SOGI法、LPF法以及正交信号发生器等方法。当输入信号中不存在直流分量&#xff0c;只有谐波分量时&…

【Python 千题 —— 基础篇】成绩评级

题目描述 题目描述 期末考试结束&#xff0c;请根据同学的分数为该同学评级。 A&#xff1a;90 ~ 100B&#xff1a;80 ~ 89C&#xff1a;70 ~ 79D&#xff1a;60 ~ 69E&#xff1a;0 ~ 60 输入描述 输入同学的分数。 输出描述 输出该同学的等级。 示例 示例 ① 输入&…

yolov5 利用Labelimg对图片进行标注

首先打开yolov5-master&#xff0c;在data文件中新建一个文件夹来存放你需要跑的数据&#xff0c;例如我这次跑的是羽毛球&#xff0c;文件把文件取名为badminton。使用其他文件夹例如images也可以&#xff0c;就是跑多了以后不好整理&#xff0c;然后点击 选中刚刚你存放数据的…

黑马程序员微服务SpringCloud实用篇01

SpringCloud01 1.认识微服务 随着互联网行业的发展&#xff0c;对服务的要求也越来越高&#xff0c;服务架构也从单体架构逐渐演变为现在流行的微服务架构。这些架构之间有怎样的差别呢&#xff1f; 1.0.学习目标 了解微服务架构的优缺点 1.1.单体架构 单体架构&#xff…

spring boot+netty 搭建MQTT broken

一、项目结构 二、安装依赖 <!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><…

C# Socket通信从入门到精通(7)——单个异步TCP服务器监听单个客户端C#代码实现

前言: 我们在开发TCP服务器程序的时候,有的时候需要一些异步的应用,比如我读取客户端发送的数据,但是服务器程序不能一直等待客户端数据发送过来,服务器要先做一些别的事情,这个时候C# Socket通信从入门到精通(5)——单个同步TCP服务器监听一个客户端C#代码实现这篇文…

在Docker中设置Redis的密码

目录 1&#xff0c;介绍2&#xff0c;实现“Docker Redis设置密码”的整体流程3&#xff0c;具体实现步骤4&#xff0c;结论 1&#xff0c;介绍 Docker是一个开源的应用容器引擎&#xff0c;可以自动化部署、扩展应用程序。它可以帮助开发人员将应用程序及其依赖项打包到一个可…

【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows 是将无限数据切割成有限的“数据块”进行处理&#xff0c;这就是所谓的“窗口”&#xff08;Window&#xff09;。 注意&#xff1a;Flink 中窗口并不是静态准备好的&#xff0c;而是动态创建——当有落在这个窗口区间范围的数据达到时&#xff0c;才创建对应的窗…

UE5、CesiumForUnreal实现加载GeoJson绘制单面(Polygon)功能(StaticMesh方式)

文章目录 1.实现目标2.实现过程2.1 实现原理2.1.1 数据读取2.1.2 三角剖分2.1.3 创建StaticMesh2.2 应用测试2.2.1 具体代码2.2.2 蓝图应用测试3.参考资料1.实现目标 通过读取本地GeoJson数据,在UE中以StaticMeshComponent的形式绘制出面数据,支持Editor和Runtime环境,GIF动…

JMeter---JSON提取器

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;加入1000人软件测试技术学习交流群&#x1f4e2;资源分享&#xff1a;进了字节跳动之后&#xff0c;才…

传统企业数字化转型都要面临哪些挑战?_数据治理平台_光点科技

数字化转型已经成为传统企业发展的必经之路&#xff0c;但在这个过程中&#xff0c;企业往往会遭遇多方面的挑战。 1.文化和组织惯性 最大的挑战之一是企业文化和组织惯性的阻力。传统企业往往有着深厚的历史和根深蒂固的工作方式&#xff0c;员工和管理层可能对新的数字化工作…

中国电信终端产业联盟5G Inside行业子联盟正式成立!宏电股份作为副理事单位受邀加入

11月9日&#xff0c;中国电信于广州召开“2023中国电信终端生态合作暨中国电信终端产业联盟&#xff08;以下简称CTTA&#xff09;第十四次会员大会”&#xff0c;联盟成员齐聚现场。作为CTTA大会的一个重要环节&#xff0c;中国电信终端产业联盟5G Inside行业子联盟正式成立&a…

爱剪辑如何将视频旋转90度,详细操作流程

爱剪辑是一款电脑端常用的视频剪辑类软件&#xff0c;基本上囊括了视频剪辑所需的所有功能&#xff0c;此处主要介绍&#xff0c;爱剪辑是如何对视频进行旋转操作的&#xff0c;水平旋转或者垂直旋转爱剪辑都是可以操作的&#xff0c;整体操作的详细过程将在下方为大家讲解。 …

希亦ACE和石头m1这两款内衣洗衣机哪一款更好?高性价比内衣洗衣机测评

内衣洗衣机可以说是近两年很火爆的小家电了&#xff0c;给大家带了一种全新的时尚体验&#xff0c;越来越内衣裤也可以用手洗&#xff01;而且还比手洗得干净&#xff01;不过现在市面上关于内衣洗衣机的品牌越来越多&#xff0c;小伙伴们想要挑选一款性价比高的内衣洗衣机看得…

LabVIEW在OPC中使用基金会现场总线

LabVIEW在OPC中使用基金会现场总线 本文讨论了如何使用开放的OPC&#xff08;用于过程控制的OLE&#xff09;接口访问基金会现场总线网络和设备。 NI-FBUS通信管理器随附了一个OPC数据访问服务器。 &#xff08;NI-FBUS Configurator自动包含NI-FBUS通信管理器。&#xff09…

【面试经典150 | 位运算】二进制求和

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;模拟 其他语言c 写在最后 Tag 【二进制】【位运算】 题目来源 67. 二进制求和 题目解读 以二进制字符串的形式返回两个二进制字符串的和。 解题思路 看到这个题目首先想到的方法可能是先把二进制字符转化成 int 型数…

[LeetCode]-225. 用队列实现栈

目录 225. 用队列实现栈 题目 ​思路 代码 225. 用队列实现栈 225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/implement-stack-using-queues/description/ 题目 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff0…

基于SSM的网络音乐系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

1. 深度学习——激活函数

机器学习面试题汇总与解析——激活函数 本章讲解知识点 什么是激活函数&#xff1f; 为什么要使用激活函数&#xff1f; 详细讲解激活函数 本专栏适合于Python已经入门的学生或人士&#xff0c;有一定的编程基础。本专栏适合于算法工程师、机器学习、图像处理求职的学生或人…