实时数据同步之Maxwell和Canal

文章目录

  • 一、概述
    • 1、实时同步工具概述
      • 1.1 Maxwell 概述
      • 1.2 Canal概述
    • 2、数据同步工作原理
      • 2.1 MySQL 主从复制过程
      • 2.2 两种工具工作原理
    • 3、MySQL 的 binlog详解
      • 3.1 什么是 binlog
      • 3.2 binlog 的开启
      • 3.3 binlog 的分类设置
    • 4、Maxwell和Canal对比
    • 5、环境安装
  • 二、Maxwell 使用
    • 1、Maxwell 安装部署
      • 1.1 下载安装
      • 1.2 初始化 Maxwell 元数据库
      • 1.3 Maxwell 进程启动
    • 2、Maxwell入门案例
      • 2.1 监控 Mysql 数据并在控制台打印
      • 2.2 监控 Mysql 数据输出到 kafka
      • 2.3 监控 Mysql 指定表数据输出控制台
      • 2.4 监控 Mysql 指定表全量数据输出控制台,数据初始化
      • 2.5 群起脚本
  • 三、Canal使用
    • 1、Canal 的下载和安装
      • 1.1 下载安装
      • 1.2 mysql创建canal用户
      • 1.3 修改 canal.properties 的配置
      • 1.4 修改 instance.properties
    • 2、实时监控测试
      • 2.1 TCP 模式测试
      • 2.2 Kafka 模式测试

一、概述

1、实时同步工具概述

1.1 Maxwell 概述

官网地址:https://maxwells-daemon.io/

Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。 实时读取MySQL 二进制日志 Binlog,并生成 JSON格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序

注意:1.30.0版本后不在支持JDK8

1.2 Canal概述

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)

2、数据同步工作原理

2.1 MySQL 主从复制过程

具体可以参考:ShardingSphere数据库中间件基础学习

  • Master 主库将改变记录,写到二进制日志(binary log)中
  • Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log);
  • Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

2.2 两种工具工作原理

就是把自己伪装成 Slave,假装从 Master 复制数据

3、MySQL 的 binlog详解

3.1 什么是 binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

  • MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给 slaves 来达到 master-slave 数据一致的目的
  • 自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件

3.2 binlog 的开启

  • 找到 MySQL 配置文件的位置
  • Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置
  • Windows: \my.ini
  • 在 mysql 的配置文件下,修改配置在[mysqld] 区块,设置/添加 log-bin=mysql-bin这个表示 binlog 日志的前缀是 mysql-bin,以后生成的日志文件就是 mysql-bin.000001的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号

3.3 binlog 的分类设置

mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配binlog_format= statement|mixed|rowMaxwell 想做监控分析,选择 row 格式比较合适

  • statement

    语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如update test set create_date=now();如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。优点: 节省空间 缺点: 有可能造成数据不一致。

  • row

    行级, binlog 会记录每次操作后每行记录的变化。优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。缺点:占用较大空间

  • mixed

    混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。默认还是 statement,在某些情况下,譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理 优点:节省空间,同时兼顾了一定的一致性。缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 监控的情况都不方便。

4、Maxwell和Canal对比

对比CanalMaxwell
语言javajava
数据格式格式自由json
采集数据模式增量全量/增量
数据落地定制支持 kafka 等多种平台
HA支持支持

5、环境安装

需要安装mysql,kafka,zookeeper,具体的可以参考之前的博客文章

这里讲解开启mysql的binLog日志

sudo vim /etc/my.cnf
# 在[mysqld]模块下添加一下内容
[mysqld]
# 主机序列号,每台都要唯一
server_id=1
# 前缀
log-bin=mysql-bin
binlog_format=row
# 针对某个库,不加就是全部,多个库就弄多行
#binlog-do-db=test_maxwell
# binlog-do-db=test_maxwell2
# 这是canal测试
#binlog-do-db=gmall-2021

# 重启 Mysql 服务
sudo systemctl restart mysqld
 
# 登录 mysql 并查看是否修改完成
mysql -uroot -p123456
show variables like '%binlog%';
# 查看下列属性
binlog_format | ROW

# 进入/var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件
cd /var/lib/mysql
sudo ls -l

注:MySQL 生成的 binlog 文件初始大小一定是 154 字节,然后前缀是 log-bin 参数配置的,后缀是默从.000001,然后依次递增。除了 binlog 文件文件以外,MySQL 还会额外生产一个.index 索引文件用来记录当前使用的 binlog 文件

二、Maxwell 使用

1、Maxwell 安装部署

1.1 下载安装

# 因为1.30开始不支持jdk8,所以用这个
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/

1.2 初始化 Maxwell 元数据库

# 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
mysql -uroot -p123456
CREATE DATABASE maxwell;
# 设置 mysql 用户密码安全级别
set global validate_password_length=4;
set global validate_password_policy=0;
# 分配一个账号可以操作该数据库
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
# 分配这个账号可以监控其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO maxwell@'%';
# 刷新 mysql 表权限
flush privileges;

1.3 Maxwell 进程启动

# ============Maxwell 进程启动方式有如下两种========
# 1、使用命令行参数启动 Maxwell 进程
cd /opt/module/maxwell-1.29.2/
# --user  连接 mysql 的用户
# --password  连接 mysql 的用户的密码
# --host mysql 安装的主机名
# --producer  生产者模式(stdout:控制台 kafka:kafka 集群)
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
# 2、修改配置文件,定制化启动 Maxwell 进程 
cp config.properties.example config.properties
vim config.properties
# 修改完成后
bin/maxwell --config ./config.properties

2、Maxwell入门案例

2.1 监控 Mysql 数据并在控制台打印

# 运行 maxwell 来监控 mysql 数据更新
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

创建对应的mysql语句,查看控制台

# 向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出
insert into test  values(1,'aaa');
{
    "database":"test_maxwell", --库名
    "table":"test", --表名
    "type":"insert", --数据更新类型
    "ts":1683960319, --操作时间
    "xid":4335, --操作 id
    "commit":true, --提交成功
    "data":{ --数据
        "id":1,
        "name":"aaa"
    }
}
# 向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台出现了 3 条 json日志,说明 maxwell 是以数据行为单位进行日志的采集的
INSERT INTO  test VALUES(2,'bbb'),(3,'ccc'),(4,'ddd');
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"xoffset":0,"data":{"id":2,"name":"bbb"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"xoffset":1,"data":{"id":3,"name":"ccc"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"commit":true,"data":{"id":4,"name":"ddd"}}
update test set name='shawn' where id=1;
{
    "database":"test_maxwell",
    "table":"test",
    "type":"update",
    "ts":1683960396,
    "xid":4737,
    "commit":true,
    "data":{
        "id":1,
        "name":"shawn" --修改后的数据
    },
    "old":{ --修改前的数据
        "name":"aaa"
    }
}
# 删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
DELETE FROM test WHERE id=1;
{
    "database":"test_maxwell",
    "table":"test",
    "type":"delete",
    "ts":1683960501,
    "xid":5085,
    "commit":true,
    "data":{
        "id":1,
        "name":"shawn"
    }
}

2.2 监控 Mysql 数据输出到 kafka

简单接入

# 启动 zookeeper 和 kafka
jpsall
# windows有个可视化工具,叫做kafka Tool
# https://www.kafkatool.com/download.html

# 启动 Maxwell 监控 binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092   --kafka_topic=maxwell
# 打开 kafka 的控制台的消费者消费 maxwell 主题
# 如果要读取历史数据,需要加上--from-begining
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

然后就是kafka 主题数据的分区控制,在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的

# 修改 maxwell 的配置文件,定制化启动 maxwell 进程
vim config.properties

#   tl;dr   config
log_level=info
# #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
 
#   mysql   login   info
host=hadoop102
user=maxwell
password=123456
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
#  ***   kafka   ***
#   list   of   kafka   brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092
#   kafka   topic   to   write   to
#   this   can   be   static,   e.g.   'maxwell',   or   dynamic,   e.g.namespace_%{database}_%{table}
#   in   the   latter   case   'database'   and   'table'   will   be   replacedwith   the   values   for   the   row   being   processed
# #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell3

#  ***   partitioning   ***
# 一般都是库名或表分区
#   What   part   of   the   data   do   we   partition   by?
#producer_partition_by=database   #   [database,   table,primary_key,   transaction_id,   column]
producer_partition_by=database

#   控制数据分区模式,可选模式有  库名,表名,主键,列名
#   specify   what   fields   to   partition   by   when   using producer_partition_by=column
#   column   separated   list.
#producer_partition_columns=name
 
#   when   using   producer_partition_by=column,   partition   by   this when
#   the   specified   column(s)   don't   exist.
#producer_partition_by_fallback=database

# 手动创建一个 3 个分区的 topic,名字就叫做 maxwell3
kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 2 --partitions 3 --topic maxwell3
# 利用配置文件启动 Maxwell 进程
bin/maxwell --config ./config.properties
/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

# 消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell3

# 向 test_maxwell 库的 test 表再次插入一条数据
insert into test_maxwell.test values (6,'fff');
# 通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 1 号分区
# 向 test_maxwell2 库的 aaa 表插入一条数据
# 注意binlog配置文件要监听这个库才行
insert into test_maxwell2.test values (23,'dd');
# 通过 kafka  tool 工具查看,此条数据进入了 maxwell3 主题的 0 号分区,说明库名会对数据进入的分区造成影响

2.3 监控 Mysql 指定表数据输出控制台

运行 maxwell 来监控 mysql 指定表数据更新

# 运行 maxwell 来监控 mysql 指定表数据更新
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout
# 注:还可以设置 include:test_maxwell.*,通过此种方式来监控 mysql 某个库的所有表,也就是说过滤整个库

2.4 监控 Mysql 指定表全量数据输出控制台,数据初始化

https://maxwells-daemon.io/bootstrapping/

Maxwell 进程默认只能监控 mysql 的 binlog日志的新增及变化的数据,但是Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据,来对 MySQL 的某张表进行数据初始化,也就是我们常说的全量同步。具体操作步骤如下:需求:将 test_maxwell 库下的 test2 表的四条数据,全量导入到 maxwell 控制台进行打印

# 修改Maxwell的元数据,触发数据初始化机制,在 mysql 的 maxwell 库中 bootstrap表中插入一条数据,写明需要全量数据的库名和表名
insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','test2');

# 启动 maxwell 进程,此时初始化程序会直接打印 test2 表的所有数据
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

# 当数据全部初始化完成以后,Maxwell 的元数据会变化
# is_complete  字段从 0 变为 1
# start_at  字段从 null 变为具体时间(数据同步开始时间)
# complete_at  字段从 null 变为具体时间(数据同步结束时间)

还有一个方法是使用maxwell-bootstrap脚本,前提是已经启动了maxwell,否则会被阻塞

/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

# 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
# 一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间

# 采用bootstrap方式同步的输出数据格式如下
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-start",
    "ts": 1450557744,
    "data": {}
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "hello"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "bootstrap!"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-complete",
    "ts": 1450557744,
    "data": {}
}


2.5 群起脚本

一个启动脚本,可以参考

#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

三、Canal使用

1、Canal 的下载和安装

1.1 下载安装

# https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
mkdir /opt/module/canal
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal

1.2 mysql创建canal用户

mysql -uroot -p123456
set global validate_password_length=4;
set global validate_password_policy=0;
GRANT SELECT, REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

# 在gmall-2021库下创建数据表
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);

1.3 修改 canal.properties 的配置

cd /opt/module/canal/conf
vim canal.properties

#################################################
######### common argument ############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file

说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka

多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例1,实例2,实例3

#################################################
######### destinations ############# 
#################################################
canal.destinations = example

1.4 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下

cd /opt/module/canal/conf/example
vim instance.properties
# 配置 MySQL 服务器地址
## mysql serverId , v1.0.26+ will autoGen 
canal.instance.mysql.slaveId=20
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=hadoop102:3306

# 配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false

2、实时监控测试

2.1 TCP 模式测试

首先创建项目,引入依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

通用监视类–CanalClient

public class CanalClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

        //TODO 获取连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "", "");

        while (true) {

            //TODO 连接
            canalConnector.connect();

            //TODO 订阅数据库
            canalConnector.subscribe("gmall-2021.*");

            //TODO 获取数据
            Message message = canalConnector.get(100);

            //TODO 获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            //TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据
            if (entries.size() <= 0) {
                System.out.println("当次抓取没有数据,休息一会。。。。。。");
                Thread.sleep(1000);
            } else {

                //TODO 遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {

                    //1.获取表名
                    String tableName = entry.getHeader().getTableName();

                    //2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    //3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();

                    //4.判断当前entryType类型是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {

                        //5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        //6.获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        //7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        //8.遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {

                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }

                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }

                            //数据打印
                            System.out.println("Table:" + tableName +
                                    ",EventType:" + eventType +
                                    ",Before:" + beforeData +
                                    ",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

然后在服务端启动canal:bin/startup.sh,修改数据库进行测试

2.2 Kafka 模式测试

修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka

#################################################
######### common argument ############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to fil

修改 Kafka 集群的地址

##################################################
######### MQ #############
##################################################
canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

修改 instance.properties 输出到 Kafka 的主题以及分区数。注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置canal.mq.partitionHash 属性

# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

然后启动测试

cd /opt/module/canal/
bin/startup.sh
# 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
jps
# 启动 Kafka 消费客户端测试,查看消费情况
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test
# 向 MySQL 中插入数据后查看消费者控制台
INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/557115.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

upload-labs第十一十二关

第十一关 $is_upload false; $msg null; if(isset($_POST[submit])){$ext_arr array(jpg,png,gif);$file_ext substr($_FILES[upload_file][name],strrpos($_FILES[upload_file][name],".")1);if(in_array($file_ext,$ext_arr)){$temp_file $_FILES[upload_fil…

前端学习之DOM编程案例:点名案例和秒表案例

点名 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>点名案例</title><style>*{margin: 0;padding: 0;}</style> </head> <body><div id"container">…

软考135-上午题-【软件工程】-软件配置管理

备注&#xff1a; 该部分考题内容在教材中找不到。直接背题目 一、配置数据库 配置数据库可以分为以下三类&#xff1a; (1) 开发库 专供开发人员使用&#xff0c;其中的信息可能做频繁修改&#xff0c;对其控制相当宽松 (2) 受控库 在生存期某一阶段工作结束时发布的阶段产…

vue 实现实时搜索文档关键字并高亮显示

最近接到的一个新需求&#xff1a;实时搜索文档关键字并高亮显示&#xff0c;听起来好难的样子&#xff0c;仔细分析起来其实也蛮简单的。 实现思路 通过 input 实现关键字的输入&#xff0c;监听关键字的变化&#xff0c;用正则表达式来匹配关键字&#xff0c;然后给关键字添…

优思学院|什么叫三现主义?

三现主义是一种深入现场、直接观察和解决问题的管理方法&#xff0c;强调管理者必须亲身体验工作现场&#xff0c;从而更精准地理解和解决问题&#xff0c;提升管理和流程改进的效果。日本的丰田公司有一個日文術語為&#xff1a;Genchi Genbutsu&#xff08;英文&#xff1a;G…

【Web】DASCTF X CBCTF 2022九月挑战赛 题解

目录 dino3d Text Reverser cbshop zzz_again dino3d 进来是一个js小游戏 先随便玩一下&#xff0c;显示要玩够1000000分 直接console改分数会被检测 先是JSFinder扫一下&#xff0c;扫出了check.php 到js里关键词索引搜索check.php 搜索sn&#xff0c;发现传入的参数是…

正确解决:关于Lattic Diamond和Radiant License冲突问题(无法破解问题)

一、问题 今天工作&#xff0c;搞16nm Avant E系列FPGA&#xff0c;需要用到莱迪思的Radiant 2023.2软件&#xff08;按这个博主的安装流程Lattice Radiant 2023.1 软件安装教程&#xff09;。 安装好之后&#xff0c;设置环境变量&#xff0c;导入License.dat就是破解不了&…

pnpm 报错: ERR_PNPM_META_FETCH_FAIL

今天突然遇到一个报错&#xff0c;pnpm 报错&#xff1a; ERR_PNPM_META_FETCH_FAIL  GET https://registry.npm.taobao.org/vue%2Fcli-service: request to https://registry.npm.taobao.org/vue%2Fcli-service failed, reason: certificate has expired问题原因&#xff1a;…

js 遍历数据结构,使不符合条件的全部删除

js 遍历数据结构&#xff0c;使不符合条件的全部删除 let newSourceJSON.parse(JSON.stringify(state.treeData))state.expandedKeys[]checkedKeys.map((item:any)>{loop(newSource,{jsonPath:item.split(&)[1]},state.expandedKeys)})function removeUnwantedNodes(tre…

开关电源拓扑结构(第一部分)

为什么使用开关电源? 开关电源的主要思想可以通过直流到直流变压器的概念解释轻松理解,如图1所示。负载 R L R_L RL​需要从主电压源 V I N V_{IN} VIN​中获得一个恒定电压 V O U T V_{OUT} VOUT​。如图1所示,通过变化串联电阻( R S R_S RS​)或分流电流( I S I_S IS​)可…

[Python开发问题] Selenium ERROR: Unable to find a matching set of capabilities

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

2024年核科学与地球化学国际会议 (ICNSG 2024)

2024年核科学与地球化学国际会议 (ICNSG 2024) 2024 International Conference on Nuclear Science and Geochemistry 【会议简介】 2024年核科学与地球化学国际会议即将在北京召开。本次会议旨在汇聚全球核科学与地球化学领域的专家学者&#xff0c;共同探讨核科学的最新进展…

Golang基础-13

Go语言基础 介绍 并发 channel goroutine 互斥锁 读写锁 原子操作 select 超时处理 sync包 runtime包 介绍 本文介绍Go语言中 channel、goroutine、互斥锁、读写锁、原子操作、select、超时处理、sync包、runtime包等相关知识。 并发 进程是是最小的资源管理单元…

webpack-babel

babel Babel 是一个 JavaScript 编译器&#xff0c;主要用于将高版本的 JavaScript 代码转换为低版本的 JavaScript 代码&#xff0c;从而确保代码在不同浏览器和环境中的兼容性。它可以将 ES6/ES7/ES8 等新特性转换为 ES5 等旧版本的 JavaScript 代码&#xff0c;使得开发人员…

CSS 格式化上下文 + CSS兼容处理

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 ✍CSS 格式化上下文&#x1f525;1 格式化上下文&#x1f337;1.1 块级格式化…

微软(TTS)文本转语音服务API实现

此博客实现与java实现微软文本转语音&#xff08;TTS&#xff09;经验总结_java tts_${简简单单}的博客-CSDN博客之上&#xff0c;首先感谢博客源码的提供&#xff0c;本人在上面添加了一些详细的注释&#xff0c;方便大家跟好的理解和使用&#xff0c;毕竟我已经用原文调试了一…

python入门之简洁安装VS保姆版安装(含虚拟环境)

11、保姆版安装 Anoconda安装&#xff08;python的一个发行版本&#xff09; 优点&#xff1a;集成了许多关于python科学计算的第三方库&#xff0c;保姆级别 下载&#xff1a;www.anaconda.com/download/ 版本默认64位&#xff0c;py37 √&#xff1a;add anaconda to my…

教程 | 亚组分析森林图模块使用介绍

本周风暴统计平台最新更新了亚组森林图板块&#xff01;界面与功能进行了全新升级&#xff0c;今天就通过这篇教程为大家详细介绍&#xff0c;亚组森林图模块各种细节的设置与使用方式&#xff01; 教程将从以下方面开展&#xff1a; 1. 亚组分析使用介绍2. 不同回归分析中亚组…

Java 数据类型

一 Java 的数据类型 二 整数类型 类型占用存储空间范围byte[字节]1字节-127~127short[短整型]2字节-215~215-1 即 -32768~ 32767int[整型]4字节-231~231-1 即 -2147483648~2147483647long[长整型]8字节-263~263-1 字节 byte是计算机存储单位的基本单元&#xff0c;通常由8个比…

Redis: 集群

文章目录 一、单点Redis的问题二、主从架构1、概述2、集群结构3、主从数据同步原理&#xff08;1&#xff09;全量同步&#xff08;2&#xff09;增量同步 4、总结&#xff08;1&#xff09;全量同步和增量同步的区别&#xff08;2&#xff09;什么时候执行全量同步&#xff08…