RabbitMQ——高级篇

目录

一、MQ的常见问题

二、消息可靠性问题 

生产者消息确认

消息持久化

消费者消息确认 

失败重试机制 

三、死信交换机 

简介死信交换机

TTL超时机制 

延迟队列 

 四、惰性队列

消息堆积问题

惰性队列 


一、MQ的常见问题

  • 消息可靠性问题:如何确保发送的消息至少被消费一次
  • 延迟消息问题:如何实现的延迟投递
  • 消息堆积问题:解决数百万的消息堆积无法及时消费的问题
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题

二、消息可靠性问题 

消息从生产者发送到exchange,再到queue,再到消费者,这个过程中有可能会导致消息丢失:

  • 发送时丢失:生产者发送的消息未送达exchange,消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

宕机指的是计算机系统或设备因为各种原因(如硬件故障、软件错误、网络问题等)而无法正常运行或停止工作的状态。

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息在发送到MQ过程中丢失,消息发送到MQ后会返回一个结果给发送者来表示消息是否处理成功,结果有如下的两种请求:

  • publisher-confirm:发送者确认,消息成功投递到交换机返回ack,消息未投递到交换机,返回nack
  • publisher-return:返回者回执,消息投递到交换机,但是没有路由到队列就会返回ACK以及路由失败的原因

注意:在确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突

消息持久化

MQ默认是内存存储消息,开启持久化就可以确保MQ的消息不会丢失。

SpringAMQP中的消息默认是持久的,也可以通过以下方式实现消息持久化:

交换机持久化:

    @Bean
    public DirectExchange simpleDirect(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
        return new DirectExchange("simple.direct",true,false);
    }

队列持久化:

    @Bean
    public Queue simpleQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }

消息持久化:SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

    @Test
    public void testDurableMessage() {
        // 1.准备消息
        Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("simple.queue", message);
    }

消费者消息确认 

RabbitMQ支持消费者确认机制的,消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除消息,在SpringAMQP中允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ackMQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

需要在配置文件中进行修改:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关ack; manual,手动ack; auto: 自动ack

失败重试机制 

当消费者出现异常后,消息会不断重新入队列,再重新发送给消费者,然后再次异常,再次重新入队,会出现无限循环导致mq的消息处理飙升,带来不必要的压力。

那么我们就可以使用Spring的retry机制,在消费者出现异常时先利用本地重试,而不是无限制的重新入队列。

在配置文件中进行如下修改:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试机制
          initial-interval: 1000 # 初始的失败等待时长是1s
          multiplier: 3  ## 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless:true  # true无状态;false有状态。如果业务中包合事务,这里改为false
           

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

RepublishMessageRecoverer的具体代码实现

定义交换机和队列,并进行绑定

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

定义RepublishMessageRecoverer:

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

综上所述, 可以从如下方面实现RabbitMQ消息的可靠性:

  • 开启生产者确认机制,确保生产者的消息能够到达队列
  • 开启持久化功能,确保队列中的消息在未消费前不会丢失
  • 开启消费者确认机制为auto,又spring确认消息处理成功之后返回ack
  • 开启消费者失败重试机制,设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,由人工进行处理

三、死信交换机 

简介死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

TTL超时机制 

TTL,也就是Time-To-Live(存活时间)如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

当队列和消息本身都设置了存活时间时,以时间短的ttl为准

延迟队列 

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

RabbitMQ的官方也推出了rabbitmq_delayed_message_exchange插件,原生支持延迟队列效果 ,RabbitMQ的官方插件社区进行下载安装到云服务器上。

在Java代码中就可以使用SpringAMQP的延迟队列插件DelayExchange,其本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。然后我们向这个delaytrue的交换机中发送消息,一定要给消息添加一个headerx-delay,值为延迟的时间,单位为毫秒

 四、惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列 

惰性队列具备如下的特征:

  • 接收消息后直接存入的是磁盘而不是内存
  • 消费者消费消息是从磁盘中进行读取再加载到内存中的
  • 支持数百万条的消息存储

设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

用SpringAMQP声明惰性队列分两种方式:

基于Bean的方式:

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable("lazy.queue")
                .lazy() # 开启x—queue-mode为lazy
                .build();
    }

基于注解的方式:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/345457.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解决 Git:ssh: connect to host github.com port 22: Connection timed out 问题的三种方案

1、问题描述: 其一、整体提示为: ssh: connect to host github.com port 22: Connection timed out fatal: Could not read from remote repository. 中文为: ssh:连接到主机 github.com 端口 22:连接超时 fatal&a…

【设计模式】腾讯面经:原型模式怎么理解?

什么是原型模式? 设计模式是编程世界的基石,其中原型模式无疑是一种常用而又高效的创建对象的手段。那么,什么是原型模式呢?又该如何去实现它? 在软件工程中,原型模式是一种创建型设计模式。我们可以这样…

「JavaSE」抽象类接口2

🎇个人主页:Ice_Sugar_7 🎇所属专栏:快来卷Java啦 🎇欢迎点赞收藏加关注哦! 抽象类&接口2 🍉接口间的继承🍉接口的应用🍉总结 🍉接口间的继承 和类的继承…

Pyside6中QTableWidget使用

目录 一:介绍: 二:演示 一:介绍: 在 PySide6 中,QTableWidget 是一个用于展示和编辑表格数据的控件。它提供了在窗口中创建和显示表格的功能,并允许用户通过单元格来编辑数据。 要使用 QTabl…

黑马Java——面向对象进阶(static继承)

1.static静态变量 静态变量是随着类的加载而加载的,优先与对象出现的

Kotlin 开发环境配置指南

一、下载与安装 Kotlin 编译器 步骤 1:获取最新版 Kotlin 编译器 要配置 Kotlin 开发环境,首先需要从 JetBrains 官方 GitHub 仓库下载最新的 Kotlin 编译器。访问以下链接以获取最新版本的编译器: [https://github.com/JetBrains/kotlin/…

Severstal公司的汉语名字是什么,是一家什么样的公司?

问题描述:Severstal公司的汉语名字是什么,是一家什么样的公司? 问题解答: Severstal 公司的中文名字通常被翻译为 "谢韦尔钢铁公司"。这家公司是俄罗斯的一家大型钢铁和矿业企业,总部位于莫斯科。Seversta…

libtorch学习第六

构建卷积网络 #include<torch/torch.h> #include<torch/script.h> #include<iostream>using std::cout; using std::endl;class LinearBnReluImpl : public torch::nn::Module { private:torch::nn::Linear ln{ nullptr };torch::nn::BatchNorm1d bn{ nullp…

[Python] glob内置模块介绍和使用场景(案例)

Unix glob是一种用于匹配文件路径的模式&#xff0c;它可以帮助我们快速地找到符合特定规则的文件。在本文中&#xff0c;我们将介绍glob的基本概念、使用方法以及一些实际应用案例。 glob介绍 Glob(Global Match)是Unix和类Unix系统中的一种文件名扩展功能&#xff0c;它可以…

分布式锁的实现方式

分布式锁是指分布式环境下&#xff0c;系统部署在多个机器中&#xff0c;实现多进程分布式互斥的一种锁。实现分布式锁有三种主流方式&#xff0c;接下来一一盘点。 盘点之前要说说选择时的优缺点 数据库实现的锁表完全不推荐。 Redis分布式锁性能优于ZooKeeper&#xff0c;因…

01、领域驱动设计:微服务设计为什么要选择DDD总结

目录 1、前言 2、软件架构模式的演进 3、微服务设计和拆分的困境 4、为什么 DDD适合微服务 5、DDD与微服务的关系 6、总结 1、前言 我们知道&#xff0c;微服务设计过程中往往会面临边界如何划定的问题&#xff0c;不同的人会根据自己对微服务的理 解而拆分出不同的微服…

通过 GScan 工具自动排查后门

一、简介 GScan 是一款为安全应急响应提供便利的工具&#xff0c;自动化监测系统中常见位置。 工具运行环境&#xff1a;CentOS (6、7) python (2.x、3.x) 工具检查项目&#xff1a; 1、主机信息获取 2、系统初始化 alias 检查 3、文件类安全扫描 3.1、系统重要文件完整行…

JS进阶-深入对象(二)

拓展&#xff1a;深入对象主要介绍的是Js的构造函数&#xff0c;实例成员&#xff0c;静态成员&#xff0c;其中构造函数和Java种的构造函数用法相似&#xff0c;思想是一样的&#xff0c;但静态成员和实例成员和java种的有比较大的差别&#xff0c;需要认真理解 • 创建对象三…

Switch用法以及新特性-最全总结版

本篇文章参考了大佬文章&#xff0c;感谢大佬无私分享&#xff1a; http://t.csdnimg.cn/MjZnX http://t.csdnimg.cn/QFg0x 目录 一、Switch用法&#xff1a;JDK7及以前 1.1、举例一&#xff1a; 1.2、举例二&#xff1a; 二、Switch穿透&#xff1a; 2.1、举例&#xf…

三极管的奥秘:如何用小电流控制大电流

双极性晶体管&#xff08;英语&#xff1a;bipolar transistor&#xff09;&#xff0c;全称双极性结型晶体管&#xff08;bipolar junction transistor, BJT&#xff09;&#xff0c;俗称三极管&#xff0c;是一种具有三个引脚的电子元器件。 本文是讲述的是三极管的基础知识…

基于openssl v3搭建ssl安全加固的c++ tcpserver

1 概述 tcp server和tcp client同时使用openssl库&#xff0c;可对通信双方流通的字节序列进行加解密&#xff0c;保障通信的安全。本文以c编写的tcp server和tcp client为例子&#xff0c;openssl的版本为v3。 2 安装openssl v3 2.1 安装 perl-IPC-Cmd openssl项目中的co…

企业内部知识库搭建教程,赶紧收藏起来

在企业运营中&#xff0c;内部知识库搭建是一项重要的挑战&#xff0c;并需要合理的规划与管理。尤其对于中大型企业&#xff0c;内部知识库能够提高工作效率&#xff0c;减轻员工工作压力与突发事件的处理的困扰。下面给大家提供一份完整的内部知识库搭建教程&#xff0c;快看…

UE4运用C++和框架开发坦克大战教程笔记(十五)(第46~48集)

UE4运用C和框架开发坦克大战教程笔记&#xff08;十五&#xff09;&#xff08;第46~48集&#xff09; 46. 批量加载 UClass 功能测试批量加载多个同类 UClass 资源 47. 创建单个资源对象测试加载并创建单个 UClass 资源对象 48. 创建同类资源对象 46. 批量加载 UClass 功能 逻…

Leetcode1143. 最长公共子序列

解题思路 求两个数组或者字符串的最长公共子序列问题&#xff0c;肯定是要用动态规划的。下面的题解并不难&#xff0c;你肯定能看懂。 首先&#xff0c;区分两个概念&#xff1a;子序列可以是不连续的&#xff1b;子数组&#xff08;子字符串&#xff09;需要是连续的&#xf…

rabbitmq基础-java-3、Fanout交换机

1、简介 Fanout&#xff0c;英文翻译是扇出。 2、 特点 1&#xff09; 可以有多个队列 2&#xff09; 每个队列都要绑定到Exchange&#xff08;交换机&#xff09; 3&#xff09; 生产者发送的消息&#xff0c;只能发送到交换机 4&#xff09; 交换机把消息发送给绑定过的…