RabbitMQ如何保证可靠

0. RabbitMQ不可靠原因

在这里插入图片描述

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

1. 发送者的可靠性

1.1 生产者重试机制

解决生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

1.2 生产者确认机制

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

开启Confirm和Return

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般使用correlated,回调机制。

定义ReturnCallback

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

定义ConfirmCallback

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("harry.direct", "q", "hello", cd);
}

开启生产者确认比较消耗MQ性能,一般不建议开启。而且触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

2. MQ的可靠性

2.1 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
    可以在控制台界面设置。
    设置为Durable就是持久化模式,Transient就是临时模式。
    设置为Durable就是持久化模式,Transient就是临时模式。

2.2 LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut.
PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
代码方式

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

3. 消费者的可靠性

3.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

3.2 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

3.3 失败处理策略

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

3.4 业务幂等性

3.4.1 唯一消息ID
  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
3.4.2 业务判断

例如处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

3.5 兜底方案

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/390474.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

idea里微服务依赖在maven能install但不能启动

场景&#xff1a;多个微服务相互依赖&#xff0c;install都没问题&#xff0c;jar包都是正常的&#xff0c;就连jar都能启动&#xff0c;为什么在idea里面项目就是不能启动呢&#xff0c;我是懵逼的 所以解决办法就是&#xff1a; 在设置的编译器里面虚拟机选项添加 -Djps.tr…

第五节 zookeeper集群与分布式锁_2

1.分布式锁概述 1.1 什么是分布式锁 1&#xff09;要介绍分布式锁&#xff0c;首先要提到与分布式锁相对应的是线程锁。 线程锁&#xff1a;主要用来给方法、代码块加锁。当某个方法或代码使用锁&#xff0c;在同一时刻仅有一个线程执行该方法或该代码段。 线程锁只在同一J…

LEETCODE 164. 破解闯关密码

class Solution { public:string crackPassword(vector<int>& password) {vector<string> password_str;for(int i0;i<password.size();i){password_str.push_back(to_string(password[i]));}//希尔排序int gappassword.size()/2;while(gap>0){for(int i…

命令执行讲解和函数

命令执行漏洞简介 命令执行漏洞产生原因 应用未对用户输入做严格得检查过滤&#xff0c;导致用户输入得参数被当成命令来执行 命令执行漏洞的危害 1.继承Web服务程序的权限去执行系统命会或读写文件 2.反弹shell&#xff0c;获得目标服务器的权限 3.进一步内网渗透 远程代…

python----输入输出算数运算

1.格式化输出 如果我们直接打印输出&#xff0c;就是输出变量的值&#xff0c;例如&#xff1a; 如果我们想打印a10就需要格式化字符串&#xff0c;就是使用f进行格式化&#xff0c;如图所示&#xff1b; 2.控制台输入 input执行的时候&#xff0c;就会等待用户进行输入&…

Qlik Sense : 条形图

条形图 “条形图适合比较多个值。维度轴显示所比较的类别条目&#xff0c;度量轴显示每个类别条目的值。” Qlik Sense中的条形图是一种数据可视化工具&#xff0c;用于展示不同类别或维度之间的比较。它通过水平或垂直的条形表示数据&#xff0c;并根据数值的大小进行排序。…

RK3568平台开发系列讲解(存储篇)文件描述符相关系统调用实现

🚀返回专栏总目录 文章目录 一、open 系统调用二、close 系统调用沉淀、分享、成长,让自己和他人都能有所收获!😄 一、open 系统调用 open()系统调用会分配新的文件句柄(file description),用来维护与打开文件相关的元信息(如偏移量、路径、操作方法等),并会给进程…

微信小程序框架阐述

目录 一、框架 响应的数据绑定 页面管理 基础组件 丰富的 API 二、逻辑层 App Service 小程序的生命周期 注册页面 使用 Page 构造器注册页面 在页面中使用 behaviors 使用 Component 构造器构造页面 页面的生命周期 页面路由 页面栈 路由方式 注意事项 模块化…

Git 初学

目录 一、需求的产生 二、版本控制系统理解 1. 认识版本控制系统 2. 版本控制系统分类 &#xff08;1&#xff09;集中式版本控制系统 缺点&#xff1a; &#xff08;2&#xff09;分布式版本控制系统 三、初识 git 四、git 的使用 例&#xff1a;将 “ OLED文件夹 ”…

单部10层电梯控制系列之UDT数据类型的建立(SCL代码)

这篇博客开始介绍单部10层电梯的完整控制程序编写过程&#xff0c;编程语言&#xff1a;SCL&#xff0c;控制器型号&#xff1a;S7-1200PLC。开篇博客我们介绍电梯控制用到的所有UDT数据类型。在学习本篇博客之前大家可以参考下面文章&#xff0c;了解博途PLC里的UDT数据类型是…

紫微斗数双星组合:天机太阴在寅申

文章目录 前言内容总结 前言 紫微斗数双星组合&#xff1a;天机太阴在寅申 内容 紫微斗数双星组合&#xff1a;天机太阴在寅申 性格分析 天机星与太阴星同坐寅申二宫守命的男性&#xff0c;多浪漫&#xff0c;易与女性接近&#xff0c;温柔体贴&#xff0c;懂得女人的心理。…

.NET Core WebAPI中使用Log4net记录日志

一、安装NuGet包 二、添加配置 // log4net日志builder.Logging.AddLog4Net("CfgFile/log4net.config");三、配置log4net.config文件 <?xml version"1.0" encoding"utf-8"?> <log4net><!-- Define some output appenders -->…

上位机图像处理和嵌入式模块部署(图像项目处理过程)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 对于一般的图像项目来说&#xff0c;图像处理只是工作当中的一部分。在整个项目处理的过程中有很多的内容需要处理&#xff0c;比如说了解需求、评…

【DDD】学习笔记-聚合之间的关系

聚合之间的关系 无论聚合是否表达了领域概念的完整性&#xff0c;我们都要清醒地认识到这种所谓的“完整”必然是相对的。如果说在领域分析模型中&#xff0c;每个体现了领域概念的类是模型的最小单元&#xff0c;那么在领域设计模型中&#xff0c;聚合才是模型的最小单元。我…

枚举(C/C++)

没有什么成套的算法&#xff0c;直接上例题&#xff01;&#xff01; 例题1&#xff1a;赢球票 代码&#xff1a; #include <bits/stdc.h> using namespace std;const int maxn 105; int n,num1[maxn],num2[maxn],cnt,cnt1,sum,ans;int check1()//检查剩余个数 {cnt1…

ZYNQ:PL-CAN总线功能应用

流程背景 前期基本实现PS端的CAN总线功能&#xff0c;现阶段的主要目的是实现PL端的CAN总线功能&#xff0c;需要采用CAN IP。 PL系统搭建 PL外设时钟源 搭建完vivado系统后&#xff0c;需要在sdk编程。但是在配置PL&#xff0d;CAN时&#xff0c;意识到CAN时钟值不清楚&…

TIM输出比较 P2

D触发器&#xff1f; 一、输出比较 二、PWM 1、简介 2、结构 三、外部设备 1.舵机 2.直流电机 我的理解是xO1 xIN1 & PWMx; xO2 xIN2 & PWMx;引入PWMx可以更方便的控制特定的电路。 四、函数学习 /*****单独设置输出比较极性*****/ void TIM_OC1PolarityConfig(…

php基础学习之可变函数(web渗透测试关键字绕过rce和回调函数)

可变函数 看可变函数的知识点之前&#xff0c;蒟蒻博主建议你先去看看php的可变变量&#xff0c;会更加方便理解&#xff0c;在本篇博客中的第五块知识点->php基础学习之变量-CSDN博客 描述 当一个变量所保存的值刚好是一个函数的名字&#xff08;由函数命名规则可知该值必…

JavaScript中什么是事件委托

JavaScript 中的事件委托&#xff08;Event delegation&#xff09;是一种重要的编程技术&#xff0c;它能够优化网页中的事件处理&#xff0c;提高程序的性能和可维护性。本文将详细介绍事件委托的概念、工作原理&#xff0c;并提供示例代码来说明其实际应用。 事件委托是基于…

我的NPI项目之Android USB 系列(一) - USB的发展历史

设计目的 USB was designed to standardize the connection of peripherals to personal computers, both to exchange data and to supply electric power. 一个是为了标准化电脑连接外设的方法。 能够支持电脑和外设的数据交互和&#xff08;对外&#xff09;供电。 目前已…