RabbitMQ入门指南

在这里插入图片描述

人生永没有终点。只有等到你瞑目的那一刻,才能说你走完了人生路,在此之前,新的第一次始终有,新的挑战依然在,新的感悟不断涌现。

文章目录

  • 一、MQ与RabbitMQ概述
    • 1. MQ简述
    • 2. MQ的优势
    • 3. MQ的劣势
    • 4. 常见的MQ产品
    • 5. RabbitMQ(兔子MQ😀)
  • 二、RabbitMQ安装与配置
    • 1. 基于docker快速安装RabbitMQ
    • 2. 创建用户和虚拟机
  • 三、RabbitMQ快速入门
    • 1. 基础环境搭建
    • 2. publisher消息发布者实现
    • 3. consumer消费者实现
  • 四、SpringAMQP与RabbitMQ工作模型
    • 1. SpringAMQP概述
    • 2. BasicQueue 基本模型(简单模型)
    • 3. WorkQueue 工作模型
    • 4. Publish、Subscribe 发布订阅模型
      • 4.1 Fanout 广播模型
      • 4.2 Direct 路由模型
      • 4.3 Topic 主题模型
    • 5. 消息转换器
      • 5.1 使用默认消息转换器发送Object类型消息
      • 5.2 使用Jackson消息转换器收发JSON消息
      • 5.3 使用默认消息转换器收发JSON消息


一、MQ与RabbitMQ概述


1. MQ简述


MQ(全称:Message Queue)直译是消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。

一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)

在这里插入图片描述

总结:

  • 消息队列(MQ),是一种中间件,用于存储和传递消息。

  • 分布式系统有两种通信方式:直接远程调用(如OpenFeign) 和 借助第三方完成间接通信(如RabbitMQ)。

  • 发送方称为生产者,接收方称为消费者。


2. MQ的优势


MQ的优势:(应用解耦、异步、削峰)

  • 应用解耦:提高系统容错性和可维护性;
  • 异步提速:提升用户体验和系统吞吐量;
  • 削峰填谷:提高系统稳定性。

1、应用解耦

在这里插入图片描述

在这里插入图片描述


2、异步提速

在这里插入图片描述

在这里插入图片描述


3、削峰填谷(秒杀)

在这里插入图片描述

在这里插入图片描述

使用MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。

但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。


3. MQ的劣势


引入MQ会遇到下列问题:

  • 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
  • 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
  • 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
  • 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)

4. 常见的MQ产品


市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ、EMQ(物联网) 等,也有直接使用Redis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特点来综合考虑。

在这里插入图片描述

  • Kafka: 是一个高可用性、高吞吐量的分布式消息系统。它具有持久化、持续性、可扩展性和副本机制,并支持多分区和多消费者组。Kafka适用于大规模的数据流处理,如日志聚合、流处理和实时数据流。在追求可用性和高吞吐能力方面,Kafka是一个不错的选择。
  • RocketMQ: 是一个低延迟、高吞吐量的分布式消息队列系统。它提供了可靠的消息传递机制,支持高并发和高可用性的消息发布和订阅。RocketMQ适用于大规模的消息处理和异步通信场景。在追求可用性、可靠性和吞吐能力方面,RocketMQ是一个较好的选择。
  • RabbitMQ: 是一个可靠性较高、低延迟的开源消息队列系统。它采用AMQP协议,支持多种消息模式和消息确认机制。RabbitMQ适用于可靠性要求较高的任务和通信场景。在追求可用性、可靠性和低延迟方面,RabbitMQ是一个合适的选择。

追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;

追求可靠性:RabbitMQ、RocketMQ;

追求吞吐能力:RocketMQ、Kafka;

追求消息低延迟:RabbitMQ、Kafka。


5. RabbitMQ(兔子MQ😀)


RabbitMQ官网:http://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。

AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标、准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)


在RabbitMQ中,有以下一些角色:

  • Producer(生产者):生产者是指发送消息到RabbitMQ的应用程序。它创建消息并将其发送到交换器。

  • Consumer(消费者):消费者是指从RabbitMQ接收消息的应用程序。它基于订阅的方式从队列中获取消息并进行处理。

  • Exchange(交换器):交换器是消息的路由中心。当生产者发送消息时,通过交换器将消息路由到一个或多个队列。

  • Queue(队列):队列是RabbitMQ中存储消息的地方。消费者从队列中接收消息,并进行处理。

  • Binding(绑定):绑定将交换器与队列相关联。它定义了消息从交换器到队列的路由规则。

  • Broker(代理服务器):代理服务器是RabbitMQ的核心组件,负责接收和传递消息。它负责处理交换器、队列、消息的路由和转发。

  • Channel(信道):信道是RabbitMQ使用的通信通道,生产者和消费者通过信道与代理服务器进行交互。

  • Virtual Host(虚拟主机):虚拟主机在RabbitMQ中用于将不同的应用隔离开来。每个虚拟主机具有自己的交换器、队列和绑定。

这些角色共同组成了RabbitMQ的基本架构。生产者发送消息到交换器,通过绑定将消息路由到队列,消费者从队列中接收消息并进行处理。代理服务器负责消息的接收、传递和路由。信道用于生产者和消费者与代理服务器的通信,而虚拟主机提供了应用隔离的环境。
在这里插入图片描述

RabbitMQ工作模式:

文档地址:https://www.rabbitmq.com/getstarted.html

RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第6种RPC远程调用不属于MQ)

在这里插入图片描述

JMS (Sun公司提供一套Java操做消息队列的接口)

  • JMS(JavaMessage Service),Java消息服务应用程序接口,即Java操作消息中间件的API;
  • JMS是JavaEE规范的一种,类比JDBC;
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供。

二、RabbitMQ安装与配置


1. 基于docker快速安装RabbitMQ


扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml

1、拉取镜像

docker pull rabbitmq:3.8-management

在这里插入图片描述

2、运行容器

在这里插入图片描述

 docker run -di \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name rabbitmq \
 --hostname my-rabbit \
 -p 15672:15672 \
 -p 5672:5672 \
 --restart=always \
 rabbitmq:3.8-management
  • \ 代表换行
  • -e 指定环境变量
  • -e RABBITMQ_DEFAULT_USER=admin 用户名
  • -e RABBITMQ_DEFAULT_PASS=123456 密码
  • -v 挂载数据卷
  • -p 15672:15672 用于web管理页面使用的端口 (管理员页面,端口15672)
  • -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里要使用的)
  • -di ,d后台运行,i打开控制台交互
  • –name mq 容器名字
  • –hostname mq (这个参数在单机版mq配不配置都可以,用来设置主机名,搭建集群会用到);

扩展:启动xxx插件(后面会用到这个命令)

# 进入容器
docker exec -it rabbitmq /bin/bash

# 启动xxx插件
rabbitmq-plugins enable xxx

RabbitMQ管理端:

管理端访问地址:http://192.168.150.103:15672/

在这里插入图片描述

在这里插入图片描述


2. 创建用户和虚拟机


1、添加一个新用户:

在这里插入图片描述

添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:

在这里插入图片描述


2、创建虚拟机

在这里插入图片描述

为指定用户授权:

在这里插入图片描述

最后该用户就可以操作这个虚拟机了:

在这里插入图片描述


三、RabbitMQ快速入门


使用传统写法完成简单模的消息传递:(特点:一条消息只能被一个消费者消费)

在这里插入图片描述

官方的HelloWorld示例是基于简单消息队列模来实现的,其中包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue;
  • queue:消息队列,负责接受并缓存消息;
  • consumer:订阅队列,处理队列中的消息。

1. 基础环境搭建


1、创建maven工程,并在pom文件中导入如下依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--SpringAMQP依赖,可以操作RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、创建子模块publisher(生产者)、consumer(消费者),并编写启动类和yml配置文件:

# 日志输出格式配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

在这里插入图片描述


2. publisher消息发布者实现


消息收发流程:Connection连接、Channel通道、queue队列和exchange 交换机。

publisher消息发布者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

1、编写publisher测试代码:

package cn.aopmin.mq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者(传统写法)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
public class PublisherTest {

    /**
     * 发送消息
     *
     * @throws IOException
     * @throws TimeoutException
     */
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost虚拟主机、用户名、密码
        factory.setHost("192.168.150.103");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue"; // 队列名称
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

在这里插入图片描述


2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)

在这里插入图片描述

查看连接信息:

在这里插入图片描述


回到IDEA继续按F8,查看通道信息:

在这里插入图片描述

在这里插入图片描述


继续按F8,查看队列信息:
在这里插入图片描述
在这里插入图片描述


最后直接放行程序,查看队列中的消息:

在这里插入图片描述
在这里插入图片描述


3. consumer消费者实现


consumer消费者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

1、编写消费者代码

package cn.aopmin.mq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者(传统写法)
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.103");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            // 接收消息的回调函数
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

在这里插入图片描述

2、测试(消费者启动程序后会一直执行,不用的时候将程序结束即可)

在这里插入图片描述
在这里插入图片描述


四、SpringAMQP与RabbitMQ工作模型


1. SpringAMQP概述


AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址:https://spring.io/projects/spring-amqp

在这里插入图片描述

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。

RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。

在这里插入图片描述


2. BasicQueue 基本模型(简单模型)


使用SpringAMQP实现简单模型的消息收发:

在这里插入图片描述

1、在父工程中引入spring-amqp起步依赖:

<!--SpringAMQP:可以操作RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

完整的pom.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.aopmin</groupId>
    <artifactId>rabbitmq02-BasicQueue</artifactId>
    <version>1.0.0</version>
    <packaging>pom</packaging>
    <description>springAMQP实现简单模型消息传递</description>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath/>
    </parent>

    <dependencies>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- SpringAMQP:可以操作RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>


    <!-- 打包插件 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2、消息发送

2.1、在publisher服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package cn.aopmin.mq.test;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 使用SpringAMQP实现简单模型的消息发送
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@SpringBootTest
// @RequiredArgsConstructor // 生成构造方法(构造器注入,要求注入的字段必须final修饰)
public class SpringAmqpTest {

    /**
     * RabbitTemplate是SpringAMQP中的核心类,用于实现消息的发送和接收
     */
    @Autowired
    private  RabbitTemplate rabbitTemplate;

    /**
     * 测试简单模型的消息发送
     */
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在这里插入图片描述


3、消息接收

3.1、在consumer服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

3.2、在consumer服务的com.baidou.mq.listener包中创建SpringRabbitListener类:

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /**
     * 订阅消息
     *
     * @param msg 消息
     * @throws InterruptedException
     */
    @RabbitListener(queues = "simple.queue")  // 配置要监听的队列: simple.queue
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("消费者接收到消息:【" + msg + "】");
    }
}

4、测试

先启动consumer服务(启动类),然后再运行publisher服务中发送消息的测试代码。
在这里插入图片描述
在这里插入图片描述


3. WorkQueue 工作模型


工作队列模型(Work Queue Mode):消息按照一定的策略分配给多个消费者来解决消息堆积问题,适用于任务分发和负载均衡场景。

角色:生产者、队列、消费者

在这里插入图片描述

使用SpringAMQP实现工作队列模型的消息收发:

1、在消费者监听类中编写两个方法,监听同一个队列,模拟多个消费者。

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /*
       编写两个方法监听同一个队列,可以实现多个消费者同时消费一个队列的消息
     */
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

在这里插入图片描述

2、模拟生产者发多条消息:

/**
* 测试工作模型发消息
*/
@Test
public void testWork() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello,rabbitmq";

    // 模拟发送100条消息
    for (int i = 1; i <= 100; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
    }
    System.out.println("消息发送完毕!");
}

在这里插入图片描述

3、测试:先启动消费者服务,再执行生产者发送消息的代码。

在这里插入图片描述


消费者预取消息限制:

工作模型默认一人一半消息,可以通过修改消费者application.yml文件,配置prefetch属性,控制消费者预取消息的上限:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码
    listener:
      simple:
        prefetch: 1  # 消息预取策略(每次获取一条消息,处理完后再获取下一条)

prefetch属性用于指定消费者一次从RabbitMQ服务器预取的消息数量。通过限制预取消息的数量,你可以控制每个消费者同时处理的消息数量,从而实现负载均衡和资源控制。


4. Publish、Subscribe 发布订阅模型


发布订阅模型特点: 可以通过交换机(exchange)将一条消息发给多个队列(消费者)进行处理。

常见的exchange类型包括:Fanout广播、Direct路由、Topic主题。

交换机的主要作用:

  • 接收生产者发送的消息
  • 将消息按照规则路由到绑定过的队列中
  • 它不能缓存消息,路由失败,消息丢失
    在这里插入图片描述

SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:

在这里插入图片描述


4.1 Fanout 广播模型


Fanout Exchange,交换机会把收到的消息发送给绑定过的所有队列。(队列需要与交换机建立关系,然后才能收到对应消息)

在这里插入图片描述


接下来使用SpringAMQP演示Fanout Exchange收发消息:

1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定

package cn.aopmin.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 在消费端声明队列、交换机、绑定关系,这样就不用在rabbitmq管理页面手动创建了,这样在服务启动后springAMQP会自动创建
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Configuration
public class MqConfig {

    /**
     * 声明Fanout交换机
     * 交换机名: exchange.fanout
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange.fanout");
    }


    /**
     * 声明队列
     * 队列名: fanout.queue1
     */
    @Bean
    public Queue queue1() {
        return new Queue("fanout.queue1");
    }


    /**
     * 声明队列
     * 队列名: fanout.queue2
     */
    @Bean
    public Queue queue2() {
        return new Queue("fanout.queue2");
    }


    /**
     * 绑定关系
     * 将队列1绑定到Fanout交换机上
     */
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    /**
     * 绑定关系
     * 将队列2绑定到Fanout交换机上
     */
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2) {//参数注入,即参数名就是bean的名字
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

消费者application.yml配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class FanoutListener {

    /*
       编写两个方法,分别监听队列1和队列2
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenerFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenerFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

编写监听类后,启动消费者服务会自动创建交换机和队列组件:

在这里插入图片描述

3、在publisher的测试类中编写向exchange.fanout发消息的代码:

/**
 * 测试广播模式发送消息
 */
@Test
public void testFanout() {
    // 交换机名称
    String exchangeName = "exchange.fanout";
    // 消息
    String message = "hello,rabbitmq";

    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key),在广播模式下不需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    System.out.println("消息发送完毕!");
}

执行测试方法,查看运行结果:

在这里插入图片描述


4.2 Direct 路由模型


Direct exchange,会将接收到的消息按照规则(Routing key)转发到指定的队列,因此称为路由模式(routes)

在交换机上做了一层规则判断操作。

Fanou模型要求:

  • 每一个Queue都与Exchange设置一个BindingKey;
  • 发布者发送消息时,指定消息的RoutingKey;
  • Exchange会将消息路由到BindingKey与消息RoutingKey一致的队列上。

在这里插入图片描述

基于AMQP演示Direct模型::

1、在consumer服务的监听类中,编写两个消费者方法,并在方法上通过@RabbitListener组合注解声明Exchange、Queue、RoutingKey,然后分别监听direct.queue1和direct.queue2队列中的消息:

package cn.aopmin.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.xml.ws.BindingType;

/**
 * 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class DirectListener {

    /**
     * 在监听方法上通过注解的方式声明交换机和队列、及绑定关系
     * 队列: 通过@Queue注解创建队列
     * 交换机: 通过@Exchange注解创建交换机
     * 绑定关系: 通过bindingkey绑定{"blue", "red"}
     *
     * @param msg
     * @throws InterruptedException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),//创建队列
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
            key = {"blue", "red"} // bindingkey
    ))
    public void listenDirect1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),//创建队列
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
            key = {"yellow", "red"} // bindingkey
    ))
    public void listenDirect2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

在这里插入图片描述


2、在publisher中编写测试方法,向exchange.direct发送消息

/**
* 测试路由模式发送消息
*/
@Test
public void testDirect() {
    // 交换机名称
    String exchangeName = "exchange.direct";
    // 消息
    String message = "helloworld!";
    
    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key,发消息时候用的),在路由模式下需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", "routingKey:blue ---" + message);
    rabbitTemplate.convertAndSend(exchangeName, "red", "routingKey:red ---" + message);
    System.out.println("消息发送完毕!");
}

在这里插入图片描述


3、测试:启动消费者服务创建交换机和队列,然后执行生产者发消息方法

在这里插入图片描述
在这里插入图片描述


小节:

1、Direct与Fanout交换机的区别?

  • Fanout相对于Direct更灵活些。

  • Fanout交换机不做判断,收到消息就会广播给绑定的队列。

  • Direct交换机会根据RouthingKey判断,然后路由给满足规则的队列。

  • 在Direct模型中,如果多个队列都有相同的RouthingKey,则与Fanout功能类似。

2、基于@RabbitListeneri注解声明队列和交换机有哪些常见注解?

  • @QueueBinding 绑定关系
  • @Queue 队列
  • @Exchange 交换机

4.3 Topic 主题模型


Topic Exchange 与 Direct Exchange类似,区别在与RoutingKey必须是多个单词组成,并且以==.== 分割。(用的最多)

队列与交换机指定BindingKey时可以使用通配符:

  • #:表示匹配0或多个单词;例如 china.# 、#.new
  • *:表示匹配1个单词;

在这里插入图片描述

  • Queue1:绑定的是java.# ,因此凡是以 java.开头的routing key 都会被匹配到,例如 java.news、java.blog;
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到,例如java.news、weather.news、heihe.weather.news。

基于AMQP演示Topic 模型:

1、在消费端的监听方法上声明交换机和队列、及绑定关系

package cn.aopmin.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.xml.ws.BindingType;

/**
 * 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class TopicListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//创建队列
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
            key = "java.#" // bindingkey
    ))
    public void listenTopic1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//创建队列
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
            key = "#.news" // bindingkey
    ))
    public void listenTopic2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

2、在生产端,编写发消息方法

/**
 * 测试主题模式发送消息
 */
@Test
public void testTopic() {
    // 交换机名称
    String exchangeName = "exchange.topic";
    // 消息
    String message = "helloworld!";

    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key),在路由模式下需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "java.blog", message);
    rabbitTemplate.convertAndSend(exchangeName, "java.news", message);
    System.out.println("消息发送完毕!");
}

3、测试

在这里插入图片描述


小节

1、Direct和Topic交换机的区别?

  • 相同点:两个交换机都会key进行判断。(即消息的路由key与队列的绑定key进行比较)
  • 不同点:Topic队列的绑定key支持通配符更加灵活。Direct队列的绑定key不支持通配符,只能匹配具体key的消息。

5. 消息转换器


默认情况下,Spring会帮我们把发送的任意对象类型消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
在这里插入图片描述

但是,Spring默认使用的是JDK序列化,JDK序列化会存在一些问题:

  • 数据体积过大;
  • 有安全隐患;
  • 可读性差。

5.1 使用默认消息转换器发送Object类型消息


1、声明队列、编写监听方法

package cn.aopmin.mq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 声明队列
 *
 * @author 白豆五
 * @version 2023/07/3
 * @since JDK8
 */
@Configuration
public class MqConfig {

    @Bean
    public Queue ObjectQueue() {
        return new Queue("object.queue");
    }
}
@Component
public class ObjectListener {
    @RabbitListener(queues = "object.queue")
    public void listenObj(Object obj) {
        System.out.println("收到消息:" + obj.toString());
    }
}

2、编写发消息代码

/**
 * 测试默认消息转换器收发消息(JDK序列化)
 */
@Test
public void testDefault() {
    // 队列名称
    String queueName = "object.queue";
    // 对象消息
    Map<String, Object> message = new HashMap<>();
    message.put("name", "张三");
    message.put("age", 18);

    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
    System.out.println("消息发送完毕!");
}

测试,查看队列数据:(默认情况下JDK序列化的结果不直观,可以把消息转成json格式发送)

在这里插入图片描述


5.2 使用Jackson消息转换器收发JSON消息


Spring提供了org.springframework.amqp.support.converter.MessageConverter接口来处理对象消息的转换。在AMQP中默认实现是SimpleMessageConverter,而SimpleMessageConverter它基于JDK的ObjectOutputStream完成序列化。

如果我们不想使用默认的消息转换器,只需在生产端和消费端配置MessageConverter类型的Bean即可。

1、生产端和消费端都引入jackson依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2、生产端和消费的都配置消息转换器

package cn.aopmin.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

//生产端
@SpringBootApplication
public class PublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApp.class, args);
    }

    // 使用json序列化机制,进行消息转换
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
package cn.aopmin.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

//消费端
@SpringBootApplication
public class ConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class, args);
    }
    // 使用json序列化机制,进行消息转换
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

3、修改消费端的监听方法

@Component
public class ObjectListener {
    @RabbitListener(queues = "object.queue")
    public void listenObj(Map<String,Object> msg) {
        System.out.println("收到消息:" + msg);
    }
}

4、测试

在这里插入图片描述


5.3 使用默认消息转换器收发JSON消息


上一种方案,配来配去非常麻烦,而且一旦消息转换器不一样,就不能达到想要的结果。默认情况下,对于字符串类型的消息,默认的JDK消息转换器会使用UTF-8编码将字符串转换为字节数组,并将其作为消息体进行发送。

这样我们在发消息的时候手工将对象序列化为json字符串,在接收消息时再序列化为Java对象即可。

JSON工具类:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.76</version>
</dependency>

常用方法:

  • 序列化:JSON.toJSONString(xxx);
  • 反序列化:JSONObject(str,Xxx.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34373.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 数据库

文章目录 数据库的基本概念数据表数据库数据库管理系统数据库系统 数据库的发展史当今主流数据库介绍SQL Server &#xff08;微软公司产品&#xff09;Oracle &#xff08;甲骨文公司产品&#xff09;DB2 &#xff08;IBM公司产品&#xff09;MySQL &#xff08;甲骨文公司收购…

Spring Boot日志文件

目录 前言&#x1f36d; 一、日志的作用&#x1f36d; 1、日志真实使用案例&#xff1a;&#x1f349; 二、日志怎么用&#x1f36d; 1、自定义日志打印&#x1f349; Ⅰ、在程序中得到日志对象&#x1f353; 常见的日志框架说明&#xff08;了解&#xff09;&#x1f35…

保姆级AT32F437 配置RT-Thread 以太网(UDP/TCP Server)

好记性不如烂笔头&#xff0c;既然不够聪明&#xff0c;就乖乖的做笔记&#xff0c;温故而知新。 本文档用于本人对知识点的梳理和记录。 一、前言 ENV版本&#xff1a;v1.3.5 rt-thread版本&#xff1a;V5 开发板&#xff1a;雅特力AT32F437 AT-START-F437 二、ENV配置 E…

java--类变量与实例变量--实验设计--村庄种树

目录 设计要求 设计流程图 程序代码 类Village代码 类MainClass代码 可以直接运行的代码 运行结果 类变量与实例变量的区别和类方法与实例方法的区别 idea的详细使用方法 设计要求 编写程序模拟两个村庄共同拥有一片森林&#xff1b;编写一个Village类&#xff0c;该类…

python爬虫:爬取网站所有页面上某些内容

举例场景&#xff1a;爬取腾讯课堂中&#xff0c;查询python的所有课程的封面图、课程标题、课程数量、课程价格&#xff0c;这4个部分的内容。 代码如下&#xff1a; import requests # import lxml # 导入用于请求的包lxml from bs4 import BeautifulSoup # 导入用于请求…

Tomcat 8.5 环境搭建指南

文章目录 导言环境搭建总结 导言 欢迎阅读本篇博客&#xff0c;本文将为您提供关于如何搭建Tomcat 8.5环境的详细指南。Tomcat是一个流行的开源Java Servlet容器&#xff0c;它提供了一个运行Java Web应用程序的平台。无论您是新手还是有经验的开发人员&#xff0c;本文都将帮…

4、架构:Canvas VS DOM

在可视化搭建的低代码平台中&#xff0c;设计器是一个非常关键的模块&#xff0c;可以帮助用户通过拖拽、配置等方式快速搭建应用界面。 在技术选型方面&#xff0c;目前市面上主流的设计器技术包括基于 HTML/CSS/JavaScript 的 Web 设计器。 在渲染方案方面&#xff0c;主流…

王道《计算机网络》思维导图汇总

第一章 1.1.1 概念与功能 1.1.2 组成与分类 1.1.3 标准化工作及相关组织 1.1.4 性能指标 速率 带宽 吞吐量 时延 时延带宽积 往返时延RTT 利用率 1.2.1 分层结构、协议、接口、服务 1.2.2 OSI参考模型 应用层 表示层 会话层 传输层 网络层 数据链路层 物理层 1.2.4 TCP/IP 参…

Linux下的分布式迁移工具dsync使用

一、Linux下的分布式迁移工具dsync使用 dsync 是一个在 Linux 系统上用于迁移文件和目录的工具。它可以在不同的存储设备之间进行数据同步和迁移操作。dsync 是 rsync 工具的一个衍生版本&#xff0c;专注于在本地文件系统之间进行数据迁移。 这里是官网&#xff1a;dsync 1、…

游泳买耳机买什么的比较好,列举几款实战性好的游泳耳机

对于运动用户来说&#xff0c;在运动时都会选择听一些节奏感比较强的音乐&#xff0c;让自己运动是更有活力。现在已经是三伏天中的前伏期间&#xff0c;不少人会选择在三伏天的日子里进行减肥瘦身&#xff0c;耳游泳已经成为很多人都首选运动&#xff0c;游泳是非常好的有氧运…

吐血整理!可免费使用的国产良心软件分享,几乎满足你办公需求

在这个信息化时代&#xff0c;软件已经成为我们办公和生活的必备工具。然而&#xff0c;市面上的大部分国产软件都需要付费才能使用&#xff0c;给我们的经济负担增加了不少。幸运的是&#xff0c;国内有一些良心软件&#xff0c;它们质量上乘&#xff0c;功能强大&#xff0c;…

2023牛客网秋招国内大厂最牛的 Java 面试八股文合集(全彩版)

秋收即将来临&#xff0c;找工作的小伙伴比比皆是&#xff0c;很对小伙伴早早的就开始储备技术&#xff0c;准备秋招面试了。 为了帮助小伙伴更好的应对面试&#xff0c;我拉来十几个大佬&#xff0c;汇总一线大厂的情况&#xff0c;给你整了一套超全的面试资料&#xff1a; 16…

深度学习项目实战二: LetNet5网络结构搭建

深度学习项目实战二: LetNet5网络结构搭建 文章目录 深度学习项目实战二: LetNet5网络结构搭建@[TOC](文章目录)一、卷积基本运算公式二、LetNet5网络1. 网络结构![在这里插入图片描述](https://img-blog.csdnimg.cn/0008fe6e5886414eac09eed49556ad99.png)2. 导入相关包3. 代码…

熔断与降级 Hystrix

一、Hystrix(豪猪)简介 1、Hystrix的设计目的 &#xff08;1&#xff09;对依赖服务调用时出现的调用延迟和调用失败进行控制和容错保护。 &#xff08;2&#xff09;阻止某一个依赖服务的故障在整个系统中蔓延&#xff0c;服务A->服务B->服务C&#xff0c;服务C故障了…

四、交换网络实验4——单臂路由配置

更多网络基础内容可见: 网络基础学习目录及各章节指引 4.6.4 配置单臂路由器 实验目的 学习配置单臂路由 实验工具 Cisco Packet Tracer 实验环境 安装模拟器的Windows系统 实验步骤 第一步:根据拓扑配置基本信息

【Linux修炼】开发工具使用

&#x1f307;个人主页&#xff1a;平凡的小苏 &#x1f4da;学习格言&#xff1a;命运给你一个低的起点&#xff0c;是想看你精彩的翻盘&#xff0c;而不是让你自甘堕落&#xff0c;脚下的路虽然难走&#xff0c;但我还能走&#xff0c;比起向阳而生&#xff0c;我更想尝试逆风…

无人机遥感在农林信息提取中的实现方法与GIS融合制图教程

详情点击链接&#xff1a;无人机遥感在农林信息提取中的实现方法与GIS融合制图 遥感技术作为一种空间大数据手段&#xff0c;能够从多时、多维、多地等角度&#xff0c;获取大量的农情数据。数据具有面状、实时、非接触、无伤检测等显著优势&#xff0c;是智慧农业必须采用的重…

Vue指令案例

案例需求&#xff1a; 将Vue数据模型中的数据以表格的形式渲染展示 具体代码如下&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-wid…

《MySQL》对表进行操作(DDL语句)

文章目录 &#x1f4a1;创建表&#x1f4a1;修改表&#x1f4a1;删除表 在了解下列语句前&#xff0c;先掌握一下指令 # 查看表内容 desc [表名] # 详细查看表内容 show create table [表名] \G&#x1f4a1;创建表 # 创建表 create table [表名] ([字段1] [类型1],[字段2] […

ChatGPT实战:高考志愿填报

近期&#xff0c;随着各地陆续发布高考成绩&#xff0c;高考志愿填报市场随之升温&#xff0c;“高报师”再次成为“香饽饽”。填报志愿对中学生来说太难&#xff0c;在一个懵懂的年纪做这样一个决策&#xff0c;份量是比较重的。当普通人没很多的信息做参考的时候&#xff0c;…