RabbitMQ高级

文章目录

  • 一.消息可靠性
    • 1.生产者消息确认
    • 2.消息持久化
    • 3.消费者确认
    • 4.消费者失败重试


MQ的一些常见问题

1.消息可靠性问题:如何确保发送的消息至少被消费一次

2.延迟消息问题:如何实现消息的延迟投递

3.高可用问题:如何避免单点的MQ故障而导致的不可用问题

4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

一.消息可靠性

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • -发送时丢失:

    • 生产者发送的消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

1.生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack

  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

在这里插入图片描述

简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息


SpringAMQP实现生产者确认

一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
    template:
      mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

二.配置ReturnCallback

ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", 
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.

通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback

在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息

三.在生产者类中发送消息并同时实现ConfirmCallback

ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次

ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 消息体
    String message = "hello, spring amqp!";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){ 
                    // ack,消息成功
                    log.debug("消息发送成功, ID:{}", correlationData.getId());
                }else{
                    // nack,消息失败
                    log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!

总结:

SpringAMQP中处理消息确认的几种情况:

  • publisher-comfirm:

    • 消息成功发送到exchange,返回ack

    • 消息发送失败,没有到达交换机,返回nack

    • 消息发送过程中出现异常,没有收到回执

  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback


2.消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1.交换机持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
    return new DirectExchange("simple.direct", true, false);
}

2.队列持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 
        .build();

但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化


3.消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

4.消费者失败重试

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息

在这里插入图片描述

实现方式:

1.定义接收失败消息的交换机、队列及其绑定关系:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

2.定义RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

总结:如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/299963.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

狂肝100小时,各大厂20W字面试真题分享

有很多童靴问我,有没有大厂的面试集合,可以针对性备考一下,我说面试题网络上有很多,随便搜索一下,就一大把吧。他们回复说,都是针对各个知识点的题目,想要吃透,至少要1-3个月的时间&…

NCC基础开发技能培训

YonBuilder for NCC 是一个带插件的eclipse工具,跟eclipse没什么区别 NC Cloud2021.11版本开发环境搭建改动 https://nccdev.yonyou.com/article/detail/495 不管是NC Cloud 新手还是老NC开发,在开发NC Cloud时开发环境搭建必看!&#xff…

命令行模式的rancher如何安装?

在学习kubectl操作的时候,发现rancher也有命令行模式,学习整理记录此文。 说明 rancher 命令是 Rancher 平台提供的命令行工具,用于管理 Rancher 平台及其服务。 前提 已经参照前文安装过了rancher环境了,拥有了自己的k8s集群…

腾讯云代金券介绍及领取教程分享

腾讯云为了吸引用户经常推出各种优惠活动,其中就包括腾讯云代金券,领取之后可用于抵扣腾讯云平台上购买的部分产品或服务的费用。以下是腾讯云代金券的详细介绍及领取教程。 一、腾讯云代金券介绍 腾讯云代金券是腾讯云优惠券的一种,代金券是…

IMU用于无人机故障诊断

最近,来自韩国的研究团队通过开发以IMU为中心的数据驱动诊断方法,旨在多旋翼飞行器可以自我评估其性能,即时识别和解决推进故障。该方法从单纯的常规目视检查跃升为复杂的诊断细微差别,标志着无人机维护的范式转变。 与依赖额外传…

深入理解并解析Flutter Widget

文章目录 完整代码程序入口构建 Widget 结构定义 widget 状态定义 widget UI获取上下文关于build()build() 常用使用 完整代码 import package:english_words/english_words.dart; import package:flutter/material.dart; import package:provider/provider.dart;void main() …

软件工程专业毕业设计题目怎么选?

文章目录 0 简介1 如何选题2 最新软件工程毕设选题3 最后 0 简介 学长搜集分享最新的软件工程业专业毕设选题,难度适中,适合作为毕业设计,大家参考。 学长整理的题目标准: 相对容易工作量达标题目新颖 1 如何选题 最近非常多的…

单电阻落地扇电机驱动 DEMO 方案

SYNWIT DEMO方案 低压 PMSM 电机,软件上采用SVPWM空间电压矢量调制技术,直接闭环启动,相比传统方波效率提高15%,具有更小的谐波分量及转矩脉动,同时采用32位MCU芯片SWM201G6S7 SSOP28 封装为主控,为驱动算…

Long类型转换精度丢失问题解决

问题: 启动前端项目 页面传递的ID 和数据库保存的ID不一致 原因:给前端返回的id为long类型,在转换json传递到前端以后精度丢失,所以前端给我们的id也是丢失精度的id,不能查询数据。 因为js数字类型最大长度为16位,而java的long类…

好物周刊#35:图标资源获取

https://github.com/cunyu1943/JavaPark https://yuque.com/cunyu1943 村雨遥的好物周刊,记录每周看到的有价值的信息,主要针对计算机领域,每周五发布。 一、项目 1. 正则大全 常用正则大全,支持 web/vscode/idea/Alfred Work…

红日靶场-3

目录 前言 外网渗透 外网渗透打点 1、arp 2、nmap 3、nikto 4、whatweb 5、gobuster 6、dirsearch CMS 1、主页内容 2、/configuration.php~ 目录 3、/administrator 目录 4、Joomla!_version探测 5、joomlascan python脚本 6、joomscan perl脚本 MySQL 1、远…

Java学习——设计模式——行为型模式2

文章目录 行为型模式状态模式观察者模式中介者模式迭代器模式访问者模式备忘录模式解释器模式 行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象无法单独完成的任务,涉及算法与对象间职责的…

(leetcode)Z字形变换 -- 模拟算法

个人主页:Lei宝啊 愿所有美好如期而遇 题目链接 . - 力扣(LeetCode) 输入描述 string convert(string s, int numRows),输入一个字符串s,以及一个行数numRows,将字符串按照这个行数进行Z字形排列&…

三菱plc学习入门(二,三菱plc指令,触点比较,计数器,交替,四则运算,转换数据类型)

今天,进行总结对plc的学习,下面是对plc基础的学习,希望对读者有帮助,欢迎点赞,评论,收藏!!! 目录 触点比较 当数据太大了的时候(LDD32位) CMP比…

文献阅读:Sparse Low-rank Adaptation of Pre-trained Language Models

文献阅读:Sparse Low-rank Adaptation of Pre-trained Language Models 1. 文章简介2. 具体方法介绍 1. SoRA具体结构2. 阈值选取考察 3. 实验 & 结论 1. 基础实验 1. 实验设置2. 结果分析 2. 细节讨论 1. 稀疏度分析2. rank分析3. 参数位置分析4. 效率考察 4.…

Win11开始菜单怎么改成经典模式-Win11切换Win10风格开始菜单方法

Win11切换Win10风格开始菜单方法 方法一: 1. 在Win11电脑上下载一个“Startallback”软件,下载安装完成后,在“控制面板”里打开该软件。 2. 打开后,在“欢迎界面”,选择使用“Windows10主题样式”并重启电脑即可。…

Python爬虫获取百度的图片

一. 爬虫的方式: 主要有2种方式: ①ScrapyXpath (API 静态 爬取-直接post get) ②seleniumXpath (点击 动态 爬取-模拟) ScrapyXpath XPath 是 Scrapy 中常用的一种解析器,可以帮助爬虫定位和提取 HTML 或 XML 文档中的数据。 Scrapy 中使用 …

大学物理实验重点——偏振光与双折射

线偏振光、椭圆偏振光、圆偏振光均可由两个频率相同、振动方向垂直、位相差恒定的线偏振光合成。 单轴晶体中,若其 o 光折射率大于 e 光折射率,也即 o 光传播速度小于 e 光传播速度,称之为负晶体,反之为正晶体。 o光和e光…

#error 在C语言中的作用

1、#error命令是C/C语言的预处理命令之一 #error 是C语言中的预处理指令之一,用于在编译时生成一个错误消息。当编译器遇到 #error 指令时,会立即停止编译,并将指定的错误消息输出到编译器的错误信息中。 在给定的代码中,#error…

2.6 KERNEL LAUNCH

图2.15在vecAdd函数中显示最终主机代码。此源代码完成了图2.6.中的骨架。2.12和2.15共同说明了一个简单的CUDA程序,该程序由主机代码和设备内核组成。该代码是硬接的,每个线程块使用256个线程。然而,使用的线程块的数量取决于向量&#xff08…