Maxwell安装使用

​欢迎光临我的博客查看最新文章: https://river106.cn

1、Maxwell简介

Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。读取 MySQL binlogs 并将修改行字段的更新写入 Kafka, Kinesis, RabbitMQ, Google Cloud Pub/Sub 或 Redis (Pub/Sub or LPUSH) 以作为 JSON 的应用程序。
官网:https://maxwells-daemon.io/
github:https://github.com/zendesk/maxwell

安装版本:maxwell-1.29.2
快速开始:https://maxwells-daemon.io/quickstart/

2、安装mysql开启binlog

创建数据库maxwell 用于存储 Maxwell 的元数据,
新增账号maxwell并授权:

GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '12345678';
GRANT  SELECT ,REPLICATION SLAVE , REPLICATION CLIENT  ON . TO maxwell@'%';
flush privileges;

创建测试数据库testdb,创建测试表testuser

CREATE TABLE `testuser` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `user_name` varchar(100) NOT NULL COMMENT '用户名',
  `pswd` varchar(100) NOT NULL COMMENT '密码',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3、启动maxwell

方法1:

bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' --producer=stdout

方法2:

cp config.properties.example config.properties

修改config.properties:

producer=stdout
host=127.0.0.1
password=12345678

启动:

bin/maxwell --config config.properties

出现如下报错:

java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
at com.zendesk.maxwell.schema.columndef.StringColumnDef.charsetForCharset(StringColumnDef.java:61)
at com.zendesk.maxwell.schema.columndef.StringColumnDef.asJSON(StringColumnDef.java:75)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:112)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:162)
at com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:176)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:486)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:626)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:178)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:255)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:183)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:286)
00:29:21,008 INFO TaskManager - Stopped all tasks

这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。
一种解决方案是,将MySQL降级,重新安装5.5.3以下的版本。
另一种方法则是修改maxwell源码。
解压打开,找到有问题的类:com.zendesk.maxwell.schema.columndef.StringColumnDef,加上能识别utf8mb3的语句,重新打包。
打包好的maxwell-1.19.0.jar百度云盘链接: https://pan.baidu.com/s/1t0s_e6d2G1-Z4dM3pSwVXQ?pwd=cs8m 提取码: cs8m 。
替换maxwell/lib/maxwell-1.19.0.jar ,重启即可。

[root@river106 maxwell-1.29.2]# bin/maxwell --config config.properties
Using kafka version: 1.0.0
20:48:10,306 INFO Maxwell - Starting Maxwell. maxMemory: 837287936 bufferMemoryUsage: 0.25
20:48:10,472 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000001:1772098], lastHeartbeat=1681476466529]
20:48:10,802 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[binlog.000001:987693], lastHeartbeat=1681465703072])
20:48:10,987 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[binlog.000001:931603], lastHeartbeat=0])
20:48:11,026 INFO MysqlSavedSchema - beginning to play deltas…
20:48:11,027 INFO MysqlSavedSchema - played 1 deltas in 1ms
20:48:11,067 INFO BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000001:1772098
20:48:11,099 INFO BinaryLogClient - Connected to localhost:3306 at binlog.000001/1772098 (sid:6379, cid:3825)
20:48:11,099 INFO BinlogConnectorReplicator - Binlog connected.

如上说明启动成功!

binlog变更抓取测试
分别新增数据、更新数据、删除数据,观察控制台输出变化

INSERT INTO testdb.testuser (user_name,pswd,create_time,modify_time) VALUES
	 ('testuser','33333',now(),now());
UPDATE testdb.testuser SET pswd='55555' WHERE user_name = 'testuser';
DELETE FROM testdb.testuser WHERE user_name = 'testuser'; 

{“database”:“testdb”,“table”:“testuser”,“type”:“insert”,“ts”:1683336811,“xid”:126263,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“33333”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:33:31”}}
{“database”:“testdb”,“table”:“testuser”,“type”:“update”,“ts”:1683336841,“xid”:126348,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“55555”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:34:01”},“old”:{“pswd”:“33333”,“modify_time”:“2023-05-06 09:33:31”}}
{“database”:“testdb”,“table”:“testuser”,“type”:“delete”,“ts”:1683336847,“xid”:126373,“commit”:true,“data”:{“id”:12,“user_name”:“testuser”,“pswd”:“55555”,“create_time”:“2023-05-06 09:33:31”,“modify_time”:“2023-05-06 09:34:01”}}

4、输出到redis

bin/maxwell --user='maxwell' --password='12345678' --host='127.0.0.1' \
    --producer=redis --redis_host='127.0.0.1' --redis_port=6380 --redis_auth='123456Ab'

或修改config.properties

redis_host=127.0.0.1
redis_port=6380
redis_auth=123456Ab
redis_database=0
redis_key=maxwell
redis_type=pubsub
bin/maxwell --config config.properties

相关配置文档:https://maxwells-daemon.io/producers/#redis

测试发布订阅模式

设置config.properties的redis_type=pubsub
mysql中新增一条数据,redis订阅频道maxwell

127.0.0.1:6380> SUBSCRIBE maxwell
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "maxwell"
3) (integer) 1

1) "message"
2) "maxwell"
3) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566124,\"xid\":86540,\"commit\":true,\"data\":{\"id\":8,\"user_name\":\"sssss\",\"pswd\":\"123456\",\"create_time\":\"2023-04-14 23:32:35\",\"modify_time\":\"2023-04-15 21:29:38\"}}"

测试list模式

设置config.properties的redis_type=lpush

127.0.0.1:6380> keys *
1) "foo"
2) "maxwell"
127.0.0.1:6380> LRANGE maxwell 0 10
1) "{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380> LPOP maxwell
"{\"database\":\"testdb\",\"table\":\"testuser\",\"type\":\"insert\",\"ts\":1681566484,\"xid\":87555,\"commit\":true,\"data\":{\"id\":9,\"user_name\":\"ggggg\",\"pswd\":\"55555\",\"create_time\":\"2023-04-15 21:48:04\",\"modify_time\":\"2023-04-15 21:48:04\"}}"
127.0.0.1:6380> 

通过发布订阅模式,实现数据同步功能,通过list方式可以获取最新的数据的变化和数据变化数量等需求。

5、输出到RabbitMQ

RabbitMQ安装及使用请参考: RabbitMQ安装及简单使用
RabbitMQ中新建exchange:maxwell,类型为:fanout。
修改Maxwell配置config.properties

producer=rabbitmq
rabbitmq_host=127.0.0.1
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=fanout
rabbitmq_exchange_durable=true

Maxwell相关配置文档:https://maxwells-daemon.io/producers/#rabbitmq
启动服务:

bin/maxwell --config config.properties

6、监控

通过 http 方式获取监控指标,修改config.properties配置如下:

metrics_type=http
metrics_jvm=true
http_port=8080

启动服务:

bin/maxwell --config config.properties

打开浏览器,访问:http://ip:8080/metrics,即可获取到监控指标.

5、Maxwell与Canal 工具对比

1、Maxwell没有Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
2、Maxwell有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
3、Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续读取数据。
4、Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
5、Maxwell比Canal更加轻量级。

扩展:
https://toutiao.io/posts/0xfdws/preview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/28486.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

3. SpringCloudAlibaba、nacos 实现配置中心

一、微服务中配置文件的问题 1.1 配置文件的问题: 配置文件的数量会随着服务的增加持续递增单个配置文件无法区分多个运行环境配置文件内容无法动态更新,需要重启服务 1.2 引入配置中心 引入配置中心:刚才架构就会成为这样。是由配置中心统…

2023上半年的九个觉悟

‍觉悟,就是觉了、悟了。有时候,你看到一句话,突然就觉悟了。 一、资本主义的问题 “资本主义把我们都缩减成了一个东西:消费者” 因此,人人都成为资本家利诱、操控、围猎的对象。 同时,金钱成为全民的神&a…

JVM垃圾回收算法及Java引用

目录 Java垃圾回收算法 1.标记清除算法:Mark-Sweep 2.复制算法:copying 3. 标记整理算法:Mark-Compact 4.分代收集算法 5.新生代垃圾回收算法:复制算法 6.老年代:标记整理算法 7.分区收集算法 Java引用 1.Ja…

ROS-melodic:源码安裝teb_local_planner算法、替换DWA算法

一.安裝teb_local_planner算法 源码下载地址:GitHub - rst-tu-dortmund/teb_local_planner: An optimal trajectory planner considering distinctive topologies for mobile robots based on Timed-Elastic-Bands (ROS Package) 注意选择对应ROS版本的代码。 放在…

爬虫 python 正则匹配 保存网页图片

目录 1. 简介1.1 爬虫1.2 爬虫语言1.3 python库1.4 我的步骤 2. 导入包2.1 代码2.2 requests库 3. 写入文件函数4. 获取图片5. 主函数5.1 代码5.2 说明一下webbrowser 6. 所有代码7. 其他(可以忽略)8. 总结 在这里我只提供的是一种方法,有很多…

SpringMVC 万字通关

文章目录 1. 什么是 Spring MVC?1.1 MVC 定义1.2 MVC 和 Spring MVC 的关系 2. Spring MVC 有什么用 ?3. 如何学 Spring MVC ?3.1 Spring MVC 的创建3.2 实现连接功能3.2.1 RquestMapping 详解1. RequestMapping 支持什么请求?2. 请求限定3. GetMapping 和 PostMapping4. c…

【Android -- JNI 和 NDK】Java 和 C/C++ 之间传递参数和返回值

本文主要介绍 JNI 的数据传递上,即 Java 如何传递对象给 C; 而 C 又如何将数据封装成 Java 所需的对象。 1. 基本数据类型 传递 java 的基本类型是非常简单而直接的,一个 jxxx 之类的类型已经定义在本地系统中了,比如:jint, jby…

认识ASP.NET MVC的5种AuthorizationFilter

一、IAuthorizationFilter 所有的AuthorizationFilter实现了接口IAuthorizationFilter。如下面的代码片断所示,IAuthorizationFilter定义了一个OnAuthorization方法用于实现授权的操作。作为该方法的参数filterContext是一个表示授权上下文的AuthorizationContext对…

FasterTransformer 004 open_attention.h forward

initialize forward() https://github1s.com/NVIDIA/FasterTransformer/blob/v1.0/fastertransformer/cuda/open_attention.h#L149-L217 使用cuBLAS库执行矩阵乘法运算,并对cublasGemmEx()进行三个单独的调用。这些操作包括将属性核与输入张…

【社区图书馆】《看漫画学Python:有趣、有料、好玩、好用(全彩修订版)》

背景 Python是一门既简单又强大的编程语言,被广泛应用于数据分析、大数据、网络爬虫、自动化运维、科学计算和人工智能等领域。Python也越来越重要,成为国家计算机等级考试科目,某些中小学也开设了Python编程课程。本书秉承有趣、有料、好玩…

SpringCloud服务注册与发现组件Eureka(五)

Eureka github 地址: https://github.com/Netflix/eureka Eureka简介 Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的。Spring…

【Android -- JNI 和 NDK】JNI 基础知识以及如何使用

JNI 基础知识 我们来系统梳理一下JNI中涉及的基本知识。 JNI定义了以下数据类型,这些类型和Java中的数据类型是一致的: Java原始类型:jint, jbyte, jshort, jlong, jfloat, jdouble, jchar, jboolean这些分别对应这 java 的int, byte, shor…

css 包含块

你不知道的 CSS 之包含块 一说到 CSS 盒模型,这是很多小伙伴耳熟能详的知识,甚至有的小伙伴还能说出 border-box 和 content-box 这两种盒模型的区别。 但是一说到 CSS 包含块,有的小伙伴就懵圈了,什么是包含块?好像…

微服务springcloud 02 创建项目中的三个service子系统,springcloud中注册中心Eureka介绍和把三个系统注册到Eureka中

item service项目 01.使用springboot创建项目 02.选择依懒项在这里插入代码片 spring web 03.添加sp01-commons依赖 在pom.xml文件中 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" x…

【色度学】光学基础

1. 光的本质 &#xff08;1&#xff09;波长不同的可见光&#xff0c;引起人眼的颜色感觉不同。 &#xff08;2&#xff09;人们观察到的颜色是物体和特有色光相结合的结果&#xff0c;而不是物体产生颜色的结果。 2. 光度量 【ISP】光的能量与颜色&#xff08;1&#xff0…

NIO 基础

3. 文件编程 non-blocking io 非阻塞 IO 1.1 Channel & Buffer channel 类似于 stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从 channel 将数据读入 buffer&#xff0c;也可以将 buffer 的数据写入 channel&#xff0c;而之前的 stream 要么是输入&#…

统信UOS V20 安装mysql5.7.42详细教程

1 安装包准备 到mysql官网可以看到最新的是8.0.33&#xff0c;想下载其他版本的点击 Looking for previous GA versions?Select Operating System: 选择如下版本的mysql 安装包 2 安装 2.1 上传文件至服务器 下载后通过远程将安装包上传至服务器&#xff0c;我这里将安装…

Seesion会话超时时间测试-业务安全测试实操(3)

Seesion会话超时时间测试, Cookie仿冒测试, 密文比对认证测试 本地加密传输测试-业务安全测试实操(2)_luozhonghua2000的博客-CSDN博客 测试原理和方法 在用户成功登录系统获得Session认证会话后,该Session认证会话应具有生命周期,即用户在成功登录系统后,如果在固定时间内…

两个链表相加

描述 假设链表中每一个节点的值都在 0 - 9 之间&#xff0c;那么链表整体就可以代表一个整数。 给定两个这种链表&#xff0c;请生成代表两个整数相加值的结果链表。 数据范围&#xff1a;0≤n,m≤1000000&#xff0c;链表任意值 0≤val≤9 要求&#xff1a;空间复杂度 O(n)…

Triton教程 -- 利用Triton部署你自己的模型

Triton教程—利用Triton部署你自己的模型 给定一个经过训练的模型&#xff0c;我如何使用 Triton 推理服务器以最佳配置大规模部署它&#xff1f; 本文档旨在帮助回答这个问题。 对于那些喜欢高级概述的人&#xff0c;下面是大多数用例的通用流程。 对于那些希望直接进入的人…