RabbitMQ实现消息发送接收——实战篇(路由模式)

本篇博文将带领大家一起学习rabbitMQ如何进行消息发送接收,我也是在写项目的时候边学边写,有不足的地方希望在评论区留下你的建议,我们一起讨论学习呀~

需求背景

先说一下我的项目需求背景,社区之间可以进行物资借用,当有社区提交物资借用申请时,需要通过RabbitMQ将这条消息发送到被借用物资的社区,同时在界面进行提示。

先把依赖引入一下

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

application.yml做好配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

工具类实现

先选择以何种方式进行消息发送,这里根据需求我选择使用RabbitMQ的路由模式进行消息发送,先来配置一下相应工具类:

先配置RabbitMQ的配置类

/**
 * @Title: RabbitMQConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 13:58
 * @description: RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

//    声明一个交换机
    @Bean
    public DirectExchange borrowMaterialExchange(){
        return new DirectExchange("borrow_material_exchange");
    }

//    动态绑定队列时使用的方法(具体绑定逻辑在下面的监听器中实现)
//    @Bean
//    public Queue communityQueue(){
//        return new Queue("communityQueue");
//    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

为了确保消息发送和接收时都以 JSON 格式处理,可以在 Spring 配置中添加 Jackson2JsonMessageConverter。这样,发送端会将 MaterialBorrowing 对象序列化为 JSON,接收端会自动将 JSON 反序列化回 MaterialBorrowing 对象。 

绑定交换机和对应队列

/**
 * @Title: RabbitMQBindRoutingConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 14:21
 * @description:  动态绑定路由配置
 */
@Component
@Slf4j
public class RabbitMQBindRoutingConfig {

    @Autowired
    private DirectExchange borrowMaterialExchange;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * 以社区ID为路由键,为指定社区动态创建队列并绑定到交换机
     * @param communityId 社区ID
     */

    public void bindRouting(String communityId){
//        创建队列
        Queue queue = new Queue("queue_" + communityId);
//        动态绑定交换机和指定队列
        Binding binding = BindingBuilder.bind(queue)
                .to(borrowMaterialExchange)
                .with(communityId);
        rabbitAdmin.declareExchange(borrowMaterialExchange);
        rabbitAdmin.declareBinding(binding);
        log.info("队列绑定成功,社区ID----》" + communityId + ",队列名称----》" + queue.getName() + ",交换机名称----》" + borrowMaterialExchange.getName());

    }


}

动态声明队列

@Configuration
@Slf4j
public class QueueDeclareConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    public void dynamicDeclareQueue(String communityId){
        String queueName = String.format("queue_%s",communityId);

        Queue queue = new Queue(queueName,true);
        rabbitAdmin.declareQueue(queue);
        log.info("队列声明成功");
    }
}

在创建声明队列的时候,我们希望的是根据我们的规则在调用接口的时候去创建指定名称的队列,所以可以使用动态声明对列而不是直接在平台上进行配置。

消息发送

@Component
@Slf4j
public class MessageSendConfig {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message,String communityId){
        System.out.println("发送消息:" + message);
        amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);
        log.info("发送消息成功------->"+message);
    }

}

消息接收(动态声明与监听结合),这里你可以先思考一下为什么要用这种方式实现消息接收,而不是使用@RabbitListener去动态获取某个队列接收消息。

/**
 * @Title: MessageRecieveConfig
 * @Author yinan
 * @Package com.yinan.config.RabbitConfig
 * @Date 2024/12/13 12:53
 * @description: 动态监听接收消息
 */
@Service
@Slf4j
public class MessageRecieveConfig<T> {


    private final ConnectionFactory connectionFactory;


    public MessageRecieveConfig(ConnectionFactory connection) {
        this.connectionFactory = connection;
    }

    public void recieveMessage(String communityId,Class<T> objectType){
        String queueName = String.format("queue_%s",communityId);
//        创建监听容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(queueName);
//        处理消息消费逻辑
        container.setMessageListener(message -> {
            try {
                // 将字节数组转换为字符串
                String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("接收到的消息:" + messageBody);

                // 如果需要将消息解析为对象(例如 MaterialBorrowing)
                ObjectMapper objectMapper = new ObjectMapper();
                T result = objectMapper.readValue(messageBody, objectType);
                System.out.println("反序列化后的消息:" + result);
            } catch (Exception e) {
                log.error("处理消息时发生错误:", e);
            }
        });
        // 确保自动确认
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.start();
        log.info("动态监听已启动,监听队列------->"+queueName);
    }

}

在你的项目中分别调用就行了,需要注意的是你必须确保在消息发送的时候你的队列已经创建完成且和对应交换机进行了绑定,不然可能会导致消息发送失败。

ok,我们启动项目

你会发现你的项目根本启动不起来,原因是因为对于 Spring AMQP 的监听器来说,必须确保监听的队列已经存在于 RabbitMQ 中,否则会抛出类似 DeclarationException 的错误。

所以我们考虑可以通过动态声明队列,在程序运行时确保 RabbitMQ 上创建好所需的队列。

动态声明队列的含义

动态声明队列是指程序在运行时,通过代码检查或创建 RabbitMQ 中尚不存在的队列,而不是手动预先配置好所有队列。这种方式可以自动帮你在 RabbitMQ 中创建所需的队列,而无需手动操作。

这里说一下为什么需要动态绑定队列而不直接使用@RabbitListener?

为什么使用 SimpleMessageListenerContainer 动态绑定队列

  • SimpleMessageListenerContainer 不需要在项目启动时绑定队列。你可以在用户调用接口时动态创建队列,并动态监听它。
  • 特点
    • 队列在用户调用接口时才会被动态创建(通过 RabbitAdmin 或其他机制)。
    • 动态创建队列和监听时,项目启动时不会尝试绑定不存在的队列,因此不会报错。
  • 适用场景:非常适合动态队列需求,比如队列名依赖用户输入或业务逻辑,且不想在项目启动时绑定固定的队列。

使用 @RabbitListener 的情况

@RabbitListener 会在项目启动时绑定到指定的队列。

  • 要求:如果绑定的队列在 RabbitMQ 中不存在,项目启动时就会抛出异常,类似 DeclarationException,这也就是上面为什么会报错的原因。
  • 解决办法
    • 提前创建队列:在 RabbitMQ 中手动创建队列,或通过 RabbitAdmin 在项目启动时自动创建队列。
    • 动态队列名:如果队列名是动态的,可以结合 SpEL 表达式,但队列仍然需要在项目启动时确保存在。
SpEL 表达式

如果你的需求中已经确定队列已经创建好的,但是需要动态去获取队列,可以使用如下形式:

@RabbitListener(queues = "#{T(java.lang.String).format('queue_%s', 'borrowedCommunityId')}")

这个表达式 是 Spring AMQP 中用于动态指定队列名称的 SpEL 表达式(Spring Expression Language),它的作用就是会动态生成一个队列名称,基于你传入的参数构造队列名。

详解
1. 关键部分解析
  • T(java.lang.String)

    • T 是 SpEL 用于引用 Java 类 的方式。
    • java.lang.String 是目标 Java 类,表明你可以调用 String 类的静态方法。
  • .format()

    • String.format() 是 Java 中的静态方法,用于格式化字符串。
    • 格式化字符串的格式是 'queue_%s'%s 是占位符,用于拼接动态内容。
  • 'queue_%s'

    • 这是格式化字符串的模板。%s 表示字符串占位符。
  • 动态参数(例如 borrowedCommunityId

    • 它会替换 %s,生成队列名。例如,当 borrowedCommunityId 的值是 123 时,结果是:queue_123
2. 具体实例

假设 borrowedCommunityId = "123"

String result = String.format("queue_%s", "123");
System.out.println(result); // 输出:queue_123

在 SpEL 中,这等同于:

queues = "#{T(java.lang.String).format('queue_%s', '123')}"

这会动态生成队列名称为 queue_123


为什么用 SpEL?

Spring AMQP 的 @RabbitListener 注解中,queues 参数支持 SpEL 表达式。这使得我们可以动态决定要监听的队列,而不是写死某个固定的队列名称。


实际应用场景

就比如在我的代码中,可能有多个社区队列,例如:

  • queue_123(社区 ID 为 123 的队列)
  • queue_456(社区 ID 为 456 的队列)

使用 queues = "#{T(java.lang.String).format('queue_%s', borrowedCommunityId)}",可以动态生成不同社区的队列名称,从而实现按社区路由的功能。

启动项目之后,调用接口就可以发送消息了

但是你会发现消息消费的逻辑并没有在控制台中打印出来,这个时候你就要考虑是不是以下几个问题了:

交换机和队列是否已经绑定成功(可以在平台上进行查看)

是否绑定到了对应的交换机:amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);红色部分指定交换机名称,如果不指定,那么就会使用默认的交换机,所以肯定也是接收不到值的。

当然,还有其他可能,如果你的项目中遇到了,可以在评论区留言,我们一起学习~

最后,重新修改代码调用接口,就可以接收到消息了

对于在界面进行消息提示的功能,这里先不写出来了,我会在后面的博客中进行更新~

【都看到这了,点赞加关注,收藏不迷路呀~】😚😚

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/936461.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机进制的介绍

一.进制介绍 对于整数&#xff0c;有四种表示方式: 1&#xff09;二进制:0,1&#xff0c;满2进1。 在golang中&#xff0c;不能直接使用二进制来表示一个整数&#xff0c;它沿用了c的特点。 参考:Go语言标准库文档中文版 | Go语言中文网 | Golang中文社区 | Golang中国 //赋值…

基于卷积神经网络的Caser算法

将一段交互序列嵌入到一个以时间为纵轴的平面空间中形成“一张图”后&#xff0c;基于卷积序列嵌入的推荐&#xff08;Caser&#xff09;算法利用多个不同大小的卷积滤波器&#xff0c;来捕捉序列中物品间的点级&#xff08;point-level&#xff09;、联合的&#xff08;union-…

基于STM32设计的粮食仓库(粮仓)环境监测系统

一、前言 当前项目使用的相关软件工具、传感器源代码工程已经上传到网盘&#xff08;实时更新项目内容&#xff09;&#xff1a;https://ccnr8sukk85n.feishu.cn/wiki/QjY8weDYHibqRYkFP2qcA9aGnvb?fromfrom_copylink 1.1 项目开发背景 随着现代农业的发展和粮食储存规模的…

计算机网络-传输层 TCP协议(上)

目录 报头结构 TCP的可靠传输机制 核心机制一&#xff1a;确认应答 TCP的序号和确认序号 核心机制二&#xff1a;丢包重传 核心机制三&#xff1a;连接管理 建立连接-三次握手 断开连接-四次挥手 核心机制四&#xff1a;滑动窗口 数据包已经抵达, ACK被丢了 数据包就…

【经验分享】容器云运维的知识点

最近忙于备考没关注&#xff0c;有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源&#xff0c;但我以交流、交换为主&#xff0c;笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟&#xff0c;为了避免更多人花没必要的钱&#xff0c;所以决定公…

纯血鸿蒙崛起,原生Android挑战?两大操作系统巅峰对决,智能设备未来谁主沉浮?

鸿蒙HarmonyOS和原生Android系统虽然在一些方面相似&#xff0c;但在架构、设计理念、API、开发工具等方面存在一些差异。鸿蒙系统的目标是跨设备、分布式的操作系统&#xff0c;强调多设备协同和资源共享&#xff0c;而Android则主要集中在智能手机和移动设备领域。 下面将从…

新手快速入门!低功耗4G模组Air780E——使用文件系统存储温湿度数据来啦~

小伙伴们&#xff0c;今天我们来学习低功耗4G模组Air780E快速入门之使用文件系统存储温湿度数据。一起接着看下去吧&#xff01; 一、编写脚本 1.1 准备资料 780E开发板 780E开发板设计资料 LuatOS-Air780E-文件系统的使用-程序源码demo TCP/UDP测试服务器 API使用介绍 …

vscode中插件ofExtensions的debug模式也无法查看U、p等openfoam中foam类型的变量

插件介绍&#xff1a; 主要内容如下&#xff1a; 以自编译的$HOME/OpenFOAM-7例&#xff0c;如果OFdebugopt设置为WM_COMPILE_OPTIONDebug&#xff0c;那最终的激活环境的命令为source $HOME/OpenFOAM/OpenFOAM-8/etc/bashrc WM_COMPILE_OPTIONDebug&#xff0c;这时候$FOAM_…

【收藏】Cesium 限制相机倾斜角(pitch)滑动范围

1.效果 2.思路 在项目开发的时候&#xff0c;有一个需求是限制相机倾斜角&#xff0c;也就是鼠标中键调整视图俯角时&#xff0c;不能过大&#xff0c;一般 pitch 角度范围在 0 至 -90之间&#xff0c;-90刚好为正俯视。 在网上查阅了很多资料&#xff0c;发现并没有一个合适的…

【经验分享】私有云运维的知识点

最近忙于备考没关注&#xff0c;有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源&#xff0c;但我以交流、交换为主&#xff0c;笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟&#xff0c;为了避免更多人花没必要的钱&#xff0c;所以决定公…

Please activate LaTeX Workshop sidebar item to render the thumbnail of a PDF

Latex代码中使用pdf图片&#xff0c;无法预览&#xff0c;提示&#xff1a; Please activate LaTeX Workshop sidebar item to render the thumbnail of a PDF 解决办法&#xff1a; 点击左边这个刷新下即可

QT:Widgets中的事件

事件的处理 (1)重新实现部件的paintEvent()、mousePressEvent()等事件处理函数。这是最常用的一种方法&#xff0c;不过它只能用来处理特定部件的特定事件。 (2)重新实现notify()函数。这个函数功能强大&#xff0c;提供了完全的控制&#xff0c;可以在事件过滤器得到事件之前…

Windows如何安装go环境,离线安装beego

一、安装go 1、下载go All releases - The Go Programming Language 通过网盘分享的文件&#xff1a;分享的文件 链接: https://pan.baidu.com/s/1MCbo3k3otSoVdmIR4mpPiQ 提取码: hxgf 下载amd64.zip文件&#xff0c;然后解压到指定的路径 2、配置环境变量 需要新建两个环境…

AI Agent:重塑业务流程自动化的未来力量(2/30)

《AI Agent&#xff1a;重塑业务流程自动化的未来力量》 摘要&#xff1a;整体思路是先介绍 AI Agent 的基本情况&#xff0c;再深入阐述其实现业务流程自动化的方法和在不同领域的应用&#xff0c;接着分析其价值和面临的挑战&#xff0c;最后得出结论&#xff0c;为读者全面…

R语言的数据结构-矩阵

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言医学数据分析实践-R语言的数据结构-CSDN博客 矩阵是一个二维数组&#xff0c;矩阵中的元素都具有相…

springboot+javafx使用aop切面导致的fx:id不能被注入问题

记录一个我遇到得问题 问题描述 我本来使用AOP切面来进行全局异常管理&#xff0c;但是使用AOP之后fxml中通过fx:id绑定得参数无法被注入 Slf4j Component Aspect public class GlobalExceptionAspect {AfterThrowing(pointcut "execution(* com.shkj.videoclassifica…

STM32 HAL库 + LM2904运算放大器 + ADC + 4-20ma液位传感器:电路设计及代码实现

4-20ma液位传感器在工业自动化和日常应用中非常常见&#xff0c;例如水位监测、液体储罐管理等。本文将结合STM32 HAL库&#xff0c;带你实现一个简单的液位监测demo&#xff0c;从电路设计到代码实现&#xff0c;实现通过单通道ADC采集4-20ma液位传感器的信号&#xff0c;并通…

MongoDB-固定集合(Capped Collection)

在 MongoDB 中&#xff0c;固定集合&#xff08;Capped Collection&#xff09;是一种具有特殊属性的集合。固定集合具有一个固定的最大大小&#xff0c;并且一旦达到该大小时&#xff0c;最早插入的文档将会被自动删除&#xff0c;以便为新的文档腾出空间。固定集合的这种特性…

Vue2 基础

Vue 2 是 Vue.js 的第二个主要版本&#xff0c;于 2016 年发布。它是一个渐进式的 JavaScript 框架&#xff0c;以其简单、灵活、易用性高而广受欢迎。Vue 2 主要专注于构建用户界面&#xff08;UI&#xff09;&#xff0c;并且非常适合用于构建单页应用&#xff08;SPA&#x…

2450.学习周刊-2024年50周

封面 人生五个球 ✍优秀博文 面对老板安排的工作&#xff0c;事事有回应&#xff0c;有必要吗&#xff1f; 职场精英进阶手册&#xff1a;工作推进五原则&#xff0c;让你合理高效地利用时间 上个班而已&#xff0c;千万别畏手畏脚 理解了雷军说的SU7要守正出奇&#xff0…