【RabbitMQ】一文详解消息可靠性

目录:

1.前言

2.生产者

3.数据持久化

4.消费者

5.死信队列

1.前言

RabbitMQ 是一款高性能、高可靠性的消息中间件,广泛应用于分布式系统中。它允许系统中的各个模块进行异步通信,提供了高度的灵活性和可伸缩性。然而,这种通信模式也带来了一些挑战,其中最重要的之一是确保消息的可靠性

影响消息可靠性的因素主要有以下几点:

  • 发送消息时连接RabbitMQ失败
  • 发送时丢失:
    • 生产者发送的消息未送达交换机;
    • 消息到达交换机后未到达队列;
  • MQ 宕机,队列中的消息会丢失;
  • 消费者接收到消息后未消费就宕机了。

2.生产者

2.1.生产者重连机制

生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,RabbitMQ提供的消息发送时的重连机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重连机制

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。但是RabbitMQ提供的重试机制是阻塞式的重试。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,就需要合理配置等待时长和重试次数,或者使用异步线程来执行发送消息的代码

2.2.生产者确认机制

RabbitMQ的生产者确认机制(Publisher Confirm)是一种确保消息从生产者发送到MQ过程中不丢失的机制。当消息发送到 RabbitMQ 后,系统会返回一个结果给消息的发送者,表明消息的处理状态。这个结果有两种可能的值:

返回结果有两种方式:

  • publisher-confirm(发送者确认)
    • 消息成功投递到交换机,返回ACK。
    • 消息未投递到交换机,返回NACK。(可能是由于网络波动未能连接到RabbitMQ,可利用生产者重连机制解决)
  • publisher-return(发送者回执)
    • 消息投递到交换机了,但是没有路由到队列。返回ACK和路由失败原因。(这种问题一般是因为路由键设置错误,可以人为规避)

通过这种机制,生产者在发送消息后获取返回的回执结果,从而采取对应的策略,如消息重发或记录失败信息。

3.数据持久化

3.1.配置持久化

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

  1. RabbitMQ宕机,存在内存中的消息会丢失。
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。RabbitMQ可以通过配置数据持久化,从而将消息保存在磁盘,包括:

  • 交换机持久化(确保RabbitMQ重启后交换机仍然存在)
  • 队列持久化(确保RabbitMQ重启后队列仍然存在)
  • 消息持久化(确保RabbitMQ重启后队列中的消息仍然存在)

由于Spring会在创建队列时默认将交换机和队列设置为持久化,发送消息时也默认指定消息为持久化消息,因此不需要额外配置。

// 将消息指定为持久化消息
Message message = MessageBuilder
    .withBody("hello".getBytes(standardcharsets.UTF_8))
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 给队列发送消息
rabbitTemplate.convertAndSend("simple.queue", message);

3.2.惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

惰性队列的特点如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)

  • 消费者要消费消息时才会从磁盘中读取并加载到内存

  • 支持数百万条的消息存储

对于低于3.12版本的情况,可以使用注解的arguments来指定

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "grade.queue", durable = "true"),
            exchange = @Exchange(name = "intel.topic", type = ExchangeTypes.TOPIC),
            key = "intel.grade",
            arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))

3.3.为什么需要数据持久化?

数据持久化在 RabbitMQ 中有以下重要作用:

队列和交换机的持久化:

  • 防止重启后丢失:将队列和交换机设置为持久化,可以防止 RabbitMQ 服务器重启后丢失这些队列和交换机,确保它们的存在和绑定关系保持不变。

消息的持久化:

  1. 安全性
    • 防止数据丢失:消息持久化后,可以防止 RabbitMQ 服务器重启或宕机时数据丢失,方便数据恢复,保证消息的可靠性和耐久性。
  2. 性能
    • 内存管理:未持久化的临时消息默认存储在内存中。内存空间有限,大量消息涌入时会导致内存占满,系统需要进行 page out 操作将消息写入磁盘。频繁的 page out 操作会严重影响性能。
    • 预防内存溢出:通过持久化消息,可以缓解内存压力,防止因内存溢出导致的系统性能问题和崩溃。

4.消费者

4.1.消费者确认机制

为了确认消费者是否正确处理了消息,RabbitMQ提供了消费者确认机制。当消费者处理消息后,会返回回执信息给RabbitMQ。回执有三种值:

  • ack:消息处理成功,RabbitMQ从队列中删除消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除消息。

在SpringBoot项目中,我们可以通过配置文件选择回执信息的处理方式,一共有三种处理方式:

  • none:不处理。RabbitMQ 假定消费者获取消息后会一定会成功处理,因此消息投递后立即返回ack,将消息从队列中删除。

  • manual:手动模式。需要在业务代码结束后,调用SpringAMQP提供的API发送ackreject,存在代码侵入问题,但比较灵活。

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑进行了环绕增强,返回结果如下:

    • 如果消费者正常处理消息,自动返回ack并删除队列的消息。

    • 如果消费者消息处理失败,自动返回nack并重新向消费者投递消息。

    • 如果消息校验异常,自动返回reject并删除队列中的消息。

注意: 手动模式返回回执消息时通常需要显式指定requeue参数,当requeue=true时,表明消息需要重新入队;当requeue=false时,RabbitMQ将从队列删除消息。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto,自动 ack

4.2.消息失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。

可以通过设置yml文件开启失败重试机制,在消息异常时利用本地重试,而不是无限制的进行requeue操作。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false 有状态。如果业务中包含事务,这里改为 false

4.3.消息失败处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息,这是默认采取的方式;
  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

5.死信队列

尽管通过以上设置可以确保消息在生产者、消息队列和消费者之间的传递过程中不会丢失,但在某些情况下,消费者仍可能无法成功处理消息(如消息重试次数耗尽后仍无法被消费)。这时候,我们需要一个机制来妥善处理这些无法被正常消费的消息。死信队列便是用于解决这一问题的兜底机制。

5.1.死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消息被拒绝: 当消费者明确拒绝一个消息并且设置不再重新入队(requeue=false)时,这个消息会被标记为死信。
  • 消息过期: 每个消息或队列可以设置一个TTL(Time-To-Live),即消息的存活时间。如果消息在队列中停留的时间超过了这个TTL,消息会被认为过期,并被转移到死信队列。
  • 队列达到最大长度: 如果队列设置了最大长度并且达到了这个限制,那么新进入的消息会被转移到死信队列中。

5.2.创建死信队列

5.2.1.创建死信交换机和死信队列

正常使用注解,创建交换机和队列即可

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dead.queue", durable = "true",
                   arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    exchange = @Exchange(name = "dead.exchange", type = ExchangeTypes.TOPIC),
    key = "dead.key"
))
public void deadLetterQueue(String msg) {
    System.out.println("您的消息已经死亡:" + msg);
}
5.2.2.绑定死信交换机

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

可以通过@Argument注解指定死信交互机和路由键,如下。

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue", durable = "true",
                    arguments = {
                            @Argument(name = "x-queue-mode", value = "lazy"),
                            @Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),
                            @Argument(name = "x-dead-letter-routing-key", value = "dead.key")
                    }),
            exchange = @Exchange(name = "simple.topic",
                    type = ExchangeTypes.TOPIC),
            key = "simple.key"
    ))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/802131.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络准入控制设备是什么?有哪些?网络准入设备臻品优选

小李:“小张,最近公司网络频繁遭遇外部攻击,我们得加强一下网络安全了。” 小张:“是啊,我听说实施网络准入控制是个不错的选择。但具体什么是网络准入控制设备?我们有哪些选择呢?” 小李微笑…

2024Datawhale AI夏令营---Inclusion・The Global Multimedia Deepfake Detection--学习笔记

赛题背景: 其实总结起来就是一句话,这个项目是基于目前的深度伪装技术,就是通过大量人脸的原数据集进行模型训练之后,能够生成伪造的人脸视频。这项目就是教我们如何去实现这个DeepFake技术。 Task1:了解Deepfake和跑通baseline …

Python项目部署到Linux生产环境(uwsgi+python+flask+nginx服务器)

1.安装python 我这里是3.9.5版本 安装依赖: yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y 根据自己的需要下载对应的python版本: cd local wget https://www.python.org/ftp…

开发实战经验分享:互联网医院系统源码与在线问诊APP搭建

作为一名软件开发者,笔者有幸参与了多个互联网医院系统的开发项目,并在此过程中积累了丰富的实战经验。本文将结合我的开发经验,分享互联网医院系统源码的设计与在线问诊APP的搭建过程。 一、需求分析 在开发任何系统之前,首先要…

成像光谱遥感技术中的AI革命:ChatGPT

遥感技术主要通过卫星和飞机从远处观察和测量我们的环境,是理解和监测地球物理、化学和生物系统的基石。ChatGPT是由OpenAI开发的最先进的语言模型,在理解和生成人类语言方面表现出了非凡的能力,ChatGPT在遥感中的应用,人工智能在…

【STM32】RTT-Studio中HAL库开发教程三:IIC通信--AHT20

文章目录 一、I2C总线通信协议二、AHT20传感器介绍三、STM32CubeMX配置硬件IIC四、RTT中初始化配置五、具体实现代码六、实验现象 一、I2C总线通信协议 使用奥松的AHT20温湿度传感器,对环境温湿度进行采集。AHT20采用的是IIC进行通信,可以使用硬件IIC或…

2. KNN分类算法与鸢尾花分类任务

鸢尾花分类任务 1. 鸢尾花分类步骤1.1 分析问题,搞定输入和输出1.2 每个类别各采集50朵花1.3 选择一种算法,完成输入到输出的映射1.4 第四步:部署,集成 2. KNN算法原理2.1 基本概念2.2 核心理念2.3 训练2.4 推理流程 3. 使用 skle…

Word参考文献交叉引用

前言 Word自带交叉引用功能,可在正文位置引用文档内自动编号的段落,同时创建超链接,适用于参考文献的引用。使用此方法对参考文献进行引用后,当参考文献的编号发生变化时,只需要更新域即可与正文中的引用相对应。下文…

vue3+TS从0到1手撸后台管理系统

1.路由配置 1.1路由组件的雏形 src\views\home\index.vue(以home组件为例) 1.2路由配置 1.2.1路由index文件 src\router\index.ts //通过vue-router插件实现模板路由配置 import { createRouter, createWebHashHistory } from vue-router import …

【15】Android基础知识之Window(一)

概述 这篇文章纠结了很久,在想需要怎么写?因为window有关的篇幅,如果需要讲起来那可太多了。从层级,或是从关联,总之不是很好开口。这次也下定决心,决定从浅入深的讲讲window这个东西。 Window Window是…

鸿蒙特色物联网实训室

一、 引言 在当今这个万物皆可连网的时代,物联网(IoT)正以前所未有的速度改变着我们的生活和工作方式。它如同一座桥梁,将实体世界与虚拟空间紧密相连,让数据成为驱动决策和创新的关键力量。随着物联网技术的不断成熟…

Qt Creator的好用的功能

(1)ctrlf: 在当前文档进行查询操作 (2)f3: 找到后,按f3,查找下一个 (3)shiftf3: 查找上一个 右键菜单: (4)f4:在…

solidity实战练习3——荷兰拍卖

//SPDX-License-Identifier:MIT pragma solidity ^0.8.24; interface IERC721{function transFrom(address _from,address _to,uint nftid) external ; }contract DutchAuction { address payable immutable seller;//卖方uint immutable startTime;//拍卖开始时间uint immut…

钡铼Modbus TCP耦合器BL200实现现场设备与SCADA无缝对接

前言 深圳钡铼技术推出的Modbus TCP耦合器为SCADA系统与现场设备之间的连接提供了强大而灵活的解决方案,它不仅简化了设备接入的过程,还提升了数据传输的效率和可靠性,是工业自动化项目中不可或缺的关键设备。本文将从Modbus TC、SCADA的简要…

基于Ubuntu2204搭建openstack-Y版-手动搭建

openstack手搭Y版 基础环境配置离线环境时间同步(双节点)安装openstack客户端数据库服务消息队列服务缓存服务 keystone服务部署glance服务部署placement服务部署nova服务部署controllercompute neutron服务部署controller节点配置neutron.conf文件配置m…

leetcode-349.两个数组的交集

题源 349.两个数组的交集 题目描述 给定两个数组 nums1 和 nums2 ,返回 它们的 交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 示例 1: 输入:nums1 [1,2,2,1], nums2 [2,2] 输出:[2] 示例…

权威认可 | 海云安开发者安全助手系统通过信通院支撑产品功能认证并荣获信通院2024年数据安全体系建设优秀案例

近日,2024全球数字经济大会——数字安全生态建设专题论坛(以下简称“论坛”)在京成功举办。由全球数字经济大会组委会主办,中国信息通信研究院及公安部第三研究所共同承办,论坛邀请多位专家和企业共同参与。 会上颁发…

android预置apk

在framework开发中,有一些需求是需要预装应用的,有些是预置应用源码,有些是预置apk。今天我们就分享下怎样预置apk 一般系统有自定义的目录,比如我的项目中根目录下有一个文件夹vendor,这里没都是自定义的一些功能。预…

Redis系列命令更新--Redis列表命令

Redis列表 1、Redis Blpop命令: (1)说明:Redis Blpop命令移出并获取列表的第一个元素;如果列表没有元素会阻塞列表直到等到超时或发现可弹出元素为止 (2)语法:redis 127.0.0.1:63…

leetcode-三数之和

视频:https://www.bilibili.com/video/BV1bP411c7oJ/?spm_id_from333.788&vd_sourcedd84879fcf1be72f360461b01ecab0d6 从两数之和开始,排序后的两数之和,利用好升序的性质,可以将时间复杂度从on2降到on; class Solution …