消息队列(MQ)核心知识点(持续更新中)
- RabbitMQ
- RabbitMQ概念及架构
- RabbitMQ消息丢失的场景及解决方案
- RabbitMQ重复消费场景及解决方案
- RabbitMQ消息堆积场景及解决方案
- RabbitMQ集群
- RocketMQ
- RocketMQ概念及架构
RabbitMQ
RabbitMQ概念及架构
参考文章,RabbitMQ详解:https://blog.csdn.net/qq_36763419/article/details/122023216
RabbitMQ消息丢失的场景及解决方案
场景1:生产者(Producer)将消息投递到 Broker 时,因为网络波动导致消息不能正常投递到Broker?
解决方案:针对生产者可以开启(Confirm)消息发布确认机制
。
场景2:交换机(Exchange)将消息转发到队列(Queue)时,Broker服务出现宕机?
解决方案:对RabbitMQ进行消息持久化
(注意事项:消息持久化影响性能)。如下:
- Exchange持久化
- Queue持久化
- 消息持久化
场景3:消费者消费队列中的消息时,消费者拿到消息开始消费后,出现消费异常或者消费者宕机,默认情况下自动确认会导致消息丢失?
解决方案:开启手动ACK
,由消费者手动确认消费。
RabbitMQ重复消费场景及解决方案
场景1:生产者推送多个一样的消息。如,接口调用时重复提交,没有做好接口幂等性?
解决方案:设置接口幂等性,防止接口重复提交
。
场景2:消费者完成消费后,准备发送ACK确认消费时,RabbitMQ出现宕机,导致MQ没有及时将消息出队,服务重启后,会再次推送该消息,导致重复消费?
场景3:消费者完成消费后,来不及发送ACK确认消费,消费者出现宕机,服务重启后,会再次收到队列中的消息进行消费。
场景 2、3 解决方案:设置消息幂等性处理(如:Redis为例,基于Redis实现锁续命)
@Service
@Slf4j
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public class RabbitMQConsumerService {
@Autowired
private RedisTemplate redisTemplate;
/*消费端监听队列(普通队列)*/
@RabbitListener(queues = {"test.queue"})
public void receiveMsg(JSONObject msgJson, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("消费者收到消息: {}", msgJson.toJSONString());
String messageId = msgJson.getString("messageId");
//锁
String lockKey = "lock:msgId:" + messageId;
//开始消费
Duration duration = Duration.ofSeconds(10);
if (redisTemplate.opsForValue().setIfAbsent(lockKey, 0, duration)) {
log.info("开始消费消息");
//设置锁续命
LockLeaseRenewal lockLeaseRenewal = new LockLeaseRenewal(redisTemplate, lockKey, duration);
lockLeaseRenewal.startRenewal();
try {
//TODO: 业务处理
//业务逻辑处理完成
redisTemplate.opsForValue().set(lockKey, 1);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息消费失败: {}", e.getMessage());
channel.basicNack(deliveryTag, false, false);
} finally {
lockLeaseRenewal.stopRenewal();
redisTemplate.delete(lockKey);
}
} else if (Objects.equals(redisTemplate.opsForValue().get(lockKey), 1)) {
log.info("消费完成,手动确认:{}", messageId);
channel.basicAck(deliveryTag, false);
redisTemplate.delete(lockKey);
}
}
private String getLockKey(String msgId) {
return "lock:msgId:" + msgId;
}
}
@Slf4j
class LockLeaseRenewal {
private final RedisTemplate redisTemplate;
private final String lockKey;
private final Duration renewalInterval;
private Timer timer;
public LockLeaseRenewal(RedisTemplate redisTemplate, String lockKey, Duration renewalInterval) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.renewalInterval = renewalInterval;
this.timer = new Timer();
}
//线程开始执行续命
public void startRenewal() {
if (redisTemplate.hasKey(lockKey)) {
TimerTask task = new TimerTask() {
@Override
public void run() {
redisTemplate.expire(lockKey, renewalInterval); // 续命
log.info("锁续命: " + lockKey);
}
};
timer.schedule(task, 0, renewalInterval.toMillis() / 2); // 定期续命
}
}
//停止锁续命
public void stopRenewal() {
timer.cancel();
timer.purge();
}
}
RabbitMQ消息堆积场景及解决方案
场景1:生产者生产消息的速度远大于消费者的消费速度;
解决方案:增加消费节点
,平衡生产和消费速度,确保消息及时处理。
场景2:消费者消费异常,消费耗时过长?
解决方案:在消费者出现故障时,设置消息的重试机制
,确保消息不会因单个消费者故障而持续堆积。同时可以设置消息消费次数阈值
,超过阈值将其入库,并将消息出队列,人工分析处理这些失败的消息,以便于消息补偿。
场景3:RabbitMQ中队列的大小设置过小,消费者消费速度一旦跟不上生产者生产速度,就会出现队列溢出?
解决方案:适当增大 queue 的大小。
RabbitMQ集群
参考文章:https://blog.csdn.net/qq_42108331/article/details/131842887
1、普通集群
2、镜像集群
3、仲裁队列
RocketMQ
RocketMQ概念及架构
参考 RocketMQ 官方文档:https://rocketmq.apache.org/zh/docs/
注意
:4.x 与 5.x 版本区别