RabbitMQ发布订阅模式Publish/Subscribe详解

订阅模式Publish/Subscribe

  • 基于API的方式
      • 1.使用AmqpAdmin定制消息发送组件
      • 2.消息发送者发送消息
      • 3.消息消费者接收消息
  • 基于配置类的方式
  • 基于注解的方式
    • 总结

SpringBoot整合RabbitMQ中间件实现消息服务,主要围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中,定制中间件是比较麻烦的工作,且必须预先定制。

下面以用户注册成功后,同时发送邮件通知和短信通知这一场景为例, 分别使用基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合。

基于API的方式

基于API的方式,是指使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息的发送。这种定制消息发送组件的方式,与在RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。下面我们就对这种基于API的方式进行讲解和演示。

1.使用AmqpAdmin定制消息发送组件

我们先打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。

package com.ytx;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Chapter08ApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    void contextLoads() {
    }

    /** 使用AmqpAdmin管理员API定制消息组件 */
    @Test
    public void amqpAdmin() {
        // 1.定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2.定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3.将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
    }
}

执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。单元测试方法执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。

在这里插入图片描述

从上图可以看出,在RabbitMQ可视化管理页面的Exchanges面板中,新出现了一个名称为fanout_exchange的交换器(其他7个交换器是RabbitMQ自带的),且其类型是我们设置的fanout类型。我们可以单击fanout_exchange交换器进入查看。

在这里插入图片描述
从上图可以看出,在fanout_exchange交换器详情页面中展示有该交换器的具体信息,还有与之绑定的两个消息队列fanout_queue_email和fanout_queue_sms,并且与程序中设置的绑定规则一致。切换到Queues面板页面,查看定制生成的消息队列信息。
在这里插入图片描述

从上图可以看出,在Queues队列面板页面中,展示有定制的消息队列信息,这与程序中定制的消息队列一致,我们可以单击消息队列名称查看每个队列的详情。

通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin,定制消息组件和在管理页面上手动定制消息组件的本质是一样的。

2.消息发送者发送消息

完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,我们可以借助一个实体类传递消息,需要预先创建一个实体类对象。

首先,在chapter08项目中创建名为com.cy.domain的包,并在该包下创建一个实体类User。

package com.ytx.domain;

/** 发布消息的实体类可以通过实现Serializable序列化接口进行发布 */
public class User {
    private Integer id;
    private String username;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", username='" + username + '\'' +
                '}';
    }
}

其次,我们在项目测试类Chapter08ApplicationTests中,使用Spring框架提供的RabbitTemplate模板类实现消息发送。

@Autowired
private RabbitTemplate rabbitTemplate;

/** 1.Publish/Subscribe工作模式消息发送端 */
@Test
public void subPublisher() {
    User user = new User();
    user.setId(1);
    user.setUsername("小明");
    rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}

上述代码中,我们先使用@Autowired注解,引入消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。此方法中的第1个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第2个参数表示路由键,因为实现的是Publish/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。

然后,执行上述消息发送的测试方法subPublisher(),控制台执行效果见下图所示。
在这里插入图片描述

从上图可以看出,发送实体类对象消息时程序发生异常,从异常信息“SimpleMessageConverter only supports String, byte[] and Serializable payloads”可以看出,消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体类对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。

如果要解决上述消息中间件发送实体类消息出现的异常,我们通常可以采用两种解决方案:第一种是执行JDK自带的Serializable序列化接口;第二种是定制其他类型的消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般使用第二种方式。

接着我们在chapter08项目中创建名为com.ytx.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。

package com.ytx.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
    /** 定制JSON格式的消息转换器 */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

代码中创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。

再次执行subPublisher()方法,该方法执行成功后,查看RabbitMQ可视化管理页面Queues面板信息。
在这里插入图片描述
从上图可以看出,消息发送完成后,Publish/Subscribe工作模式下绑定的两个消息队列中各自拥有一条待接收的消息, 由于目前尚未提供消息。
在这里插入图片描述

3.消息消费者接收消息

在chapter08项目中创建名为com.ytx.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。

package com.ytx.chapter08.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
    /** Publish/Subscribe工作模式接收,处理邮件业务 */
    @RabbitListener(queues = "fanout_queue_email")
    public void subConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("邮件业务接收到消息:" + msg);
    }

    /** Publish/Subscribe工作模式接收,处理短信业务 */
    @RabbitListener(queues = "fanout_queue_sms")
    public void subConsumerSms(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("短信业务接收到消息:" + msg);
    }
}

上述代码中,创建了一个接收处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解,我们可以监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听的这两个队列是前面指定发送并存储消息的消息队列。

需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列中有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法就会立即接收并消费队列中的消息。另外,在接收消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用与消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的消息参数信息MessageProperties。

启动chapter08项目,控制台显示的消息消费效果如下图所示。
在这里插入图片描述
从上图可以看出,项目启动成功后,消息消费者监听到消息队列中存在的两条消息,并进行了各自的消费。与此同时,通过RabbitMQ可视化管理页面的Queues面板查看队列消息情况,会发现两个队列中存储的消息已经被消费。至此,一条完整的消息发送、 消息中间件存储、消息消费的Publish/Subscribe(发布订阅模式)工作模式的业务案例已经实现。

注意,如果没有引入Spring Web模块的依赖,启动chapter08项目时,消息消费者接收消息会报以下错误。

Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.amqp.support.converter.MessageConverter]: Factory method 'messageConverter' threw exception; nested exception is java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
  at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.3.25.jar:5.3.25]
  at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653) ~[spring-beans-5.3.25.jar:5.3.25]
  ... 18 common frames omitted

小提示:

上述代码中,使用的是开发中常用的@RabbitListener注解,来监听指定名称队列的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,我们还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。

基于配置类的方式

基于配置类的方式,主要讲的是使用SpringBoot框架提供的@Configuration注解,配置定制消息发送组件,并进行消息发送。下面我们来对这种基于配置类的方式进行讲解和演示。

打开RabbitMQ消息配置类RabbitMQConfig,在该配置类中使用基于配置类的方式定制消息发送相关组件。

package com.ytx.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
    /** 定制JSON格式的消息转换器 */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /** 使用基于配置类的方式定制消息中间件 */
    // 1.定义fanout类型的交换器
    @Bean
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }
    // 2.定义两个不同名称的消息队列
    @Bean
    public Queue fanoutQueueEmail() {
        return new Queue("fanout_queue_email");
    }
    @Bean
    public Queue fanoutQueueSms() {
        return new Queue("fanout_queue_sms");
    }
    // 3.将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail() {
        return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs();
    }
    @Bean
    public Binding bindingSms() {
        return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs();
    }
}

上述代码中,使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与交换器的绑定。这种基于配置类方式定制的消息组件,其实现和基于API方式定制的消息组件完全一样,只不过是实现方式不同而已。

按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现了消息发送者和消息消费者,并且基于配置类方式定制的消息组件名称,和之前测试用的消息发送和消息消费组件名称都是一致的,所以这里我们可以直接重复使用。

重新运行消息发送者测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

基于注解的方式

基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。

在消息接收和处理的业务类RabbitMQService中,将针对邮件业务和短信业务处理的消息消费者方法进行注释,使用@RabbitListener注解及其相关属性定制消息发送组件。

package com.ytx.chapter08.service;
import com.ytx.chapter08.domain.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
    /** Publish/Subscribe工作模式接收,处理邮件业务 */
    /*
    @RabbitListener(queues = "fanout_queue_email")
    public void subConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("邮件业务接收到消息:" + msg);
    }
    */

    /** Publish/Subscribe工作模式接收,处理短信业务 */
    /*
    @RabbitListener(queues = "fanout_queue_sms")
    public void subConsumerSms(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("短信业务接收到消息:" + msg);
    }
    */

    /** 使用基于注解的方式实现消息服务 */
    // 1.1 Publish/Subscribe工作模式接收,处理邮件业务
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_email"),
            exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void subConsumerEmailAno(User user) {
        System.out.println("邮件业务接收到消息:" + user);
    }
    // 1.2 Publish/Subscribe工作模式接收,处理短信业务
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_sms"),
            exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void subConsumerSmsAno(User user) {
        System.out.println("短信业务接收到消息:" + user);
    }
}

上述代码中,使用@RabbitListener注解及其相关属性定制了两个消息组件的消费者,这两个消费者都接收实体类User并消费。在@RabbitListener注解中,bindings属性用于创建并绑定交换器和消息队列组件,需要注意的是,为了能使两个消息组件的消费者接收到实体类User,需要我们在定制交换器时将交换器类型type设置为fanout。另外,bindings属性的@QueueBinding注解除了有value、exchange属性外,还有key属性用于定制路由键routingKey(当前发布订阅模式不需要)。

重启测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

至此,我们就在SpringBoot中完成了基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。

在实际开发中,使用基于配置类的方式和基于注解的方式较为常见,基于API的方式则偶尔使用,当然大家要根据实际情况进行具体选择。

总结

今天介绍了基于API方式、配置类方式和注解的3种消息队列,并展示了实现发布订阅Publish/Subscribe模式的整合及代码实现,基于注解方式的实现需要重点掌握。有关RabbitMQ的其他内容,袁老后续更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/871611.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用select

客户端 服务端 1 #include<myhead.h>2 3 #define SER_PORT 6666 //服务器端口4 #define SER_IP "127.0.0.1" //服务器ip5 6 7 int main(int argc, const char *argv[])8 {9 //创建套接字10 int sfdsocket(AF_INET,SOCK_STREAM,0);11 if(sfd-1)12 …

开源大模型LLaMA架构介绍

大模型相关目录 大模型&#xff0c;包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步&#xff0c;扬帆起航。 swift与Internvl下的多模态大模型分布式微调指南&#xff08;附代码和数据&#xff…

思科设备静态路由实验

拓扑及需求 网络拓扑及 IP 编址如图所示&#xff1b;PC1 及 PC2 使用路由器模拟&#xff1b;在 R1、R2、R3 上配置静态路由&#xff0c;保证全网可达&#xff1b;在 R1、R3 上删掉上一步配置的静态路由&#xff0c;改用默认路由&#xff0c;仍然要求全网可达。 各设备具体配置…

前端技巧——复杂表格在html当中的实现

应用场景 有时候我们的表格比较复杂&#xff0c;表头可能到处割裂&#xff0c;我们还需要写代码去完成这个样式&#xff0c;所以学会在原生html处理复杂的表格还是比较重要的。 下面我们来看这一张图&#xff1a; 我们可以看到有些表头项的规格不太一样&#xff0c;有1*1 2*…

Unity Protobuf3.21.12 GC 问题(反序列化)

背景&#xff1a;Unity接入的是 Google Protobuf 3.21.12 版本&#xff0c;排查下来反序列化过程中的一些GC点&#xff0c;处理了几个严重的&#xff0c;网上也有一些分析&#xff0c;这里就不一一展开&#xff0c;默认读者已经略知一二了。 如果下面有任何问题请评论区留言提…

实现 FastCGI

CGI的由来&#xff1a; 最早的 Web 服务器只能简单地响应浏览器发来的 HTTP 请求&#xff0c;并将存储在服务器上的 HTML 文件返回给浏 览器&#xff0c;也就是静态 html 文件&#xff0c;但是后期随着网站功能增多网站开发也越来越复杂&#xff0c;以至于出现动态技 术&…

2020 位示图

2020年网络规划设计师上午真题解析36-40_哔哩哔哩_bilibili 假设某计算机的字长为32位&#xff0c;该计算机文件管理系统磁盘空间管理采用位示图&#xff08;bitmap&#xff09;&#xff0c;记录磁盘的使用情况。若磁盘的容量为300GB&#xff0c;物理块的大小为4MB&#xff0c;…

【网络安全】漏洞挖掘:IDOR实例

未经许可&#xff0c;不得转载。 文章目录 正文 正文 某提交系统&#xff0c;可以选择打印或下载passport。 点击Documents > Download后&#xff0c;应用程序将执行 HTTP GET 请求&#xff1a; /production/api/v1/attachment?id4550381&enamemId123888id为文件id&am…

C语言 | Leetcode C语言题解之第354题俄罗斯套娃信封问题

题目&#xff1a; 题解&#xff1a; int cmp(int** a, int** b) {return (*a)[0] (*b)[0] ? (*b)[1] - (*a)[1] : (*a)[0] - (*b)[0]; }int maxEnvelopes(int** envelopes, int envelopesSize, int* envelopesColSize) {if (envelopesSize 0) {return 0;}qsort(envelopes, …

JVM 有哪些垃圾回收器?

JVM 有哪些垃圾回收器&#xff1f; 图中展示了7种作用于不同分代的收集器&#xff0c;如果两个收集器之间存在连线&#xff0c;则说明它们可以搭配使用。虚拟机所处的区域则表示它是属于新生代还是老年代收集器。 新生代收集器&#xff08;全部的都是复制算法&#xff09;&…

wps题注为表格或图片编号

word中为表格添加题注&#xff1a; 问题&#xff1a;多次或多人编辑导致--序号不能联动更新&#xff08;域代码不一致,如图&#xff09; 所以是否可以批量替换word里的域代码&#xff1f;如果可以这问题就解决了————失败 解决办法&#xff1a; 如图&#xff0c;复制表头&…

协处理器+流水线 (9)

3级流水线 流程&#xff1a; 取指令 译码 执行。 每一个时钟周期都可以执行一个指令。 提高CPU的能力有两种方法&#xff0c; 1 提高时钟频率&#xff0c;造成单位时间内执行的指令更多。 2 减少每条指令的平均指令周期数CPI &#xff0c;CPI我不太懂&#xff0c;但大概的…

2024.8.21 作业

一个服务器和两个客户端聊天 代码&#xff1a; /*******************************************/ 文件名&#xff1a;server.c /*******************************************/ #include <myhead.h> #define SER_IP "192.168.2.7" // 服务器IP #define SER…

C#开发基础之100个常用的C#正则表达式

前言 正则表达式是处理字符串的强大工具&#xff0c;特别是在文本搜索、替换和验证中。本文将100个常用的C#正则表达式进行分类&#xff0c;以帮助我们更快速地找到适合的正则表达式解决方案。 1. 基础匹配 这些正则表达式用于匹配一些基本的字符或字符串模式。 匹配任意字…

Linux信号机制探析--信号的产生

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;Linux从入门到进阶 欢迎大家点赞收藏评论&#x1f60a; 目录 &#x1f4da;信号什么是信号&#xff1f;为什么要有信号&#xff1f;查看Linux系统中信号 &#x1f388;信号产生&#x1f4d5;kill…

【计算机网络】网络版本计算器

此前我们关于TCP协议一直写的都是直接recv或者read&#xff0c;有了字节流的概念后&#xff0c;我们知道这样直接读可能会出错&#xff0c;所以我们如何进行分割完整报文&#xff1f;这就需要报头来解决了&#xff01; 但当前我们先不谈这个话题&#xff0c;先从头开始。 将会…

Kubectl命令、初识pod、namespace

文章目录 一、Kubectl简介基础命令1.基本信息命令2.创建和更新资源命令3.删除资源命令4. 查看日志和调试命令5. 端口转发和复制文件命令6. 部署管理命令7. 伸缩命令8. 配置和上下文管理命令9.常用命令 二、Pod简介核心概念pod常见状态调度和初始化阶段容器创建和运行阶段异常状…

Zookeeper服务注册及心跳机制详解

ZooKeeper提供了一种类似于文件目录的结构来保存key值&#xff0c;其提供了四种key类型&#xff0c;分别是持久节点&#xff0c;临时节点&#xff0c;持久有序节点&#xff0c;临时有序节点。其中临时节点的特性是当创建此节点的会话断开时&#xff0c;节点也会被删除。这一特性…

Apache Commons-IO 库

Apache Commons-IO是Apache开源基金组织提供的一组有关IO&#xff08;Input/Output&#xff09;操作的小框架。这个库的主要目的是为了提高IO流的开发效率&#xff0c;减少在进行文件读写、目录遍历等操作时编写的样板代码量。通过使用Commons-IO库&#xff0c;开发者可以更加简…