RabbitMQ 消息丢失解决 (高级发布确认、消息回退与重发、备份交换机)

目录

一、发布确认SpringBoot版本

确认机制图例:

代码实战:

代码架构图:

1.1交换机的发布确认

添加配置类

消息消费者

消息生产者发布消息后的回调接口

测试:

 1.2回退消息并重发(队列的发布确认)

修改回调接口

生产者:

测试:

二、备份交换机

实战

生产者

报警消费者:

测试:


一、发布确认SpringBoot版本

        首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。

确认机制图例:

代码实战:

一个交换机:confirm.exchange,一个队列:confirm.queue,一个消费者:confirm.consumer

其中交换机类型时 direct,与队列关联的 routingKey 是 key1

代码架构图:

1.1交换机的发布确认

配置文件中添加:

server:
  port: 8888
spring:
  rabbitmq:
    host: 192.168.163.132
    port: 5672
    username: 2252631565
    password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法
    publisher-confirm-type: correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

声明交换机和队列,并且将交换机和队列进行绑定:

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE="confirm.exchange";
    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String ROUTING_KEY="key1";

    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE,false,false);
    }

    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    @Bean
    public Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

消息生产者

也可以说是 Controller 层,在这里发送两条消息给两个交换机,其中一个交换机是我们设置好的,另一个交换机不存在;这样就可以清晰看出交换机应答效果。

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData=new CorrelationData("1");
        correlationData.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);
        //向一个不存在的交换机发送消息
        log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData2=new CorrelationData("2");
        correlationData2.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+123,ConfirmConfig.ROUTING_KEY,message,correlationData2);
    }

}

消息消费者

监听 confirm.queue 队列

@Slf4j
@Component
public class ConfirmLetterQueue {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void confirmConsumer(Message message, Channel channel){
        log.info("收到了消息:{}",new String(message.getBody()));
    }
}

消息生产者发布消息后的回调接口

只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this::confirm);
    }
    /*交换机确认回调
    1.交换机收到了消息 触发回调
    1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容
    1.2 ack 交换机收到消息 true
    1.3 cause 交换机收到消息的原因 null
    ---------------------------------
    2.交换机未收到消息 触发回调
    2.1 correlationData 消息的ID以及消息内容
    2.2 ack 交换机未收到消息 false
    2.3 cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("发送消息到交换机成功!消息体为:{}",new String(correlationData.getReturned().getMessage().getBody()));
        }else {
            log.info("发送消息到交换机失败!原因为:{}",cause.toString());
        }
    }
}

测试:

 效果很明显,我们配置的交换机成功收到消息并转发给队列;不存在的交换机没有接受到消息并作出反应。

 1.2回退消息并重发(队列的发布确认)

在配置文件中开启消息回退功能

server:
  port: 8888
spring:
  rabbitmq:
    host: 192.168.163.133
    port: 5672
    username: 2252631565
    password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法
    publisher-confirm-type: correlated
    #    消息回退 当消息未路由至队列时触发
    publisher-returns: true

修改回调接口

实现 RabbitTemplate.ReturnsCallback 接口,并实现方法

@Slf4j
@Component
public class MyCallBack  implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }
    /*交换机确认回调
    1.交换机收到了消息 触发回调
    1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容
    1.2 ack 交换机收到消息 true
    1.3 cause 交换机收到消息的原因 null
    ---------------------------------
    2.交换机未收到消息 触发回调
    2.1 correlationData 消息的ID以及消息内容
    2.2 ack 交换机未收到消息 false
    2.3 cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("时间:{}发送消息到交换机成功!",new Date());
        }else {
            log.info("发送消息到交换机失败!原因为:{}",cause.toString());
        }
    }


    //当消息未路由到队列时触发 只有失败时才触发 若消息发送至延迟队列则一定会触发回退 记得根据交换机名称排除延迟队列
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息:'{}',被交换机:{}回退,回退原因为:{},routingKey为:{}"
                ,new String(returned.getMessage().getBody())
                ,returned.getExchange()
                ,returned.getReplyText()
                ,returned.getRoutingKey());
        //10s后消息重发
        try {
            Thread.sleep(10000);
            log.info("时间:{},生产者重新发消息",new Date());
            rabbitTemplate.convertAndSend(returned.getExchange(),ConfirmConfig.ROUTING_KEY,new String(returned.getMessage().getBody()));
        }catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

生产者:

向交换机中发送消息,指定错误的routingkey,触发队列回退消息并重发消息。

    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        //向一个不存在的队列发送消息
        log.info("时间:{}生产者发送一条的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData2=new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);
    }

测试:

回退未进入队列的消息并重新发送消息。 

二、备份交换机

        什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout (扇出),这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警并可以重发消息。

实战

需要一个备份交换机 backup.exchange,类型为 fanout,该交换机发送消息到队列 backup.queue 和 warning.queue。

 修改高级确认发布 配置类

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE="confirm.exchange";
    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String ROUTING_KEY="key1";

    //备份交换机
    public static final String BACKUP_EXCHANGE="backup.exchange";
    //备份队列
    public static final String BACKUP_QUEUE="backup.queue";
    //报警队列
    public static final String WARNING_QUEUE="warning.queue";
    @Bean
    public DirectExchange confirmExchange(){
        //绑定确认交换机与备份交换机
        Map<String,Object> argument=new HashMap<>();
        argument.put("alternate-exchange",BACKUP_EXCHANGE);
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).withArguments(argument).build();
    }

    //备份交换机
    @Bean
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE);
    }
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    //备份队列
    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }
    //警告队列
    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }
    @Bean
    public Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
    //绑定备份交换机与两个队列
    @Bean
    public Binding BAndBBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("backupQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
    @Bean
    public Binding BAndWBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("warningQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

生产者

        生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        log.info("时间:{}生产者发送两条消息队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData=new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);
        //向一个不存在的队列发送消息
        CorrelationData correlationData2=new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);
    }

报警消费者:

接收不可路由的消息

/**
 * 报警消费者
 */
@Slf4j
@Component
public class WarningConsumer {
    //监听报警消息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)
    public void receiveWarningMsg(Message message){
        String msg=new String(message.getBody());
        log.info("报警发现不可路由消息:{},重发消息",msg);
    }
}

测试:

         生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

        在此案例中,也设置了消息回退的配置,但是没有触发消息回退。由此得出:备份交换机的优先级更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/157546.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CPSC发布含有纽扣电池或硬币电池产品的最终规则!16CFR1263+UL4200A

2023年9月21日&#xff0c;美国消费品安全委员会&#xff08;CPSC&#xff09;在《联邦公报》上发布了纽扣及硬币电池及相关产品的最终规则&#xff08;DFR&#xff09;16 CFR 1263&#xff0c;以保护6岁以下儿童免受电池摄入危害。DFR将于2023年10月23日生效&#xff0c;除非消…

centos7 网卡聚合bond0模式配置

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、什么是网卡bond二、网卡bond的模式三、配置bond0 一、什么是网卡bond 所谓bond&#xff0c;就是把多个物理网卡绑定成一个逻辑上的网卡&#xff0c;使用同一个…

003 OpenCV filter2D

目录 一、环境 二、图像卷积 三、代码演示 3.1、锐化 3.2、sobel边缘&#xff0c;x方向 3.3、sobel边缘&#xff0c;y方向 3.4、高斯模糊 3.5、完整代码 一、环境 本文使用环境为&#xff1a; Windows10Python 3.9.17opencv-python 4.8.0.74 二、图像卷积 在OpenCV…

时间序列预测(6) — ARIMA实现单输入单输出负荷预测

目录 1 数据准备与可视化 2 简单数据探索与清洗 3 差分处理 4 绘制ACF与PACF图像&#xff0c;完成模型选择 5 建立ARIMA和SARIMA模型 5.1 初步建模 5.2 精细化建模 5.3 最终的模型 ARIMA作为成熟的统计学模型已被各种软件以各种方式实现&#xff0c;在Python中我们最常使…

电源地虚接,导致信号线发烫

音频板的信号是经过隔直电容接到音频板的。

【STM32】CRC(循环冗余校验)

一、CRC的背景知识 1、什么是CRC (1)CRC&#xff08;Cyclic Redundancy Check&#xff09;&#xff0c;循环冗余校验 (2)什么是校验&#xff0c;为什么需要校验&#xff1a;数据传输&#xff0c;数据存储过程中需要使用到的 (3)什么是冗余&#xff1a;表示比实际上要传输的数据…

正版软件|Kaspersky 杀毒软件 - 全方位安全软件

卡巴斯基 全方位安全软件 一款产品满足 您的全部安全需求&#xff0c; 通过屡获殊荣的保护产品抵御黑客、病毒和恶意软件&#xff0c;获得无与伦比的安全感。此外还有支付保护和隐私保护工具&#xff0c;为您提供全方位保护。包含高级版功能&#xff1a; 免费 Kaspersky Safe …

使用Jupyter Notebook调试PySpark程序错误总结

项目场景&#xff1a; 在Ubuntu16.04 hadoop2.6.0 spark2.3.1环境下 简单调试一个PySpark程序&#xff0c;中间遇到的错误总结&#xff08;发现版对应和基础配置很重要&#xff09; 注意&#xff1a;在前提安装配置好 hadoop hive anaconda jupyternotebook spark zo…

vulnhub靶机Momentum2

下载地址&#xff1a;https://download.vulnhub.com/momentum/Momentum2.ova 主机发现 端口扫描 端口服务扫描 漏洞扫描 先去看看web 这里面没有什么&#xff0c;就顺手扫一下目录 发现一些可疑的目录 比较正常 再看一下有没有别的web 看到几个新的东西去看看 文件上传 啥都没…

8.jib-maven-plugin构建springboot项目镜像,docker部署配置

目录 1.构建、推送镜像 1.1 执行脚本 2.2 pom.xml配置 ​2.部署镜像服务 2.1 执行脚本 2.2 compose文件 3.docker stack常用命令 介绍&#xff1a;使用goole jib插件构建镜像&#xff0c;docker stack启动部署服务&#xff1b; 通过执行两个脚本既可以实现构建镜像、部…

实时音视频方案汇总

若有好的方案欢迎留言讨论&#xff0c;非常感谢&#xff0c;汇总了一些&#xff0c;从市面上了解的一些低时延的端到端的方案&#xff0c;仅供参照&#xff0c;若有问题&#xff0c;也欢迎留言更正&#xff01; 方案 方案描述 时延 备注 1大华同轴高清电缆200米电缆&#xf…

Mysql之多表查询下篇

Mysql之多表查询下篇 满外连接的实现UNION关键字UNIONUNION ALL操作符 7种SQL JOINS的实现语法格式小结自然连接USING连接表连接的约束条件 满外连接的实现 在上篇博客中&#xff0c;我们可以了解到在Mysql中是不支持FULL JOIN来实现 满外连接的&#xff0c;那么我们在Mysql采用…

“鸿蒙之父”确认鸿蒙 PC 来了;腾讯是囤积 AI 芯片最多的中国科技公司之一丨 RTE 开发者日报 Vol.87

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE &#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

Cow Lineup S——离散化、单调队列、双指针

题目描述 思路 x、id不大于1亿&#xff0c;数据量太大&#xff0c;使用离散化将id离散化成一串从1开始连续的编号&#xff0c;使用map集合进行离散化使用双指针维护一段区间&#xff0c;这段区间满足每个编号都包含 如何使用map集合进行离散化&#xff1f; 维护一个变量nums…

如何将中文翻译成荷兰语?

随着中国的崛起&#xff0c;荷兰与中国的交流日益频繁。越来越多的企业和个人需要荷兰语翻译服务。那么&#xff0c;如何将中文翻译成荷兰语&#xff0c;北京哪家翻译公司比较专业&#xff1f; 专业人士指出&#xff0c;要提供优质的荷兰语翻译服务&#xff0c;不仅需要扎实的荷…

如何为初创企业选择合适的 ERP 系统?

**ERP系统**是制造、分销、供应链、金融、会计、风险管理等多个行业必不可少的企业技术解决方案。不论垂直行业、企业规模或目标受众如何&#xff0c;将ERP作为企业管理战略的核心部分都非常重要。 对于渴望发展的小型企业和初创企业来说&#xff0c;更是如此。大型企业需要对…

Maven依赖管理项目构建工具(保姆级教学)

一、Maven介绍 官网地址&#xff1a;Maven – Introduction Maven 是一款为 Java 项目管理构建、依赖管理的工具&#xff08;软件&#xff09;&#xff0c;使用 Maven 可以自动化构建、测试、打包和发布项目&#xff0c;大大提高了开发效率和质量。 Maven就是一个软件&#…

mysql索引学习案例

简单的学习一下mysql普通索引 这是一个小的案例 一、创建表SQL CREATE TABLE group_order (id int(11) NOT NULL AUTO_INCREMENT,group_seq varchar(64) COLLATE utf8mb4_bin NOT NULL COMMENT 拼单号,group_status int(8) NOT NULL COMMENT 100 待提货, 200 已提货, 300 已…

2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷

2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷 2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷A模块基础设施设置/安全加固&#xff08;200分&#xff09;A-1&#xff1a;登录安全加固&#xff08;Windows, Linux&#xff09;A-2&#…

vite+vue3+ts中watch和watchEffct的使用

vitevue3ts中watch和watchEffct的使用 本文目录 vitevue3ts中watch和watchEffct的使用watchrefreactivepropsimmediate组合监听 watchEffect单值多值侦听副作用停止监听 watch vue官方文档&#xff1a;https://cn.vuejs.org/api/reactivity-core.html#watch 可以监听基础类型&…