SpringCloud微服务 【实用篇】| RabbitMQ快速入门、SpringAMQP

目录

一:初始RabbitMQ

1. 同步和异步通讯

1.1 同步调用

1.2 异步调用

2. MQ常见框架

二:RabbitMQ快速入门

1. RabbitMQ概述和安装

2. 常见消息队列模型

3. 快速入门案例

三:SpringAMQP

1. Basic Queue 简单队列模型

2. Work Queue 工作队列模型

3. 发布订阅模型-Fanout 发布

4. 发布订阅模型-Direct 发布

5. 发布订阅模型-Topic 发布

6. 消息转换器


前些天突然发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,感兴趣的同学可以进行学习人工智能学习

一:初始RabbitMQ

1. 同步和异步通讯

同步通讯和异步通讯理解

生活中就有很多同步和异步的案例,例1:假如你现在与一个妹子聊天,采用同步通信更像是打视频电话,就像直播一样,所得到的信息都能立刻同步过去,具有一定的优势;而异步通信更像是微信聊天,别人不想理你也不知道,时效性不是那么好,但也有自己的优点。例2:假如你现在在和三个妹子聊天,同步通信只能一个妹子聊,就会错失很多良机;异步通信可以多个妹子一块聊,还不会被发现;所以那么牛的技术我们当然要好好学习!

1.1 同步调用

案例:前面学习的微服务间基于Feign的调用就属于同步方式,就存在以下问题:

耦合度高:每次加入新的需求,都要修改原来的代码

对于一个订单业务,我们支付成功后就需要更改订单服务修改订单状态,然后进行发货;支付服务调用订单服务还是存储服务都需要等待对方的响应,是实时的调用。此时一个完整的系统开发好了,如果产品经理需要增加一个短信通知服务等功能,此时就需要在支付服务里增加代码;每次增加一个业务,代码就需要更改,具有很强的耦合性!

性能下降(吞吐量):调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和

假如现在调用支付服务需要50ms,支付服务调用其它服务都需要150ms,支付服务调用每个服务都是同步调用,所以只能进行等待当前调用完成才可以调用其它的服务;所以一个完整的服务调用下来就需要500ms,这相当于1s中只能处理请求。数以十万百万的请求过来根本顶不住,性能下降、吞吐量也下降了!

资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

在支付服务等待订单服务的过程,CPU和内存都在占用着啥都不干,只有某个服务调用完成才会执行下一个,在等待的过程中浪费大量资源,资源利用的不够充分!

级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

假如现在存储服务挂了,此时支付服务进行访问,就会一直进入阻塞状态,这个请求就不会被释放,后面阻塞的越来越多,等待资源耗尽,支付服务就进不去了,相当于支付服务也挂了;所以造成整个服务就瘫痪了!

总结同步调用:

优点:时效性强,可以立即得到结果。

缺点:耦合度高、性能和吞吐能力下降、有额外的资源消耗、有级联失联问题。

1.2 异步调用

异步调用常见实现就是事件驱动模式

 在支付服务与其它服务之间引入一个Broker(事件代理者)。一旦有人支付成功就是一个事件,这个事件交给Broker来管理;而订单、仓储等服务就会找Broker这个老大哥,一旦有人支付成功你要通知我们(订阅事件);所以一旦有人支付成功,Broker就会发布支付成功事件(这里通知完就会返回给用户,不会等待其它服务响应完)去通知其它服务有人支付成功了,此时其它服务就会去修改订单状态!

优势一:服务解耦

原来增加业务需要更改业务的代码,现在就不用了;因为现在支付服务不负责调用,只负责发送一个事件到Broker,至于是谁接收?什么时间接收?有没有完成?完全不用管。所以一旦有新的业务只需要订阅新的Broker事件即可(到时候直接大喇嘛一喊,就能通知到你)!注:这样将来增加或删除业务就不需要更改代码,只需要订阅或取消订阅事件即可

优势二:性能提升,吞吐量提升

以前的耗时是总耗时加在一起50+150*3=500ms,现在只要支付成功,支付服务就向Broker发布事件,立刻就能返还给用户支付成功50+10=60ms。而Broker通知其它服务,什么时候去完成?多久去完成?完全不用管。

优势三:服务没有强依赖,不担心级联失败问题,没有资源浪费

支付服务相当于借用Broker去通知而不是调用,此时仓储服务挂了,也和我没关系,只需要重启仓储服务即可。既然没有强的依赖关系,我不调用你,也不需要等待你,所以就没有了资源浪费。

优势四:流量削峰

假设现在有多个用户发出请求,此时Broker就起到一个缓冲的作用,把请求都放到让订单服务、仓储等服务按照自己的能力去处理业务,处理完再去Broker取,现在此时的压力是Broker扛着。

总结异步调用:

优点:耦合度低,性能和吞吐量提升,故障隔离,没有资源消耗,没有级联失联问题,流量消峰。

缺点:依赖于Broker的可靠性、安全性、吞吐能力,架构复杂了,业务没有明显的流程线,不好追踪管理。

2. MQ常见框架

MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Brokeri

MQ常见的四种实现:RabbitMQ、ActiveMQ、RocketMQ、Kafka 

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二:RabbitMQ快速入门

1. RabbitMQ概述和安装

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

RabbitMQ安装

单机部署:基于Centos7虚拟机中使用Docker来安装!

第一步:下载镜像

①在线拉取

docker pull rabbitmq:3-management

②从本地加载,使用本地已经安装的镜像包 

上传到虚拟机目录后(例如tmp目录),使用命令加载镜像即可:

docker load -i mq.tar

第二步:安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \ # -e设置环境变量:用户名和密码
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \ # --hostname配置主机名,集群部署需要配置这个
 -p 15672:15672 \ # 管理平台的端口
 -p 5672:5672 \ # 消息通信的端口
 -d \
 rabbitmq:3-management

第三步:查看状态

docker ps

成功启动

第四步:登录管理品台页面

注:如果出现第二天登录不上的情况,请重启docker,service docker restart

192.168.#.#:15672 # 前面是虚拟机IP,后面是端口

输入设置的账户密码

需要注意的是:每个用户都需要有自己独享的虚拟主机

RabbitMQ的结构和概念

Publisher是消息的发送者,Consumer是消息的消费者。发送者将来会把消息发送到exchange(交换机),交换机会把消息路由到queue(队列),队列负责暂存消息;而后消费者从队列中获取消息,然后处理消息!

注:每创建一个用户都对应一个VirtualHost(虚拟主机),各个虚拟主机之间是相互隔离的,看不到,这样可以避免干扰。

总结RabbitMQ中的几个概念:

①channel:操作MQ的工具; 

②exchange:路由消息到队列中 ;

③queue:缓存消息 ;

④virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组、隔离;

2. 常见消息队列模型

MQ的官方文档中给出了7个MQ的Demo示例,其中与消息发送和接收有关系的就是前5个:

①其中前2个命名为基本消息队列(BasicQueue)工作消息队列(WorkQueue),这两种有一个共同的特征:消息的发送和接收都是基于队列来完成的(没有通过交换机),其中P代表发送者、C代表消费者、中间的红色部分代表消息队列。

②后3个都属于发布订阅(Publish、Subscribe),只是交换机类型不同分为三种:Fanout Exchange(广播)Direct Exchange(路由)Topic Exchange(主题),其中紫色的部分就代表交换机。

3. 快速入门案例

HelloWorld案例---》基本消息队列入门

注:mq-demo是父工程用来做依赖管理,consumer和publisher是两个子工程

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

publisher:消息发布者,将消息发送到队列;

queue:消息队列,负责接受并缓存消息;

consumer:订阅队列,处理队列中的消息;

注:其中queue是由MQ进行管理的,所以我们只需要写publisher和consumer这两部分代码

mq-demo父工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

publisher消息的发送者

其中启动类和yml文件是SpringBoot工程必备的,没什么好说的,最主要的是测试类

①首先要先创建连接,需要连接工厂ConnectioFactory

②根据连接工厂,去设置连接的信息:连接的地址、端口号、虚拟主机、用户名、密码

③前面连接工厂和参数都准备好了,然后就是调用连接工厂ConnectionFactory的newConnection方法,正式建立连接connection

④正式建立连接后,就需要调用connection的createChannel建立通道channnel,这样生产者和消费者才能完成消息的发送和接收;

⑤通道有了就可以基于通道向队列queue中发送消息了,首先是声明了队列的名称,然后调用通道的queueDeclare方法向队列中发送消息;

⑥有了队列,生产者就可以向队列中发送消息了,把准备的消息发送到队列当中,以字节的形式发送出去。

⑦最后在关闭通道和连接。

注:无论是声明队列还是向队列中发送消息实际上使用的都是通道channel

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672); // 5672是通信的端口,15672是管理的接口
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

1. 正式建立连接connection,管理界面就会有连接的信息 

2. 连接正式建立后,就会创建通道channel供消息的发送和接收使用

3. 根据通道向队列queue发送消息

4. 消息发送到队列后,就关闭通道和连接(发完就不管了,解除了耦合)

①控制台

② 管理的页面queue,都表示消息已经成功发出去

Consumer消息的接收者

①消费者就需要从队列中接收消息,所以也会有创建连接工厂、准备参数、创建通道等操作,这些代码不变;

②值得注意的是在这里我们又创建了队列,这是为什么呢?这是因为我们生产者和消费者的启动顺序是不同的,万一消费者先启动找队列找不到怎办?为了避免这种情况的发生都声明了对列。并且如果这个对列已经创建过了不会再次创建;

③下面实际上就相当于回调函数,调用basicConsume方法,表示消费一条消息,那么去干什么呢,什么行为?这里就采用了匿名内部类对象DefaultConsumer(默认的消费者),重写了handleDelivery方法(处理投递的消息),把处理的行为挂载到队列queueName当中;一旦消息队列中有了消息,这个回调函数就会执行。

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.2.129");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override 
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body); // 发的时候是字节,接的时候也必须是字节,这里在转换为字符串
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

此时的执行结果

此时先打印的是 ”等待接收消息。。。。” ,实际上这就是回调机制,前面的代码只是让回调函数和队列进行绑定,此时的消息还没过来,代码会继续执行,一直到MQ把消息投递过来才会打印。这也再次证明了是异步的!

一旦消息被消费,队列中的就会被删除!

三:SpringAMQP

前面我们使用官方的API实现了简单的MQ程序,但是发现程序非常的麻烦;接下来就学习一下SpringAMQP,大大简化了消息的发送和接收。

什么是SpringAMQP

SpringAmqp的官方地址:Spring AMQP,是应用间消息通信的一种协议,与语言平台无关。

AMQP:在学习SpringAMQP之前需要先了解一下AMQP,Advanced Message Queuing Protocol(高级消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现

①用于异步处理入站消息的监听器容器;

②用于发送和接收消息的 RabbitTemplate

RabbitAdmin 实现自动化的声明队列、交换和绑定,自动创建队列;

接下来就是用SpringAMQP实现消息队列的五种类型!

1. Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

第一步:在父工程中引入spring-amqp的依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--SpringBoot的单元测试依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
</dependency>

第二步:在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

①在publisher服务中编写application.yml,添加mq连接信息

以配置的方式制定建立连接的一些信息。

spring:
  rabbitmq:
    host: 192.168.2.129  # IP地址
    port: 5672  # 端口
    virtual-host: /  # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

②在publisher服务中新建一个测试类,编写测试方法:

直接使用RabbitTemplate工具类发送信息即可。

注:springamqd不会帮你创建队列,只能存在已有的队列中,所以要自己提前在浏览器的控制页面上创建这个对列!

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 调用工具类的方法
    @Test
    public void testSendMessageSimpleQueue(){
        // 第一个参数队列的名称
        String queueName = "simple2.queue";
        // 第二个参数消息
        String message = "hello SpringAMQP!";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

成功发送

第三步:在consumer服务中编写消费逻辑,绑定simple.queue这个队列,进行监听

①在consumer服务中编写application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: 192.168.2.129
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321

②在consumer服务中新建一个类,添加@Component注解,类中声明方法添加@RabbitListener注解,编写消费逻辑:

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component // 纳入Spring管理
public class SpringRabbitListener {
    // 声明监听那个队列
    @RabbitListener(queues = "simple2.queue")
    // 行为,封装成方法
    public void listenSimpleQueueMessage(String msg){ // Spring会把消息传递过来给msg参数
        System.out.println("消费者接收到的消息是:"+msg);
    }
}

运行主函数,启动上面的Bean

2. Work Queue 工作队列模型

前面已经学习了简单队列的发送和接收,一旦有人拿到消息,就会从队列中删除,其它消费者根本拿不到。那如果有多个消息怎么办呢?就可以基于上述的特性让多个消费者合作处理。接下来就学习一下Work queue(工作队列)可以提高消息处理速度,避免队列消息堆积

案例:模拟WorkQueue,实现一个队列绑定多个消费者

第一步:在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
   
    // 发送消息
    @Test
    public void testSendMessageWorkQueue(){
        String queueName = "simple2.queue";
        String message = "hello--->";
        // 利用for循环发送50条消息
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName,message+i);
            // 休眠20毫秒
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

第二步:在consumer服务中定义两个消息监听者,都监听simple.queue队列

注:消费者1每秒处理50条消息,消费者2每秒处理10条消息。

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component // 纳入Spring管理
public class SpringRabbitListener {
    // 消费者1
    @RabbitListener(queues = "simple2.queue")
    // 行为,封装成方法
    public void listenWorkQueue1Message(String msg){
        System.out.println("消费者1接收到的消息是:"+msg+ LocalDateTime.now());
        // 每秒处理50条消息
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // 消费者2
    @RabbitListener(queues = "simple2.queue")
    // 行为,封装成方法
    public void listenWorkQueue2Message(String msg){
        System.out.println("消费者2接收到的消息是---》"+msg+LocalDateTime.now());
        // 每秒处理10条消息
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

执行结果:

理论上1秒处理完,实际上却是2秒才处理完,并没有做到能者多劳,消费者1实际上在1秒内很快就处理完消息,而消费者2因为能力不够却需要2秒。实际上这是因为MQ的预取机制,才开始就优先从队列中拿过来,并没有考虑到消费能力如何!

第三步:消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限!

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

执行结果:能者多劳,可以在1秒内完成

3. 发布订阅模型-Fanout 发布

发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者,实现方式是加入了exchange(交换机)。到底是发给谁?这是由交换机的类型决定的:

①Fanout:广播;

②Direct:路由; 

③Topic:话题;

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失;消息的存储是由队列完成的

发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue!

案例:利用SpringAMQP演示FanoutExchange的使用

第一步:在consumer服务中,声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)

①SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:

②在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:

package cn.itcast.mq.config;

import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // 声明交换机fanout.exchange
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange");
    }

    // 声明队列1 fanout.queue1
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    // 声明队列2 fanout.queue2
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }

    // 进行绑定
    @Bean
    public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue queue1){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue queue2){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }

}

成功声明交换机

成功声明队列

绑定成功

第二步:在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.LocalTime;

@Component // 纳入Spring管理
public class SpringRabbitListener {
    // 声明监听那个队列
    @RabbitListener(queues = "fanout.queue1")
    // 行为,封装成方法
    public void listenFanoutQueueMessage1(String msg){ // Spring会把消息传递过来给msg参数
        System.out.println("消费者1接收到的消息是:"+msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    // 行为,封装成方法
    public void listenFanoutQueueMessage2(String msg){ // Spring会把消息传递过来给msg参数
        System.out.println("消费者2接收到的消息是:"+msg);
    }

}

第三步:在publisher中编写测试方法,向交换机itcast.fanout发送消息

注:以前是发送到queue,现在是发送到exchange,注意区别!

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 向交换机发送消息
    @Test
    public void testSendFanoutExchange(){
        // 交换机
        String exchangeName = "fanout.exchange";
        // 信息
        String message = "Hello eyeryone";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message); // 中间参数routingKey后面会讲,这里先设置为空
    }
}

执行结果:

4. 发布订阅模型-Direct 发布

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式。

①每一个Queue都与Exchange设置一个BindingKey;相当于暗号密码!

②发布者发送消息到Exchange时,也要指定一个消息的RoutingKey;与上面的BindingKey对上就发给谁!

③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列;并且一个队列能绑定多个key;如果两个队列的BindingKey都能与RountingKey对上就都会发送(就相当于广播)!

声明单个key

声明多个key 

案例:利用SpringAMQP演示DirectExchange的使用

第一步:在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

注:前面使用Bean方式声明一个个类,发现太麻烦了,所以这里就学习一下使用利用@RabbitListener注解声明Exchange、Queue、RoutingKey。

package cn.itcast.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.LocalTime;

@Component // 纳入Spring管理
public class SpringRabbitListener {
  
    // DirectExchange,使用注解的形式绑定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void LitenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:["+msg+"]");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void ListenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:["+msg+"]");

    }


}

声明后启动!查看控制页面,成功绑定

第二步:在publisher中编写测试方法,向itcast. direct发送消息

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendDirectExchange(){
        String exchangeName = "itcast.direct";
        String message = "hello blue";
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }
}

此时RoutingKey为blue,只有direct.quque1能接收到:

如果此时RoutingKey为red

@Test
public void testSendDirectExchange(){
    String exchangeName = "itcast.direct";
    String message = "hello red";
    rabbitTemplate.convertAndSend(exchangeName,"red",message);
}

则direct.quque1和direct.queue2都能接收到:

总结:所以相对于Fanout Exchange,Direct Exchange更加的灵活,可以通过key这个标记把消息传递给某一个或者所有,Fanout Exchange可以看做是Direct Exchange的一种特殊存在。

5. 发布订阅模型-Topic 发布

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以点 “ .” 分割。 例如:china.news 代表中国的新闻消息; japan.news 则代表日本新闻。

Queue与Exchange指定BindingKey时可以使用通配符:

①#:代指0个或多个单词;

②*:代指一个单词;

案例:利用SpringAMQP演示TopicExchange的使用

第一步:利用@RabbitListener声明Exchange、Queue、RoutingKey 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

    // topic Exchange
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void ListenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:["+msg+"]");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void ListenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:["+msg+"]");
    }

成功声明与绑定 

第二步:在publisher中编写测试方法,向itcast. topic发送消息

    @Test
    public void testSendTopicExchange(){
        String exchangeName = "itcast.topic";
        String message = "It's a nice day ";
        rabbitTemplate.convertAndSend(exchangeName,"china.weath",message);
    }

此时是topic.queue1接收到消息

总结:Topic Exchange和Direct Exchange的本质相同,Topic Exchange可以指定通配符的方式来表达BindingKey,相对于Direct Exchange灵活度又变高了。

6. 消息转换器

说明:在SpringAMQP的发送方法中,接收消息的类型实际上是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

案例:测试发送Object类型消息

在consumer中利用@Bean声明一个队列:

package cn.itcast.mq.config;

import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // 声明一个队列
    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }
}

发送一个Map集合到object.queue队列

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendObjectQueue(){
        // 准备一个Map集合
        Map<String,Object> msg = new HashMap<>();
        msg.put("name", "Jack");
        msg.put("age", 21);
        // 发送
        rabbitTemplate.convertAndSend("object.queue",msg);
    }
}

执行结果:最终的结果是通过JDK序列化转换成字节发送的

注:JDK序列化性能比较差、安全性比较差容易出现注入的情况、数据长度太长了占用额外的内存空间。

消息转换器

①Spring的对消息对象的处理是由org.springframework.amqp.support.converter.

MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化!

②如果要修改只需要定义一个MessageConverter 类型的Bean即可,推荐用JSON方式序列化!步骤如下:

第一步:在父工程中引入jackson依赖

<!--jackson依赖-->
<dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
</dependency>

第二步:在publisher启动类声明MessageConverter,覆盖掉原来的配置

package cn.itcast.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
    // 覆盖原理的序列化方式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

执行结果:成功转换成Json格式

第三步:在consumer服务中MessageConverter并监听object.queue队列并消费消息

启动类声明MessageConverter,覆盖掉原来的配置

package cn.itcast.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    // 覆盖原理的序列化方式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

消费object.queue队列的消息

    @RabbitListener(queues = "object.queue")
    public void ListenOjectQueue(Map<String,Object> msg){
        System.out.println("消费者接收到object.queue的消息:["+msg+"]");
    }

成功消费:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/322642.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高精度PWM脉宽调制信号转模拟信号隔离变送器1Hz~10KHz转0-5V/0-10V/1-5V,0-10mA/0-20mA/4-20mA

主要特性: >>精度等级&#xff1a;0.1级。产品出厂前已检验校正&#xff0c;用户可以直接使用 >>辅助电源&#xff1a;8-32V 宽范围供电 >>PWM脉宽调制信号输入: 1Hz~10KHz >>输出标准信号&#xff1a;0-5V/0-10V/1-5V,0-10mA/0-20mA/4-20mA等&…

统信UOS_麒麟KYLINOS与Windows通过Open SSH实现文件传输

原文链接&#xff1a;统信UOS/麒麟KYLINOS与Windows通过Open SSH实现文件传输 hello&#xff0c;大家好啊&#xff01;今天我要给大家介绍的是在统信UOS或麒麟KYLINOS操作系统与Windows系统之间通过Open SSH实现文件传输的方法。在日常工作中&#xff0c;我们经常需要在不同操作…

各种排序算法学习笔记

Docshttps://r0dhfl3ujy9.feishu.cn/docx/XFlEdnqv9oCEoVx7ok8cpc4knnf?fromfrom_copylink如果你认为有错误&#xff0c;欢迎指出&#xff01;

【深度学习:构建医学图像】如何构建医学图像的 QA 工作流程

【深度学习&#xff1a;构建医学图像】如何构建医学图像的 QA 工作流程 第 1 步&#xff1a;选择并划分数据集第 2 步&#xff1a;准备使用多个盲注进行注释第三步&#xff1a;建立图像标注协议第 4 步&#xff1a;在少数样本上练习医学图像注释第 5 步&#xff1a;发布第一批图…

debug之pycharm调试:出现Collecting data......

pycharm调试时&#xff0c;出现Collecting data… 一直在这个界面很久&#xff0c;这是新版本的Pycharm的bug&#xff0c;通常在多线程的情况下发生。 解决方法&#xff1a; File->Setting->Build,Execution,Deployment->Python Debugger。把Gevent compatible勾选…

当前页面一键回关

CSDN博客关注页面当前页面一键回关所有fans代码 f12打开控制台&#xff0c;输入以下代码 // 获取所有的button元素&#xff0c;根据它们的属性进行筛选 var buttons document.querySelectorAll("button[data-v-0947769e][data-ref^li_][data-id][classattention-btn]&qu…

基于ssm的毕业生学历证明系统+vue论文

内容摘要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统毕业生学历信息管理难度大&#xff0c;容错率低&#…

构建基于RHEL8系列(CentOS8,AlmaLinux8,RockyLinux8等)的MySQL8.0.32的RPM包

本文适用&#xff1a;rhel8系列&#xff0c;或同类系统(CentOS8,AlmaLinux8,RockyLinux8等) 文档形成时期&#xff1a;2023年 因系统版本不同&#xff0c;构建部署应略有差异&#xff0c;但本文未做细分&#xff0c;对稍有经验者应不存在明显障碍。 因软件世界之复杂和个人能力…

mySQL 汇总

登录MySQL winR 打开查询命令 输入 cmd 输入net start MySQL 打开mysql 报错:系统错误&#xff0c;拒绝访问 &#xff08;没权限&#xff01;&#xff09; 解决办法&#xff1a;搜索栏查询‘cmd’ 使用管理员身份运行 &#xff08;或鼠标右键‘开始’&#xff0c;windows po…

RoSA: 一种新的大模型参数高效微调方法

随着语言模型不断扩展到前所未有的规模&#xff0c;对下游任务的所有参数进行微调变得非常昂贵&#xff0c;PEFT方法已成为自然语言处理领域的研究热点。PEFT方法将微调限制在一小部分参数中&#xff0c;以很小的计算成本实现自然语言理解任务的最先进性能。 (RoSA)是一种新的P…

预约上门按摩系统概述

预约上门按摩系统是一种基于H5或者APP的服务平台&#xff0c;为用户提供预约上门按摩服务。该系统通过集成用户端、技师端、渠道商端、城市代理端、分销商端、总后台管理端&#xff0c;实现了用户与技师之间的快速连接&#xff0c;提供在线预约、支付、评价等服务。 1.用户通过…

中国数据库市场的领军黑马——亚信安慧AntDB数据库

自2008年问世以来&#xff0c;亚信科技AntDB数据库一直在中国国产数据库市场中崭露头角&#xff0c;尤其在信创政策的大力支持下&#xff0c;成为这一领域的一匹黑马。经过多次迭代&#xff0c;AntDB已经发展到了7.0版本&#xff0c;为超高强度和密度的业务需求提供了强有力的解…

Java多线程并发篇----第十五篇

系列文章目录 文章目录 系列文章目录前言一、偏向锁二、分段锁三、锁优化四、线程基本方法前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 一、偏向锁 Hotspot 的…

实时云渲染与离线渲染的区别是哪些?

实时云渲染是通过把3D渲染过程放到云端完成&#xff0c;从而打破用户设备限制的方式&#xff0c;它与离线渲染有着显著差异。 1、渲染过程 实时云渲染是在云服务器上进行的渲染&#xff0c;它能生成实时画面&#xff0c;方便用户访问和操作&#xff0c;而离线渲染不用响应用户操…

保障系统稳定,中电金信混沌工程再结硕果

在为金融行业提供优质服务的同时&#xff0c;中电金信积极参与行业标准制定。今年&#xff0c;公司先后参与了由中国信通院牵头发起的《一云多芯稳定性度量评估模型》、中国通信标准化协会牵头发起的《分布式系统稳定性成熟度模型》和《证券基金期货重要系统稳定性保障模型》的…

继承、修饰符、工具类、jar包

目录 1.继承 2.修饰符 3.工具类 4.jar包的制作与使用 1.继承 是什么 1.面向对象的三大特征之一&#xff08;封装、继承、多态&#xff09; 2.可以使得子类具有父类的属性和方法&#xff0c;还可以在子类中重新定义&#xff0c;追加属性和方法。 继承的格式 public class F…

一文详解JAVA的File类,FileInputStream和FileOutputStream

目录 一、File类介绍 二、FileInputStream类 三、FileOutputStream类 一、File类介绍 Java的File类是用于操作文件和目录的类。它提供了一组方法来创建、删除、重命名、复制、移动文件和目录&#xff0c;以及查询文件和目录的属性。 File类的常用方法有&#xff1a; exis…

QT上位机开发(知识产权ip保护)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 大部分看我们文章的网友&#xff0c;本身就是搞技术出身的&#xff0c;很少是做生意&#xff0c;或者是做销售的。但是技术本身&#xff0c;它又是…

一款 StarRocks 客户端工具,支持可视化建表、数据编辑

什么是 StarRocks&#xff1f; StarRocks 是新一代极速全场景 MPP (Massively Parallel Processing) 数据库。StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。用户无需经过复杂的预处理&#xff0c;就可以用 StarRocks 来支持多种数据分析场景的极速分析。 为了…

代码随想录 Leetcode383. 赎金信

题目&#xff1a; 代码&#xff08;首刷自解 2024年1月15日&#xff09;&#xff1a; class Solution { public:bool canConstruct(string ransomNote, string magazine) {vector<int> v(26);for(auto letter : magazine) {v[letter - a];}for(auto letter : ransomNote…