Producer
1 )发送消息注意事项
-
1.1 一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤
message.setTags("TagA");
-
1.2. 每个消息在业务局面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突
// 订单 Id String orderId = "20034568923546"; message.setKeys(orderId);
-
1.3 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段
-
1.4. send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义
- SEND_OK: 消息収送成功
- FLUSH_DISK_TIMEOUT: 消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- FLUSH_SLAVE_TIMEOUT: 消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- SLAVE_NOT_AVAILABLE: 消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
-
对于精于发送顺序消息的应用,由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果
sendresult 中的 status 字段不等于 SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样 -
对于消息不可丢失应用,务必要有消息重发机制
- 例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,戒者人工触发重发
2 )消息发送失败如何处理
-
Producer 的 send 方法本身支持内部重试,重试逻辑如下:
- 至多重试 3 次
- 如果发送失败,则轮转到下一个 Broker
- 这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s
- 所以,如果本身向 broker 发送消息产生超时异常,就不会再做重试
-
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做
-
如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker
-
上述 db 重试方式为什么没有集成到 MQ 客户端内部做,而是要求应用自己去完成,我们基于以下几点考虑
- MQ 的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是 cpu、内存、网络
- 如果 MQ 客户端内部集成一个 KV 存储模块,那数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受 MQ 运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失
- Producer 所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据
-
综上,建议重试过程交由应用来控制
3 )选择 oneway 形式发送
- 一个 RPC 调用,通常是这样一个过程
-
- 客户端发送请求到服务器
-
- 服务器处理该请求
-
- 服务器向客户端返回应答
-
- 所以一个 RPC 的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用 oneway 形式调用,oneway 形式只发送请求不等待应答,而发送请求在客户端实现局面仅仅是一个 os 系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒级
4 )发送顺序消息注意事项
-
4.1 选择合适的顺序消息类型,RocketMQ 支持两种类型的顺序消息:
- 全局顺序消息:所有消息严格按顺序发送和消费
- 分区顺序消息:消息按分区(队列)顺序发送和消费
-
4.2. 设置消息键(Message Key)
- 对于全局顺序消息,需要设置消息键(Message Key),确保消息的唯一性和顺序性
- 对于分区顺序消息,可以通过设置消息键来指定消息所属的分区
-
4.3. 使用顺序消息生产者,创建顺序消息生产者时,需要指定消息的顺序类型。例如:
// 创建顺序消息生产者 DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 设置消息的顺序类型 MessageQueueSelector selector = new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }; // 发送顺序消息 for (int i = 0; i < 10; i++) { Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, selector, i); System.out.printf("%s%n", sendResult); } producer.shutdown();
-
4.4. 确保生产者和消费者的一致性
- 生产者:确保生产者在发送消息时使用相同的 MessageQueueSelector 和 arg 参数,以保证消息按预期的顺序发送
- 消费者:确保消费者在消费消息时使用相同的顺序策略。例如,使用 DefaultMQPushConsumer 并设置
ConsumeOrderly
模式:
// 创建顺序消息消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); // 设置顺序消费 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println("Receive message: " + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();
-
4.5. 处理消息积压
- 生产者:如果消息积压严重,可以考虑增加生产者的并发度或优化消息的发送逻辑。
- 消费者:如果消费者处理速度较慢,可以考虑增加消费者的并发度或优化消息的处理逻辑。
-
4.6. 确保网络和服务器的稳定性
- 网络:确保生产者和消费者之间的网络稳定,避免因网络问题导致消息丢失或乱序
- 服务器:确保 RocketMQ 服务器的稳定性和高性能,定期进行维护和监控
-
4.7. 监控和日志
- 监控:使用 RocketMQ 提供的监控工具(如 RocketMQ Console)监控消息的发送和消费情况,及时发现和解决问题
- 日志:开启详细的日志记录,便于排查问题和调试
-
4.8. 容错和重试机制
- 生产者:设置合理的重试机制,确保消息发送失败时能够自动重试
- 消费者:处理消息失败时,可以设置重试策略,确保消息最终被成功处理
-
4.9. 避免长时间阻塞
- 生产者:避免在发送消息时阻塞太久,可以设置超时时间
- 消费者:避免在处理消息时阻塞太久,可以设置合理的超时时间和处理逻辑
-
4.10. 测试和验证
- 测试:在正式上线前,进行充分的测试,确保顺序消息的发送和消费符合预期
- 验证:在实际使用中,定期验证消息的顺序性和完整性,确保系统的稳定性和可靠性
-
通过遵循以上注意事项,可以确保在使用 RocketMQ 发送顺序消息时,消息的顺序性和可靠性得到保障
Consumer
1 )消费过程要做到幂等(即消费端去重)
- RocketMQ 无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务局面去重,有以下几种去重方式
- 1.1. 将消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等,消费之前判断是否在Db 或 Tair(全局 KV 存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId 一定是全局唯一标识符,但是可能会存在同样的消息有两个不同 msgId 的情况(有多种原因),这种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重 - 1.2 使用业务局面的状态机去重
2 )消费失败处理方式
2.1. 重试机制
- RocketMQ 默认提供了消息重试机制,当消费者消费消息失败时,消息会被重新投递给消费者进行重试
重试队列
- 重试队列:消费失败的消息会被放入一个特殊的重试队列中,这个队列的名称格式为 %RETRY%<消费组名称>
- 重试次数:默认情况下,每条消息最多可以重试 16 次。每次重试的间隔时间会逐渐增加,以减少对系统的冲击
重试时间间隔
- 延时级别:RocketMQ 提供了多个延时级别,每个级别对应不同的重试间隔时间
- 例如:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
- 重试次数超过 16 次:如果消息在重试 16 次后仍然失败,消息将被放入死信队列
死信队列
-
死信队列:当消息重试次数超过最大重试次数后,消息会被放入死信队列中,死信队列的名称格式为 %DLQ%<消费组名称>
-
处理死信消息:死信队列中的消息不会再被正常消费,需要人工介入或通过其他手段进行处理
- 例如,可以使用后台任务定期扫描死信队列,对消息进行重新处理或丢弃
消费失败的返回状态
- 在消费消息时,可以通过返回不同的状态来控制消息的处理方式:
- ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消费成功,消息将被确认并从队列中移除。
- ConsumeConcurrentlyStatus.RECONSUME_LATER:消费失败,消息将被重新投递给消费者进行重试。
- null:消费失败,消息将被重新投递给消费者进行重试。
- 抛出异常:消费失败,消息将被重新投递给消费者进行重试。
手动处理消费失败
- 在某些情况下,你可能需要手动处理消费失败的情况,例如:
- 记录日志:记录消费失败的日志,便于后续排查问题。
- 报警:发送报警通知,提醒相关人员处理消费失败的问题。
- 手动重试:在代码中实现手动重试逻辑,确保消息最终被成功处理。
配置重试次数
- 可以通过配置项来调整消息的重试次数:
consumer.setMaxReconsumeTimes(20);
- 这个配置项表示每条消息最多可以重试 20 次。如果重试次数超过 20 次,消息将被放入死信队列
幂等性处理
- 为了避免消息重复消费带来的问题,需要在业务逻辑中实现幂等性处理
- 例如,可以使用消息的唯一标识(如 MessageId)来确保消息的幂等性
示例代码
以下是一个简单的示例,展示了如何处理消费失败的情况:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
// 处理消息的业务逻辑
// 如果处理失败,抛出异常
if (/* 业务逻辑处理失败 */) {
throw new RuntimeException("消费失败");
}
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消费失败,消息将被重新投递给消费者进行重试
System.err.println("消费失败: " + e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
总结
- 通过合理配置 RocketMQ 的重试机制、死信队列和消费失败的返回状态,可以有效地处理消费失败的情况,确保消息的可靠性和系统的稳定性
- 同时,实现幂等性处理和手动处理消费失败的逻辑,可以进一步提高系统的健壮性
3 ) 消费速度慢处理方式
- 当使用 RocketMQ 时,如果遇到消费速度慢的问题,可以通过多种方式来优化和解决
- 以下是一些常见的处理方式:
3.1. 增加消费者实例
- 水平扩展:增加消费者的数量,可以显著提高消费速度。每个消费者实例可以并行处理消息,从而提高整体的消费能力。
- 消费者组:确保所有消费者实例属于同一个消费者组,这样 RocketMQ 会将消息均匀分配给各个消费者实例。
3.2. 优化消息处理逻辑
- 减少处理时间:检查消息处理逻辑,优化耗时的操作,例如数据库操作、网络请求等。
- 异步处理:将耗时的操作异步化,使用多线程或异步框架(如 CompletableFuture)来处理消息。
3.3. 批量消费
- 批量消费:启用批量消费模式,一次消费多条消息,减少每次消费的开销。可以通过设置 MessageListenerConcurrently 的 consumeMessage 方法中的 msgs 参数来实现批量消费。
3.4. 调整消费者配置
- 拉取消息频率:
- 调整 pullBatchSize 和 consumeMessageBatchMaxSize 参数,控制每次拉取和消费的消息数量。
- 消费线程池
- 增加消费线程池的大小,提高并发处理能力
- 可以通过 consumer.setMessageModel(MessageModel.CLUSTERING) 和
- consumer.setConsumeThreadMin/max 来配置。
3.5. 优化网络和服务器性能
- 网络优化:确保消费者和 RocketMQ 服务器之间的网络连接稳定,减少网络延迟。
- 服务器性能:提升 RocketMQ 服务器的硬件性能,例如增加 CPU、内存和磁盘 I/O。
3.6. 监控和日志
- 监控:使用 RocketMQ 提供的监控工具(如 RocketMQ Console)监控消息的消费情况,及时发现和解决问题。
- 日志:开启详细的日志记录,便于排查消费慢的原因。
3.7. 消息堆积处理
- 消息堆积:如果消息堆积严重,可以考虑临时增加消费者的数量,快速消耗堆积的消息。
- 优先级处理:如果有紧急消息,可以设置消息的优先级,确保紧急消息优先被消费。
3.8. 负载均衡
- 负载均衡:确保消息均匀分配给各个消费者实例,避免某个消费者实例过载。
- 分区:合理划分消息分区,确保每个分区的消息量均衡。
示例代码
以下是一个示例,展示了如何优化消费者的配置和处理逻辑:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OptimizedConsumerExample {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("TopicTest", "*");
// 设置消费线程池大小
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
// 设置每次消费的消息数量
consumer.setConsumeMessageBatchMaxSize(10);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
// 处理消息的业务逻辑
// 优化处理逻辑,减少处理时间
processMessage(msg);
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消费失败,消息将被重新投递给消费者进行重试
System.err.println("消费失败: " + e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
private void processMessage(MessageExt msg) {
// 优化消息处理逻辑
// 例如,使用异步处理
// CompletableFuture.runAsync(() -> {
// // 异步处理逻辑
// });
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
总结
- 通过增加消费者实例、优化消息处理逻辑、启用批量消费、调整消费者配置、优化网络和服务器性能、监控和日志、处理消息堆积以及负载均衡等多种方式,
- 可以有效提高 RocketMQ 的消费速度根据具体情况选择合适的方法,确保系统的高效运行
4 ) 提高消费并行度
- 绝大部分消息消费行为属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降,如图所示,呈现抛物线形式。所以应用必须要设置合理的并行度。CPU 密集型应用除外
- 修改消费并行度方法
- a) 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
- 超过订阅队列数的 Consumer 实例无效
- 可以通过加机器,或者在已有机器启动多个进程的方式
- b) 提高单个 Consumer 的消费幵行线程,通过修改以下参数
consumeThreadMin
consumeThreadMax
- a) 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
批量方式消费
- 某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 秒钟,一次处理 10 个订单可能也只耗时 2 秒钟,这样即可大幅度提高消费的吞吏量,通过设置 consumer 的 consumeMessageBatchMaxSize 这个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那每次消费的消息数小于等于 N
跳过非重要消息
-
发生消息堆积时,如果消费速度一直追不上发送速度
-
可以选择丢弃不重要的消息,如何判断消费发生了堆积?
public ConsumeConcurrentlyStatus consumeMessage(// List<MessageExt> msgs, // ConsumeConcurrentlyContext context) { long offset = msgs.get(0).getQueueOffset(); String maxOffset = // msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO 消息堆积情况的特殊处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO 正常消费过程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
-
如以上代码所示,当某个队列的消息数堆积到 100000 条以上,则尝试丢弃部分或全部消息
-
这样就可以快速追上发送消息的速度
优化每条消息消费过程
举例如下,某条消息的消费过程如下
- 根据消息从 DB 查询数据 1
- 根据消息从 DB 查询数据 2
- 复杂的业务计算
- 向 DB 插入数据 3
- 向 DB 插入数据 4
- 这条消息的消费过程与 DB 交互了 4 次,如果按照每次 5ms 计算,那举总共耗时 20ms,假设业务计算耗时 5ms,那总过耗时 25ms,如果能把 4 次 DB 交互优化为 2 次,那举总耗时就可以优化到 15ms,也就是说总体性能提高了 40%
- 对于 Mysql 等 DB,如果部署在磁盘,那与 DB 进行交互,如果数据没有命中 cache,每次交互的 RT 会直线上升,如果采用 SSD,则 RT 上升趋势要明显好亍磁盘。个别应用可能会遇到这种情况:
- 在线下压测消费过程中,db 表现非常好,每次 RT 都很短,但是上线运行一段时间,RT 就会发长,消费吞吐量直线下降
- 主要原因是线下压测时间过短,线上运行一段时间后,cache 命中率下降,那 RT 就会增加
- 建议在线下压测时,要测试足够长时间,尽可能模拟线上环境,压测过程中,数据的分布也很重要,数据不同,可能 cache 的命中率也会完全不同
消费打印日志
如果消息量较少,建议在消费入口方法打印消息,方便后续排查问题
public ConsumeConcurrentlyStatus consumeMessage(//
List<MessageExt> msgs, //
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每条消息消费耗时,那在排查消费慢等线上问题时,会更方便