【RabbitMQ】RabbitMQ配置与交换机学习

【RabbitMQ】RabbitMQ配置与交换机学习

文章目录

  • 【RabbitMQ】RabbitMQ配置与交换机学习
    • 简介
    • 安装和部署
        • 1. 安装RabbitMQ
        • 2.创建virtual-host
        • 3. 添加依赖
        • 4.修改配置文件
    • WorkQueues模型
        • 1.编写消息发送测试类
        • 2.编写消息接收(监听)类
        • 3. 实现能者多劳
    • 交换机
      • Fanout交换机
        • 1.消息发送
        • 2.消息监听
      • Direct交换机
        • 1.消息发送
        • 2.消息接收
      • Topic交换机
        • 1.消息发送
        • 2.消息接收
    • 声明队列和交换机
        • 声明队列
        • 声明交换机
        • 绑定队列和交换机
          • 1.fanout示例
          • 2. direct示例
          • 3.基于注解的方式声明队列和交换机
        • 消息转换器
    • 总结

简介

RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。RabbitMQ支持多种消息传递协议,具有高可靠性、高可用性和高性能等特点。它允许应用程序通过消息队列进行异步通信,从而实现解耦和负载均衡。RabbitMQ的核心概念包括交换机(Exchange)、队列(Queue)和绑定(Binding),它们共同构成了消息的路由和传递机制。

RabbitMQ的架构如图:

47ab6076-c1b1-45e1-b8c4-f6c41a601b21

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

安装和部署

这里以Centos7为例:

1. 安装RabbitMQ
docker run \
 -e RABBITMQ_DEFAULT_USER=shijun \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management
  1. -e RABBITMQ_DEFAULT_USER=shijun

    -e参数用于设置环境变量。这行代码用来设置RabbitMQ的默认用户名为shijun

  2. -e RABBITMQ_DEFAULT_PASS=123321

    这行代码用来设置默认密码为123321

  3. -p 15672:15672

    这行代码用来将宿主机的15672端口映射到容器的15672端口,15672端口是RabbitMQ管理控制台的默认端口。

  4. -p 5672:5672

    这行代码用来将宿主机的5672端口映射到容器的5672端口,5672端口是RabbitMQ的默认通信端口。

  5. --network hm-net

    这行代码将容器连接到名为hm-net的网络。

  6. -d

    -d参数表示以后台模式运行容器。

  7. rabbitmq:3.8-management

    这里是我们要运行的rabbitmq的Docker镜像,这里选择的是RabbitMQ 3.8版本,版本需要根据自己的SpringCloud版本来选择。

安装完成后访问:http://虚拟机IP地址:15672

输入刚才的账号密码:shijun 123321,就能进入控制台界面。

image-20240608222043550

2.创建virtual-host

由于RabbitMQ 每秒并发能力为几万,一般项目都不会达到这个规模,因此我们可以让多个项目使用同一个RabbitMQ 。要实现项目直接的隔离需要创建virtual-host,每个项目对应一个virtual-host。

按顺序点击,填入“Name”和“Descrption”,然后点击“Add virtual host”按钮:

image-20240609151722019

然后在右上角切换到创建的virtual-host:

image-20240609152323104

3. 添加依赖
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
4.修改配置文件
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.56.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /mq-demo # 虚拟主机
    username: shijun # 用户名
    password: 123321 # 密码

WorkQueues模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

image-20240609154853698

在控制台创建一个work.queue队列:

image-20240609155329959

1.编写消息发送测试类
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "work.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
}
  • RabbitTemplate:Spring AMQP提供的模板类,用于发送和接收消息。
  • rabbitTemplate.convertAndSend(queueName, message + i);:使用RabbitTemplate发送消息到指定的队列。
2.编写消息接收(监听)类
@Component
public class SpringRabbitListener {
    
    /**
     * 监听名为"work.queue"的RabbitMQ队列,接收并处理来自队列的消息。
     * 通过延迟执行模拟消息处理时间
     * 
     * @param msg 从队列中接收到的消息内容,以字符串形式提供
     * @throws InterruptedException 如果线程在睡眠期间被中断,则抛出此异常
     */
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        // 输出接收到消息的时间,以便跟踪消息处理的时间点
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        // 模拟消息处理时间,让线程睡眠20毫秒
        Thread.sleep(20);
    }


    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
}
  • @RabbitListener:此注解用来设置该方法应该作为RabbitMQ消息队列的监听器。这意味着当队列中有消息发布时,Spring框架会自动调用此方法来处理这些消息。
  • queues = "work.queue":指定要监听的RabbitMQ队列的名称,在这个例子中是work.queue。
  • 注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
    • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
    • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

运行后查看结果:

image-20240609163135799

观察可以发现:

  1. 一个消息只会被一个消费者处理
  2. 当有很多消息时,会平均分配(一个消费者负责奇数的消息,一个消费者负责偶数的消息)

但问题是消费者之间的消费能力可能不一样,有的消费能力强,有消费的弱,会出现部分消费者一直空闲而其他消费者一直繁忙的状况,没有充分利用每一个消费者的能力

3. 实现能者多劳

修改配置文件:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次运行,查看结果:

image-20240609164615309

观察可以发现:

消费者2处理了大概5条消息,而消费者1处理了40多条消息,成功实现"能者多劳"。

交换机

在之前的RabbitMQ的架构图中我们可以看到,里面还有一项Exchange没有提到,那就是RabbitMQ中的交换机。

交换机是RabbitMQ中用于接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机有不同的类型,常见的有以下几种:

  • Fanout Exchange(扇出交换机):将消息广播到所有绑定的队列,不考虑路由键。
  • Direct Exchange(直连交换机):根据消息的路由键精确匹配队列。
  • Topic Exchange(主题交换机):根据路由键的模式匹配队列。
  • Headers Exchange(头交换机):根据消息的头部信息匹配队列。

Fanout交换机

扇出交换机将消息广播到所有绑定的队列,而不考虑路由键。这种交换机非常适合需要将消息广播到多个消费者的场景。

假如说当订单服务有了一笔新订单之后就要去通知短信服务、商品服务等等,有了交换机之后就只需要将消息发给交换机,然后为每一个微服务创建一个队列并绑定,之后当有消息时,交换机就会把消息发送到所有队列,就能实现一个消息被多个消费者处理了。

image-20240610112130389

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息
  1. 创建Fanout交换机

    image-20240609170548383
  2. 创建两个队列fanout.queue1fanout.queue2

    image-20240609170655932
  3. 点击刚刚创建的交换机,进入:

    image-20240609170907536
  4. 将刚才创建的两个队列绑定到交换机,

image-20240609171002978
1.消息发送

在SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "demo.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

abbitTemplate.convertAndSend(exchangeName, "", message);中的第二个参数用来指定路由键。对于Fanout交换机,路由键没有实际意义,因此可以传递一个空字符串。

2.消息监听

SpringRabbitListener中添加两个方法:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

运行代码,查看结果:

image-20240609173012677

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

直连交换机根据消息的路由键精确匹配队列。只有当消息的路由键与绑定的路由键完全匹配时,消息才会被路由到相应的队列。

image-20240610112328443

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
  1. 创建direct.queue1direct.queue2两个队列,之后创建一个direct类型的交换机:

    image-20240609174620595
  2. 绑定队列到交换机,最终结果如图所示:

    image-20240609184743239

1.消息发送

SpringAmqpTest类中添加测试方法:

@Test
public void testSendDirectExchange1() {
    // 交换机名称
    String exchangeName = "demo.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

@Test
public void testSendDirectExchange2() {
    // 交换机名称
    String exchangeName = "demo.direct";
    // 消息
    String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
2.消息接收

SpringRabbitListener中添加方法:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

运行测试类中的testSendDirectExchange1,查看结果:

image-20240609185400537

运行测试类中的testSendDirectExchange2,查看结果:

image-20240609185710182

观察可以发现:

  • 当发送的消息的Routing key为red时,两个消息队列都能收到

  • 当发送的消息的Routing key为red时,只有消息队列1才能收到

Topic交换机

Topic交换机(Topic Exchange)是RabbitMQ中一种功能强大的交换机类型,它通过路由键的模式匹配将消息路由到一个或多个队列。Topic交换机允许使用通配符来匹配路由键,从而实现灵活的消息路由。

  • 通配符

    :在绑定键中,可以使用两个特殊字符来实现模式匹配:

    • *:匹配一个单词。
    • #:匹配零个或多个单词。
image-20240610112445760

如图所示,假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

更多范例:

假设我们有以下绑定键:

  • *.orange.*
  • *.*.rabbit
  • lazy.#

我们可以通过以下路由键进行消息路由:

  • 路由键quick.orange.rabbit将匹配第一个和第二个绑定键。
  • 路由键lazy.orange.elephant将匹配第一个和第三个绑定键。
  • 路由键lazy.brown.fox将匹配第三个绑定键。
  • 路由键lazy将匹配第三个绑定键。

按照之前的流程创建Topic交换机和队列并进行绑定,最终结果如下:

1.消息发送

SpringAmqpTest类中添加测试方法:

@Test
public void testSendTopicExchange1() {
    // 交换机名称
    String exchangeName = "demo.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

@Test
public void testSendTopicExchange1() {
    // 交换机名称
    String exchangeName = "demo.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
2.消息接收

SpringRabbitListener中添加方法:

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

运行测试类中的testSendTopicExchange1后观察结果:

image-20240609204043494

观察发现两个消息队列都收到了,说明china.##.news都匹配成功了。

运行测试类中的testSendTopicExchange2后观察结果:

image-20240609204427086

观察可以发现只有消息队列1匹配成功,说明china.#匹配成功。

声明队列和交换机

之前我们创建交换机是通过控制台创建的,然而实际开发中是不推荐使用这种方式,因为可能会出现一些问题,更推荐让程序员通过代码来判断交换机和队列是否存在,如果没有再进行创建。

在实际开发中,RabbitMQ的配置类一般放到消费者包下,生产者一般会关心消息是否发送成功。

声明队列

队列是RabbitMQ中用于存储消息的组件。Spring AMQP通过Queue类来声明队列。队列有以下几个重要属性:

  • name:队列名称。
  • durable:是否持久化。持久化队列在RabbitMQ重启后仍然存在,信息持久到磁盘。
  • exclusive:是否排他。排他队列只能被创建它的连接使用,并且在连接断开时自动删除。
  • autoDelete:是否自动删除。当最后一个消费者断开连接后,自动删除队列。

比如:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQQueueConfig {

    @Bean
    public Queue myQueue() {
        return new Queue("simple.queue");
    }
}

声明了一个名为simple.queue的队列,默认为持久化、非排他、非自动删除。

image-20240610095936480
声明交换机

使用ExchangeBuilder声明交换机,ExchangeBuilder类提供了多种方法来配置交换机的属性。以下是一些常用的方法:

  • durable():声明持久化交换机。
  • autoDelete():声明自动删除交换机。
  • withArgument():添加交换机的自定义参数。
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQExchangeConfig {

    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("direct.exchange").durable(true).build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanout.exchange").durable(true).build();
    }

    @Bean
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange("topic.exchange").durable(true).build();
    }

}
绑定队列和交换机

在RabbitMQ中,绑定关系(Binding)是交换机和队列之间的连接。绑定关系告诉交换机如何将消息路由到队列。在Spring AMQP中,我们可以使用BindingBuilder类来声明和配置绑定关系。

BindingBuilder类提供了一些静态方法来创建绑定关系。常用的方法包括:

  • bind():绑定队列到交换机。
  • to():指定交换机。
  • with():指定路由键(用于直连交换机和主题交换机)。
  • where():指定头部信息(用于头交换机)。
1.fanout示例
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("demo.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

在控制台删除demo.fanout交换机和fanout.queue2fanout.queue2这两个队列,再次运行代码会发现删除的又重新出现了:

image-20240610102527887
2. direct示例

direct交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("direct.exchange").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}
3.基于注解的方式声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

修改SpringRabbitListener类:


@Component
public class SpringRabbitListener {

    // .......
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "demo.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "demo.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    }
    
    // .......

}

@QueueBinding注解包含以下几个主要部分:

  1. value:定义队列,使用@Queue注解。
  2. exchange:定义交换机,使用@Exchange注解。
  3. key:定义路由键,使用字符串数组

删除交换机和队列后再次运行会发现又重新出现:

image-20240610105323455
消息转换器

Spring的消息发送代码接收的消息体是一个Object:

image-20240610105507926

在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

因此我们可以配置JSON转换器来解决这个问题。

  1. 引入Jackson依赖:
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
  1. 配置消息转换器

    publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

测试:

FanoutConfig类中声明队列:

    @Bean
    public Queue objectQueue() {
        return new Queue("object.queue");
    }

SpringAmqpTest类中添加:

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

SpringRabbitListener类中添加:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

运行测试类查看结果:

image-20240610110529216

总结

本文较为详细的记录了RabbitMQ的安装配置以及交换机学习,希望本文对大家学习RabbitMQ有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/695702.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习】—— 神经网络介绍

神经网络介绍 本系列主要是吴恩达深度学习系列视频的笔记&#xff0c;传送门&#xff1a;https://www.coursera.org/deeplearning-ai 目录 神经网络介绍神经网络的应用深度学习兴起的原因 神经网络&#xff0c;全称人工神经网络&#xff08;Artificial Neural Network&#xf…

25.逢七必过

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/363 题目描述 逢七必过的游戏规则如下:对一…

Linux安装Docker | 使用国内镜像

环境 CentOS7 先确认能够上网 curl www.baidu.com返回该输出说明网络OK 步骤一&#xff1a;安装gcc 和 gcc-c yum -y install gccyum -y install gcc-c步骤二&#xff1a;安装Docker仓库 yum install -y yum-utils接下来配置yum的国内镜像 yum-config-manager --add-re…

激活乡村振兴新动能:推动农村产业融合发展,打造具有地方特色的美丽乡村,实现乡村全面振兴

目录 一、推动农村产业融合发展 1、农业产业链条的延伸 2、农业与旅游业的结合 二、挖掘地方特色&#xff0c;打造美丽乡村 1、保护和传承乡村文化 2、发展特色农业 三、加强基础设施建设&#xff0c;提升乡村品质 1、改善农村交通条件 2、提升农村水利设施 四、促进…

大数据湖一体化运营管理建设方案(49页PPT)

方案介绍&#xff1a; 本大数据湖一体化运营管理建设方案通过构建统一存储、高效处理、智能分析和安全管控的大数据湖平台&#xff0c;实现了企业数据的集中管理、快速处理和智能分析。该方案具有可扩展性、高性能、智能化、安全性和易用性等特点&#xff0c;能够为企业数字化…

水滴型锤片粉碎机:多功能粉碎利器

在现代工业生产中&#xff0c;粉碎机作为一种重要的机械设备&#xff0c;广泛应用于饲料、化工、木材等多个领域。其中&#xff0c;水滴型锤片粉碎机凭借其设计和粉碎能力&#xff0c;成为市场上的热门产品。 水滴型锤片粉碎机其设计灵感来源于水滴的形态。这种设计使得机器在…

vmware-17虚拟机安装教程,安装linux centos系统

下载VMware 1.进入VMware官网&#xff1a;https://www.vmware.com/sg/products/workstation-pro.html 2.向下翻找到&#xff0c;如下界面并点击“现在安装” 因官网更新页面出现误差&#xff0c;现提供vmware17安装包网盘链接如下&#xff1a; 链接&#xff1a;https://pan.b…

【SpringBoot + Vue 尚庭公寓实战】基本属性接口实现(七)

【SpringBoot Vue 尚庭公寓实战】基本属性接口实现&#xff08;七&#xff09; 文章目录 【SpringBoot Vue 尚庭公寓实战】基本属性接口实现&#xff08;七&#xff09;1、保存或更新属性名称2、保存或更新属性值3、查询全部属性名称和属性值列表4、根据ID删除属性名称5、根据…

freertos内核拓展DAY2(消息队列)

这节内容是信号量的基础&#xff0c;因为创建以及发送/等待信号量所调用的底层函数&#xff0c;就是创建/发送/接受消息队列时所用到的通用创建函数&#xff0c;这里先补充一下数据结构中关于队列的知识。 目录 1. 队列原理 1.1 顺序队列操作 1.2 循环队列操作 2.消息队列原…

N32G45XVL-STB之移植LVGL(lvgl-8.2.0)

目录 概述 1 软硬件介绍 1.1 软件版本信息 1.2 ST7796-LCD 1.3 MCU IO与LCD PIN对应关系 2 认识LVGL 2.1 LVGL官网 2.2 LVGL库文件下载 3 移植LVGL 3.1 准备移植文件 3.2 添加lvgl库文件到项目 3.2.1 src下的文件 3.2.2 examples下的文件 3.2.3 配置文件路径 3.2…

python-数字黑洞

[题目描述] 给定一个三位数&#xff0c;要求各位不能相同。例如&#xff0c;352是符合要求的&#xff0c;112是不符合要求的。将这个三位数的三个数字重新排列&#xff0c;得到的最大的数&#xff0c;减去得到的最小的数&#xff0c;形成一个新的三位数。对这个新的三位数可以重…

【Web世界探险家】3. CSS美学(二)文本样式

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 |《MySQL探索之旅》 |《Web世界探险家》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更…

Web学习_SQL注入_布尔盲注

盲注就是在SQL注入过程中&#xff0c;SQL语句执行后&#xff0c;查询到的数据不能 回显到前端页面。此时&#xff0c;我们需要利用一些方法进行判断或者尝 试&#xff0c;这个过程称之为盲注。而布尔盲注就是SQL语句执行后&#xff0c;页面 不返回具体数据&#xff0c;数据库只…

32、matlab:基于模板匹配的车牌识别

1、准备工作 1&#xff09;准备材料 车牌字符模板和测试的实验车牌 2&#xff09;车牌字符模板 数字、字母和省份缩写 3&#xff09;测试车牌 四张测试车牌 2、车牌识别实现(已将其嵌入matlab) 1&#xff09;打开APP 找到APP 找到我的APP双击点开 2)界面介绍 包括&am…

神经网络 torch.nn---Non-Linear Activations (ReLU)

ReLU — PyTorch 2.3 documentation torch.nn - PyTorch中文文档 (pytorch-cn.readthedocs.io) 非线性变换的目的 非线性变换的目的是为神经网络引入一些非线性特征&#xff0c;使其训练出一些符合各种曲线或各种特征的模型。 换句话来说&#xff0c;如果模型都是直线特征的…

【数据结构】排序(上)

个人主页~ 堆排序看这篇~ 还有这篇~ 排序 一、排序的概念及应用1、概念2、常见的排序算法 二、常见排序的实现1、直接插入排序&#xff08;1&#xff09;基本思想&#xff08;2&#xff09;代码实现&#xff08;3&#xff09;时间复杂度&#xff08;4&#xff09;空间复杂度 2…

eNSP学习——RIP的路由引入

目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、搭建公司B的RIP网络 3、优化公司B的 RIP网络 4、连接公司A与公司B的网络 需要eNSP各种配置命令的点击链接自取&#xff1a;华为&#xff45;NSP各种设备配置命令大全PDF版_ensp…

Windows开始ssh服务+密钥登录+默认启用powershell

文章内所有的命令都在power shell内执行&#xff0c;使用右键单击Windows徽标&#xff0c;选择终端管理员即可打开 Windows下OpenSSH的安装 打开Windows power shell&#xff0c;检查SSH服务的安装状态。会返回SSH客户端和服务器的安装状态&#xff0c;一下是两个都安装成功的…

操作系统教材第6版——个人笔记5

3.2 单连续分区存储管理 3.2.1 单连续分区存储管理 单连续分区存储管理 每个进程占用一个物理上完全连续的存储空间(区域) 单用户连续分区存储管理固定分区存储管理可变分区存储管理 单用户连续分区存储管理 主存区域划分为系统区与用户区设置一个栅栏寄存器界分两个区域…

最新下载:Navicat for MySQL 11软件安装视频教程

软件简介&#xff1a; Navicat for MySQL 是一款强大的 MySQL 数据库管理和开发工具&#xff0c;它为专业开发者提供了一套强大的足够尖端的工具&#xff0c;但对于新用户仍然易于学习。Navicat For Mysql中文网站&#xff1a;http://www.formysql.com/ Navicat for MySQL 基于…