RabbitMQ 如何保证消息可靠性

RabbitMQ 如何保证消息可靠性

    • 1. 保证生产者可靠
      • 1.1 生产者确认机制
      • 1.2 实现生产者确认
        • 1.2.1 开启生产者确认机制
        • 1.2.2 定义ReturnCallback
        • 1.3.3.定义ConfirmCallback
      • 1.3 注意
    • 2. 保证MQ可靠
      • 2.1 数据持久化
        • 2.1.1 交换机持久化
        • 2.1.2.队列持久化
        • 2.1.3 消息持久化
        • 2.1.4 注意
    • 3. 保证消费者可靠
      • 3.1 消费者确认机制
      • 3.2 失败重试机制
      • 3.3 失败处理策略
    • 4. 总结

该文章用到的项目代码与MQ请看下面两篇文章:

消息队列 - RabbitMQ

SpringAMQP 快速入门


要保证消息的可靠性先看一下哪些情况会破坏消息的可靠性:

  • 发送消息时丢失:
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

1. 保证生产者可靠

1.1 生产者确认机制

生产者确认机制是 RabbitMQ 中的一种机制,用于确保消息成功发送到 RabbitMQ 服务器。这个机制可以防止消息在生产者发送后因为某种原因未能成功到达消息队列,从而提高消息的可靠性。

生产者确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ACK的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

1.2 实现生产者确认

1.2.1 开启生产者确认机制

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
1.2.2 定义ReturnCallback

ReturnCallback 是 RabbitMQ 提供的回调接口,主要用于处理消息无法路由到队列时的返回情况。当消息发送到交换机,但交换机无法将消息路由到任何队列时,就会触发 ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@Configuration
public class MqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
1.3.3.定义ConfirmCallback

ConfirmCallback 是 RabbitMQ 提供的回调接口,用于处理消息是否成功发送到 RabbitMQ 服务器的确认。当消息成功到达服务器时,触发 ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

在这里插入图片描述

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

在这里插入图片描述

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Test
    public void testConfirm1() {
        // 1.创建CorrelationData 这里没传 id ,因为 CorrelationData 无参构造函数里面会创建id
        CorrelationData cd = new CorrelationData();
        // 2.给Future添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // 2.1.Future发生异常时的处理逻辑,基本不会触发
                log.error("send message fail", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
                if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                    log.debug("发送消息成功,收到 ack!");
                }else{ // result.getReason(),String类型,返回nack时的异常描述
                    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("test.topic", "aaa", "hello", cd);
    }

执行结果如下

在这里插入图片描述

消息发送成功收到了ACK,但是没有对应的Routing Key触发了return callback

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

1.3 注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理 nack 就可以了。

2. 保证MQ可靠

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.1 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
2.1.1 交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

在这里插入图片描述

设置为Durable就是持久化模式,Transient就是临时模式。

2.1.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

在这里插入图片描述

2.1.3 消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

delivery-mode:

  • 持久性标志,用于指定消息的持久性。值为 1 表示持久性消息,值为 2 表示非持久性消息。

在这里插入图片描述

2.1.4 注意

在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。


3. 保证消费者可靠

RabbitMQ是阅后即焚机制,一般情况下RabbitMQ确认消息投递到消费者后会立刻删除,如果消费者未正常处理这条消息,该消息就会丢失。

设想这样的场景:

  1. RabbitMQ投递消息给消费者
  2. 消费者获取消息后,返回ACK给RabbitMQ
  3. RabbitMQ删除消息
  4. 消费者宕机,消息尚未处理 or 处理过程中发生异常

这样,消息就丢失了。

3.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenTestQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述

放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除

我们将异常改为RuntimeException类型:

    @RabbitListener(queues = "test.queue1")
    public void listenTestQueueMessage(String msg) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        if (true) {
            throw new RuntimeException("故意的");
        }
        log.info("消息处理完成");
    }

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态)

放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态然后再次投递变为unacked状态,并且没有被RabbitMQ删除:

在这里插入图片描述

3.2 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.3 失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1.在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2.定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
// 条件注解 只有在yml文件中 spring.rabbitmq.listener.simple.retry.enabled 属性的值为 true 才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfiguration {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

测试消息发送,消费者重试3次后就将消息转发到了指定的交换机

在这里插入图片描述

在这里插入图片描述

4. 总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置 MessageRecoverer 为 RepublishMessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/256047.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

引领半导体划片机行业,实现钛酸锶基片切割的卓越效能

在当今快速发展的半导体行业中&#xff0c;博捷芯以其卓越的技术实力和精准的行业应用&#xff0c;脱颖而出&#xff0c;再次引领行业潮流。这次&#xff0c;他们将先进的BJX3356划片机技术应用于钛酸锶基片的切割&#xff0c;为半导体制造行业的进一步发展提供了强大的技术支持…

mysql中的server_id到底有什么用?详解mysql配置中的server_id配置项

当我们搭建MySQL集群时&#xff0c;自然需要完成数据库的主从同步来保证数据一致性。而主从同步的方式也分很多种&#xff0c;一主多从、链式主从、多主多从&#xff0c;根据你的需要来进行设置。但只要你需要主从同步&#xff0c;就一定要注意server-id的配置&#xff0c;否则…

YOLOv5改进 | TripletAttention三重注意力机制(附代码+机制原理+添加教程)

一、本文介绍 本文给大家带来的改进是Triplet Attention三重注意力机制。这个机制&#xff0c;它通过三个不同的视角来分析输入的数据&#xff0c;就好比三个人从不同的角度来观察同一幅画&#xff0c;然后共同决定哪些部分最值得注意。三重注意力机制的主要思想是在网络中引入…

Gitlab仓库推送到Gitee仓库的一种思路

文章目录 Gitlab仓库推送到Gitee仓库的一种思路1、创建Gitee的ssh公钥&#xff08;默认已有Gitlab的ssh公钥&#xff09;2、添加Gitlab远程仓库地址3、添加Gitee远程仓库地址4、拉取Gitlab远程仓库指定分支到本地仓库指定分支&#xff08;以test分支为例&#xff09;5、推送本地…

Elasticsearch的批量bulk 提交 写入的方式会有顺序问题吗?

Elasticsearch的分布式特性可能会导致写入操作的执行顺序与提交顺序稍有不同。在分布式环境中,Elasticsearch将数据分散到不同的节点上进行存储和处理,因此写入操作的执行顺序可能会受到网络延迟、负载均衡等因素的影响。 根源在于ES的分布式架构。如上图所示,客户端的命令首…

蓝桥杯嵌入式——KEY

CUBE里将这几个引脚配置成GPIO输入模式&#xff0c;再同时选中&#xff0c;配置成上拉&#xff0c;如下图&#xff1a; 同时配置定时器&#xff0c;定时10ms&#xff0c;每10ms扫描一次按键&#xff0c;计算公式&#xff1a;80 000 000 / 80 / 10000 100HZ 10ms&#xff0c;配…

C语言-第十六周课堂总结-数组

引用 先定义&#xff0c;后使用只能引用单个的数组元素&#xff0c;不能一次引用整个数组 int a[10]; 10个数组元素&#xff1a;a[0]、a[1]、…、a[9] 数组元素&#xff1a;数组名[下标] 下标&#xff1a;整形表达式 下标取值范围&#xff1a;[0&#xff0c;数组长度-1]数组元…

解决win11杀毒(不能安装破解软件的问题)

1、下载火绒APP&#xff0c;打开火绒APP软件 2、点击菜单&#xff0c;选择安全设置 3、选择病毒防护&#xff0c;修改病毒处理方式为询问我 4、这样在解压激活的软件就不会被windows的杀毒软件自动删除了 5、问题解决了就点击三连吧

JVM虚拟机系统性学习-JVM调优实战之内存溢出、高并发场景调优

调优实战-内存溢出的定位与分析 首先&#xff0c;对于以下代码如果造成内存溢出该如何进行定位呢&#xff1f;通过 jmap 与 MAT 工具进行定位分析 代码如下&#xff1a; public class TestJvmOutOfMemory {public static void main(String[] args) {List<Object> list…

Linux CentOS7安装harbor

1、下载harbor离线包 wget https://github.com/goharbor/harbor/releases/download/v2.4.2/harbor-offline-installer-v2.4.2.tgz 2、解压安装 tar -zxvf harbor-offline-installer-v2.4.2.tgz #解压离线安装包 3、配置harbor cd harbor #切换到harbor目录下…

100套风景+人物动物AI绘画关键词

1、10美元计划 Midjourney的10美元计划是最基础的计划&#xff0c;每月可以生成200张图&#xff0c;然而没有fast模式&#xff0c;也无法免排队生成图。相对于30美元和60美元计划&#xff0c;这个计划更适合个人用户或小型团队使用&#xff0c;仅用于少量图像的生成。如果你只…

Ubuntu 20.04 prometheus prometheus-process-exporter

prometheus-process-exporter 监控系统架构方案 https://blog.csdn.net/weixin_45801289/article/details/126922395 sudo apt install prometheus-process-exporter prometheus-process-exporter_0.4.0ds-1_amd64.deb service prometheus-process-exporter status netstat …

在linux上基于shell自动部署Java项目

一&#xff0c;安装git yum list git 列出git安装包 yum install git 在线安装git 使用 git -varsion 查看是否安装成功 安装成功 二&#xff0c; Git克隆代码 git clone 远程仓库地址 三&#xff0c;创建shell脚本 touch shell.sh shell脚本 #!/bin/sh echo echo 自动…

PowerShell实战(一)PowerShell使用ImportExcel模块轻松操作Excel

目录 一、介绍 二、安装模块 三、操作示例 1、导出excel 2、读取Excel数据 3、导出包含图表的Excel 4、导出包含汇总列和图表的Excel 一、介绍 ImportExcel模块可以理解为基于PowerShell环境操作Excel的强大类库&#xff0c;使用它可以在 Windows、Linux 和 Mac 上都可以使用。…

【Apache-StreamPark】Flink 开发利器 StreamPark 的介绍、安装、使用

【Apache-StreamPark】Flink 开发利器 StreamPark 的介绍、安装、使用 1&#xff09;框架介绍与引入1.1.&#x1f680; 什么是 StreamPark1.2.&#x1f389; Features1.3.&#x1f3f3;‍&#x1f308; 组成部分1.4.引入 StreamPark 2&#xff09;安装部署2.1.环境要求2.2.Hado…

linux系统和网络(一):文件IO

本文主要探讨linux系统编程的文件IO相关知识。 文件IO 文件存在块设备中为静态文件,open打开文件,内核在进程中建立打开文件的数据结构在内存中用于记录文件的文件参数,开辟一段内存用于存放内容,将静态文件转为动态文件 打开文件后对文件的读写操作都为对动态…

机器学习——支持向量机

目录 一、基于最大间隔分隔数据 二、寻找最大间隔 1. 最大间隔 2. 拉格朗日乘子法 3. 对偶问题 三、SMO高效优化算法 四、软间隔 五、SMO算法实现 1. 简化版SMO算法 2. 完整版SMO算法 3. 可视化决策结果 六、核函数 1. 线性不可分——高维可分 2. 核函数 …

Apache Flink(十五):Flink任务提交模式

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

品牌价格管控的有效方法

当品牌渠道价格混乱&#xff0c;出现低价、乱价、窜货时&#xff0c;则需要进行价格的管控&#xff0c;包括使链接改价、下架、被删除等&#xff0c;如果放任低价链接不管&#xff0c;会使渠道越来越乱&#xff0c;当更多的低价出现时&#xff0c;品牌价值也会受影响。 价格管控…

Axure之交互与情节与一些实例

目录 一.交互与情节简介 二.ERP登录页到主页的跳转 三.ERP的菜单跳转到各个页面的跳转 四.省市联动 五.手机下拉加载 今天就到这里了&#xff0c;希望帮到你哦&#xff01;&#xff01;&#xff01; 一.交互与情节简介 "交互"通常指的是人与人、人与计算机或物体…