RocketMQ源码阅读-八-定时消息和消息重试
- 定时消息
- 概念
- 逻辑流程图
- 延迟级别
- Producer发送定时消息
- Broker存储定时消息
- Broker发送定时消息
- Broker 持久化定时发送进度
- 消息重试
- 总结
定时消息
概念
官网给出的概念:https://rocketmq.apache.org/zh/docs/featureBehavior/02delaymessage
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至Broker服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
逻辑流程图
来源https://www.iocoder.cn/RocketMQ/message-schedule-and-retry/?github&1601
延迟级别
RocketMQ 目前只支持固定精度的定时消息。
官方给出不能任意时间延迟的原因:如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
延迟级别相关源码如下MessageStoreConfig:
/**
* 消息延迟级别字符串配置
*/
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
可以看到一共有18个延时级别。
解析延迟级别的代码在ScheduleMessageService:
private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<>(32);
/**
* 解析延迟级别
*
* @return 是否解析成功
*/
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
此方法,将延迟级别转换为毫秒数,存储在delayLevelTable中。
Producer发送定时消息
下面是官方给出发送定时消息的
//定时/延时消息发送
MessageBuilder messageBuilder = new MessageBuilderImpl();;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
主要通过setDeliveryTimestamp方法,设置定时时间。
Broker存储定时消息
Broker 存储消息时,延迟消息进入特定 Topic 为 SCHEDULE_TOPIC_XXXX。同时会将 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1。
核心代码在CommitLog#putMessage中:
/**
* 添加消息,返回消息结果
*
* @param msg 消息
* @return 结果
*/
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// ...省略代码
// 定时消息处理
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 延迟级别 与 消息队列编号 做固定映射
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// ...省略代码
}
延迟级别 与 消息队列编号 做固定映射的代码为ScheduleMessageService#delayLevel2QueueId:
/**
* 根据 延迟级别 计算 消息队列编号
* QueueId = DelayLevel - 1
*
* @param delayLevel 延迟级别
* @return 消息队列编号
*/
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
在生成ConsumeQueue时,每条消息的 tagsCode 使用【消息计划消费时间】。这样,ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。
相应的代码如下:
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
try {
// ... 省略代码
// 17 properties
short propertiesLength = byteBuffer.getShort();
if (propertiesLength > 0) {
byteBuffer.get(bytesContent, 0, propertiesLength);
String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties);
keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}
int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(//
topic, // 1
queueId, // 2
physicOffset, // 3
totalSize, // 4
tagsCode, // 5
storeTimestamp, // 6
queueOffset, // 7
keys, // 8
uniqKey, //9
sysFlag, // 9
preparedTransactionOffset// 10
);
} catch (Exception e) {
}
return new DispatchRequest(-1, false /* success */);
}
32行调用computeDeliverTimestamp方法计算计划消费时间:
/**
* 计算 投递时间【计划消费时间】
*
* @param delayLevel 延迟级别
* @param storeTimestamp 存储时间
* @return 投递时间【计划消费时间】
*/
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) {
return time + storeTimestamp;
}
return storeTimestamp + 1000;
}
计算出来的计划消费时间,作为tagsCode。后面Broker发送定时消息时会用到这个tagsCode进行过滤。
Broker发送定时消息
针对延时消息队列,即每一个SCHEDULE_TOPIC_XXXX主题,每个消费队列都会有一个单独的定时任务进行轮询,用来发送到达定时的计划消费时间的消息。
流程图如下:出处;https://www.iocoder.cn/RocketMQ/message-schedule-and-retry/?github&1601
相应的实现源码在DeliverDelayedMessageTimerTask 中:
该类继承TimerTask,是一个定时任务,源码如下:
/**
* 发送(投递)延迟消息定时任务
*/
class DeliverDelayedMessageTimerTask extends TimerTask {
/**
* 延迟级别
*/
private final int delayLevel;
/**
* 位置
*/
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
this.executeOnTimeup();
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
/**
* 纠正可投递时间。
* 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
*
* @param now 当前时间
* @param deliverTimestamp 投递时间
* @return 纠正结果
*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
long result = deliverTimestamp;
long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
if (deliverTimestamp > maxTimestamp) {
result = now;
}
return result;
}
public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) { // 消息到达可发送时间
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
// 发送消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
continue;
} else { // 发送失败
// XXX: warn and notify me
log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
// 安排下一次任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
// 更新进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
// 安排下一次任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
// 更新进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 安排下一次任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else { // 消费队列已经被删除部分,跳转到最小的消费进度
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}
/**
* 设置消息内容
*
* @param msgExt 消息
* @return 消息
*/
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
}
上面代码,实现了逻辑如下:
- 轮询延迟消息的topic,看是否有到期的定时任务
- 到期的定时任务,提交到CommitLog,供消费者消费
Broker 持久化定时发送进度
- 定时消息发送进度存储在文件(…/config/delayOffset.json)里
- 每 10s 定时持久化发送进度
核心代码在类ScheduleMessageService中:
public void start() {
// 定时发送消息
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 定时持久化发送进度
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Exception e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
此方法同样是启动一个定时任务,每10s执行一次持久化操作。
消息重试
消息重试发生在Consumer消费消费时,消费失败的消息会发回到Broker,进入延时消息队列,过一段时间重新消费。
所以消息重试,和定时/延时消息是密切相关的。
消费者将消费失败的消息发回Broker的源码在SendMessageProcessor#consumerSendMsgBack:
/**
* 消费者发回消息
*
* @param ctx ctx
* @param request 请求
* @return 响应
* @throws RemotingCommandException 当远程调用异常
*/
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
// ... 省略部分代码
// 处理 delayLevel
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
|| delayLevel < 0) { // 如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)
// 此时不会进入
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
// 设置延时
msgExt.setDelayTimeLevel(delayLevel);
}
// ... 省略部分代码
return response;
}
重点在于第26行,设置了延时时间。
总结
本篇分析了RocketMQ的定时消息的处理逻辑。
- RocketMQ不支持任意时间的延迟,只支持固定时间,因为性能考虑
- Producer发送定时消息只是调用setDeliveryTimestamp指定延迟时间或等级
- Broker会先将定时消息,存储在特定的Topic,名字格式为 SCHEDULE_TOPIC_XXXX
- Broker会启动一个定时任务,每1000ms执行一次,轮询 SCHEDULE_TOPIC_XXXX 中的消息,通过tagsCode过滤,将到期的消息发送到CommitLog
- Broker同时会启动持久化定时发送进度的任务,每10s执行一次
- 消息发送存储到Commitlog后,Consumer就可以消费到
- 消息消费失败时,Consumer会将消息发回到Broker的延时消息Topic,固定时间后再次重试消费