【RabbitMQ】可靠性策略(幂等,消息持久化)

MQ可靠性策略

  • 发送者的可靠性问题
    • 生产者的重连
    • 生产者确认
  • MQ的可靠性
    • 数据持久化
    • Lazy Queue
  • 消费者的可靠性问题
    • 消费者确认机制
    • 消息失败处理
  • 业务幂等性
  • 简答问题

发送者的可靠性问题

生产者的重连

可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
         enabled:  true #开启超时重试机制
         initial-interval: 1000ms #失败后的初始等待时间
         multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multiplier
         max-attempts: 3 #最大重试次数

生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  4. 其他情况都会返回NACK,告知投递失败

通过配置文件配置生产者的消息类型:

spring:
   rabbitmq:
      publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型
      publisher-returns: true # 开启publisher return机制

这里的publisher-confirm-type 有三种模式可选:
none:关闭confirm机制
simple: 同步阻塞等待MQ的回执消息
correlated:MQ异步调用方式返回回执消息

异步回调方式:

我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理

我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback

@Configuration
public class CommonConfig implements ApplicationContextAware{
   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
   //获取RabbitTemplate
   RabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);
   //设置ReturnCallback
   rabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{
   //处理操作
   }
   }
}

通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback

void test() throws InterruptedException{
    CorrelationData cd= new CorrelationData();
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){
    @Override
    public void onFailure(Throwable ex){
    //Future 发生异常是的逻辑处理,基本不会触发
    }
    @Override
    public void onSuccess(CorrelationData.confirm result){
      //Future接收到回执的处理逻辑,参数中的result就是回执内容
      if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执
         //处理逻辑
         
      }else{
      //异常处理
      }
    }
    });
}
rabbittemplate.convertAndSend("","",cd);

MQ的可靠性

在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:

  1. 一旦MQ宕机,内存的消息会丢失
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)

数据持久化

RabbitMQ的数据持久化包括:

  1. 交换机持久化(Durable 永久的,Transient临时的)
  2. 队列持久化(Durable 永久的,Transient临时的)
  3. 消息持久化

消息的持久化:

void test(){
   Message message =MessageBuilder
                   .withBody("hello".getBytes(StandardCharsets.UTF-8))
                   .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}

会将消息在过程中持久化到磁盘不会导致MQ阻塞

消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化

Lazy Queue

惰性队列
特征:

  1. 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持百万条数据的消息存储

在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在这里插入图片描述
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy:

@Bean
public Queue lazyQueue(){
  return QueueBuilder
         .durable("lazy.queue")
         .lazy() //开启lazy模式
         .build();
}

通过注解也可以实现

@RabbitListener(queuesToDeclare= @Queue(
                name="lazy.queue",
                durable="true",//持久化
                arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){
             //消费处理
}

消费者的可靠性问题

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该信息
nack:消息处理失败,RabbitMQ需要再次投递信息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject

spring:
   rabbitmq:
      listener:
         simple:
            prefetch: 1
            acknowledge-mode: none #none,关闭

消息失败处理

如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:

spring:
  rabbitmq:
    listener:
      simple:
          prefetch: 1
          retry:
             enabled: true  #开启消费者失败重试
             initial-interval: 1000ms #初始的失败等待时间
             multiplier: 1 #下次失败的等待时长的倍数
             max-attempts: 3 # 最大重试次数
             stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在这里插入图片描述

将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息

将失败策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机,队列绑定
  2. 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){
     return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景:防止某一数据被重复进行修改
幂等业务:根据id的查询业务,根据id的删除业务等
非幂等:用户下单,扣减库存等

如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){
    //定义消息转换器
    Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();
    //配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断

简答问题

如何保证支付服务与交易服务之间的订单状态一致性:

  1. 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  2. 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常

如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/591114.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

无人机+无人车:自组网协同技术及应用前景详解

无人车&#xff0c;也被称为自动驾驶汽车、电脑驾驶汽车或轮式移动机器人&#xff0c;是一种通过电脑系统实现无人驾驶的智能汽车。这种汽车依靠人工智能、视觉计算、雷达、监控装置和全球定位系统协同合作&#xff0c;使得电脑可以在没有任何人类主动操作的情况下&#xff0c;…

SpringBoot 基础简介

目录 1. SpringBoot 概述 1.1. 为什么会有springboot 1.1.1. 传统Spring 的两个缺点 1.1.2. Springboot 功能 2. SpringBoot 快速搭建 2.1. 创建Maven项目​编辑​编辑​编辑 2.2. 导入SpringBoot起步依赖 2.3. 定义controller 2.4. 添加引导类 2.5. 启动访问 3. Sprin…

香港理工大学内地事务总监陆海天教授确认出席“边缘智能2024 - AI开发者峰会”并发表主题演讲

隨著AI技術的日新月異&#xff0c;我們正步入一個邊緣計算智能化與分布式AI相互融合的新紀元。這一變革不僅推動了分布式智能創新應用的飛速發展&#xff0c;還使得邊緣智能——這一結合邊緣計算和智能技術的新興領域&#xff0c;逐漸成為引領AI發展的重要力量。通過其分布式和…

在家连学校的服务器

在家连接学校的服务器。 Step1: 首先下载一个vscode的插件 Visual Studio Code - Code Editing. Redefined 我的服务区是ubuntu20.04&#xff0c;x64的&#xff0c;所以下载这个。 Step2: 下载到本地之后&#xff0c;想办法将这个文件拷贝到你的服务器上。 Step3: 解压该包…

零基础该如何自学linux运维?

零基础该如何自学linux运维&#xff1f;以下是建议帮助你入门Linux运维的一些建议。 一、自学建议&#xff1a; 理解基础概念&#xff1a;首先&#xff0c;你需要对Linux操作系统的基本概念有所了解&#xff0c;包括文件系统、用户权限、进程管理等。安装Linux系统&#xff1…

AI-数学-高中-47导数与几何意义

原作者视频&#xff1a;【导数】【考点精华】7导数与几何意义考点解析&#xff08;基础&#xff09;_哔哩哔哩_bilibili 该点处切点的斜率 该点处导函数的值 示例1&#xff1a; 导数问题解决最常用方法&#xff1a;参数分离&#xff0c;在左边函数有解的值域范围内。 示例2&…

Jackson-jr 对比 Jackson

关于Jackson-jr 对比 Jackson 的内容&#xff0c;有人在做了一张下面的图。 简单点来说就 Jackson-jr 是Jackson 的轻量级应用&#xff0c;因为我们在很多时候都用不到 Jackson 的很多复杂功能。 对很多应用来说&#xff0c;我们可能只需要使用简单的 JSON 读写即可。 如我们…

微服务总览

微服务保护 微服务总览 微服务总览 接入层&#xff1a;反向代理功能&#xff0c;可以将用户域名访问的地址以负载均衡的方式代理到网关地址&#xff0c;并且并发能力非常高&#xff0c;并且会采用主备nginx的方式防止nginx寄了&#xff0c;备份nginx监控主nginx状态&#xff0c…

CMakeLists.txt 文件内容分析

一. 简介 前一篇文章学习了针对只有一个 .c源文件&#xff0c;cmake工具是如何使用编译的&#xff0c;文章如下&#xff1a; cmake的使用方法:单个源文件的编译-CSDN博客 本文对 所编写的 CMakeLists.txt文件的内容进行分析。从而了解如何编写一个 CMakeLists.txt文件。 二…

ElasticSearch01(ES简介,安装ES,操作索引,操作文档,RestAPI)【全详解】

目录 一、ES简介 1. 数据库查询的问题 2. ES简介 1 ElasticSearch简介 2 ElasticSearch发展 3. 倒排索引【面试】 1 正向索引 2 倒排索引 4. ES和MySql 5. 小结 二、安装ES 1. 方式1:使用docker安装 1 准备工作 2 创建ElasticSearch容器 3 给ElasticSearch配置i…

百度网盘里的文件怎么打印?

在日常生活和工作中&#xff0c;我们经常需要打印各种文件&#xff0c;包括学习资料、工作报告、合同文件等。有时候&#xff0c;这些文件保存在百度网盘等云存储服务中&#xff0c;我们该如何方便地打印出来呢&#xff1f;今天&#xff0c;就为大家介绍一种便捷的方法——通过…

一对一WebRTC视频通话系列(二)——websocket和join信令实现

本系列博客主要记录WebRtc实现过程中的一些重点&#xff0c;代码全部进行了注释&#xff0c;便于理解WebRTC整体实现。 一对一WebRTC视频通话系列往期博客&#xff1a; 一对一WebRTC视频通话系列&#xff08;一&#xff09;—— 创建页面并显示摄像头画面 websocket和join信令…

深入浅出学习Pytorch—Pytorch简介与2024年最新安装(GPU)

深入浅出学习Pytorch—Pytorch简介 学习原因&#xff1a;Pytorch日益增长的发展速度与深度学习时代的迫切需要 Pytorch模型训练 pytorch实现模型训练包括以下的几个方面&#xff08;学习路线&#xff09; 数据&#xff1a;数据预处理与数据增强模型&#xff1a;如何构建模型模…

全栈开发之路——前端篇(4)watch监视、数据绑定和计算属性

全栈开发一条龙——前端篇 第一篇&#xff1a;框架确定、ide设置与项目创建 第二篇&#xff1a;介绍项目文件意义、组件结构与导入以及setup的引入。 第三篇&#xff1a;setup语法&#xff0c;设置响应式数据。 辅助文档&#xff1a;HTML标签大全&#xff08;实时更新&#xff…

【JVM】从硬件层面和应用层面的有序性和可见性,到Java的volatile和synchronized

Java的关键字volatile保证了有序性和可见性&#xff0c;这里我试着从底层开始讲一下有序性和可见性。 一&#xff0c;一致性 数据如果同时被两个cpu读取了&#xff0c;如何保证数据的一致性&#xff1f;或者换句话说&#xff0c;cpu1改了数据&#xff0c;cpu2的数据就成了无效…

esp32-cam 1. 出厂固件编译与测试

0. 环境 - ubuntu18 - esp32-cam - usb转ttl ch340 硬件连接 esp32-camch340板子U0RTXDU0TRXDGNDGND5V5V 1. 安装依赖 sudo apt-get install vim sudo apt install git sudo apt-get install git wget flex bison gperf python python-pip python-setuptools python-serial p…

【Linux】awk命令学习

最近用的比较多&#xff0c;学习总结一下。 文档地址&#xff1a;https://www.gnu.org/software/gawk/manual/gawk.html 一、awk介绍二、语句结构1.条件控制语句1&#xff09;if2&#xff09;for3&#xff09;while4&#xff09;break&continue&next&exit 2.比较运…

20240503解决Ubuntu20.04和WIN10双系统下WIN10的时间异常的问题

20240503解决Ubuntu20.04和WIN10双系统下WIN10的时间异常的问题 2024/5/3 9:33 缘起&#xff1a;因为工作需要&#xff0c;编译服务器上都会安装Ubuntu20.04。 但是因为WINDOWS强悍的生态系统&#xff0c;偶尔还是有必须要用WINDOWS的时候&#xff0c;于是也安装了WIN10。 双系…

软件应用开发安全设计指南

1.1 应用系统架构安全设计要求 设计时要充分考虑到系统架构的稳固性、可维护性和可扩展性&#xff0c;以确保系统在面对各种安全威胁时能够稳定运行。 在设计系统架构时&#xff0c;要充分考虑各种安全威胁&#xff0c;如DDoS攻击、SQL注入、跨站脚本攻击&#xff08;XSS&…

2022 亚马逊云科技中国峰会,对话开发者论坛

目录 前言 最近整理资料发现还有一些前 2 年的内容没发出来&#xff0c;故补发记录&#xff0c;每年都有新的感悟。 开发者论坛 1. 你认为什么是开发者社区&#xff0c;如何定义一个成功的开发者社区&#xff1f; 我认为可以把开发者社区看成一个 “产品” 来对待&#xff…