RocketMQ 的顺序消息功能允许消息以发送的顺序被消费,这对于很多业务场景(如交易处理、订单生成等, 或某些需要按照一定顺序执行的业务场景)至关重要,因为这些场景下操作的执行顺序不能被打乱。顺序消息的实现需要确保消息在发送和存储过程中保持顺序,同时消费过程也要严格按照这一顺序进行
一. 消息发送与存储顺序
1. 消息分区
-
在发送消息时, 消息会基于某种分区策略(通常是业务标识符, 如订单ID)被分配到特定的消息队列。
-
RocketMQ 使用了一致性哈希或其他算法来决定消息应该路由到哪个队列, 所有相同分区键的消息都会发送到同一队列
-
总结来说, 就是生产者实例在发送消息的时候, 必须发送到同一个 topic, 由于该 topic 下面的 queue 队列不会只有一个, 所以还要根据某个唯一的业务id, 来做一致性哈希, 来确保本次发送的几条消息, 只发送到该 topic 下的一个同 queue 里面
-
比如该 topic 下面有 4个 messageQueue, 如果有多条消息, 只会发送到一个固定的 queue里面
2. 队列选择
- 当生产者发送消息时, 它可以指定一个队列选择器, 该选择器基于业务键(如订单ID)决定使用哪个队列
- 通过这种方式, 所有关于同一实体的操作都会被顺序处理, 因为它们被发送到同一个队列中并且队列内部消息是有序的
3. 发送原理图
其实就是一个先进先出队列, 同一个 (业务id) 下的消息, 比如 订单生成, 支付, 完成, 他们订单id 都会一样, 经过 hash 后, 都会投递到该 topic 下的 一个 queue 里面, 并且会按照投递顺序进行消费
4. 发送消息Demo代码示例
下面模拟订单流程的, 创建 -> 支付 -> 发货流程的伪代码, 之后发送消息
@Autowired
private MessageQueueSelector testMessageQueueSelector;
@GetMapping("/messageTags")
public void testMessageTags(HttpServletRequest request) {
// 全局订单id
long orderId = SnowFlakeIdWorker.createIdWorker();
// 订单创建消息
OrderProcessCreate orderProcessCreate = new OrderProcessCreate();
orderProcessCreate.setOrderId(orderId);
orderProcessCreate.setProcessType("ORDER_CREATE");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessCreate.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
orderProcessCreate, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));
// 订单支付消息
OrderProcessPay orderProcessPay = new OrderProcessPay();
orderProcessPay.setOrderId(orderId);
orderProcessPay.setProcessType("ORDER_PAY");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessPay.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
orderProcessPay, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));
// 订单发货消息
OrderProcessLogistics orderProcessLogistics = new OrderProcessLogistics();
orderProcessLogistics.setOrderId(orderId);
orderProcessLogistics.setProcessType("ORDER_LOGISTICS");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessLogistics.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
orderProcessLogistics, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));
}
RocketMqSender 是自行封装的, 里面用的 springboot 的 rocketmqTemplate, 相关代码如下, 关键的2行代码是
1. 发送之前自定义 messageQueueSelector 接口的实例
2. 之后调用 syncSendOrderly 方法, 传递 topic, message 和 hashKey
rocketMQTemplate.setMessageQueueSelector(messageQueueSelector);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
注意: 由于我用的 SpringBoot 框架封装的 RockeMQ-Starter, 这个最外层方法的 message 对象的 Spring 框架的 Message 顶级接口:
org.springframework.messaging.Message
后续代码, 框架会把 Spring 框架的 Message 类, 转换为 Rocketmq 框架中的 Message 类
@Slf4j
@Component
public class RocketMqSender {
private static RocketMQTemplate rocketMQTemplate;
@Autowired
private void setRocketMQTemplate(RocketMQTemplate rocketMQTemplate) {
RocketMqSender.rocketMQTemplate = rocketMQTemplate;
}
public static void sendMessage(String topic, String messageKey, Object payload, String token, MessageQueueSelector messageQueueSelector, String hashKey) {
if (Objects.isNull(topic) || Objects.isNull(payload) || StringUtils.isBlank(messageKey)) {
log.error("topic or message or key not be null");
return;
}
Message<Object> message = MessageBuilder
.withPayload(payload)
.setHeader(RocketMQHeaders.KEYS, messageKey)
.setHeader(SystemDefines.JWT_TOKEN_HEADER, token)
.build();
rocketMQTemplate.setMessageQueueSelector(messageQueueSelector);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
log.info("mq sendMessage success, topic={}, message={}", topic, JSON.toJSONString(payload));
}
}
5. 顺序发送相关框架源码
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
// 如果是发送顺序消息, 这里的参数有 messageQueueSelector, 和 hashKey
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
再往下看源码, 发现在真正发消息之前, 会选择 MessageQueue
rocketmq 默认帮我们实现的是有队列选择器的, 不过我们也可以自定义队列选择器
不过开源版本的, 服务器机房相关的 SelectMessageQueueByMachineRoom 源码没有实现
自定义队列选择器, 必须实现 rocketmq 的 MessageQueueSelector 的接口, 实现逻辑也是根据一致性 hash
@Component
public class TestMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String id = String.valueOf(arg);
int hashCode = id.hashCode();
hashCode = Math.abs(hashCode);
long index = hashCode % mqs.size();
return mqs.get((int) index);
}
}
6. 消息到达 broker
-
消息首先到达 broke r的消息接收系统, broker 根据生产者指定的 topic 和队列(或自行计算得出的队列)确定消息的目标队列
-
每个消息队列在 broker 中都对应一个逻辑存储单元, 所有队列的消息都存储在同一个物理文件(CommitLog)中, 但逻辑上通过不同的队列ID和偏移量分隔
-
当 broker 接收到消息后, 它将消息追加到 commitLog 的末尾, broker 还会在内存中维护一个队列索引, 记录每个队列当前的读写位置
二. 消息消费顺序
1. 消费者行为
-
顺序消息的消费者在启动时会尝试锁定一个或多个队列,以确保在给定时间内只有一个消费者实例消费该队列中的消息
那么问题来了:
问 : 由于消费队列 MessageQueue 存在于 broker 端, 如何保证一个队列只被一个消费者实例拉取到?
原理是: 消费客户端先向 broker 端发起对 messageQueue 的加锁请求, 只有加锁成功时才创建pullRequest 进行消息拉取, 下面看下 lock 加锁请求方法源码:
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
final long delayMills) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 顺序消费者实例, 开始申请 messageQueue 锁
boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
if (lockOK) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
} else {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
}
}
}, delayMills, TimeUnit.MILLISECONDS);
}
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
// 创建远程请求, 请求 broker 获取 messageQueus 队列锁, 锁定成功, 返回这个 messageQueue
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
public Set<MessageQueue> lockBatchMQ(
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
// 根据参数传递的 broker addr 创建远程请求对象
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
return messageQueues;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
再往下就是 rocketmq 使用 netty 发送网络请求的代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//get the request id
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 使用 netty 发送网络请求
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
responseFuture.setSendRequestOK(false);
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr);
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
加锁成功, 获取到 messageQueue 后, 把 messageQueue 提交到线程池中进行消费
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
final long delayMills) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 上面的向 broker 申请队列锁
boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
if (lockOK) {
// 加锁成功后, 把 messageQueue 给到线程池进行消费
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
} else {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
}
}
}, delayMills, TimeUnit.MILLISECONDS);
}
private void submitConsumeRequestLater(
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final long suspendTimeMillis
) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
}
if (timeMillis < 10) {
timeMillis = 10;
} else if (timeMillis > 30000) {
timeMillis = 30000;
}
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 调度线程去消费这个 messageQueue
ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
}
}, timeMillis, TimeUnit.MILLISECONDS);
}
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
// 构建 ConsumeRequest 提交到线程池
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
来看一下这个 ConsumeRequest, 发现该类实现了 Runnable 接口, 那么必然有 run 方法
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest
// 实现了 Runnable 接口
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
来看一下 run 方法, 发现直接使用 synchronized 关键字去申请 jvm 进程内的排他锁, 同一时刻只有一个线程可以执行该方法
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 请求 jvm 进程内的 排他锁, 同一时刻只有一个线程, 可以执行该方法, 消费消息
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
所以, 整个顺序消费, 消费者的行为也清晰了, 可以梳理一下, 只要有2点:
1. 如有多个消费者实例, 每个消费者实例, 都会去向 broker 申请 messageQueue 锁,
实现方式是使用 netty 远程请求到 broker 申请 MessageQueue 锁
2. 获取到 MessageQueue 锁的消费者实例中, 同一时刻, 有且只能有一个消费者线程去
实现方式是使用 synchronized 关键字去申请 jvm 排他锁
所以保证了: 一个 MessageQueue 中的消息, 在同一时刻, 只能由一个消费者实例中的一个消费者线程去消费消息
2. 代码运行模拟
启动微服务, 发现消费者客户端已经注册上去
查看 topic 下的 queue 数量, 有 4 个
调用 接口直接发送消息
http://localhost:7004/test/messageTags
观察打印的日志, 发现这批消息, 只会被 ConsumeMessageThread_orderProcess-check-group_3 这个消费者现成, 顺序的消费:
2024-06-03 17:19:00 [ConsumeMessageThread_orderProcess-check-group_3] INFO OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_CREATE, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F0210006, queue=1, orderProcess=ORDER_CREATE, key=1797558588185509889
2024-06-03 17:19:01 [ConsumeMessageThread_orderProcess-check-group_3] INFO OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_PAY, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F02D0007, queue=1, orderProcess=ORDER_PAY, key=1797558588240035840
2024-06-03 17:19:02 [ConsumeMessageThread_orderProcess-check-group_3] INFO OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_LOGISTICS, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F0390008, queue=1, orderProcess=ORDER_LOGISTICS, key=1797558588290367488
通过日志可以看到, 消息的 queue id 都为 1, 我们再次打开 rocketmq 控制台, 查询这个 topic 下的 queue id 为 1 的 MessageQueue, 发现现在最大位点为 27, 比发送之前的最大位点 24 多了 3, 也可以说明刚才的消息, 都投递到了 queue id 为 1的这个 MessageQueue 中