springboot 整合 rabbitMQ(1)-CSDN博客
上期说了rabbitMQ的基础用法(普通队列模式)
这期学习一下如何防止消息重复消费和进阶用法(订阅者模式)
目录
重复消费问题
导致 RabbitMQ 重复消费问题的原因:
解决思路
代码实现(这里在上一期的代码上进行修改)
生产者:
消费者:
订阅者模式:
是什么:
代码实现:
生产者:
消费者:
效果:
重复消费问题
导致 RabbitMQ 重复消费问题的原因:
-
网络问题: 在分布式系统中,网络通信是不稳定的因素之一。如果生产者发送一条消息到 RabbitMQ 但尚未收到确认(acknowledgment),可能会导致 RabbitMQ 认为消息未被正确处理并重新发送。
-
消费者故障: 消费者在处理消息时可能会发生故障,例如应用程序崩溃或因某种原因终止。如果 RabbitMQ 未收到消费者的确认消息,它可能会认为消息未被消费并重新发送。
-
网络分区: 当分布式系统中的网络发生分区(网络隔离)时,可能会导致消息在不同部分之间重复传递。这是因为每个分区可能都会独立处理消息。
-
消息重复传递策略: RabbitMQ 提供了不同的消息传递策略,例如“至少一次传递”和“最多一次传递”。这些策略可能会导致消息的重复传递,尤其在异常情况下。
-
消费者超时设置不当: 如果消费者设置了较长的超时时间,在消费者未确认消息的情况下,RabbitMQ 可能会认为消息未被处理并重新发送。
解决思路
- 生产者发送消息时携带一个唯一的id
- 消费者每次消费前先判断一下在redis中是否在id,不存在就消费,消费完之后就把id存储到redis中
代码实现(这里在上一期的代码上进行修改)
生产者:
消费者:
@Autowired
private RedisTemplate redisTemplate;
private static final String QUEUE_NAME="login_queue";
@RabbitListener(queuesToDeclare = @Queue(QUEUE_NAME))
public void test01(User getUser, Message message, Channel channel) throws IOException {
//获取消息唯一id
String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//判断消息是否存在
if(redisTemplate.opsForHash().hasKey(QUEUE_NAME,messageId)){
//存在表示该消息已被消费,return
return;
}
//将该消息id存入redis 第三个参数随便字符串,不要填对象,没法序列化
redisTemplate.opsForHash().put(QUEUE_NAME,messageId,"a");
//发送消息到邮箱
sendMail(getUser);
System.out.println("消费成功");
//手动确认消息 multiple 是一个布尔值,它决定了是否确认多个消息
// false:只确认指定的单个消息 true:确认所有未确认的消息,这些消息的deliveryTag 小于或等于指定的 deliveryTag
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//消息回滚 deliveryTag:指定要拒绝的消息 false:表示只拒绝指定的这条消息 true:表示将消息重新入队,让其他消费者有机会再次处理这条消息。
channel.basicNack(deliveryTag,false,true);
System.out.println("消息消费异常");
}
}
订阅者模式:
是什么:
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图取自于官方网站(RabbitMQ)的发布/订阅模式的图例
那么要使交换机接受到消息后转发给队列,就需要将队列绑定给交换机,可以写一个配置类,项目启动,自动将队列绑定给交换机
代码实现:
/**
* @ClassName QueueConfig
* @Description 声明队列和交换机 并将队列和交换机绑定
* @Author
* @Date 2024/10/9 16:28
*/
@Configuration
public class QueueConfig {
//订阅模式交换机名称
public static String EXCHANGENAME="fanout_exchange";
public static String TESTQUEUE01="test_01";
public static String TESTQUEUE02="test_02";
@Bean
public Queue queue01(){
return new Queue(TESTQUEUE01);
}
@Bean
public Queue queue02(){
return new Queue(TESTQUEUE02);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGENAME);
}
@Bean
public Binding bingingQueue01(Queue queue01,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue01).to(fanoutExchange);
}
@Bean
public Binding bingingQueue02(Queue queue02,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue02).to(fanoutExchange);
}
}
生产者:
只需要交换机的名字和传入的数据,队列名不需要填,因为交换机已经绑定队列
消费者:
效果:
发送一条数据,两个消费者消费