文章目录
- RocketMQ 相关概念
- 消息模型
- MQ 的简单消息模型
- RocketMQ 的复杂消息模型
- RocketMQ 中消息相关概念
- 消息(Message)
- 主题(Topic)
- Tags
- 队列
- 消息标识
- RocketMQ 中的物理对象
- NameServer
- Broker
- Producer
- Consumer
- NameServer 与 Broker 在 Windows 本地启动示例:
- 本地启动 NameServer
- 本地启动 Broker
- tools 工具测试消息消费
- 可视化工具 rocketmq-dashboard
- 生产者和消费者Java代码示例
RocketMQ 相关概念
消息模型
MQ 的简单消息模型
MQ 是一种提供消息队列服务
的中间件(消息中间件),提供了消息生产、存储、消费全过程API的软件系统。消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer)。MQ 的基础消息模型就是一个发布/订阅(Pub/Sub)模型。
一个简单的 Pub/Sub 概念模型:包括生产者 (Producer),消费者 (Consumer),中间进行基于**消息主题(Topic)**的消息传送。
RocketMQ 的复杂消息模型
在实际应用中的消息模型的结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区,同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
下图是 RocketMQ 支持的一种扩展后的消息模型,包括两个生产者,两个消息Topic,以及两组消费者 Comsumer。
- 为了消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作后的分区被称为队列(MessageQueue)。
- 为了消费能力的水平扩展,引入了 ConsumerGroup 的概念。
RocketMQ 中消息相关概念
消息(Message)
-
消息是最小数据传输单元,每条消息必须属于一个主题。
-
RocketMQ 中的消息源码:org.apache.rocketmq.common.message.Message
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; private String topic; private int flag; private Map<String, String> properties; private byte[] body; // ....... public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { this.topic = topic; this.flag = flag; this.body = body; if (tags != null && tags.length() > 0) this.setTags(tags); if (keys != null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK); } // ...... }
在 RocketMq 中,Message(消息)包括 Topic(主题)、body(消息体)、properties、flag。可以看出在构造消息时,就需要指定消息所属的 Topic。
topic
:表示消息的主题(Topic),用于标识消息所属的主题分类。消费者可以根据主题来订阅消息。body
:表示消息的内容(Body),它是一个字节数组(byte[]
)类型,用于存储消息的实际内容数据。properties
:表示消息的属性(Properties),它是一个键值对(Map<String, String>
)类型的对象,用于存储消息的自定义属性信息。常用的 tags、keys 属性键值对就保存在 Properties 中tags
:表示消息的标签(Tags),用于对消息进行更细粒度的分类。消费者可以根据标签来过滤和选择特定类型的消息。keys
:表示消息的关键字(Keys),用于标识消息的唯一性。可以根据关键字来进行消息的检索和查询。
flag
:表示消息的标志(Flag),用于标识一些特殊的消息属性。
主题(Topic)
-
主题是消息传输和存储的顶层容器,用于
标识
同一类业务逻辑的消息。但主题是一个逻辑概念
,并不是实际的消息容器。是 Rocketmq 进行消息订阅的基本逻辑单位。-
定义数据的分类隔离:通过主题实现逻辑上的存储隔离性和订阅隔离性。
-
定义数据的身份和权限:消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
-
-
主题内部由多个队列组成,消息的存储和水平扩展能力最终是由队列实现的;并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列来实现。
Tags
-
Topic 与 Tag 都是业务上用来归类的
逻辑标识
,区别在于 Topic 是一级分类,而 Tag 可以理解为二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。-
Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
-
Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。
-
队列
- 队列是中消息存储和传输的
实际物理容器
,是消息的最小存储单元。 - 一个 Topic 可能有多个队列,并且可以分布在不同的 Broker 上。
- 一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中。
消息标识
Rocketmq 中的每个消息拥有唯一的 MessageId,还可以携带具有业务标识的 Keys,以便对消息的查询。对于 MessageId 有两个,在生产者发送消息时会自动生成一个 MessageId(msgId),当消息到达 Broke 后,Broke 也会自动生成一个 MessageId(offsetMsgId)。Key、msgId 和 offsetMsgId 都称为消息标识。
-
msgId:由 producer 端生成,生成规则为:
produceIp + 进行pid + MessageClientIDSetter 类的 classLoader 的 hashcode + 当前时间 + AutomicInteger 自增计数器。
-
offsetMsgId:由 broke 端生成,生成规则为:brokerIp + 队列 Queue 中的偏移量(offset)。
-
keys:由用户指定与业务相关的唯一标识。
RocketMQ 中的物理对象
NameServer
NameServer 是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。
Broker
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
部署模型
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
Producer
RocketMq 中提供一个用于发送消息的核心组件。
生产者组:在 RocketMQ 中,生产者组(Producer Group)是一组具有相同业务逻辑的生产者实例的集合。它们共享相同的身份和配置,并且协同工作以实现高可用和负载均衡。
- 负载均衡:在同一生产者组下,消息会均匀地分布到不同的生产者实例上,从而实现负载均衡,生产者组负载均衡是通过 Name Server 来协调的。
- 故障转移:如果生产者组内的某个实例发生故障或停止工作,RocketMQ 会自动将该实例的消息发送任务转移到其他正常工作的实例上,以确保消息发送的可靠性和持续性。这种故障转移机制可以提高系统的可用性和容错性。
启动生产者最基本流程:
-
配置并初始化:在使用生产者(如:
DefaultMQProducer
) 之前,需要设置生产者的组名(Group Name)和名称服务器的地址(Name Server Address)。 -
启动:调用
start()
方法,可以启动生产者并将其连接到名称服务器。 -
之后便可以使用生产者发送消息了。
public class ProducerExample { public static void main(String[] args) throws Exception { // 基本配置 DefaultMQProducer producer = new DefaultMQProducer("your_producer_group"); producer.setNamesrvAddr("your_nameserver_address"); // 启动 producer.start(); // ...... } }
RocketMq 消息发送的三种方式:
- 可靠同步发送,一个消息发送后收到 Broker 响应再发送下一个消息,使用
send(Message)
方法。 - 可靠异步发送,一个消息发送后不需要收到 Broker 响应便可再发送下一个消息,通过回调函数处理消息发送结果,使用
send(Message, SendCallback)
方法。 - 单向发送,一个消息单向发送到 Broker,不需要等待 Broker 服务器的响应,使用
sendOneway(Message)
方法。
Consumer
消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。
消费者分组(ConsumerGroup)是一组具有相同业务逻辑的消费者者实例的集合。通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾,同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。
RocketMq 中有两种消费模式:
- 集群(Clustering)模式:在集群模式下,多个消费者可以组成一个消费者组(Consumer Group)。每个消费者组内的消费者协同消费消息,每个消息只被消费者组内的一个消费者消费。可以通过扩缩消费者数量,来提升或降低消费能力。
- 广播(Broadcasting)模式:在广播模式下,消息推送给消费组所有的消费者,每个消费者都会独立消费消息。扩缩消费者数量无法提升或降低消费能力。
RocketMq 中有两种消费方式(消费者模型):
- 推模式:主动推送消息给消费者。默认的消息者实现类为 DefaultMQPushConsumer。
- 拉模式:消费者主动拉取消息。默认的消息者实现类为 DefaultMQPullConsumer。
启动消费者最基本流程:
-
配置并初始化:在使用消费者(如:
DefaultMQPushConsumer
) 之前,需要设置消费者的组名(Group Name)和名称服务器的地址(Name Server Address)。这里是启动消费者最基本配置,但是启动消费者是为了订阅消息消费,所以还需要配置一些消费消息的信息。 -
- 配置要订阅消息的主题(Topic);
- 配置消费者消费模型(不指定默认为集群模式 CLUSTERING);
- 配置并注册消息监听器(
MessageListenerConcurrently
或MessageListenerOrderly
),并在监听器中实现消息消费的处理逻辑。
-
启动:调用
start()
方法,可以启动消费者并将其连接到名称服务器。 -
之后便可以使用消费者者监听消息消费了。
public class ConsumerExample { public static void main(String[] args) throws Exception { // 基本配置 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group"); consumer.setNamesrvAddr("your_nameserver_address"); // 设置消费起始位置为最新位置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置订阅主题和标签 consumer.subscribe("your_topic", "your_tag"); // 注册消息监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { // 处理接收到的消息 System.out.println("Received message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动 consumer.start(); // ...... } }
NameServer 与 Broker 在 Windows 本地启动示例:
官网下载 RocketMq 二进制包压缩包并解压:https://rocketmq.apache.org/download/,这里下载 4.9.6 版本。
这里提供已经下载好的源码和二进制压缩包,请关注微信公众号【Qin的学习营地】,回复【RocketMq安装包】。
配置环境变量:
- 编辑环境变量:ROCKETMQ_HOME = F:\rocketmq\rocketmq-4.9.6
- 添加到环境路径下:Path = %ROCKETMQ_HOME%\bin
本地启动 NameServer
进入 cmd 命令窗口,启动命令:
start mqnamesrv.cmd
如下代表启动成功:
本地启动 Broker
进入 cmd 命令窗口,启动命令:
mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
-n
参数指定 NameServer 的地址和端口号,NameServer默认启动端口是 9876。- autoCreateTopicEnable=true 表示自动创建主题。
如下代表启动成功:
tools 工具测试消息消费
在使用测试工具测试之前需要配置环境变量:
- 编辑环境变量:NAMESRV_ADDR = localhost:9876 或 NAMESRV_ADDR = 127.0.0.1:9876
- 添加到环境路径下:Path = %NAMESRV_ADDR%\bin
启动消费者:
进入命令行执行以下命令:
start tools.cmd org.apache.rocketmq.example.quickstart.Consumer
如下代表启动成功:等待消费消息。
启动发送者:
进入命令行执行以下命令:
start tools.cmd org.apache.rocketmq.example.quickstart.Producer
可以看到发送的消息:
同时也可以看到消费者消费消息:
可视化工具 rocketmq-dashboard
可视化源码下载:https://github.com/apache/rocketmq-dashboard,这里下载的是最新版的控制台,支持 RocketMq 5.x 版本。
对于 RocketMq 4.x 版本存在不兼容场景,需要去提交历史中找旧版本的控制台,如:https://github.com/apache/rocketmq-externals/tree/d8f1cb444c5ed4455d5d7f62e636f3fe9a8c8824,找 rocketmq-externals 工程下的 rocketmq-console 模块。
这里提供已经下载好的旧版本控制台(支持 RocketMq 4.x 版本)和新版本控制台(支持 RocketMq 5.x 版本)压缩包,请关注微信公众号【Qin的学习营地】,回复【RocketMq控制台】。
Maven spring-boot run 启动:
mvn spring-boot:run
或 Maven build and run 编译运行:
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
或 将源码在 IDEA 打开,本地运行。
启动后输入便看进入 dashboard 界面: http://127.0.0.1:8080,dashboard 默认应用程序端口为 8080。
生产者和消费者Java代码示例
引入依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.6</version>
</dependency>
消费者代码示例:
public class ConsumerService {
private static final String TOPIC_TEST = "topic_test";
private static final String TAGS_TEST = "tags_test";
/**
* 1、配置消费者分组
* 2、配置 NamesrvAddr 地址
* 3、配置Topic、Tags路由信息
* 4、注册监听器
* 5、启动消费者
*/
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
pushConsumer.setConsumerGroup("TEST_CONSUMER_GROUP");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe(TOPIC_TEST, TAGS_TEST);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
}
}
生产者代码示例:
public class ProducerService {
private static final String TOPIC_TEST = "topic_test";
private static final String TAGS_TEST = "tags_test";
private static final String KEYS_TEST = "keys_test";
public static void main(String[] args) throws Exception {
/**
* 1、配置生产者者分组
* 2、配置 NamesrvAddr 地址
* 3、启动生产者
* 4、发送消息
* 5、关闭生产者
*/
DefaultMQProducer mqProducer = new DefaultMQProducer();
mqProducer.setProducerGroup("TEST_PRODUCER_GROUP");
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.start();
SendResult sendResult = mqProducer.send(new Message(TOPIC_TEST, TAGS_TEST, KEYS_TEST, "body test".getBytes()));
System.out.println(sendResult);
mqProducer.shutdown();
}
}
运行上述代码,再启动可视化控制台观察:
可以看到消费者相关信息,但在点击消费详情时,右上角报异常:
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 3 DESC: request type 351 not supported For more information, please visit the url, https://rocketmq.apache.org/docs
点击消息编辑项,可以查看消息消费情况。在这里我的 idea 消费者控制台打印出消息信息,也就是消息已经消费了,但是下面的 trackType 还是为 UNKNOWN,不是 CONSUMED 已消费状态。
以上出现的问题主要是 RocketMq 和控制台版本不兼容导致的,这里使用的 RocketMq 是 4.9.6 版本,而这里使用的控制台是新版本,支持 RocketMq 5.x 以上的版本,可以在控制台源码的 pom.xml 中看引入 RocketMq 的版本为 5.1.0。
所以这里需要使用旧版本的控制台,前面提供了下载地址和已经下载好的压缩包。下载旧版本控制台,可以看到这里下载的旧版本控制台的 pom.xml 中引入 RocketMq 的版本为 4.9.0。启动控制台(配置文件中记得配置 namesrvAddr),上面的问题就能解决了,消息消费后,trackType 为 CONSUMED 已消费状态。
RocketMQ 相关概念
与使用入门