rabbitMQ对消息不可达处理-备份交换机/备份队列

在这里插入图片描述

生产者发送消息,在消息不可达指定队列时,可以借助扇出类型交换机(之前写过消息回退的处理方案,扇出交换机处理的方案优先级高于消息回退)处理不可达消息,然后放置一个备份队列,供消费者处理不可达消息,同时也加一个报警队列,对于不能走正常流程的消息进行消费者告警。

先用方法配置类把各个组件声明:

在这里插入图片描述

package com.esint.configs;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class BackupConfig {

    /**
     * 定义组件常量名字
     */
    //交换机- 确认交换机额
    public static final String EXCHANGE_SURE = "sure.ex";
    //交换机- 备份交换机额
    public static final String EXCHANGE_BACK = "backup.ex";

    //队列- 正常确认队列
    public static final String QUEUE_SURE = "sure.queue";
    //队列-备份队列
    public static final String QUEUE_BACKUP = "backup.queue";
    //队列-警告队列
    public static final String QUEUE_WARN = "warn.queue";

    //routing-key
    public static final String ROUTING_KEY_SURE = "key1";

    /**
     * 声明组件
     */
    //确认交换机
    @Bean("sureExchange")
    public DirectExchange sureExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange",EXCHANGE_BACK);
        return ExchangeBuilder.directExchange(EXCHANGE_SURE).durable(true).withArguments(arguments).build();

    }

    //备份交换机
    @Bean("backExchange")
    public FanoutExchange backExchange(){
        return new FanoutExchange(EXCHANGE_BACK);
    }

    //确认队列
    @Bean("sureQueue")
    public Queue sureQueue(){
        return QueueBuilder.durable(QUEUE_SURE).build();
    }

    //备份队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(QUEUE_BACKUP).build();
    }

    //警告队列
    @Bean("warnQueue")
    public Queue warnQueue(){
        return QueueBuilder.durable(QUEUE_WARN).build();
    }

    /**
     * 绑定组件  确认队列 绑定 确认交换机 with key1
     */
    @Bean
    public Binding sureQueueBindingSureExchange(@Qualifier("sureQueue") Queue sureQueue,
                                             @Qualifier("sureExchange")DirectExchange sureExchange){
        return BindingBuilder.bind(sureQueue).to(sureExchange).with(ROUTING_KEY_SURE);

    }

    /**
     * 绑定组件 备份队列 绑定 备份交换机
     */
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
                                                    @Qualifier("backExchange")FanoutExchange backExchange){

        return  BindingBuilder.bind(backupQueue).to(backExchange);
    }

    /**
     * 绑定组件 警告队列 绑定 备份交换机
     */
    @Bean
    public Binding warnQueueBindingBackupExchange(@Qualifier("warnQueue") Queue warnQueue,
                                                  @Qualifier("backExchange")FanoutExchange backExchange){

        return  BindingBuilder.bind(warnQueue).to(backExchange);

    }
}

生产者: 我们做出两个方法,一个可正常进行流程,一个routingKey异常无法路由到指定队列
package com.esint.controller;

import com.esint.configs.BackupConfig;
import com.esint.constants.ResponseCode;
import com.esint.entity.ResponseEntity;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.kafka.clients.producer.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutionException;


@Api(value = "rabbitMQ-备份队列测试")
@RestController
@RequestMapping("/rabbit")
public class BackUpExchangeController {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ApiOperation(value = "routingKey正常测试",httpMethod = "GET",tags = {"去正常流程"})
    @ApiImplicitParams({
            @ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")
    })
    @ResponseBody
    @RequestMapping(value = "/test1", method = RequestMethod.GET)
    public ResponseEntity test1(String str ) {

        rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE,str);
        return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyOk:"+str);

    }

    @ApiOperation(value = "routingKey非正常测试",httpMethod = "GET",tags = {"去备份-警告"})
    @ApiImplicitParams({
            @ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")
    })
    @ResponseBody
    @RequestMapping(value = "/test2", method = RequestMethod.GET)
    public ResponseEntity test2(String str ) {

        rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE+"wrong",str);

        return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyWrong:"+str);

    }

}

三个消费者分别监听正常队列 备份队列 警告队列

确认队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer01 {

    @RabbitListener(queues = BackupConfig.QUEUE_SURE)
    public void reveiver(Message message){
        log.info("正常消费者C1:" +  new String(message.getBody()),"UTF-8");
    }

}

备份队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer02 {

    @RabbitListener(queues = BackupConfig.QUEUE_BACKUP)
    public void reveiver(Message message){
        log.info("备份消费者C2:" +  new String(message.getBody()),"UTF-8");
    }
}

警告队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer03 {

    @RabbitListener(queues = BackupConfig.QUEUE_WARN)
    public void reveiver(Message message){
        log.info("警告消费者C3:" +  new String(message.getBody()),"UTF-8");
    }

}

测试:

1.正常流程测试
在这里插入图片描述
在这里插入图片描述

 com.esint.consumer.BackUpConsumer01      : 正常消费者C1:你好啊 正常队列

2.路由不达消息测试

在这里插入图片描述
在这里插入图片描述

com.esint.consumer.BackUpConsumer03      : 警告消费者C3:这个消息不可达 routing-key不对 它去哪里了?
com.esint.consumer.BackUpConsumer02      : 备份消费者C2:这个消息不可达 routing-key不对 它去哪里了?

测试达到预期结果!

在队列消息不可达时,备份交换机处理优先级高于消息回退处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/204865.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解压压缩包脚本

解压压缩包脚本 我们一般解压一个压缩包&#xff0c;需要三步&#xff1a; 1、打开压缩包。 2、点击解压文件。 3、选择解压目录解压到指定文件夹。 那么&#xff0c;怎么一步完成这些繁琐的操作呢&#xff1f;编写一个bat脚本即可&#xff0c;如下所示&#xff1a; 1、安装解压…

【人工智能Ⅰ】实验5:AI实验箱应用之贝叶斯

实验5 AI实验箱应用之贝叶斯 一、实验目的 1. 用实验箱的摄像头拍摄方块上数字的图片&#xff0c;在图像处理的基础上&#xff0c;应用贝叶斯方法识别图像中的数字并进行分类。 二、实验内容和步骤 1. 应用实验箱机械手臂上的摄像头拍摄图像&#xff1b; 2. Opencv处理图像…

Wish防关联是什么?Wish要怎样避免违规封店?

四大跨境电商平台之一wish&#xff0c;做跨境电商的很多人可能都听过wish。随着wish不断完善平台制度&#xff0c;对于多账号运营的卖家要求越来越严厉&#xff0c;wish和亚马逊、eBay等其它跨境电商平台一样&#xff0c;不支持一个卖家开设多个账号多家店铺。 但是对于各位卖家…

2023/11/24JAVAweb学习(Vue常用指令,Vue.js文件,Ajax,Axios两种请求,Vue-cli脚手架,Vue项目,Element)

age只会执行成立的,show其实都展示了,通过display不展示 使用Vue,必须引入Vue.js文件 假如运行报错,以管理员身份打开vscode,再运行 ------------------------------------------------------------------- 更改端口号

2023/11/30JAVAweb学习

数组json形式 想切换实现类,只需要只在你需要的类上添加 Component 如果在同一层,可以更改扫描范围,但是不推荐这种方法 注入时存在多个同类型bean解决方式

C语言——多种方式打印出1000之内的所有的“水仙花数”

所谓水仙花数,是指一个3位数,其各位数字立方和等于该数本身。水仙花数是指一个三位数&#xff0c;它的每个位上的数字的立方和等于它本身。例如&#xff0c;153是一个水仙花数&#xff0c;因为1^3 5^3 3^3 153。 方法一 #define _CRT_SECURE_NO_WARNINGS 1#include <std…

制造业如何做生产设备管理、分析生产数据?

本文将为大家讲解&#xff1a;1、设备管理的现状与问题&#xff1b;2、设备管理系统功能&#xff1b;3、制造业企业如何做生产设备管理、分析生产数据&#xff1f;4、制造业设备管理的价值。 想要管理好设备&#xff0c;设备档案管理、巡检、报修、保养、分析预警等问题都是必须…

Python list列表添加元素的3种方法及删除元素的3种方法

Python list列表添加元素的3种方法 Python list 列表增加元素可调用列表的 append() 方法&#xff0c;该方法会把传入的参数追加到列表的最后面。 append() 方法既可接收单个值&#xff0c;也可接收元组、列表等&#xff0c;但该方法只是把元组、列表当成单个元素&#xff0c;这…

解决电脑蓝屏问题:SYSTEM_THREAD_EXCEPTION_NOT_HANDLED,回到系统还原点

解决电脑蓝屏问题&#xff1a;SYSTEM_THREAD_EXCEPTION_NOT_HANDLED&#xff0c;回到系统还原点 1&#xff0c;蓝屏显示问题1.1&#xff0c;蓝屏1&#xff0c;清楚显示1.2&#xff0c;蓝屏2&#xff0c;模糊显示 2&#xff0c;排除故障问题3&#xff0c;解决蓝屏的有效方法 1&a…

本地事务和分布式事务

请直接看原文 原文链接:彻底搞清楚什么是分布式事务 - 知乎 (zhihu.com) -------------------------------------------------------------------------------------------------------------------------------- 1、什么是本地事务 多个sql操作,被同一个线程执行, 使用…

MedicalTransformer论文解读

论文是一个分割任务&#xff0c;但这里的方法不局限于分割&#xff0c;运用到检测、分类都可以。 论文下载 https://www.yuque.com/yuqueyonghupjh9oc/ovceh4/onilw42ux6e9n1ne?singleDoc# 《轴注意力机制》 一个问题 为什么transformer一开始都有CNN&#xff1a;降低H、W…

AWS EC2 如何 使用 SSM会话管理器登陆

首先只有特定版本的OS会默认附带SSM Agent。 预安装了 SSM Agent 的 Amazon Machine Images&#xff08;AMIs&#xff09; - AWS Systems Manager 其次EC的instance role必须有一个叫“AmazonSSMManagedInstanceCore”的策略 如何给IAM User赋权&#xff0c;让他们可以使用SSM…

教育企业CRM选择技巧

教育行业的发展一波三折&#xff0c;要想在激烈的赛道脱颖而出&#xff0c;就需要有一套有效的CRM系统&#xff0c;来帮助教育机构提升招生效率、增加学员留存、提高教学质量。下面说说&#xff0c;教育企业选择CRM系统要具备的四大功能。 1、招生管理功能 教育机构的首要目标…

Java的threadd常用方法

常用API 给当前线程命名 主线程 package com.itheima.d2;public class ThreadTest1 {public static void main(String[] args) {Thread t1 new MyThread("子线程1");//t1.setName("子线程1");t1.start();System.out.println(t1.getName());//获得子线程…

C 语言-数组

1. 数组 1.1 引入 需求&#xff1a;记录班级10个学员的成绩 需要定义10个变量存在的问题:变量名起名困难变量管理困难需求&#xff1a;记录班级1000个学员的成绩 1.2 概念 作用&#xff1a;容纳 数据类型相同 的多个数据的容器 。 特点&#xff1a; 长度不可变容纳 数据类型…

微服务--07--Seata 分布式事务

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 分布式事务1.认识Seata2.部署TC服务2.1.准备数据库表2.2.准备配置文件2.3.Docker部署 3.微服务集成Seata3.1.引入依赖3.2.改造配置3.3.添加数据库表3.4.测试 分布式…

【力扣:526】优美的排列

状态压缩动态规划 原理如下&#xff1a; 遍历位图可以得到所有组合序列&#xff0c;将这些序列的每一位看作一个数&#xff0c;取序列中1总量的值作为每轮遍历的位&#xff0c;此时对每个这样的位都能和所有数进行匹配&#xff0c;因为一开始就取的是全排列&#xff0c;并且我们…

聚类算法Sklearn实践

聚类算法是一种常用的无监督学习方法&#xff0c;用于将数据集划分为具有相似特征的组或簇。在实践中&#xff0c;为了方便快捷地应用聚类算法&#xff0c;可以使用Scikit-learn&#xff08;简称Sklearn&#xff09;这个强大的Python机器学习库。Sklearn提供了丰富的聚类算法实…

Python自动化测试面试经典题

相信大家经历过许多面试都会有这样的感受&#xff1a;好不容易通过了 2 -3轮技术面试&#xff0c;但是薪资不够理想&#xff1b;要么被面试的测试专家虐的不要不要的。但每一次的面试也能让自己认识到不足之处&#xff0c;这样才有利于后续拿到理想的offer。 牛鹭学院的学子对…

Python 进阶(十二):随机数(random 模块)

《Python入门核心技术》专栏总目录・点这里 文章目录 1. 导入random库2. 常用随机数函数2.1 生成随机浮点数2.2 生成随机整数2.3 从序列中随机选择2.4 随机打乱序列3. 设置随机数种子4. 应用实例4.1 游戏开发4.2 数据分析4.3 加密与安全4.4 模拟实验5. 总结大家好,我是水滴~~ …