三十分钟学会RabbitMQ

1、初识MQ

1.1 MQ是什么?

MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

1.2 为什么要用MQ

使用消息队列(MQ)的主要原因包括解耦、异步处理、削峰填谷、提高系统响应速度、支持分布式一致性需求、以及点对点消费模式。

  1. 解耦:MQ允许系统之间实现解耦,一个系统产生的数据可以通过MQ发布,其他系统可以订阅该消息并消费,而无需直接与数据产生系统进行交互。这种解耦方式降低了系统之间的依赖性,减少了代码维护成本。
  2. 异步处理:MQ支持异步通信模式,当一个系统调用其他系统的API时,通过使用MQ进行异步化,系统可以将调用请求发送到队列中,然后继续处理其他任务,从而大幅缩短调用时间,提高高延时接口的速度。
  3. 削峰填谷:在高峰期,大量请求涌入系统时,如果直接将请求发送到数据库等后端存储,可能会导致系统崩溃。通过使用MQ,系统可以将请求先写入队列中,在低谷时逐个消费请求,从而避免系统崩溃。
  4. 提高系统响应速度:对于同步接口调用导致响应时间长的问题,使用MQ之后,将同步调用改成异步,能够显著减少系统响应时间。
  5. 支持分布式一致性需求:MQ可以支持分布式系统中的一致性需求。通过使用合适的MQ,系统可以保证各个模块执行的结果一致性,避免不同模块之间的数据不一致问题。
  6. 点对点消费模式:MQ支持点对点的消息消费模式,可以确保消息只被一个消费者接收和处理。这对于一些需要确保消息只被一个接收者处理的场景非常有用。

然而,引入MQ也可能带来一些问题,如可用性降低和复杂性增加。因此,根据不同的业务需求和技术实力,选择适合的MQ是非常重要的。

1.3 MQ 的分类

目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ

2、安装RabbitMQ

这里使用docker安装RabbitMQ镜像

docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=admin \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://localhost:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

登录后即可看到管理控制台总览页面:

在这里插入图片描述

可以在控制台进行创建用户、虚拟机、队列等一系列操作

3、RabbitMQ的java客户端

3.1 前期准备

1、创建一个springboot项目

2、导入依赖坐标

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

3、在springboot项目中创建两个模块、继承父项目依赖

在这里插入图片描述

3.2 小试牛刀
3.2.1 在publisher模块中创建测试类用于发送消息

注意:包路径需要再启动类所在包下 或启动类的子包下

package com.itheima.publisher.amqp;
/*
 *@Author:张泽阳
 *@Date:2024年06月02日 21:14
 *Software: IntelliJ IDEA
 *@Description:
 * */

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleQueueTest(){
        // 队列名
        String queueName = "simple.queue";
        // 消息
        String message = "hhhhhhh";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName,message);

    }
}

运行测试方法发送消息

3.2.2 在consumer模块监听消息

在consumer创建listener包在其中创建SpringRabbitListener类

package com.itheima.consumer.listener;
/*
 *@Author:张泽阳
 *@Date:2024年06月02日 21:20
 *Software: IntelliJ IDEA
 *@Description:
 * */

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    // 方法体中的参数就是监听到的消息
    @RabbitListener(queues = "simple.queue")
    public void springListener(String msg){
        System.out.println("监听到的消息:" + msg );
    }
}

在这里插入图片描述

这样就完成了RabbitMQ简单的消息发送与监听

3.3 Works Queues 模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

接下来,我们就来模拟这样的场景。

首先,我们在控制台创建一个新的队列,命名为work.queue

在这里插入图片描述

3.3.1 发送消息

在publisher模块中用for循环发送50条消息

    @Test
    public void workQueueTest(){
        // 队列名
        String queueName = "work.queue";
        // 消息
        String msg = "消息体==> ";
        for (int i = 1; i <= 51; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName,msg + i);
        }
    }
3.3.2 监听消息

在consumer模块中新增两个方法用来监听work.queue中的消息

    @RabbitListener(queues = "work.queue")
    public void workQueueListener1(String msg){
        System.out.println("监听者 1 得到的消息:" + msg );
    }

    @RabbitListener(queues = "work.queue")
    public void workQueueListener2(String msg){
        System.out.println("监听者 2 得到的消息:" + msg );
    }

在这里插入图片描述

在RabbitMQ的workQueue模型中默认采用轮询方式将所有消息平均分配给每个监听者,这样一来两个机器都要消费相同数量的消息,如果有一个机器非常慢,速度快的机器就要等速度慢的机器消费完。

3.3.3 改变消费速度
    @RabbitListener(queues = "work.queue")
    public void workQueueListener1(String msg) throws InterruptedException {
        System.out.println("监听者 1 得到的消息:" + msg );
        Thread.sleep(25);
    }

    @RabbitListener(queues = "work.queue")
    public void workQueueListener2(String msg) throws InterruptedException {
        System.out.println("监听者 2 得到的消息:" + msg );
        Thread.sleep(200);
    }

在这里插入图片描述

在这里插入图片描述

在RabbitMQ的workQueue模型中默认采用轮询方式将所有消息平均分配给每个监听者,这样一来两个机器都要消费相同数量的消息,如果有一个机器非常慢,速度快的机器就要等速度慢的机器消费完。

3.3.4 修改配置文件

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.204.20 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1	# 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

监听者 1 得到的消息:消息体==> 1 2024-06-03T14:52:27.133554900
监听者 2 得到的消息:消息体==> 2 2024-06-03T14:52:27.133554900
监听者 1 得到的消息:消息体==> 3 2024-06-03T14:52:27.162577900
监听者 1 得到的消息:消息体==> 4 2024-06-03T14:52:27.190329800
监听者 1 得到的消息:消息体==> 5 2024-06-03T14:52:27.217329400
监听者 1 得到的消息:消息体==> 6 2024-06-03T14:52:27.244262600
监听者 1 得到的消息:消息体==> 7 2024-06-03T14:52:27.272310600
监听者 1 得到的消息:消息体==> 8 2024-06-03T14:52:27.299311
监听者 1 得到的消息:消息体==> 9 2024-06-03T14:52:27.327377900
监听者 2 得到的消息:消息体==> 10 2024-06-03T14:52:27.336259100
监听者 1 得到的消息:消息体==> 11 2024-06-03T14:52:27.354458700
监听者 1 得到的消息:消息体==> 12 2024-06-03T14:52:27.381099300
监听者 1 得到的消息:消息体==> 13 2024-06-03T14:52:27.409382100
监听者 1 得到的消息:消息体==> 14 2024-06-03T14:52:27.436237
监听者 1 得到的消息:消息体==> 15 2024-06-03T14:52:27.464754200
监听者 1 得到的消息:消息体==> 16 2024-06-03T14:52:27.491437900
监听者 1 得到的消息:消息体==> 17 2024-06-03T14:52:27.517241200
监听者 2 得到的消息:消息体==> 18 2024-06-03T14:52:27.537599400
监听者 1 得到的消息:消息体==> 19 2024-06-03T14:52:27.546405100
监听者 1 得到的消息:消息体==> 20 2024-06-03T14:52:27.574655400
监听者 1 得到的消息:消息体==> 21 2024-06-03T14:52:27.600656300
监听者 1 得到的消息:消息体==> 22 2024-06-03T14:52:27.627640400
监听者 1 得到的消息:消息体==> 23 2024-06-03T14:52:27.655109100
监听者 1 得到的消息:消息体==> 24 2024-06-03T14:52:27.682036500
监听者 1 得到的消息:消息体==> 25 2024-06-03T14:52:27.710680200
监听者 2 得到的消息:消息体==> 26 2024-06-03T14:52:27.738643100
监听者 1 得到的消息:消息体==> 27 2024-06-03T14:52:27.740556800
监听者 1 得到的消息:消息体==> 28 2024-06-03T14:52:27.769239800
监听者 1 得到的消息:消息体==> 29 2024-06-03T14:52:27.797490300
监听者 1 得到的消息:消息体==> 30 2024-06-03T14:52:27.824438700
监听者 1 得到的消息:消息体==> 31 2024-06-03T14:52:27.854432500
监听者 1 得到的消息:消息体==> 32 2024-06-03T14:52:27.880662
监听者 1 得到的消息:消息体==> 33 2024-06-03T14:52:27.908320800
监听者 1 得到的消息:消息体==> 34 2024-06-03T14:52:27.937883400
监听者 2 得到的消息:消息体==> 35 2024-06-03T14:52:27.941885700
监听者 1 得到的消息:消息体==> 36 2024-06-03T14:52:27.964396500
监听者 1 得到的消息:消息体==> 37 2024-06-03T14:52:27.990140600
监听者 1 得到的消息:消息体==> 38 2024-06-03T14:52:28.017677
监听者 1 得到的消息:消息体==> 39 2024-06-03T14:52:28.047443500
监听者 1 得到的消息:消息体==> 40 2024-06-03T14:52:28.074392800
监听者 1 得到的消息:消息体==> 41 2024-06-03T14:52:28.101285800
监听者 1 得到的消息:消息体==> 42 2024-06-03T14:52:28.127155800
监听者 2 得到的消息:消息体==> 43 2024-06-03T14:52:28.142776
监听者 1 得到的消息:消息体==> 44 2024-06-03T14:52:28.155621600
监听者 1 得到的消息:消息体==> 45 2024-06-03T14:52:28.182376
监听者 1 得到的消息:消息体==> 46 2024-06-03T14:52:28.209322300
监听者 1 得到的消息:消息体==> 47 2024-06-03T14:52:28.235621600
监听者 1 得到的消息:消息体==> 48 2024-06-03T14:52:28.263368300
监听者 1 得到的消息:消息体==> 49 2024-06-03T14:52:28.290364100
监听者 1 得到的消息:消息体==> 50 2024-06-03T14:52:28.318363400
监听者 1 得到的消息:消息体==> 51 2024-06-03T14:52:28.346327500

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.4 交换机类型

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

在这里插入图片描述

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.5 Fanout交换机

在这里插入图片描述

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

创建一个名为 my.fanout的交换机,类型是Fanout

创建两个队列fanout.queue1fanout.queue2,绑定到交换机my.fanout

3.5.1创建交换机和队列

创建交换机

在这里插入图片描述

创建对列

在这里插入图片描述

将队列绑定到交换机上

在这里插入图片描述

在这里插入图片描述

3.5.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

    @Test
    public void funoutExchangeTest(){
        // 交换机名
        String exchangeName = "my.fanout";
        // 消息
        String message = "hello";
        // 发送消息 三个参数:交换机名、队列名、消息
        rabbitTemplate.convertAndSend(exchangeName,null,message);
    }
3.5.3 消息接收
    @RabbitListener(queues = "fanout.queue1")
    public void exchangeListener1(String msg) throws InterruptedException {
        System.out.println("消费者1监听到fanout消息:" + msg + " " + LocalDateTime.now() );
    }

    @RabbitListener(queues = "fanout.queue2")
    public void exchangeListener2(String msg) throws InterruptedException {
        System.out.println("消费者2监听到fanout消息:" + msg + " " + LocalDateTime.now() );
    }

在这里插入图片描述

两个消费者都监听到了funoutExchangeTest()发布的消息

3.6 Direct 交换机
3.6.1 创建交换机和队列

创建两个队列:direct.queue1和direct.queue2

在这里插入图片描述

创建一个direct类型的交换机: my.direct

在这里插入图片描述

将两个队列绑定到my.direct交换机上并给队列指定Routing key

在这里插入图片描述

3.6.2 消息发送
    @Test
    public void directExchangeTest(){
        // 交换机名
        String exchangeName = "my.direct";
        // Routing key
        String routingKey = "red";
        // 发送消息 三个参数:交换机名、队列名、消息
        rabbitTemplate.convertAndSend(exchangeName,routingKey,"红色请接收");
    }
3.6.2 消息接收
    @RabbitListener(queues = "direct.queue1")
    public void directListener1(String msg) throws InterruptedException {
        System.out.println("Routing key:red、blue 接收到消息=======>" + msg);
    }

    @RabbitListener(queues = "direct.queue2")
    public void directListener2(String msg) throws InterruptedException {
        System.out.println("Routing key:red、yellow 接收到消息=======>" + msg);
    }

在这里插入图片描述

此时routing key 为 red 两个消费者都监听到了消息

    @Test
    public void directExchangeTest(){
        // 交换机名
        String exchangeName = "my.direct";
        // Routing key
        String routingKey = "blue";
        // 发送消息 三个参数:交换机名、队列名、消息
        rabbitTemplate.convertAndSend(exchangeName,routingKey,"蓝色请接收");
    }

在这里插入图片描述

将routing key 改为blue后只有绑定key 为含blue的消费监听到了

3.7 Topic 交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词
3.7.1 创建交换机和队列

创建两个队列分别为:topic.queue1 和 topic.queue2

在这里插入图片描述

创建topic类型的交换机 my.topic

在这里插入图片描述

利用通配符绑定队列和交换机。

在这里插入图片描述
在这里插入图片描述

image-20240603193507889

3.7.2 发送消息
    @Test
    public void topicExchangeTest(){
        // 交换机名
        String exchangeName = "my.toptic";
        // Routing key
        String routingKey = "china.news";
        // 发送消息 三个参数:交换机名、队列名、消息
        rabbitTemplate.convertAndSend(exchangeName,routingKey,"台湾回归祖国怀抱!!!");
    }
3.7.3 消息接收
    @RabbitListener(queues = "topic.queue1")
    public void topicListener1(String msg) throws InterruptedException {
        System.out.println("china.# 接收到消息=======>" + msg);
    }

    @RabbitListener(queues = "topic.queue2")
    public void topicListener2(String msg) throws InterruptedException {
        System.out.println("#.news 接收到消息=======>" + msg);
    }

在这里插入图片描述

3.8 基于bean声明队列交换机

SpringAMQP提供了一个Queue类,用来创建队列:

在这里插入图片描述

在这里插入图片描述

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

在这里插入图片描述

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

在这里插入图片描述

3.8.1 fanout示例

先把之前在控制台创建的fanout.queue队列和my.fanout交换机删除

package com.itheima.consumer.config;
/*
 *@Author:张泽阳
 *@Date:2024年06月03日 20:02
 *Software: IntelliJ IDEA
 *@Description:
 * */

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        // return new FanoutExchange("my.fanout");
        return ExchangeBuilder.fanoutExchange("my.fanout").build();
    }

    /**
     * 创建队列
     */
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 将队列绑定到交换机
     */
    @Bean
    public Binding bindingQueue1(Queue queue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
}

3.8.2 基于注解声明

删除之前在控制台创建的direct.queue队列和my.direct交换机

创建队列

package com.itheima.consumer.config;
/*
 *@Author:张泽阳
 *@Date:2024年06月03日 20:30
 *Software: IntelliJ IDEA
 *@Description:
 * */

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("my.direct");
    }

    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue1");
    }
}

消费者监听消息

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "my.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void directListener1(String msg) throws InterruptedException {
        System.out.println("Routing key:red、blue 接收到消息=======>" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "my.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void directListener2(String msg) throws InterruptedException {
        System.out.println("Routing key:red、yellow 接收到消息=======>" + msg);
    }
3.9 消息转换器

Spring的消息发送代码接收的消息体是一个Object:

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

我们来测试一下。

3.9.1 测试默认转换器

创建队列

package com.itheima.consumer.config;
/*
 *@Author:张泽阳
 *@Date:2024年06月03日 21:00
 *Software: IntelliJ IDEA
 *@Description:
 * */

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ObjectQueueConfig {

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }
}

发送消息

    @Test
    public void objectQueueTest(){
        // 队列名
        String queueName = "object.queue";
        // 创建消息
        Map<String,Integer> map = new HashMap<>(2);
        map.put("jack",18);
        map.put("tom",20);
        // 发送消息
        rabbitTemplate.convertAndSend(queueName,map);
    }

在控制台查看消息

在这里插入图片描述

消息类型为:application/x-java-serialized-object

3.9.2 配置json消息转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

在父工程的pom文件中引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:

package com.itheima.consumer;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;


@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MessageConverter messageConverter(){
        // 1、定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

此时,我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/725530.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud Alibaba Sentinel基础入门与安装

GitHub地址&#xff1a;https://github.com/alibaba/Sentinel 中文文档&#xff1a;https://sentinelguard.io/zh-cn/docs/introduction.html 下载地址&#xff1a;https://github.com/alibaba/Sentinel/releases Spring Cloud Alibaba 官方说明文档&#xff1a;Spring Clou…

前端根据环境变量配置网页的title和favicon

前端根据环境变量配置网页的title和favicon 前言流程步骤一、设置environment文件二、在入口文件中配置三、删除index.html中的title和 icon link四、使用对应的打包命令进行部署 注意事项一、angular中&#xff0c;需要在angular.json添加favicon.ico额外的构建 前言 有些项目…

大学物理绪论组收集和分析

目录 ​编辑 随机误差的估计 算术平均值的标准偏差 不确定度&#xff08;Uncertainty&#xff09;是测量学中的一个重要概念&#xff0c;用于表示测量结果的可靠程度。它反映了测量值可能偏离真值&#xff08;即被测量的客观真实值&#xff09;的程度。 A类不确定度的计算方…

Reverse-Proxy微软开源:高效构建HTTP反向代理的利器

Reverse-Proxy&#xff1a; 简化你的网络架构&#xff0c;用微软的反向代理加速你的服务。- 精选真开源&#xff0c;释放新价值。 概览 微软的reverse-proxy项目是一个高性能的HTTP反向代理应用程序开发工具包。它提供了一种灵活的方式来构建能够处理大量并发连接的代理服务。…

centos 7.8 安装sql server 2019

1.系统环境 centos 7.8 2.数据库安装文件准备 下载 SQL Server 2019 (15.x) Red Hat 存储库配置文件 sudo curl -o /etc/yum.repos.d/mssql-server.repo https://packages.microsoft.com/config/rhel/7/mssql-server-2019.repo 采用yum源进行不安装下载,这时yum 会自动检测…

Javase.抽象类和接口

抽象类和接口 【本节目标】1.抽象类1.1抽象类的概念1.2 抽象类语法1.3 抽象类特性1.4 抽象类的作用 2. 接口2.1 接口的概念2.2 语法规则2.3 接口使用2.4 接口特性2.5 实现多个接口2.6 接口间的继承2.7 接口使用实例2.8Clonable 接口和深拷贝2.9 抽象类和接口的区别 3. Object类…

《Windows API每日一练》5.1 键盘基础

本节我们讲述关于键盘的一些基础知识。当我们按下一个键盘按键时&#xff0c;会产生一个键盘按键消息。这一点你能确定吗&#xff1f;假如是一个菜单快捷键消息&#xff0c;或者是一个子窗口控件消息呢&#xff1f;这就超出了本节讨论的范围&#xff0c;我们将在菜单和子窗口控…

[Qt的学习日常]--窗口

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、窗口的分…

无引擎游戏开发(2):最简游戏框架 | EasyX制作井字棋小游戏I

一、EasyX中的坐标系 不同于数理中的坐标系&#xff0c;EasyX中的y轴是竖直向下的 二、渲染缓冲区 之前的程序添加了这三个函数改善了绘图时闪烁的情况: 小球在"画布“上移动的过程就是我们在调用绘图函数&#xff0c;这个”画布“就是渲染缓冲区&#xff0c;先绘制的内…

【漏洞复现】致远OA webmail.do 任意文件下载 (CNVD-2020-62422)

免责声明&#xff1a; 本文内容旨在提供有关特定漏洞或安全漏洞的信息&#xff0c;以帮助用户更好地了解可能存在的风险。公布此类信息的目的在于促进网络安全意识和技术进步&#xff0c;并非出于任何恶意目的。阅读者应该明白&#xff0c;在利用本文提到的漏洞信息或进行相关测…

摄影构图:人像摄影和风景摄影的一些建议

写在前面 博文内容涉及摄影中人像摄影和风景摄影的简单介绍《高品质摄影全流程解析》 读书笔记整理理解不足小伙伴帮忙指正 &#x1f603; 生活加油 不必太纠结于当下&#xff0c;也不必太忧虑未来&#xff0c;当你经历过一些事情的时候&#xff0c;眼前的风景已经和从前不一样…

C++ | Leetcode C++题解之第167题两数之和II-输入有序数组

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> twoSum(vector<int>& numbers, int target) {int low 0, high numbers.size() - 1;while (low < high) {int sum numbers[low] numbers[high];if (sum target) {return {low …

前端技术栈三(vue+Axios)

一、Vue 1 基本介绍 1.1 Vue 是什么? Vue (读音 /vjuː/&#xff0c;类似于 view) 是一个前端框架, 易于构建用户界面 Vue 的核心库只关注视图层&#xff0c;不仅易于上手&#xff0c;还便于与第三方库或项目整合 支持和其它类库结合使用 开发复杂的单页应用非常方便 Vue 是…

海外优青ppt美化_海优ppt录音视频制作

海外优青 优秀青年科学基金项目&#xff08;海外&#xff09;旨在吸引和鼓励在自然科学、工程技术等方面已取得较好成绩的海外优秀青年学者&#xff08;含非华裔外籍人才&#xff09;回国&#xff08;来华&#xff09;工作&#xff0c;自主选择研究方向开展创新性研究&#xf…

在 Visual Studio 2022 (Visual C++ 17) 中使用 Visual Leak Detector

在 Visual C 2022 中使用 Visual Leak Detector 1 问题描述1.1 内存泄漏的困扰和解决之道1.2 内存泄漏检测工具的选择1.3 VLD的现状 2 安装和设置VLD的环境变量2.1 安装VLD文件2.2 VLD安装后的目录和文件说明2.2.1 include子目录说明2.2.2 lib子目录说明2.2.2.1 目录整理 2.2.3…

centOS Stream9配置NAT8网络

首先将VMware关机&#xff0c;添加网络适配器 启动虚拟机&#xff0c;查看ens192是否打开连接 安装的图形化需要查看右上角电源处网卡是否连接 最小化安装一般不会出现未连接的状态 使用ip a 查看 配置网卡文件 cd /etc/NetworkManager/system-connections/cd到当前目录下…

【stm32-新建工程-HAL库版本】

stm32-新建工程-HAL库版本 ■ HAL库版本目录■ Drivers■ Middlewares 文件夹&#xff0c; 同寄存器版本一样。■ Output 文件夹&#xff0c; 同寄存器版本一样。■ Projects 文件夹&#xff0c; 同寄存器版本一样。■ User 文件夹 ■ HAL库版本目录 ■ Drivers ① &#xff0c…

leetcode119 杨辉三角②

给定一个非负索引 rowIndex&#xff0c;返回「杨辉三角」的第 rowIndex 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: rowIndex 3 输出: [1,3,3,1]示例 2: 输入: rowIndex 0 输出: [1]示例 3: 输入: rowIndex 1 输出: [1,1] pub…

【实战指南】SpringBoot结合Zookeeper/Nacos构建Dubbo微服务

1、微服务架构 微服务架构是一种设计复杂应用程序的方法&#xff0c;它提倡将单一应用程序开发为一组小型、独立的服务&#xff0c;每个服务运行在其自己的进程中&#xff0c;并通过轻量级通信&#xff08;通常是HTTP协议&#xff09;进行交互。每个服务都是围绕业务功能构建的…

绘唐3官网体验入口绘唐官网

绘唐3官网体验入口绘唐官网 绘唐3官网体验入口是指进入绘唐3官网的入口。绘唐是一款中国传统绘画软件&#xff0c;绘唐3是绘唐系列的最新版本。通过绘唐3官网&#xff0c;用户可以了解绘唐3的功能特点、下载安装包、查看新闻动态、参与社区讨论等。以下是绘唐3官网的体验入口&…