RabbitMQ消费者的可靠性

目录

一、消费者确认

二、失败重试机制

2.1、失败处理策略

三、业务幂等性

3.1、唯一消息ID

 3.2、业务判断

3.3、兜底方案


一、消费者确认

RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: none

当使用none模式时

生产者发送一条消息

 消费者接受消息时抛异常

 

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

把确认机制修改为auto

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto

 再次发送消息

 

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto #消息确认
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
2.1、失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在消费者里创建一个异常消息配置类

@Configuration
@Slf4j
//当配置文件中spring.rabbitmq.listener.simple.retry.enabled 属性为ture时配置类才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    //消息处理失败交换机
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    //消息处理失败队列
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    //绑定关系
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    //定义一个RepublishMessageRecoverer,关联队列和交换机
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        log.error("加载RepublishMessageRecoverer");
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

 当接收消息出现异常时,会创建error.queue队列

 可以查看到异常信息

三、业务幂等性

何为幂等性? 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断
3.1、唯一消息ID
  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例 

在生产者和消费者的启动类里加一个配置

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

 测试生产者发送消息会产生一个id

 3.2、业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

 @Override
    public void markOrderPaySuccess(Long orderId) {
        // 1.查询订单
        Order old = getById(orderId);
        // 2.判断订单状态
        if (old == null || old.getStatus() != 1) {
            // 订单不存在或者订单状态不是1,放弃处理
            return;
        }
        // 3.尝试更新订单
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        updateById(order);
    }

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

3.3、兜底方案

我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/111572.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

7+共病思路。WGCNA+多机器学习+实验简单验证,易操作

今天给同学们分享一篇共病WGCNA多机器学习实验的生信文章“Shared diagnostic genes and potential mechanism between PCOS and recurrent implantation failure revealed by integrated transcriptomic analysis and machine learning”,这篇文章于2023年5月16日发…

小程序获取头像和昵称的思路

小程序获取头像和昵称的基本方法是调用小程序自带的API wx.getUserProfile(),这也是小程序官方目前最推荐的做法。成功获取用户名头像之后,小程序允许保存调用的结果,以便下一次打开页面的时候自动显示头像和名字。保存用户名和头像并不是保存…

电子电器架构 —— 车载网关初入门(三)

电子电器架构 —— 车载网关初入门(三) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 PS:小细节,本文字数5000+,详细描述了网关在车载框架中的具体性能设置。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 没有人关注你。也无需有人关…

Flask Shell 操作 SQLite

一、前言 这段时间在玩Flask Web,发现用Flask Shell去操作SQLite还是比较方便的。今天简单地介绍一下。 二、SQLite SQLite是一种嵌入式数据库,它的数据库就是一个文件,处理速度快,经常被集成在各种应用程序中,在IO…

【精】UML及软件管理工具汇总

目录 1 老七工具(规划质量) 1.1 因果图(鱼骨图、石川图) 1.2 控制图 1.3 流程图:也称过程图 1.4 核查表:又称计数表 1.5 直方图 1.6 帕累托图 1.7 散点图&#xf…

Java设计模式之状态模式

定义 对有状态的对象,把复杂的“判断逻辑”提取到不同的状态对象中,允许状态对象在其内部状态发生改变时改变其行为。 结构 状态模式包含以下主要角色。 环境角色:也称为上下文,它定义了客户程序需要的接口,维护一个…

java如何获取调用接口的ip?

获取调用者的ip 场景:想知道哪个ip访问的某个接口时,就需要打印出来看看,这时就可以使用这个方法了。 案例: //HttpServletRequest 入参加上,请求对象public ForkResponse queryXXX(RequestBody XXXX xxxx, HttpServletRequest …

Zynq-Linux移植学习笔记之64- 国产ZYNQ在linux下配置国产5396芯片

1、背景介绍 复旦微ZYNQ通过SPI配置国产JEM5396,框图如下: 现在需要在linux下的应用程序内配置JEM5396的寄存器。其中FMQL和进口的XILINX ZYNQ类似,JEM5396和进口的BCM5396兼容。因此可以参考进口ZYNQ在linux下配置BCM5396过程。Zynq-Linux移…

驾驶技巧_新手

人人都是老司机 1> 快速起步(手动挡)2> 窄路会车3> 转弯4> 变道 1> 快速起步(手动挡) 【B站】视频讲解 Step 1> 【快 停 慢放】左脚离合,快速抬到半联动点; Step 2> 离合器慢放同时加油! 2> 窄路会车 3> 转弯 4&…

C++学习初探---‘C++面向对象‘-继承函数重载与运算符重载

文章目录 前言继承继承是什么?三种访问权限的继承: 函数重载与运算符重载函数重载运算符重载可重载运算符&不可重载运算符 前言 第三次学习记录,依旧是C面向对象的内容。 继承 继承是什么? C中的继承是一种面向对象编程&am…

Kubuntu安装教程

目录 1.介绍 2.安装 3.配置 更新软件 中文输入法 美化 1.介绍 Kubuntu,是众多Ubuntu分支的一种,它采用KDE Plasma桌面为其默认桌面环境。它和Ubuntu采用同样的底层系统和软件库。基本上,Kubuntu和Ubuntu没有太大的差异,只是…

Spring Boot 3系列之一(初始化项目)

近期,JDK 21正式发布,而Spring Boot 3也推出已有一段时间。作为这两大技术领域的新一代标杆,它们带来了许多令人振奋的新功能和改进。尽管已有不少博客和文章对此进行了介绍,但对于我们这些身处一线的开发人员来说,有些…

【Truffle】二、自定义合约测试

一、准备测试 上期我们自己安装部署了truffle,并且体验了测试用例的整个测试流程,实际开发中,我们可以对自己的合约进行测试。 我们首先先明白自定义合约测试需要几个文件 合约文件:既然要测试合约,肯定要有合约的源码…

[EFI]asus strix b760-i 13900F电脑 Hackintosh 黑苹果efi引导文件

硬件型号驱动情况主板 asus strix b760-i 处理器 I9 13900F 已驱动内存crucial ddr5-5200 64gb(32gb*2)(overclock 5600)已驱动硬盘 WD black sn850 500g*2 已驱动显卡rx570已驱动声卡Realtek ALCS1220A已驱动网卡Intel I225-V 2.5 Gigabit Ethernet已驱动无线网卡蓝牙Fevi T91…

通过内网穿透快速搭建公网可访问的Spring Boot接口调试环境

文章目录 前言1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址…

Spring Cloud Alibaba 之 Sentinel

系列文章目录 第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Sprin…

C++学习笔记之四(标准库、标准模板库、vector类)

C 1、C标准库2、C标准模板库2.1、vector2.1.1、vector与array2.1.2、vector与函数对象2.1.3、vector与迭代器2.1.4、vector与算法 1、C标准库 C C C标准库指的是标准程序库( S t a n d a r d Standard Standard L i b a r a y Libaray Libaray),它定义了十个大类…

x3daudio1 7.dll丢失怎么修复?多种x3daudio1 7.dll修复方法对比

x3daudio1_7.dll是Windows操作系统中的一个动态链接库文件,它主要负责处理音频相关的功能。当这个文件缺失或损坏时,可能会导致一些音频播放问题,如无声、杂音等。那么,x3daudio1_7.dll缺失的原因是什么呢?又该如何修复…

Unity 粒子特效-第五集-烟雾缭绕合并特效

一、特效预览 二、制作原理 1.素材介绍 我们之前几章做了有光球,星星,烟雾 我们把他们结合起来,做一些调整 2.合并方法 我们还是建一个粒子游戏物体 我们把所有的效果取消 再重置一下transform 现在这个物体就是一个可以控制粒子特效的空…

钢琴培训答题服务预约小程序的效果怎样

很多家长都会从小培养孩子的兴趣,钢琴便是其中热度较高的一种,而各城市也不乏线下教育培训机构,除了青少年也有成年人参加培训,市场教育高需求下,需要商家不断拓展客户和转化。 那么通过【雨科】平台制作钢琴培训服务…