RabbitMQ之发送者(生产者)可靠性

文章目录

  • 前言
  • 一、生产者重试机制
  • 二、生产者确认机制
    • 实现生产者确认
      • (1)定义ReturnCallback
      • (2)定义ConfirmCallback
  • 总结


前言

生产者重试机制、生产者确认机制。


一、生产者重试机制

  • 问题:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
  • 解决:SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

实现:
需要配置application.yaml文件

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

我们利用命令停掉RabbitMQ服务:

docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

二、生产者确认机制

  • 一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
  • 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
    • MQ内部处理消息的进程发生了异常
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
  • 针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
  • 其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
    默认两种机制都是关闭状态,需要通过配置文件来开启。

实现生产者确认

发送者的application.yaml配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

(1)定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在发送者定义一个配置类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback(回调)
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息的return callback, exchange:{},key:{}, msg:{}, code:{},text:{}", returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());

            }
        });
    }
}

(2)定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
在这里插入图片描述

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
在这里插入图片描述
发送消息的测试类(添加ConfirmCallback):

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息   RoutingKey是错误的
    rabbitTemplate.convertAndSend("dragon.direct", "q", "hello", cd);
}

由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。而如果连交换机都是错误的,则只会收到nack。

注意:

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

总结

以上就是全部讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/188662.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(Matalb回归预测)GA-BP遗传算法优化BP神经网络的多维回归预测

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分代码&#xff1a; 四、分享本文全部代码数据说明手册&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于M…

IT行业多项目管理的方法与策略:优化资源分配与提升项目成功率

多项目管理已成为项目经理们面临的常态&#xff0c;IT行业如何高效进行项目管理呢&#xff1f; 多项目管理过程中存在的问题 1、多类型项目并行&#xff0c;项目流程掺杂混乱&#xff0c;项目进度难以监控&#xff0c;反应缓慢&#xff0c;容易产生延误风险。 2、团队资源有…

arp报文及使用go实现

一、ARP协议报文格式及ARP表 ARP&#xff08;Address Resolution Protocal&#xff0c;地址解析协议&#xff09;是将IP地址解析为以太网的MAC地址&#xff08;或者称为物理地址&#xff09;的协议。在局域网中&#xff0c;当主机或其他网络设备有数据要发送给另一个主机或设备…

03:2440--UART

目录 一:UART 1:概念 2:工作模式 3:逻辑电平 4:串口结构图 5:时间的计算 二:寄存器 1:简单的UART传输数据 A:GPHCON--配置引脚 B:GPHUP----使能内部上拉​编辑 C: UCON0---设置频率115200 D: ULCON0----数据格式8n1 E:发送数据 A:UTRSTAT0 B:UTXHO--发送数据输…

二维数值型数组例题2

1、内部和 题目描述 给定一个m行n列的二维矩阵&#xff0c;求其内部元素和 输入要求 第一行为两个整数&#xff1a;m和n&#xff08;0<m,n<10&#xff09;&#xff0c;接下来输入m*n的二维矩阵 输出要求 二维矩阵内部元素和 输入样例 3 3 1 2 3 4 5 6 7 8 9 …

基于python+Django+SVM算法模型的文本情感识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介1. 简介2. 技术栈3. 系统架构4. 关键模块介绍5. 如何运行 二、功能三、系统四. 总结 一项目简介 # 基于 Python Django SVM 算法模型的文本情感识别系统介…

芯片安全和无线电安全底层渗透技术

和传统网络安全不同&#xff0c;硬件安全、芯片安全、无线电安全属于网络底层安全的重要细分领域&#xff0c;是网络安全的真正基石&#xff0c;更是国家安全的重要组成部分&#xff0c;“夯实网络底层安全基础&#xff0c;筑牢网络强国安全底座”&#xff0c;是底网安全重要性…

基于51单片机的百叶窗控制系统设计

**单片机设计介绍&#xff0c; 基于51单片机的百叶窗控制系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于51单片机的百叶窗控制系统设计可以分为硬件设计和软件设计两个方面。下面是一个简要的设计介绍&#xff1a; …

阿里云服务器安装mysql数据库之后无法远程连接

目录 一、mysql安装完成后直接远程远程连接阿里云服务器上的MySQL会报下述错误&#xff1a; 1、修改root用户的host 为% 登录MySQL 后 执行 2、修改完成后执行 3、退出mysql 重启mysql服务 exit; 4、修改完成后需要设置阿里云的安全规则。 二、dbaver测试链…

docker devicemapper: Error running DeleteDevice dm_task_run failed

docker 删除容器&#xff0c;遇到&#xff1a; devicemapper: Error running DeleteDevice dm_task_run failed 异常 [hadoophadoop02 ~]$ sudo docker rm 5ede1280f0bf Error response from daemon: container 5ede1280f0bf791e91d40038b15decd42e8923546ae578abd96e08114c76…

JVM——垃圾回收(方法区中的垃圾回收和(堆回收)自动垃圾回收)

目录 1.自动垃圾回收介绍1.C/C的内存管理2.Java的内存管理3.垃圾回收的对比 2.方法区的回收方法区的回收 – 手动触发回收 3.堆回收1.引用计数法2.可达性分析算法 1.自动垃圾回收介绍 1.C/C的内存管理 ⚫ 在C/C这类没有自动垃圾回收机制的语言中&#xff0c;一个对象如果不再…

【差旅游记】走进新疆哈密博物馆

哈喽&#xff0c;大家好&#xff0c;我是雷工&#xff01; 前些天在新疆哈密时&#xff0c;有天下午有点时间&#xff0c;看离住的宾馆不远就是哈密博物馆&#xff0c;便去逛了逛博物馆&#xff0c;由于接下来的一段时间没顾上记录&#xff0c;趁今天有些时间简单记录下那短暂的…

蓝桥杯每日一题2023.11.25

题目描述 “蓝桥杯”练习系统 (lanqiao.cn) 题目分析 对于此题目最开始是递归想法&#xff0c;但发现题意中的匹配次数实际上是指在这8个字母中这8个字母每个字母对应的个数是否相同&#xff0c;如果相同则匹配。 此处我们可以使用subsrtr函数&#xff0c;每次循环截取8个字…

值得看的书--《全宋词》节选

(https://img-blog.csdnimg.cn/5d5fe2844f6646b5b7b415f0a9e80f6c.jpg)

基于uniapp的 电子书小程序——需求整理

前言 想开发一个很简单的 电子书阅读小程序&#xff0c;要怎么做的。下面从功能、数据库设计这一块来说一下。说不一定能从某个角度提供一些思路 开发语言 springcloud uniapp 小程序&#xff08;vue2&#xff09;mysql 说明 电子书的主题是电子书&#xff0c;我们在日常…

Elasticsearch集群部署 head监控插件 Kibana部署 Nginx反向代理 Logstash部署

一、组件介绍1、Elasticsearch&#xff1a;2 、Logstash3、Kibana4、Kafka&#xff1a;5、Filebeat: 二、 Elasticsearch集群部署服务器创建用户安装ES修改配置文件创建数据和日志目录设置JVM堆大小 #7.0默认为4G修改安装目录及存储目录权限系统优化&#xff08;1&#xff09;增…

2024年天津天狮学院专升本护理学专业《内外科护理学》考试大纲

天津天狮学院2024年护理学专业高职升本入学考试《内外科护理学》考试大纲 一、考试性质 《内外科护理学》专业课程考试是天津天狮学院护理专业高职升本入学考试的必考科目之一&#xff0c;其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《内外科护理学…

完蛋!我被ConcurrentHashMap源码包围了!(一)

文章目录 1. 引言2. 使用3. 初始化4. 存储流程5. 取值流程6. 扩容流程 1. 引言 ConcurrentHashMap是一个线程安全的HashMap&#xff0c;在JDK1.7与JDK1.8&#xff0c;无论是实现还是数据结构都会有所不一样。这促使了ConcurrentHashMap有着HashMap一样的面试高频考点。 接下来…

【Linux】进程间通信——system V共享内存、共享内存的概念、共享内存函数、system V消息队列、信号量

文章目录 进程间通信1.system V共享内存1.1共享内存原理1.2共享内存数据结构1.3共享内存函数 2.system V消息队列2.1消息队列原理 3.system V信号量3.1信号量原理3.2进程互斥 4.共享内存的使用示例 进程间通信 1.system V共享内存 1.1共享内存原理 共享内存区是最快的IPC形式…

递归剪枝题

期中考终于考完了&#xff0c;整道题奖励下自己 我一北大同学问我的&#xff0c;说他递归超时了&#xff0c;叫我想一个办法 后面他说他加了个剪枝就过了&#xff0c;然后我自己尝试了一个方法&#xff1a; 就是先把城市按1到n排列&#xff0c;然后考虑互换&#xff0c;如果互…