RocketMQ学习笔记

kafka适合于日志收集的场景(不需要太多topic;topic下面的partition多了会造成写文件的速度变慢,因为要造很多索引)
RocketMQ更适合于电商场景(适用于topic特别多的情况)

快速安装RocketMQ

RocketMQ的官网地址: http://rocketmq.apache.org ,github地址是 https://github.com/apache/rocketmq 。
最新版本的RocketMQ可以到官网上进行下载。历史版本需要到Github仓库中下载。下载地址:https://github.com/apache/rocketmq/releases。

运行RocketMQ需要先安装JDK。
配置环境变量。使用 vi ~/.bash_profile编辑文件,在下面加入以下内容:

export JAVA_HOME=/app/jdk1.8/
PATH=$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

编辑完成后,执行 source ~/.bash_profile让环境变量生效。输入java -version能查看到以下内容表明JDK安装成功了。

[oper@worker1 ~]$ java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

然后把下载的压缩包在本地完成解压,并上传到/app/rocketmq目录。完成后,把rocketmq的bin目录也配置到环境变量当中。
vi ~/.bash_profile,加入以下内容,并执行source ~/.bash_profile让环境变量生效:

export JAVA_HOME=/app/jdk1.8/
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.9.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH
这个ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。
这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。
所以实际情况中,可以不按这个配置,但是一定要能找到配置文件

NameServer服务搭建

启动NameServer非常简单, 在$ROCKETMQ_HOME/bin目录下有个mqadminsrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。但是要注意,RocketMQ默认预设的JVM内存是4G,这是RocketMQ是最佳配置。但是通常用虚拟机的话都是不够4G内存的,所以需要调整下JVM内存大小。修改的方式是直接修改runserver.sh。 用vi runserver.sh编辑这个脚本,在脚本中找到这一行调整内存大小为512M

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

然后用静默启动的方式启动NameServer服务:

nohup bin/mqnamesrv &

启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。并且使用jps指令可以看到有一个NamesrvStartup进程。

Broker服务搭建

启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。vi runbroker.sh,找到这一行,进行内存调整

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

然后需要找到$ROCKETMQ_HOME/conf/broker.conf, vi指令进行编辑,在最下面加入一个配置:

autoCreateTopicEnable=true

然后也以静默启动的方式启动runbroker.sh

nohup ./mqbroker &

启动完成后,同样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 并且jps指令可以看到一个BrokerStartup进程。

在观察runserver.sh和runbroker.sh时,还可以查看到其他的JVM执行参数,这些参数都可以进行定制。
观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1垃圾回收器。

命令行启动客户端

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。在worker2上进入RocketMQ的安装目录:

首先需要配置一个环境变量NAMESRV_ADDR指向启动的NameServer服务。

export NAMESRV_ADDR='localhost:9876'

然后启动消息生产者发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

然后启动消息消费者接收消息:

 bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ服务
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

集群架构解析

在这里插入图片描述

一个完整的RocketMQ集群中,有如下几个角色

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

RocketMQ集群搭建与优化

准备三台虚拟机,硬盘空间建议大于4G。配置机器名。

#vi /etc/hosts
192.168.232.128 worker1
192.168.232.129 worker2
192.168.232.130 worker3

创建用户–可选

useradd oper
passwd oper (密码输入 123qweasd)

系统配置
免密登录
切换oper用户,在worker1上 生成key

ssh-kengen

然后分发给其他机器

ssh-copy-id worker1
ssh-copy-id worker2
ssh-copy-id worker3

这样就可以在worker1上直接ssh 或者scp到另外的机器,不需要输密码了。

关闭防火墙

systemctl stop firewalld.service
firewall-cmd --state
配置RocketMQ主从集群

搭建一个2主2从异步刷盘的集群,所以会使用conf/2m-2s-async下的配置文件。预备设计的集群情况如下:

机器名nemaeServer节点部署broker节点部署
worker1nameserver
worker2nameserverbroker-a, broker-b-s
worker3nameserverbroker-b,broker-a-s
所以修改的配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件。主要是配置broker.conf文件。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:
2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。
通常正式环境都会采用这种方式来搭建集群。
配置第一组broker-a

在worker2上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

该节点对应的从节点在worker3上。修改2m-2s-async/broker-a-s.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

配置第二组Broker-b
这一组broker的主节点在worker3上,所以需要配置worker3上的config/2m-2sasync/broker-b.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

然后他对应的slave在worker2上,修改work2上的 conf/2m-2s-async/broker-b-s.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

这样2主2从的集群配置基本就完成了。搭建过程中需要注意的配置项:
1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
3、如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,指定所在机器的外网网卡地址。

启动RocketMQ

直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。
1、先启动nameServer
修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

直接在三个节点上启动nameServer。

nohup bin/mqnamesrv &

启动完成后,在nohup.out里看到这一条关键日志就是启动成功了
在这里插入图片描述
使用jps指令可以看到一个NamesrvStartup进程
2、再启动broker
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件

在worker2上启动broker-a的master节点和broker-b的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &

在work3上启动broker-b的master节点和broker-a的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

3、启动状态检查
使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。
nohup.out中也有启动成功的日志。
对应的日志文件:

# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log

4、测试mqadmin管理工具
RocketMQ源码中并没有提供管理控制台,只提供了一个mqadmin指令来管理
RocketMQ。指令的位置在bin目录下。直接使用该指令就会列出所有支持的命令。
在这里插入图片描述
使用方式都是 mqadmin {command} {args}。 如果有某个指令不会使用,可以
使用 mqadmin help {command} 指令查看帮助。

5、命令行快速验证
RocketMQ提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服
务。例如,在worker2机器上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,
所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,
这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'

这个tooles.sh实际上是封装了一个简单的运行RocketMQ的环境,上面指令中指定的Java类,都在lib/rocketmq-example-4.7.1.jar包中。未来如果有一些客户端示例,也可以打成jar包放到这个lib目录下,通过tools.sh运行。

搭建管理控制台

RocketMQ源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,地址: https://github.com/apache/rocketmq-dashboard

下载下来后,解压并进入对应的目录,使用maven进行编译

mvn clean package -Dmaven.test.skip=true

编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.yml中需要指定nameserver的地址。默认这个属性是指向本地。如果配置为空,会读取环境变量NAMESRV_ADDR。

那可以在jar包的当前目录下增加一个application.yml文件,覆盖jar包中默认的一个属性:

rocketmq:
  config:
    namesrvAddrs:
      - worker1:9876
      - worker2:9876
      - worker3:9876

然后执行:

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

搭建Dledger高可用集群–了解

通过这种方式,搭建了一个主从结构的RocketMQ集群,但是要注意,这种主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。注意这个集群至少要是3台,允许少于一半的节点发生故障。

如果slave挂了,对集群的影响不会很大,因为slave只是做数据备份的。
但是影响也是会有的,例如,当消费者要拉取的数据量比较大时,
RocketMQ有一定的机制会优先保证Master节点的性能,只让Master节点返回一小部分数据,
而让其他部分的数据从slave节点去拉取。

另外,需要注意,Dleger会有他自己的CommitLog机制,也就是说,
使用主从集群累计下来的消息,是无法转移到Dleger集群中的。

而如果要进行高可用的容灾备份,需要采用Dledger的方式来搭建高可用集群。

搭建方法
要搭建高可用的Broker集群,只需要配置conf/dleger下的配置文件就行。

这种模式是基于Raft协议的,是一个类似于Zookeeper的paxos协议的选举协议,也是会在集群中随机选举出一个leader,其他的就是follower。只是他选举的过程跟paxos有点不同。Raft协议基于随机休眠机制的,选举过程会比paxos相对慢一点。

系统参数调优 – 重要

到这里,整个RocketMQ的服务就搭建完成了。但是在实际使用时,RocketMQ的吞吐量、性能都很高,那要发挥RocketMQ的高性能,还需要对RocketMQ以及服务器的性能进行定制
1、配置RocketMQ的JVM内存大小:
之前提到过,在runserver.sh中需要定制nameserver的内存大小,在runbroker.sh中需要定制broker的内存大小。这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中都还需要根据服务器的实际情况进行调整。这里以runbroker.sh中对G1GC的配置举例,在runbroker.sh中的关键配置:

JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"

-XX:+UseG1GC: 使用G1垃圾回收器, -XX:G1HeapRegionSize=16m 将G1的region块大小设为16M,
-XX:G1ReservePercent:在G1的老年代中预留25%空闲内存,这个默认值是10%,RocketMQ把这个参数调大了。-XX:InitiatingHeapOccupancyPercent=30:当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收时间太长的问题

然后,后面定制了GC的日志文件,确定GC日志文件的地址、打印的内容以及控制每个日志文件的大小为30M并且只保留5个文件。这些在进行性能检验时,是相当重要的参考内容。

2、RocketMQ的其他一些核心参数
例如在conf/dleger/broker-n0.conf中有一个参数:sendMessageThreadPoolNums=16。这一个参数是表明RocketMQ内部用来发送消息的线程池的线程数量是16个,其实这个参数可以根据机器的CPU核心数进行适当调整,例如如果你的机器核心数超过16个,就可以把这个参数适当调大。

3、Linux内核参数定制
在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制。例如

  • ulimit,需要进行大量的网络通信和磁盘IO。

  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)

  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。

  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。

  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。

  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。建议设置文件描述符的值为655350。

      这些参数在CentOS7中的配置文件都在 /proc/sys/vm目录下。
      另外,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整。
    

RocketMQ消息转发模型

1 消息模型(Message Model)
​ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2 消息生产者(Producer)
​ 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

​ 生产者中,会把同一类Producer组成一个集合,叫做生产者组。同一组的Producer被认为是发送同一类消息且发送逻辑一致。

3 消息消费者(Consumer)
​ 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

  • 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

  • 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

​ 消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费模式下, 相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
    4 主题(Topic)
    ​ 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

​ Topic只是一个逻辑概念,并不实际保存消息。同一个Topic下的消息,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是一个具有FIFO特性的队列结构,生产者发送消息与消费者消费消息的最小单位。

5 代理服务器(Broker Server)
​ 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:

  • 普通集群:
    这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。

这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。

  • Dledger高可用集群:
    Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。

Dledger技术做的事情:1、从集群中选举出master节点 2、完成master节点往slave节点的消息同步。

6 名字服务(Name Server)
​ 名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

​ 这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。

7 消息(Message)
​ 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

​ 并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/561363.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

js 函数节流和函数防抖及区别详解

文章目录 1. 前言2. 函数节流3. 函数防抖4. 总结 1. 前言 浏览器中总是有一些操作非常耗费性能。所以就有了函数节流和函数防抖来提高浏览器性能。 函数节流:频繁触发一个事件时候,每隔一段时间,函数只会执行一次。 函数防抖:当触…

某零售企业招聘管理体系搭建咨询项目

科学岗位分析,改善招聘流程,提高招聘及时率随着公司不断发展壮大,企业规模逐渐增大,部门设置也日益增多,因此对人员的需求也日益提高。但是目前该企业在人员招聘方面逐渐暴露出一些诸如岗位分析不到位、缺乏整体面试计…

Java中对象如何拷贝?

hi,我是程序员王也,一个资深Java开发工程师,平时十分热衷于技术副业变现和各种搞钱项目的程序员~,如果你也是,可以一起交流交流。 今天我们来聊一聊Java中的对象拷贝~ 浅拷贝与深拷贝 在Java中,对象拷贝可…

向量数据库的崛起:如何改变数据存储与机器学习的未来

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

便携的图像背景去除工具PhotoScissors9.2版本在Windows系统的下载与安装配置

目录 前言一、PhotoScissors安装二、使用配置总结 前言 ​“ PhotoScissors是一个多功能和强大的照片编辑工具,专门为Windows用户寻求一个直观的解决方案,背景删除和图像编辑。作为专用的Windows软件,PhotoScissors提供了一个用户友好的平台…

vlan 和 trunk实验

VLAN(Virtual Local Area Network),即虚拟局域网,是一种网络技术,它的主要原理是将物理网络划分为多个逻辑子网,每个子网形成一个独立的广播域。这样,VLAN内的主机间通信就像在同一个局域网内一…

[Java基础揉碎]集合

目录 集合的理解和好处 数组 集合的理解和好处 继承图 ​编辑 简单实例 Collection接口和常用方法 1) add:添加单个元素 2) remove:删除指定元素 3) contains:查找元素是否存在 4) size:获取元素个数 5) isEmpty:判断是否为空 ​编辑 6) clear:清空 7) addAll:添…

碎碎笔记01

凹凸性 一元函数 凸函数&#xff1a;二阶导数>0 f ( x ) x 2 f(x) x^2 f(x)x2的二阶导数是 2&#xff0c;>0凹函数&#xff1a;二阶导数<0 驻点&#xff0c;拐点 驻点&#xff1a;增减性的交替点 拐点&#xff1a;凹凸性的交替点 脑补 f ( x ) s i n x f(x) …

揭阳硕榕超市管理系统的设计与实现(论文)_kaic

摘 要 在互联网高速发展环境下&#xff0c;传统的管理手段无法满足对信息的高效、快速的管理要求。为顺应时代发展的需要&#xff0c;提高超市的管理效能&#xff0c;提高超市的管理速度&#xff0c;构建一个信息化的工作流程&#xff0c;揭阳硕榕超市管理系统应运而生。 根…

spring boot: 使用MyBatis从hive中读取数据

一、hive表&#xff1a; 启动hiveserver2 二、添加mybatis starter和hive依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instan…

[阅读笔记23][JAM]JOINTLY TRAINING LARGE AUTOREGRESSIVE MULTIMODAL MODELS

这篇论文是24年1月发表的&#xff0c;然后是基于的RA-CM3和CM3Leon这两篇论文。它所提出的JAM结构系统地融合了现有的文本模型和图像生成模型。 主要有两点贡献&#xff0c;第一点是提出了融合两个模型的方法&#xff0c;第二点是为混合模型精心设计的指令微调策略。 下图是一个…

【Arduino IDE 环境配置】

目录 Arduino IDE 环境配置 1. 安装方式2. 操作方法&#xff08;Arduino中文社区&#xff09; 2.1. 安装Arduino IDE2.2. 下载固件2.3. 修改Arduino IDE语言2.4. 添加开发板管理网址2.5. 运行离线包2.6. 检查安装是否成功 下载Arduino IDE&#xff1a; 如果你还没有安装Arduin…

Spring Boot + Thymeleaf 实现的任务发布网站

角色&#xff1a; 管理员雇主雇员 功能 雇主&#xff1a;登录、注册、发布任务、选择中标雇员、评价雇员雇员&#xff1a;登录、注册、查看任务列表、投标任务、收藏任务、完成任务管理员、登录、任务管理、雇主管理、雇员管理 部分功能截图 部署 导入数据库…

MySQL学习-非事务相关的六大日志、InnoDB的三大特性以及主从复制架构

一. 六大日志 慢查询日志:记录所有执行时间超过long_query_time的查询&#xff0c;方便定位并优化。 # 查询当前慢查询日志状态 SHOW VARIABLES LIKE slow_query_log; #启用慢查询日志 SET GLOBAL slow_query_log ON; #设置慢查询文件位置 SET GLOBAL slow_query_log_file …

Unity实现动态数字变化

最近的项目需要动态显示数字&#xff0c;所以使用Text组件&#xff0c;将数字进行变化操作过程记录下来。 一、UI准备 1、新建一个Text组件 2、新建C#脚本 3、将Text挂载到脚本上 二、函数说明 1、NumberChange 方法 NumberChange 方法接收四个参数&#xff1a;初始数字 in…

maven问题汇总

​ 1、报错 failed to transfer from http://0.0.0.0/ during a previous attempt. com.byd.xxx:xxx-parent:pom:1.1.0-SNAPSHOT failed to transfer from http://0.0.0.0/ during a previous attempt. This failure was cached in the local repository and resolution is no…

一分钟成为点灯大师(超简单5行代码-STM32F407的HAL实现按键轮询点亮LED灯)

一、开发环境 硬件&#xff1a;正点原子探索者 V3 STM32F407 开发板 单片机&#xff1a;STM32F407ZGT6 Keil版本&#xff1a;5.32 STM32CubeMX版本&#xff1a;6.9.2 STM32Cube MCU Packges版本&#xff1a;STM32F4 V1.27.1 使用STM32F407的HAL库实现按键轮询读取按键值&…

ssh-key关于authorized_keys电脑与linux互相认证

思路&#xff1a; 在A上生成公钥私钥。将公钥拷贝给server B&#xff0c;要重命名成authorized_keys(从英文名就知道含义了)Server A向Server B发送一个连接请求。Server B得到Server A的信息后&#xff0c;在authorized_key中查找&#xff0c;如果有相应的用户名和IP&#xf…

C语言Linux vim shell命令

1. actionmotion dG删到文件尾 ggdG先到开头再删除到末尾 d^到达行首 d$到行尾 2. num action 2dd删除两行 t"向后寻找"找到&#xff0c;找到前面一个位置 f"向后寻找"找到&#xff0c;直接找到本来的位置 diw删除单词并保持在视图状态&#xff…

抖音abogus(收部Pixel2手机退坑的dd我走咸鱼淘宝)

声明 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01;wx a15018601872 本文章未…