0.前文
RocketMQ源码分析(三) 消费者
RocketMQ源码分析(二) 生产者
RocketMQ源码分析(一)broker启动&remoting抽象
1. 概述
RocketMQ的延迟消息是指消息发送到Broker后,不会立即被消费者消费,而是要等待指定的时间后才能被消费。本文将从源码层面分析延迟消息的实现原理。
2. 核心类图
主要类及其关系:
-
ScheduleMessageService
- Broker端延迟消息处理服务
- 管理延迟级别的定时任务
- 转换延迟消息到真实主题
-
DelayCombineMessageStore
- 延迟消息存储实现
- 管理延迟消息的commitlog
- 提供延迟消息的读写能力
-
ScheduleMessageTask
- 延迟消息调度任务
- 处理到期的延迟消息
- 将消息投递到真实主题
3. 延迟消息实现原理
3.1 延迟级别设计
RocketMQ默认支持18个延迟级别:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
实现原理:
- 每个延迟级别对应一个定时任务
- 每个延迟级别对应一个内部主题
- 延迟消息先存储到对应级别的主题
- 到期后投递到真实主题
3.2 消息发送流程
发送流程:
- Producer设置消息延迟级别
- Broker接收到消息后:
- 计算延迟时间戳
- 将消息存储到延迟主题
- 返回发送结果
示例代码:
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置延迟级别为3级(10秒)
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
3.3 消息调度流程
调度流程:
- ScheduleMessageService启动定时任务
- 扫描延迟队列中到期的消息
- 将消息投递到目标主题
- 更新消费进度
核心代码:
public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>();
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>();
public void executeOnTimeup() {
// 处理到期的延迟消息
ConsumeQueue cq = DefaultMessageStore.this.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
// 读取消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
// 投递到真实主题
MessageExtBrokerInner msgInner = MessageDecoder.decode(bufferMsg);
msgInner.setTopic(messageExt.getTopic());
// 更新消费进度
PutMessageResult putMessageResult = DefaultMessageStore.this.putMessage(msgInner);
}
}
3.4 消息消费流程
消费流程:
- 消息到期后投递到真实主题
- Consumer正常消费目标主题
- 对Consumer透明,无需特殊处理
4. 实现细节
4.1 延迟主题设计
- 使用SCHEDULE_TOPIC_XXXX作为延迟主题
- 每个延迟级别对应一个队列ID
- 队列ID = 延迟级别 - 1
4.2 定时调度机制
- 基于DelayQueue实现定时功能
- 每个延迟级别一个调度任务
- 任务执行时间由延迟级别决定
- 支持动态调整调度参数
4.3 消息转换机制
延迟消息转换过程:
- 保存原始消息主题
- 临时存储到延迟主题
- 到期后恢复原始主题
- 投递到目标队列
5. 最佳实践
5.1 使用建议
- 合理选择延迟级别
- 避免设置过多的延迟消息
- 注意延迟消息的顺序性
- 考虑消息可靠性要求
5.2 常见问题
-
延迟精度问题
- 依赖定时任务调度
- 存在一定误差范围
- 不适合精确定时要求
-
顺序性问题
- 同一延迟级别基本有序
- 不同级别无法保证顺序
- 需要业务层面考虑
-
性能问题
- 会占用额外存储空间
- 增加系统调度开销
- 需要合理规划容量
6. 总结
RocketMQ延迟消息的特点:
- 支持多个延迟级别
- 实现机制简单高效
- 对用户基本透明
- 可靠性有保证
通过定时调度和主题转换的方式,实现了灵活可靠的延迟消息功能。