RabbitMQ的高级特性

目录

数据导入

MQ的常见问题

消息可靠性问题

生产者确认机制

SpringAMQP实现生产者确认

消息持久化

消费者消息确认

失败重试机制

消费者失败消息处理策略

死信交换机

TTL

延时队列

待更


数据导入

资料下载地址:day05MQ高级

MQ的常见问题

  • 消息可靠性:如何确保消息至少被消费一次
  • 延迟消息问题:如何实现消息的延迟投递
  • 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题

消息可靠性问题

消息丢失的三大类:

  • 发送时丢失:
    • 生产者发送的消息未送达到exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack。
    • 消息未投递到交换机,返回nack。
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

需要注意的是,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

SpringAMQP实现生产者确认

在publisher模块中配置如下内容

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。
    • true:则调用ReturnCallback;
    • false:则直接丢弃消点

在生产者模块中配置全局ReturnCallback(一个RabbitTemplate只能配置一个ReturnCallback)

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
        template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode,replyText,exchange,routingKey,message.toString());
        }));
    }
}

进行测试

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        String routingKey = "simple";
        String message = "hello, spring amqp!";
        //准备消息id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(
                result->{
                    if (result.isAck()){
                        log.debug("消息发送成功,ID:{}",correlationData.getId());
                    }else {
                        log.error("消息发送失败,ID:{},原因{}",correlationData.getId(),result.getReason());
                    }
                },
                ex->{
                    log.error("消息发送异常,ID:{},原因:{}",correlationData.getId(),ex.getMessage());
                }
        );
        rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData);
    }
}

运行观察控制台

测试一种路由失败的情况,这种情况可以正常发送到交换机,但是不能发送到Queue

消息持久化

MQ默认是内存存储,当服务重启后,数据就会丢失。因此我们需要对交换机与队列进行持久化操作。在消费者模块添加如下代码

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange directExchange(){
        /**
         * name:交换机名称
         * durable:是否持久化
         * autoDelete:当没有队列绑定时是否删除
         */
        return new DirectExchange("direct.exchange",true,false);
    }

    @Bean
    public Queue simpleQueue(){
        /**
         * 使用Builder创建持久化队列
         * 使用 new Queue("名称")创建也可以,默认就是持久化的
         */
        return QueueBuilder.durable("simple.queue").build();
    }
}

启动消费者,就可以看到交换机与队列被持久到磁盘中,但需要注意的时,消息并没有持久化,当重启服务器消息还是会丢失。之前我们发送的消息是String类型,现在,我们使用AMQP的Message对消息进行持久化。

    @Test
    public void testDurableMessage() throws Exception {
        Message msg = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        rabbitTemplate.convertAndSend("simple.queue",msg);
    }

消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。(业务处理成功后,调用channel.basicAck()手动签收,如果出现异常,则调用channle.basicNack()方法。)
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

manual模式对代码有一定入侵,需要添加发送ack的代码。因此不推荐使用

auto模式是通过Spring的AOP机制,来对消息进行自动确认。推荐使用

none模式不对消息进行确认,不使用

在消费者模块的配置文件中配置如下内容

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

进行测试,在监听器处添加错误代码

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1/0);
    }
}

进行debug观察Rabbit控制台

对断点放行,会发现控制台抛出错误后立即再进入断点,那么就可以确定,MQ会再次投递失败的消息。取消断点放行,会发现控制台无休止进行打印错误,这种处理方式并不友好,因此我们可以自定义失败重试机制。

失败重试机制

当消费者消费消息抛出异常后,会将消息投递给MQ。而MQ又会立即投递给消费者。这样循环往复会导致MQ的消息处理飙升,带来不必要的压力。因此我们可以采用Spring的重试机制(在本地重试,不返回ack也不返回nack),来避免这种情况。

消费者模块的配置文件添加如下内容

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000 #初始的失败等待时长为1s
          multiplier: 2 #下次失败的等待时长倍数,下次灯带时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true # true无状态;false有状态,如果业务中包含事务,这里改为false

接下来进行测试

首先是重试时间分别为1,2对应着配置中的1s与1s*2,如果还有下次重试次数那么重试时间就是1s*2*2。其次是在RabbitMQ中找不到这条错误的消息了。具体原因如下

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。流程图如下

添加一个新的Config

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue",true);
    }

    @Bean
    public Binding errorBinging(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");
    }
}

重启发送一条消息测试

观察Rabbit的控制台

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信 (dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。
  • 消息是一个过期消息,超时无人消费。
  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)

与RepublishRecoverer的区别在于,该种方式是通过MQ进行转发,而RepublishRecoverer是通过消费者进行转发。如果只是保存失败的消息,那么推荐使用RepublishRecoverer。

TTL

TTL(time to live)超时时间分为两种情况:

  • 消息本身设置了超时时间
  • 消息所在的队列设置了超时时间

当消息到达存活时间后还没有被消费会被自动清除。如果同时设置了消息过期时间和队列过期时间,以时间短的为准,队列过期会将所有消息移除,如果一个已经过期的消息不在队列顶端时并不会立即移除,一旦它到了队列顶端则会进行判断是否移除。

延时队列

我们可以通过TTL来实现一个延时队列,对消息设置过期时间存放在ttl.queue,但是没有消费者监听该队列,等到过期之后,放入死信队列,而消费者监听死信队列,对过期消息进行消费,从而实现延时队列。具体流程如下

接下来实现延时队列

编写ttl部分

@Slf4j
@Configuration
public class TTLMessageConfig {
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                .ttl(10000)//超时时间
                .deadLetterExchange("dl.exchange")//指定死信队列
                .deadLetterRoutingKey("dl")//死信队列的路由key
                .build();
    }

    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.exchange");
    }

    @Bean
    public Binding simpleBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

编写消费者方的监听

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue",durable = "true"),
            exchange = @Exchange(name = "dl.exchange"),
            key = "dl"
    ))
    public void listenDlQueue(String msg){
        log.info("消费者接收到了延时消息:{}",msg);
    }

编写测试方法

    @Test
    public void testTTLMessage() throws Exception {
        Message msg = MessageBuilder.withBody("hello TTL".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        rabbitTemplate.convertAndSend("ttl.exchange","ttl",msg);
        log.info("消息成功发送!");
    }

至此实现了延时处理消息。


待更

明天在更新后续部分 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/124726.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于卷积神经网络的池化层(pooling)

了解池化层 池化层又称“下采样层”或“子采样层”,池化层可以大大降低特征的维度,减少计算量,同时可以避免过拟合问题。 顾名思义,最大池化层就是从输入的矩阵中某一范围内,选择最大的元素进行保留;平均池…

ThreadLocal原理以及内存泄露问题

1、ThreadLocal实现原理 1、每个线程中有一个ThreadLocalsMap,这是一个哈希表的结构里面有很多entry(也就是k-v),当我们使用ThreadLocal进行set值的时候,会将这个threadLocal设置为key,然后值设置为value放入ThreadLocalsMap,key为弱引用&am…

Python爬虫实战-批量爬取美女图片网下载图片

大家好,我是python222小锋老师。 近日锋哥又卷了一波Python实战课程-批量爬取美女图片网下载图片,主要是巩固下Python爬虫基础 视频版教程: Python爬虫实战-批量爬取美女图片网下载图片 视频教程_哔哩哔哩_bilibiliPython爬虫实战-批量爬取…

兴达易控232转profinet在搅拌站使用案例配置案例

该搅拌站所采用的是双行星动力搅拌桨混合机,借助兴达易控232转profinet网关(XD-PNR200)与PLC和变频器进行通信,从而实现对变频器的精确控制,大大提升了搅拌过程的稳定性和效率。 这一方案还具备高度的灵活性和可扩展性,使得搅拌站…

HashMap源码分析(一)

存储结构 说明:本次讲解的HashMap是jdk1.8中的实现,其他版本可能有差异 内部是由Node节点数组组成,Node节点之间又由链表或红黑树组成。 图是网上找的,实在不想画 属性介绍 //存储数据的数组,初次使用时初始化&…

CocosCreator让一个物体跟随鼠标移动(两种方式 本地坐标系和世界坐标系)

在 Cocos Creator 3.x 游戏运行时显示的画布大小就是屏幕区域,屏幕坐标是从画布的左下角为原点开始计算 在 Creator 3.x 里,屏幕和 UI 是完全区分开的,用户可以在没有 UI 的情况下点击屏幕获取触点信息。因此,获取屏幕触点&#…

mac M2 anaconda 解决装不了python3.7

今天发现一个很奇怪的问题 但是我一换成 conda create -n DCA python3.8.12就是成功的 这个就很奇怪, 解决如下 https://towardsdatascience.com/how-to-manage-conda-environments-on-an-apple-silicon-m1-mac-1e29cb3bad12 998 conda search pythonconda search python …

半导体(芯片)制造工艺流程简单说

半导体行业是国民经济支柱性行业之一,是信息技术产业的重要组成部分,是支撑经济社会发展和保障国家安全的战略性、基础性和先导性产业,其发展程度是衡量一个国家科技发展水平的核心指标之一,属于国家高度重视和鼓励发展的行业。 …

ant design pro of vue怎么使用阿里iconfont

一 使用离线iconfont 首先需要生成图所有图标对应的js文件。如下图所示,将生成的js代码复制,在项目中创建一个js文件,将代码粘贴进去。这里我将js文件放在了src/assets/iconfont下面 然后,在main.js中引入文件,并进…

Java代码是怎么运行的?

Java代码是怎么运行的? 运行流程 将Java程序转换成Java虚拟机所能识别的指令序列,也称Java字节码。之所以这么取名,是因为Java字节码指令的操作码(opcode)被固定为一个字节。 Java虚拟机可以由硬件实现,…

IP协议相关技术

文章目录 IP协议相关技术仅凭IP无法完成通信DNSARP IP协议相关技术 仅凭IP无法完成通信 人们在上网的时候其实很少直接输入某个具体的IP地址。 在访问Web站点和发送、接收电子邮件时,我们通常会直接输入Web网站的地址或电子邮件地址等那些由应用层提供的地址&…

麒麟v10 安装jenkins

1.想安装哪个版本? https://pkg.jenkins.io/redhat-stable/ 我们查看我们想要哪个版本: 4年前安装的是 Jenkins2.279 版本 现在在docker 上安装的是Version 2.425 版本 2.碰到到的问题 1.安装老版本的Jenkins,会出现安装的插件不兼容&…

viple入门(四)

(1)行打印 主要用于在运行窗口中显示数据,打印完成后,自动换行。 注意事项:不可同时打印两个数据,例如 解决方案1:使用或并,使得每次进入行打印的数据只有一个,缺点&am…

文件管理技巧:如何利用文件名关键字进行整理

在日常生活和工作中,我们经常需要处理大量的文件,这些文件可能包含各种类型的信息,如文本、图像、视频、音频等。如何有效地管理和整理这些文件,以便我们能够快速找到所需的文件,是一个非常重要的问题。本文将介绍一种…

Linux shell编程学习笔记22: () $() (()) 的用法小结

最近学习Linux Shell编程,对 () (()) [] [[]]等符号的用法还是有点分不太清楚,于是决定再梳理一下。今天先整理 () $() (()) 的用法。 1 单小括号() 1.1 子shell(命令组) 括号中的命令将会新开一个子shell顺序执行,所…

用vite搭建一个vue3 + ts项目

搭建项目 参考连接https://www.jianshu.com/p/1ce93c7e15a0 练手项目参考 https://www.xuexiluxian.cn/ 1.安装pina vue3 pinia的安装和使用 2.安装router npm install vue-router -S Vue3: Vue-router路由的使用

什么是跨境电商独立站?有什么优势和劣势?

你是否有过这样的经历:当你在网上浏览一些商品时,发现有些网站的域名很奇怪,比如.com、.net、.co等,而且网站的界面和设计也和国内的电商平台不太一样。 你可能会好奇,这些网站是什么?它们是怎么做的&…

学习笔记|Pearson皮尔逊相关系数|Spearman斯皮尔曼相关系数|和Kendall肯德尔tau-b相关系数|分析流程|-SPSS中双变量相关性分析系数

目录 学习目的软件版本原始文档基础概念皮尔逊相关系数基本假设(适用条件):系数的范围及意义实例1. 读数据:2.正态性检验:3.异常值检验(体重):4.分析: 斯皮尔曼相关系数基…

Git的入门详细教程

🏅我是默,一个在CSDN分享笔记的博主。📚📚 ​​ 🌟在这里,我要推荐给大家我的专栏《git》。🎯🎯 🚀无论你是编程小白,还是有一定基础的程序员,这…

企业级,搭建接口自动化测试框架思路分析,8年测试老鸟整理...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 在选择接口测试自…