阿里Canal使用

Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析,提供实时的数据订阅和消费服务的工具。它可以用来读取 MySQL 的 binlog 日志并转换成 JSON 格式的事件消息,然后将这些消息发布到下游的消息中间件,比如 RabbitMQ,以实现数据库变更的实时同步。

以下是 Canal 连接 MySQL 和 RabbitMQ 的基本步骤概览:

配置 MySQL:

开启 MySQL 的 binlog(二进制日志)功能,确保要同步的数据库开启了 binlog 记录所有数据更改操作。
创建一个专门给 Canal 使用的 MySQL 用户账号,并赋予足够的权限来读取 binlog,通常包括 REPLICATION SLAVE 权限。
安装与配置 Canal Server:

# 配置Mysql(如果使用本地数据库可跳过此步骤)
[mysqld]
log_timestamps=SYSTEM
default-time-zone='+8:00'
server-id=1
log-bin=mysql-bin
binlog-do-db = mall # 要监听的库
binlog_format=ROW
挂载数据库(如果使用本地数据库可跳过此步骤)
docker run --name mysql01 \
-p 3306:3306 \
-v /opt/mysql/conf.d:/etc/mysql/conf.d \
-v /opt/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0

show variables like 'log_%';
show variables like 'binlog_format';
show variables like 'server_id';
--查看所有日志
show binlog events;
--查看最新的日志
show master status
-- 查询指定的binlog日志
show binlog events in 'XTZJ-20221008CY-bin.000020'
--清空所有的 binlog 日志文件
reset master

下载并安装 Canal,根据官方文档配置 Canal 的 server 端。
在这里插入图片描述
设置数据库的IP、Port 、UserName、Password
在这里插入图片描述
在这里插入图片描述
然后设置、MQ类型及其他配置
在这里插入图片描述
在这里插入图片描述

canal.properties

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT

##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host = 192.168.**.**:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2


##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =

instance.porperties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.**.**:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

docker-compose.yml

version: '3'
services:
  canal:
    container_name: canal_latest
    image: canal/canal-server:v1.1.5
    restart: always
    ports:
      - 11111:11111
    volumes:                                
      - ./canal.properties:/home/admin/canal-server/conf/canal.properties
      - ./instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - ./logs:/home/admin/canal-server/logs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/549714.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

重磅消息:CnosDB 文档网站升级全新框架啦!

我们很高兴地宣布,CnosDB 文档网站迎来了一次重大升级!现在,我们采用了全新的强大的开源文档框架,为用户提供更流畅、更直观的浏览体验。 全新框架带来的优势: 更快速的加载速度:现在您可以更快地访问并查…

【Linux】磁盘扩容到根目录逻辑卷(LVM)

目录 一、物理卷和逻辑卷 1.物理卷和逻辑卷的区别 2.在Linux系统中查看所有物理卷的信息 3.在Linux系统中查看所有逻辑卷的信息 二、文件系统 三、实操-对root(/)目录进行扩容 1.使用lsblk命令查看新加入的磁盘信息 2.fdisk -l命令查看系统中磁盘…

【切换网络连接后】VMware虚拟机网络配置【局域网通信】

初次安装Linux虚拟机以及切换网络都需要配置虚拟机网络, 从而使得win主机内通过远程连接工具能够连接该虚拟机, 而不是在虚拟机内操作。 本片文章你将了解到网络切换后如何配置虚拟机网络的一些基础操作,以及局域网通信的一些基础知识。 …

HTTP/1.1特性总结

优点 【简单,灵活和易于扩展,应用广泛和跨平台】 1.简单: http基本的报文格式就是headerbody,头部信息也是key-value简单的文本形式,易于理解,降低了学习和使用的门槛 2.灵活和易于扩展: &…

电动汽车退役锂电池SOC主动均衡控制MATLAB仿真

微❤关注“电气仔推送”获得资料(专享优惠) 仿真简介 模型选用双向反激变换器作为主动均衡拓扑电路,均衡策略采用基于SOC的主动均衡策略,旨在解决电动汽车退役锂电池的不一致性问题。模型选用双向反激变换器作为主动均衡拓扑电路…

基于小程序实现的餐饮外卖系统

作者主页:Java码库 主营内容:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app等设计与开发。 收藏点赞不迷路 关注作者有好处 文末获取源码 技术选型 【后端】:Java 【框架】:spring…

HTML图片标签和超链接标签

目录 图片标签 图片路径属性 图片替换文本属性 图片宽度和高度属性 图片边框属性 图片提示属性 超链接标签 链接属性 跳转方式属性 图片标签 在HTML中&#xff0c;可以使用<img>标签为网页添加图片 在HTML中&#xff0c;使用图片标签一般包含下面的四个属性 …

雨云免费云服务器领取步骤详解

随着云计算技术的日益普及&#xff0c;越来越多的用户开始选择使用云服务器来满足他们的数据存储和计算需求。雨云作为一家具有自主知识产权的国产云计算服务提供商&#xff0c;其免费云服务器服务备受关注。接下来&#xff0c;本文将为大家详细介绍雨云免费云服务器的领取步骤…

供应链金融机器学习建模实战

随着全球贸易的不断发展和供应链的日益复杂化&#xff0c;供应链金融作为一种新型金融工具&#xff0c;正逐渐受到企业和金融机构的关注和重视。供应链金融是指通过金融手段来优化和改进供应链中的资金流动和货物流动&#xff0c;以实现企业间的合作共赢。 供应链金融的核心是将…

STM32之HAL开发——CubeMX配置串行Flash文件系统

配置流程 在开始配置FATFS前&#xff0c;需要提前配置好RCC的时钟&#xff0c;以及时钟的频率&#xff0c;另外还要配置好Debug选项&#xff08;选择串行&#xff09; 选项介绍 文件系统适用于SD卡&#xff0c;Disk磁盘等&#xff0c;需要我们将对应的驱动打开才可以使用。 …

Sonatype Nexus 的使用参数

在最近安装的 Sonatype Nexus 版本中提供了一个使用参数情况界面。 这个使用情况的界面主要是针对当前 Sonatype Nexus 的安装实例出现的系统接入和调用情况。 上面提供了一个限制&#xff0c;这个限制不是说达到了限制后拒绝提供服务了&#xff0c;而是因为在默认的 Sonatype…

java二维数组

一、二维数组的概述&#xff1a; 目录 二维数组的概述&#xff1a; 二维数组图解&#xff1a; 二维数组的四种创建方式&#xff1a; Java 用sort对二维数组进行排序 二维数组简单概述&#xff1a;Java中的二维数组一般应用在矩阵的一些运算、棋盘游戏中棋盘的实现、二维数据…

vue3+vite+typescript+pinia+element_plus构建web项目

1.vite搭建 yarn create vite 可能会提示node版本不支持&#xff0c;需要根据提示升级或降级node版本 使用nvm下载对应版本 nvm download 18.x.xnvm use 18.x.x// 需要安装yarn npm install -g yarn// 重新执行 yarn create vite 过程中会提供选择&#xff0c;分别选择vue、…

三个晚上!给干废了!MINI2440 挂载 NFS

虚拟机执行&#xff1a;sudo ifconfig tap0 10.10.10.1 up qemu 开发板&#xff1a; set bootargs noinitrd root/dev/nfs rw nfsroot10.10.10.1:/nfsroot ip10.10.10.10:10.10.10.1 ::255.255.255.0 consolettySAC0,115200 Hit any key to stop autoboot: 0 MINI2440 # set…

VMware 虚拟机中的 Ubuntu 16.04 设置 USB 连接

VMware 虚拟机中的 Ubuntu 16.04 设置 USB 连接 1. VMware USB Arbitration Service2. 可移动设备 USB 口连接主机3. 虚拟机 -> 可移动设备 -> 连接 (断开与主机的连接)4. 状态栏 -> 断开连接 (连接主机)References 1. VMware USB Arbitration Service 计算机 -> …

设计编程网站集:动物,昆虫,蚂蚁养殖笔记

入门指南 区分白蚁与蚂蚁 日常生活中&#xff0c;人们常常会把白蚁与蚂蚁搞混淆&#xff0c;其实这两者是有很大区别的&#xff0c;养殖方式差别也很大。白蚁主要食用木质纤维&#xff0c;会给家庭房屋带来较大危害&#xff0c;而蚂蚁主要采食甜食和蛋白质类食物&#xff0c;不…

word文件的创建时间和修改时间可以更改吗?答案是肯定的 文件属性修改的方法

一&#xff0c;引言 在日常生活和工作中&#xff0c;我们经常需要处理各种Word文件。有时&#xff0c;由于某些原因&#xff0c;我们可能需要更改Word文件的创建时间和修改时间。虽然这听起来可能有些复杂&#xff0c;但实际上&#xff0c;通过一些简单的方法和工具&#xff0…

使用VLC无法播放安防监控EasyCVR平台分发出的FLV视频流,是什么原因?

安防视频汇聚平台EasyCVR不仅可支持的接入协议非常多&#xff08;包括&#xff1a;国标GB28181、RTSP/Onvif、RTMP&#xff0c;以及厂家的私有协议与SDK&#xff0c;如&#xff1a;海康ehome、海康sdk、大华sdk、宇视sdk、华为sdk、萤石云sdk、乐橙sdk等&#xff09;&#xff0…

数据结构线性表篇-顺序表的实现

1.数据结构的介绍 ⚀基本概念 数据结构 数据 结构 数据&#xff1a; 数据就是所有描述客观事物的符号。比如&#xff1a;我们常见的文字&#xff0c;“你今天学习了吗&#xff1f;”&#xff1b;数字&#xff0c;“1&#xff0c;2&#xff0c;3&#xff0c;4&#xff0c;5&a…

基于RflySim平台的uORB消息读取与写入实验(一)

uORB消息读取与写入实验框架总览 一. 总体说明 —文件目录 uORB消息读取与写入例程目录&#xff1a; [安装目录]\RflySimAPIs\5.RflySimFlyCtrl\0.ApiExps\6.uORB-Read-Write 自定义uORB消息例程目录&#xff1a; [安装目录]\5.RflySimFlyCtrl\0.ApiExps\7.uORB-Create 二…