高性能消息中间件 RabbitMQ

一、RabbitMQ概念

1.1 MQ是什么

消息队列

MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信

  • 同步通信相当于两个人当面对话,你一言我一语。必须及时回复:

  • 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。

消息

两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

队列

数据结构中概念。在队列中,数据先进先出,后进后出。

 1.2  MQ的优势

应用解耦

在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:

  1. 如果库存系统出现故障,会造成整个订单系统崩溃。
  2. 如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。

如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再转发到其他系统,则会解决以下问题:

  1. 由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。
  2. 如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长如下:

削峰填谷

假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒5000,则会造成系统崩溃。此时引入mq即可解决该问题

 使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

1.3 MQ的劣势 

  • 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
  • 系统复杂度提高 MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
  • 一致性问题 A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。

 1.4 MQ的应用场景

  • 抢红包、秒杀活动、抢火车票等

这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。

 而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。

  • 消息分发

如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。

  • 数据同步

假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。

  • 异步处理

在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。

  • 离线处理

在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。

 1.5 AMQP协议

 RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。

AMQP

即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。

AMQP工作过程

生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者。

 就好比是报纸印刷厂(Publisher)将印刷出来的报纸交给报社(Exchange),报社再将报纸交给不同的邮递员(Queue),邮递员再将报纸交给用户(Consumer)。

1.6 RabbitMQ工作原理

  • Producer

    消息的生产者。也是一个向交换机发布消息的客户端应用程序。也就是Java代码。

  • Connection

    连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。

  • Channel

    信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。

  • Broker

    消息队列服务器实体。即RabbitMQ服务器

  • Virtual host

    虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是/

  • Exchange

    交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。

  • Queue

    消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。

  • Binding

    消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。

  • Consumer

    消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。 

(面试)RabbitMQ为什么使用信道而不直接使用TCP连接通信?

TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

 二、RabbitMQ安装

 2.1 安装Erlang

RabbitMQ是使用Erlang语言编写的,所以在安装RabbitMQ前需要先安装Erlang环境 .

1、安装Erlang所需的依赖

yum install -y epel-release

2、添加存储库条目

wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

3、安装Erlang

yum install -y erlang24.2.1

4、查看Erlang是否安装成功

erl -version

2.2 安装RabbitMQ并启动

1、为了外部能够正常访问RabbitMQ服务,先关闭防火墙

# 关闭运行的防火墙
systemctl stop firewalld.service
# 禁止防火墙自启动
systemctl disable firewalld.service

2、RabbitMQ是通过主机名进行访问的,必须给服务器添加主机名

# 修改文件
vim /etc/sysconfig/network
# 添加如下内容
NETWORKING=yes
HOSTNAME=zj


# 修改文件
vim /etc/hosts
# 添加如下内容
服务器ip zj

3.使用xftp上传RabbitMQ压缩文件到根目录下

4.安装RabbitMQ

# 解压RabbitMQ
tar xf rabbitmq-server-generic-unix-3.9.13.tar.xz


# 重命名:
mv rabbitmq_server-3.9.13 rabbitmq


# 移动文件夹:
mv rabbitmq /usr/local/

5.配置环境变量

# 编辑/etc/profile文件
vim /etc/profile


#添加如下内容
export PATH=$PATH:/usr/local/rabbitmq/sbin


# 运行文件,让修改内容生效
source /etc/profile

6.开启管控台插件

rabbitmq-plugins enable rabbitmq_management

7.后台运行

#启动rabbitmq
rabbitmq-server -detached


#停止rabbitmq
rabbitmqctl stop

8.通过管控台访问RabbitMQ

路径:http://ip地址:15672,用户名:guest,密码:guest

管控台的端口是15672,MQ的端口是5672

 9.此时会提示guest账户只允许本地使用,我们可以配置允许使用guest远程访问

# 创建配置文件夹
mkdir -p /usr/local/rabbitmq/etc/rabbitmq
# 创建配置文件
vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
# 添加如下内容
loopback_users=none


# 重启RabbitMQ
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

此时就能进入控制台了。

 2.3 账户管理

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.

1、创建账户

# 创建账户
rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)

2、给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

3、给用户授权

# "/"表示虚拟机
# zj表示用户名
# ".*" ".*" ".*" 表示完整权限
rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"

4、通过管控台访问rabbitmq即可。

2.4 管控台

2.5 Docker安装

 1、关闭RabbitMQ服务

rabbitmqctl stop

2、在Centos7中安装docker

# 安装Docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun


# 启动docker
systemctl start docker

3、拉取镜像

docker pull rabbitmq

4、启动MQ

docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

三、RabbitMQ工作模式

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)

3.1 简单模式

简介

特点

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

 项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。 

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

1、启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2、创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

3、编写生产者

package com.zj.mq.Simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("simpleQueue",false,false,false,null);
        //5.发送消息
        String msg ="hello rabbitMQ";
        /*
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simpleQueue",null,msg.getBytes());
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("OK");
    }
}

 4.编写消费者

package com.zj.mq.Simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

 消费者随时在监听队列只要队列有消息就会被消费。

3.2 工作队列模式

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

1、编写生产者,并产生大量消息

package com.zj.mq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("WorkQueue",false,false,false,null);
        //5.发送大量消息
        for (int i = 0; i < 100; i++) {
            /*
             * 参数1:交换机名,""表示默认交换机
             * 参数2:路由键,简单模式就是队列名
             * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
             * 参数4:要传递的消息字节数组
             */
            channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
        }

        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
    }
}

 2.编写消费者

 编写三个消费者,他们都监听的是一个队列。

package com.zj.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者2接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer3 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者3接受消息为:"+message);
            }
        });
    }
}
消费者1接受消息为:这是第0个消息
消费者1接受消息为:这是第3个消息
消费者1接受消息为:这是第6个消息
消费者1接受消息为:这是第9个消息
消费者1接受消息为:这是第12个消息
消费者1接受消息为:这是第15个消息
消费者1接受消息为:这是第18个消息
消费者1接受消息为:这是第21个消息
……


消费者2接受消息为:这是第1个消息
消费者2接受消息为:这是第4个消息
消费者2接受消息为:这是第7个消息
消费者2接受消息为:这是第10个消息
消费者2接受消息为:这是第13个消息
消费者2接受消息为:这是第16个消息
消费者2接受消息为:这是第19个消息
消费者2接受消息为:这是第22个消息
消费者2接受消息为:这是第25个消息
……


消费者3接受消息为:这是第2个消息
消费者3接受消息为:这是第5个消息
消费者3接受消息为:这是第8个消息
消费者3接受消息为:这是第11个消息
消费者3接受消息为:这是第14个消息
消费者3接受消息为:这是第17个消息
消费者3接受消息为:这是第20个消息
消费者3接受消息为:这是第23个消息
消费者3接受消息为:这是第26个消息
消费者3接受消息为:这是第29个消息
消费者3接受消息为:这是第32个消息

3.3 发布订阅模式 

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe) 

特点

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

 1、编写生产者

package com.zj.mq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeFanout", BuiltinExchangeType.FANOUT,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,发布订阅模式不存在路由关键字*/
        channel.queueBind("mailQueue","exchangeFanout","");
        channel.queueBind("messageQueue","exchangeFanout","");
        channel.queueBind("stationQueue","exchangeFanout","");
        //7.往交换机发送消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("exchangeFanout","",null,("你好,MQ"+i).getBytes());
        }
        //8.关闭资源
        channel.close();
        connection.close();

    }
}

 2、站内信消费者(其他同理)

package com.zj.mq.publish;

import com.rabbitmq.client.*;
import com.sun.deploy.ui.AboutDialog;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*站内信消费者*/
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionaFactory = new ConnectionFactory();
        connectionaFactory.setHost("192.168.66.100");
        connectionaFactory.setPort(5672);
        connectionaFactory.setUsername("MQzhang");
        connectionaFactory.setPassword("MQzhang");
        connectionaFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionaFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
         * 参数一:监听的队列名
         * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
         * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
         * */
        channel.basicConsume("stationQueue",true,new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}
发送站内信:你好,MQ0
发送站内信:你好,MQ1
发送站内信:你好,MQ2
发送站内信:你好,MQ3
发送站内信:你好,MQ4
发送站内信:你好,MQ5
发送站内信:你好,MQ6
发送站内信:你好,MQ7
发送站内信:你好,MQ8
发送站内信:你好,MQ9

 当然也能创建多个消费者来监听同一个队列来提高消费速度。

 3.4 路由模式

 使用发布订阅模式时,所有消息都会发送到绑定的队列中(发送到绑定到交换机上的每个队列,队列再发送给消费者),但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。

特点

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。

编写生产者

package com.zj.mq.route;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeRoute", BuiltinExchangeType.DIRECT,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         * */
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,一个队列可以有多个路由关键字
        * */
        channel.queueBind("mailQueue","exchangeRoute","import");
        channel.queueBind("messageQueue","exchangeRoute","normal");
        channel.queueBind("stationQueue","exchangeRoute","import");
        //7.往交换机发送消息,路由关键字是import,表示交换机会将消息发送到带有import关键字的队列。
        channel.basicPublish("exchangeRoute","import",null,("你好,import MQ").getBytes());
        //路由关键字是normal表示交换机会将消息发送到带有normal关键字的队列
        channel.basicPublish("exchangeRoute","normal",null,("你好,normal MQ").getBytes());
        //8.关闭资源
        channel.close();
        connection.close();

    }
}

 编写消费者

消费者还是和其他模式的消费者是一样的。这里以mailQuene举例子。

package com.zj.mq.route;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        *
        */
        channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

3.5 通配符模式

通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

通配符规则:

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
  2. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

编写生产者

package com.zj.mq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeTopic", BuiltinExchangeType.TOPIC,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,【#.mail.#】 表示:mail前后可以匹配多个单词*/
        channel.queueBind("mailQueue","exchangeTopic","#.mail.#");
        channel.queueBind("messageQueue","exchangeTopic","#.message.#");
        channel.queueBind("stationQueue","exchangeTopic","#.station.#");
        //7.往交换机发送消息到三个队列
        channel.basicPublish("exchangeTopic","mail.message.station",null,("你好,MQ").getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

 编写消费者

也是和其他模式的消费者是一样的只需要监听消费者。

package com.zj.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        *
        */
        channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

四、SpringBoot整合RabbitMQ

 之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

1.创建SpringBoot项目,引入RabbitMQ起步依赖(springboot版本是2.7.0)

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /


#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

 SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:

package com.zj.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";


    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
                .durable(true) // 是否持久化
                .build();
    }


    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }


    // 交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                    @Qualifier(QUEUE_NAME) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}

4.编写生产者 

 SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。

package com.zj;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testProducer(){
        /*
        * 参数一:交换机名称
        * 参数二:路由关键字
        * 参数三:要发送的消息
        */
        rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ");
    }

}

 5.编写消费者

 我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。

1、创建项目导入依赖。

2、编写配置文件,和生产者的相同

3、编写消费者,监听队列

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

4、运行项目。观察管控台队列和控制台

五、消息的可靠性投递

5.1 概念

RabbitMQ消息投递的路径为:

生产者--->交换机--->队列--->消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /


#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列:

package com.zj.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";


    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
                .durable(true) // 是否持久化
                .build();
    }


    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }


    // 交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                    @Qualifier(QUEUE_NAME) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}

创建生产者

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

5.2 确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

1、生产者配置文件开启确认模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated

2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。

package com.zj;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testProducer(){

        //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             * @param correlationData 相关配置信息
             * @param ack  交换机是否收到消息
             * @param cause  失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  if(ack){
                      System.out.println("消息接受成功");
                  }else {
                      System.out.println("消息接受失败:"+cause);
                      //做一些处理让消息再次发送
                  }

            }
        });

        rabbitTemplate.convertAndSend("aaa","message","hello MQ");
    }

}

3、运行结果

消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)

5.3 退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

 1、生产者配置文件开启退回模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated
      # 开启回退模式
    publisher-returns: true

2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。


@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testProducer(){

        //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。
         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
             /**
              *
              * @param returnedMessage 失败后将失败信息封装到该参数
              */
             @Override
             public void returnedMessage(ReturnedMessage returnedMessage) {
                 System.out.println("消息对象:"+returnedMessage);
                 System.out.println("错误码:"+returnedMessage.getReplyCode());
                 System.out.println("错误信息:"+returnedMessage.getReplyText());
                 System.out.println("交换机:"+returnedMessage.getExchange());
                 System.out.println("路由键:"+returnedMessage.getRoutingKey());

                 //处理消息……
             }
         });

        rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ");
    }
}
消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb]
错误码:312
错误信息:NO_ROUTE
交换机:boot_topic_exchange
路由键:bbb

5.4 Ack手动签收

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"

 1.消费者配置开启手动签收

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
      # 开启手动签收
    listener:
      simple:
        acknowledge-mode: manual

2、消费者处理消息时定义手动签收和拒绝签收的情况

package com.zj.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class ACKConsumer {

    // 监听队列
    /**
     *
     * @param message  消息对象
     * @param channel   信道对象,用于手动接受消息
     */
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
        //deliveryTag:消息投递序号,每次投递该值都会+1.
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //签收消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
             //拒签消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息)
             */
            channel.basicNack(deliveryTag,true,true);
            System.out.println("消息消费失败");
        }

    }
}

六、RabbitMQ高级特性

6.1 消费端限流

之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置限流机制

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
        prefetch: 20

3、消费者接受消息

package com.zj.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class OosConsumer {
    @RabbitListener(queues = "boot_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        //1.获取消息
        System.out.println("当前时间:"+new String(message.getBody()));
        //2.模拟业务处理
        Thread.sleep(2000);
        //3.签收消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }
}

20230619

6.2 限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

 使用方法如下:

1.生产者批量发送消息


@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置不公平分发

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

3、编写两个消费者消费相同的队列信息

package com.zj.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "boot_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }


    // 消费者2
    @RabbitListener(queues = "boot_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
消费者2:send message...1
消费者1:send message...0
19:53:21.676 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
消费者1:send message...3
消费者1:send message...4
消费者1:send message...2
消费者1:send message...5
消费者1:send message...6
消费者2:send message...7
消费者1:send message...8
消费者1:send message...9

发现消费者1消费的要比消费者2消费的多。能者多劳。

6.3 设置队列所有消息存活时间

 RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

1、在创建队列时设置其存活时间:

    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
                .ttl(10000)      //队列存活时间10s单位毫秒
                .build();
    }

2、生产者生产消息

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;

    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

十秒后,未被消费的消息会被移除。

6.4 设置单条消息存活时间

1、在消息发送的时候设置发送时间

    /*发送消息并设置消息的存活时间*/
    @Test
    public void testSend() {
        //1.创建消息属性
        MessageProperties messageProperties = new MessageProperties();

        //2.设置存活时间,单位毫秒
        messageProperties.setExpiration("10000");

        //3.创建消息对象
        Message message = new Message(("send message……").getBytes(), messageProperties);

        //4.发送消息
        rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
    }

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

6.5 优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

 优先级队列用法如下:

1、设置队列的优先级

    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
               // .ttl(10000)      //队列中消息存活时间10s单位毫秒
                .maxPriority(10)  //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
                .build();
    }

2、编写生产者发送有优先级的消息

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;


    @Test
    public void testSend() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) { // i为5时消息的优先级较高
                //1.创建消息属性
                MessageProperties messageProperties = new MessageProperties();
                //2.设置消息优先级
                messageProperties.setPriority(9);
                //3.创建消息对象
                Message message = new Message(("send message……" + i).getBytes(), messageProperties);
                rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
            }else {
                rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
            }
        }
    }
}

3、编写消费者测试是否是第五条消息最先被消费

package com.zj.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //1.获取消息
        System.out.println(new String(message.getBody()));
        //2.手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.0)

17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :No active profile set, falling back to 1 default profile: "default"
17:47:15.482 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Attempting to connect to: [192.168.66.100:5672]
17:47:15.498 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
17:47:15.529 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
send message……5
send message……0
send message……1
send message……2
send message……3
send message……4
send message……6
send message……7
send message……8
send message……9

第五条消息首先被消费。

七、RabbitMQ死信队列

7.1 概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

 7.2 代码实现

1、创建死信交换机和死信队列

package com.zj.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    //死信交换机和死信队列
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";


    //普通交换机和普通队列
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";


    // 创建死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }

    // 创建死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)   //死信队列名称
                .build();
    }

    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
                                 @Qualifier(DEAD_QUEUE) Queue queue){
         return BindingBuilder
                 .bind(queue)
                 .to(exchange)
                 .with("dead")     //交换机路由键
                 .noargs();
    }

    //创建普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    //创建普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)   //普通信队列名称
                .deadLetterExchange(DEAD_EXCHANGE)  //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
                .deadLetterRoutingKey("dead")   //死信队列路由关键字
                .ttl(10000)  //消息存活时间
                .maxLength(10)  //消息最大长度
                .build();
    }
    //普通交换机绑定普通对列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
                                 @Qualifier(NORMAL_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("normal")
                .noargs();
    }
}

2.创建生产者发送消息(测试存活时间过期变成死信)

    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }

十秒后

消息全部去了死信队列。

2.创建生产者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;


    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
//        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        //2.超过队列长度变成死信
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        }
    }
}

因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。

 2.创建生产者和消费者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {


    @Resource
    public RabbitTemplate rabbitTemplate;


    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.生产者拒签消息,消息变成死信。
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }
}




@Component
public class Consumer {

    // 监听队列
    @RabbitListener(queues = "normal_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
    }
}

拒签消息,消息变成了死信。

 八、RabbitMQ延迟队列

8.1 概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

 但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

8.2 死信队列实现延迟队列

 1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
   


#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

 3.创建队列和交换机

package com.zj.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    // 过期订单交换机和队列(死信交换机和死信队列)
    private final String EXPIRE_EXCHANGE = "expire_exchange";
    private final String EXPIRE_QUEUE = "expire_queue";

    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(EXPIRE_EXCHANGE)
                .durable(false)
                .build();
    }
    // 过期订单队列
    @Bean(EXPIRE_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(EXPIRE_QUEUE)
                .build();
    }
    // 将过期订单队列绑定到交换机
    @Bean
    public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("expire_routing")
                .noargs();
    }

    // 订单交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(ORDER_EXCHANGE)
                .durable(false)
                .build();
    }

    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(ORDER_QUEUE)
                .ttl(10000) // 存活时间为10s,模拟30min
                .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
                .build();
    }
    // 将订单队列绑定到交换机
    @Bean
    public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
                                   @Qualifier(ORDER_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("order_routing")
                .noargs();
    }
}

4.编写下单的控制器方法,下单后向订单交换机发送消息

@RestController
public class OrderController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //将订单id发送到订单队列
        rabbitTemplate.convertAndSend("order_exchange","order_routing",id);
        return "下单成功,修改库存。";
    }
}

 5.编写监听死信队列的消费者

@Component
public class Consumer {

    // 监听过期队列
    @RabbitListener(queues = "expire_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

8.3 插件实现延迟队列

 在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

1、使用xftpj将延迟插件上传到虚拟机

2.安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop


#启动rabbitmq
rabbitmq-server restart -detached

 此时登录管控台可以看到交换机类型多了延迟消息:

 4、创建延迟交换机和延迟队列

package com.zj.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class RabbitConfig {

    // 创建延迟交换机和延迟队列
    private final String DELAYED_EXCHANGE = "delayed_exchange";
    private final String DELAYED_QUEUE = "delayed_queue";

    // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。
    @Bean(DELAYED_EXCHANGE)
    public Exchange deadExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args);
    }
    // 延迟队列
    @Bean(DELAYED_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DELAYED_QUEUE)
                .build();
    }

    // 将延迟队列绑定到延迟交换机
    @Bean
    public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,
                                     @Qualifier(DELAYED_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("delayed-routing")
                .noargs();
    }
    
}

5.编写下单的控制器方法

package com.zj.controller;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class OrderController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //下单
    @GetMapping("/place/{id}")
    public String placeOrder(@PathVariable("id") String id){
        System.out.println("处理订单数据");
        //设置消息的延迟时间为10s
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        };

        rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor);
        return "下单成功,修改库存。";
    }
}

6.编写延迟队列的消费者

package com.zj.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {

    // 监听延迟队列
    @RabbitListener(queues = "delayed_queue")
    public void listen_message(String id)  {
        System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 ");
    }
}

7、下单测试

 延迟队列中没有消息是因为消费者将消息消费了。

 九、RabbitMQ集群(暂略)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34127.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Arduino UNO的循迹小车

目录 1.analogWrite函数的使用 2.红外循迹模块介绍 3.循迹小车代码实现 4.实物示例 1.analogWrite函数的使用 用analogWrite来替换digitalWrite 说明 将一个模拟数值写进Arduino引脚。这个操作可以用来控制LED的亮度, 或者控制电机的转速. 在Arduino UNO控制器中&#…

查询Oracle当前用户下,所有数据表的总条数

需求&#xff1a;查询Oracle当前用户下&#xff0c;所有数据表的总条数 方法&#xff1a;存储过程 右键点击Procedures&#xff0c;点击New 点击OK 把存储过程写进去&#xff0c;然后点击编译运行&#xff1a; create or replace procedure tables_count ist_count numbe…

xshell安装jdk1.8环境

xshell安装jdk1.8环境 大家好&#xff0c;今天我们来学习一下xshell安装jdk1.8环境&#xff0c;好好看&#xff0c;好好学&#xff0c;超详细的 第一步 进入xshell官网下载 第二步 打开xshell新建一个会话&#xff0c;如下图&#xff1a; 第三步 输入你的名称、主机ip、端口号(…

CSS中伪类详解和用法例子详解

文章目录 一、伪类介绍二、伪类选择器1.动态伪类2.结构伪类3.否定伪类4.状态伪类5.目标伪类 一、伪类介绍 用于已有元素处于某种状态时&#xff08;滑动、点击等&#xff09;为其添加对应的样式&#xff0c;这个状态是根据用户行为而动态变化的。 二、伪类选择器 动态伪类作…

Arduino uno 环境配置 for Mac

1、IDE 在官网下载 官网地址&#xff1a;https://www.arduino.cc/en/software 看到钱&#x1f4b0;不要怕&#xff0c;只是问你捐不捐款&#xff0c;不收钱&#xff0c;你直接安装就行 &#xff08;你也可以捐一点&#xff5e;&#xff09; 安装之后 2、安装驱动 地址 &…

互联网+洗鞋店预约小程序新模式;

互联网洗鞋店预约小程序 1、线上线下业务的结合。 传统的线下业务消费者到店可以向其推介线上的预约到家服务&#xff0c;让线下的消费者成为小程序内的会员&#xff0c;留存客户之后线上可直接触达&#xff0c;减少与消费者的距离&#xff0c;从等待客户到可以主动出击&…

黑客是什么?想成为黑客需要学习什么?

什么是黑客 在《黑客辞典》里有不少关于“黑客”的定义, 大多和“精于技术”或“乐于解决问题并超越极限”之类的形容相关。然而&#xff0c;若你想知道如何成为一名黑客&#xff0c;只要牢记两点即可。 这是一个社区和一种共享文化&#xff0c;可追溯到那群数十年前使…

6.Mysql主从复制

文章目录 Mysql主从复制读写分离概念&#xff1a;读写分离的作用&#xff1a;读写分离的作用条件&#xff1a;主从复制与读写分离mysq支持的复制类型主从复制的工作过程配置时间同步主服务器配置从服务器配置 读写分离MySQL 读写分离原理目前较为常见的 MySQL 读写分离分为以下…

A核与M核异构通信过程解析

现在越来越多的产品具有M core和A core的异构架构&#xff0c;既能达到M核的实时要求&#xff0c;又能满足A核的生态和算力。比如NXP的i.MX8系列、瑞萨的RZ/G2L系列以及TI的AM62x系列等等。虽然这些处理器的品牌及性能有所不同&#xff0c;但多核通信原理基本一致&#xff0c;都…

Linux - 那些年测试服务器带宽的 3 种方式

方式一 speedtest-cli wget -O speedtest-cli https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.pychmod x speedtest-cliorcurl -Lo speedtest-cli https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.pychmod x speedtest-c…

重生之我测阿里云U1实例(通用算力型实例)

官方福利&#xff01;&#xff01;&#xff01;&#xff01;大厂羊毛你确定不薅&#xff1f;&#xff1f;&#xff1f; 参与ECSU实例评测&#xff0c;申请免费体验机会&#xff1a;https://developer.aliyun.com/mission/review/ecsu 参与ECSU实例评测&#xff0c;申请免费体验…

一些零散的查询知识

一、all any some 表&#xff1a; all大于所有的值&#xff1a; any some:大于任意一个即可 例题&#xff1a; 大于50部门所有员工工资的人&#xff1a; 等价于&#xff1a; 二、exists关键字 1、exists查询 exists(子查询) 如果有满足条件的记录&#xff0c;那么exi…

C/C++内存管理(内存分布、动态内存分配、动态内存分配与释放、内存泄漏等)

喵~ 内存之5大区&#xff08;栈区、堆区、静态区、常量区、代码区&#xff09;C/C中各自的内存分配操作符内存泄露?内存泄漏检测方法 内存之5大区&#xff08;栈区、堆区、静态区、常量区、代码区&#xff09; 1、栈区&#xff08;stack&#xff09;&#xff1a;由编译器自动分…

【从删库到跑路】详细讲解MySQL的函数和约束作用

&#x1f38a;专栏【MySQL】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 文章目录 &#x1f354;函数⭐字符串函数&#x1f388;字符串拼接函数&…

LabVIEW开发呼吸分析系统

LabVIEW开发呼吸分析系统 在日常生活中&#xff0c;许多人都在练习调息法&#xff0c;但大多数人都不知道如何以完美的方式做。不当的做法不会带来适当的结果。一种使用LabVIEW分析呼吸信号的方法&#xff0c;以使人们以完美的方式练习调息。这有助于从业者按照系统指定的说明…

Vue2封装一个全局通知组件并发布到NPM

✍&#x1f3fc;作者&#xff1a;周棋洛&#xff0c;计算机学生 ♉星座&#xff1a;金牛座 &#x1f3e0;主页&#xff1a;点击查看更多 &#x1f310;关键&#xff1a;vue2 组件封装 npm发包 文章目录 1. 前言 &#x1f343;2. 我为什么要封装通知插件 ❓3. 初始化vue空项目 &…

Learn Mongodb DB数据库部署 ②

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; PHP MYSQL &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

【STM32】软件I2C控制频率

在上一篇文章中&#xff0c;我们已经介绍了整个软件I2C的实现原理&#xff0c;但是也遗留了一个问题&#xff0c;那就是I2C速率的控制&#xff0c;其实就是控制SCL信号的频率。 微秒级延时 在上篇文章中&#xff0c;我们使用了SysTick进行延时&#xff0c;具体如下&#xff1…

Ubuntu下编译VTK

1.先安装QT&#xff0c;不知道不装行不行&#xff0c;我们项目需要。 2.去VTK官网下载VTK源码。 3.解压源码。 4.编译需要用cmake-gui&#xff0c;装QT的一般都有&#xff0c;但需要把路径添加到PATH才能用。 5.打开cmake-gui&#xff0c;设置源码路径&#xff0c;编译输出路…

Java开发 - Canal的基本用法

前言 今天给大家带来的是Canal的基本用法&#xff0c;Canal在Java中常被我们用来做数据的同步&#xff0c;当然不是MySQL与MySQL&#xff0c;Redis与Redis之间了&#xff0c;如果是他们&#xff0c;那就好办了&#xff0c;我们可以直接通过配置来完成他们之间的主从、主主&…