RabbitMQ基础教程

1.什么是消息队列

消息队列(Message Queue),我们一般简称为MQ。消息队列中间件是分布式系统中重要的组件,具有异步性、松耦合、分布式、可靠性等特点。用于实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前主流的消息队列有RocketMQ、Kafka、RabbitMQ、ZeroMQ、MetaMQ等。消息队列在很多业务场景中都会使用到,例如:异步处理、应用解耦、流量消锋、数据同步、日志处理等等。下面是一个消息队列最简单的架构模型。

img

名词解释:

  • Producer:消息的生产者,负责将消息发送到Broker
  • Broker:消息处理中心(内部通常包含多个队列,称之为queue),负责消息的存储等操作
  • Consumer:消息消费者,负责从Broker中获取消息并进行相应处理

2.RabbitMQ

2.1 简介

RabbitMQ是流行的开源消息队列其中的一种,用erlang语言开发。它基于AMQP协议(AMQP是应用层协议的一个开放标准,称为高级消息队列协议,专门为面向消息的中间件设计)的标准实现。RabbitMQ支持多种语言客户端(如:Java、C#、Python、Ruby、C、PHP)等。在易用性、扩展性、高可用性等方面表现都不错。

2.2 安装

由于RabbitMQ是基于erlang语言编写,在安装前先必须安装erlang环境。

官网地址:https://www.erlang.org/downloads。

img

最新版本为22.2,Windows用户可直接下载OPT 22.2 Windows 64-bit Binary直接安装即可。

img

接着去RabbitMQ官网下载最新版本的安装包进行安装。

官网地址:https://www.rabbitmq.com/install-windows.html#chocolatey

img

点击下载rabbitmq-server-3.8.2.exe的Bintray安装包,下载后直接打开安装。

img

如果是macOS用户,可以通过Homebrew直接安装,并且Homebrew在安装RabbitMQ时会自动下载并安装erlang环境。

img

2.3 配置环境变量

将RabbitMQ安装目录下的sbin子目录加入到环境变量的Path中。

img

img

2.4 启动/停止服务

启动或停止服务有应用方式启动和服务启动两种方式。

应用方式启动:

命令说明
rabbitmq-server直接启动,关闭窗口后应用就会停止
rabbitmq-server -detached后台启动,后台独立进程方式运行,关闭窗口后应用不会关闭
rabbitmqctl stop停止应用

示例:

img

服务方式启动:

当安装完后可以在服务列表中查看到RabbitMQ这个服务,可以在这里直接启用或停止。

img

也可以在命令行使用相关命令启动或关闭服务(注意:控制台要以管理员方式运行)

命令说明
rabbitmq-service start启动服务
rabbitmq-service stop停止服务
rabbitmq-service disable禁用服务
rabbitmq-service enable启用服务

示例:

img

2.5 可视化管理插件

RabbitMQ默认提供了一个rabbitmq_management可视化管理插件,方便我们通过web访问的方式来管理和查看RabbitMQ。此插件默认是禁用的,因此需要手动启用它。在命令行使用rabbitmq-plugins来启用插件。如下:

rabbitmq-plugins enable rabbitmq_management

img

启用后可以在浏览器中输入http://localhost:15672来访问登录页面,默认登陆账号和密码都为guest

img

登陆成功后进入功能管理首页。

img

在后续的示例中会讲解这里面的具体内容。

2.6 用户管理

RabbitMQ默认提供了一个guest用户,我们也可以创建新用户并给用户分配相应的权限。创建用户有两种方式,一种是使用rabbitmqctl工具,另一种是使用可视化的方式操作。

使用可视化操作:

在web管理登陆页面登陆后,点击Admin选项,这里会列出所有的用户信息,默认只有一个guest用户,如下:

img

点击下面的Add a user,在展开的页面中填写新用户的姓名、密码以及身份标签,确认无误后点击Add User按钮保存。如下:

img

此时用户列表就会多出一个新建的用户,如下:

img

但这个用户还不能正常使用,因为还未分配访问的虚拟主机(虚拟主机的概念会在下个章节说明)以及权限,所以点击列表中的用户名(也就是wangl)跳转到如下页面:

img

说明:

  • Virtual Host:设置虚拟主机的路径,默认为“/”,因为没有新创建别的虚拟主机,所以只有一个默认的。
  • Configure regexp:设置用户的配置权限,支持正则表达式(.*表示所有)。
  • Write regexp:设置用户的写权限,支持正则表达式(.*表示所有)。
  • Read regexp:设置用户的读权限,支持正则表达式(.*表示所有)。

最后点击Set permission按钮保存,然后回到用户列表,这时新建的用户就能正常使用了

img

登出后使用新用户登陆来访问。

使用rabbitmqctl工具:

在命令行可以使用rabbitmqctl,它是RabbitMQ中间件的一个命令行管理工具。

1.创建用户:

命令:rabbitmqctl add_user username password

示例:rabbitmqctl add_user user1 123

2.删除用户:

命令:rabbitmqctl delete_user username

示例:rabbitmqctl delete_user user1

3.修改密码:

命令:rabbitmqctl change_password username newpassword

示例:rabbitmqctl change_password user1 321

4.列出所有用户:

命令:rabbitmqctl list_users

5.设置用户权限:

命令:rabbitmqctl set_permissions [-p vhostpath] username

示例:rabbitmqctl set_permissions -p / user1 .* .* .*

6.删除用户权限:

命令:rabbitmqctl clear_permissions [-p vhostpath] username

示例:rabbitmqctl clear_permissions -p / user1

2.7 配置文件

不同的操作系统默认存放的配置文件目录是不一样的(也可以通过环境变量指定配置文件的目录),下面列出在不同系统中默认配置文件的存放位置。

img

以Windows为例,我们在C:\Users%USERNAME%\AppData\Roaming\RabbitMQ目录下创建一个名为rabbitmq.conf的配置文件。

img

使用记事本打开添加如下配置信息可以修改默认的配置。

listeners.tcp.default = 5673
management.listener.port = 15673
num_acceptors.tcp = 10

说明:

属性描述默认值
listeners.tcp.defaultAMQP连接的默认监听端口,也就是访问RabbitMQ的默认端口号5672
management.listener.port访问web管理插件的默认端口15672
num_acceptors.tcp接受tcp连接的erlang进程数10

这里我们修改了默认的tcp连接端口以及web管理插件的默认端口,配置完成之后记得要重启RabbitMQ服务,接着重新打开web管理页面,使用修改后的端口进行访问。

img

参考:https://www.linuxidc.com/Linux/2019-03/157354.htm

2.8 AMQP通信模型

img

名词解释:

  • Broker:消息处理中心,也就是RabbitMQ Server。
  • Virutal Host:虚拟主机相当于一个命名空间。用于隔离不同的Exchange和Queue。每个Virutal Host内部有自己的Exchange和Queue,他们之间互不影响。我们可以为不同用户指定不同的Virutal Host,这样不同用户只能访问当前设置的Virutal Host下的Exchange和Queue,而不能访问其他的Virutal Host。在RabbitMQ有一个默认的Virutal Host就是“/”。我们也可以通过可视化插件或者使用rabbitmqctl工具来创建新的Virutal Host。
  • Exchange:Exchange也称之为交换机,核心作用就是将消息生产者(Producer)发送过来的message依据指定的路由规则发送到特定的Queue中。
  • Queue:存放message的队列,消息最终会被消息消费者(Consumer)取出消费。
  • Producer:消息的生产者,负责将消息发送到交换机(Exchange)中。
  • Consumer:消息消费者,负责从Queue中获取消息并进行相应处理。
  • Binding:Binding就是将一个或者多个消息队列(Queue)绑定到交换机(Exchange)上。绑定时会设置一个路由的key(一种路由规则表达式)。这样当Exchange接收到Producer发送的消息时,会根据路由规则将消息发送到具体的Queue中。

3. 基础应用

RabbitMQ支持多种语言的客户端,在这个章节中将使用Java客户端来操作RabbitMQ。新建Maven项目并添加依赖。

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.0</version>
</dependency>

3.1 Queue

直接使用Queue是实现消息发布订阅最简单的一种方式,内部会通过一个默认的Exchange(交换机)来将消息路由到Queue中。

img

Producer示例:

public class Producer {

    /**
     * 消息队列名称
     */
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) {
        //创建连接工厂并设置RabbitMQ主机地址,默认端口为5672
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        try (Connection conn = connectionFactory.newConnection();
             //使用连接对象构建一个消息通信的通道
             Channel channel = conn.createChannel()) {
            /**
             * 创建队列
             * 参数一:队列名称
             * 参数二:队列是否持久化(true为持久化)
             * 参数三:是否排他(true为排他),排他性指的是当exclusive为true时,
             *        队列只对首次创建的connection是可见的,false则表示被所有创建的connection都可见
             * 参数四:如果设置为true,表示连接断开时会自动删除此队列
             * 参数五:队列的其他属性设置,一个map集合
             */
             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             String message = "hello world";
            /**
             * 发布消息
             * 参数一:设置为"",表示未指定交换机的名称,此时会通过一个默认的交换机来路由消息
             * 参数二:队列名称
             * 参数三:消息路由头的其他属性,这里未添添加任何属性,设置为null
             * 参数四:消息体,将其转换为字节数组
             */
             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

运行Producer,打开web管理界面,在Queues的选项里可以查看到新创建一个名为"test_queue"的队列,并且存有一条发布的消息,如下:

img

注意:队列会在第一次使用时创建,如果之前已经创建则不会再创建。

Cosumer示例:

public class Consumer {

    /**
     * 消息队列名称
     */
    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂并设置RabbitMQ主机地址,默认端口为5672
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection connection = connectionFactory.newConnection();
        //创建通信通道
        Channel channel = connection.createChannel();
        //创建队列(如果存在则不再创建)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //接收消息时所需的回调接口
        DeliverCallback callback = (consumerTag, delivery) -> {
            //获取消息体
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerTag:" + consumerTag);
        };
        //接收消息
        /**
         * 接收消息
         * 参数一:队列名称
         * 参数二:是否自动签收(true为自动签收),自动签收就是
         *        消息处理完后会自动给rabbitmq回馈一条消息,表示这条消息已经处理完毕
         * 参数三:消息的回调接口,也就是上面声明的DeliverCallback,用于接收消息体
         * 参数四:消费者取消订阅时的回调接口,会传入一个consumerTag签收标签
         */
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});

    }
}

注意:Consumer在创建Connection时不要放在try-with-resources语句块中,避免Connection自动关闭导致程序结束。因为Consumer运行后会产生阻塞,需要一直监听队列是否有新的消息,如果有则从队列取出并消费。

运行Consumer,在控制台查看接收的消息。

img

再次查看web管理控制台,此时队列的消息已经被消费掉。

img

大家可以反复运行Producer进行测试。

3.2 Exchange

前面的例子主要是讲解Queue的用法,并通过一个默认的Exchange(交换机)来路由消息。在这个章节中我们主要来了解其他几种Exchange的用法,Exchange的概念在前面的AMQP的通信模型中已经介绍过,它主要是根据路由key将转发消息到绑定的队列(Queue)上。

img

Exchange的类型有Topic、Direct、Fanout、Headers这四种。而Headers类型的交换机使用场景较少,我们主要学习Topic、Direct、Fanout这几种交换机的用法。

3.2.1 Topic

作用:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果匹配(可以通过通配符进行模糊匹配),则发送到该Binding对应的Queue中。

CusumerA示例:

public class ConsumerA {

    /**
     * 定义Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.topic";
    /**
     * 定义一个Queue名称,这里指定为info.queue
     */
    private final static String QUEUE_NAME = "info.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //声明Exchange,类型指定为为topic, 
        //第三个参数是否持久化,true为持久化,默认值为false
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);
        //声明queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key(使用"*"进行模糊绑定),表示任意以".info"结尾的key
        //的消息都会发送到这个queue中
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.info");
        //消息回调接口
        DeliverCallback callback = (consumerTag, delivery) -> {
            //获取路由key
            System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());
            //获取消息体
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerA receive message: " + message);
        };
        //接收消息
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }

CusomerB示例:

public class ConsumerB {

    /**
     * 定义Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.topic";
    /**
     * 定义一个Queue名称,这里指定为error.queue
     */
    private final static String QUEUE_NAME = "error.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //声明交换机,类型为topic
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key(使用"*"进行模糊绑定),表示任意以".error"结尾的key
        //的消息都会发送到这个queue中
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.error");
        //接收消息
        DeliverCallback callback = (consumerTag, delivery) -> {
            //获取路由key
            System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());
            //获取消息体
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerB receive message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

分别运行ConsumerA和ConsumerB,打开web管理页面,在Exchanges的页面中我们可以看到创建了一个名为logs.exchange,类型为topic的Exchange。

img

在Queues的页面中可以看到创建了error.queue和info.queue两个queue。

img

在Exchanges页面的列表中点击logs.topic我们创建的这个exchange,可以查看Exchange和queue的绑定信息,以及路由的key。

img

同样在Queues页面的的列表中点击error.queue或者info.queue,也可以查看相互绑定的信息。

img

Producer示例:

public class Producer {

    /**
     * Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.topic";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()) {
            //创建交换机,类型为topic
            //第三个参数是否持久化,true为持久化,默认值为false
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);
            //定义一个info的message
            String infoMessage = "info message...";
            //定义一个error的message
            String errorMessage = "error message...";
            //将消息发送到交换机,并指定不同路由key
            //第三个参数是否持久化消息,如果需要持久化则设置为MessageProperties.PERSISTENT_TEXT_PLAIN。
            //如果不需要持久化,则设置为null
            channel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "log.info", null, errorMessage.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行Producer,将两条消息发送到Exchange,此时Exchange会根据消息中指定的路由key将消息不同的消息发送到不同的Queue中。

结果:

img

img

3.2.2 Direct

作用:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果完全匹配(注意:是完全匹配),则发送到该Binding对应的Queue中。

ConsumerA示例:

public class ConsumerA {

    /**
     * Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.direct";
    /**
     * Queue名称
     */
    private final static String QUEUE_NAME = "info.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //声明Exchange,类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key,这里不能使用模糊匹配,direct类型要求路由的key必须完全匹配
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.info");
        //接收消息
        DeliverCallback callback = (consumerTag, delivery) -> {
            System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerA receive message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

ConsumerB示例:

public class ConsumerB {

    /**
     * 定义Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.direct";
    /**
     * 定义一个Queue名称,这里指定为error.queue
     */
    private final static String QUEUE_NAME = "error.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //创建交换机,类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //创建queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key,这里不能使用模糊匹配,direct类型要求路由的key必须完全匹配
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.error");
        //接收消息
        DeliverCallback callback = (consumerTag, delivery) -> {
            System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerB receive message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

Producer示例:

public class Producer {

    /**
     * Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.direct";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()) {
            //创建交换机,类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //定义一个info的message
            String infoMessage = "info message...";
            //定义一个error的message
            String errorMessage = "error message...";
            //将消息发送到交换机,并指定不同路由key
            channel.basicPublish(EXCHANGE_NAME, "log.info", null, infoMessage.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行ConsumerA,ConsumerB以及Producer

结果:

img

img

3.2.3 Fanout

说明:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key,直接将消息发送到所有绑定的queue中,因此所有队列都会接收到相同的消息,相当于广播。

ConsumerA示例:

public class ConsumerA {

    /**
     * Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.fanout";
    /**
     * Queue名称
     */
    private final static String QUEUE_NAME = "info.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //声明Exchange,类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //声明queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key,这里将路由key可设置为任意字符,通常设置为""
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "aa");
        //接收消息
        DeliverCallback callback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerA receive message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

ConsumerB示例:

public class ConsumerB {

    /**
     * 定义Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.fanout";
    /**
     * 定义一个Queue名称,这里指定为error.queue
     */
    private final static String QUEUE_NAME = "error.queue";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        //声明Exchange,类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //声明queue
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        //为queue和exchange绑定路由key,这里将路由key可设置为任意字符,通常设置为""
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bb");
        //接收消息
        DeliverCallback callback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ConsumerB receive message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});
    }
}

Producer示例:

public class Producer {

    /**
     * Exchange名称
     */
    private final static String EXCHANGE_NAME = "logs.fanout";

    public static void main(String[] args) throws Exception {
        //初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建连接对象,并使用连接对象构建一个消息通信的通道
        try(Connection conn = factory.newConnection();
            Channel channel = conn.createChannel()) {
            //创建交换机,类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //定义一个info的message
            String infoMessage = "info message...";
            //定义一个error的message
            String errorMessage = "error message...";
            //将消息发送到交换机,路由key可任意设置,通常设置为""
            channel.basicPublish(EXCHANGE_NAME, "", null, infoMessage.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "", null, errorMessage.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行ConsumerA,ConsumerB以及Producer

结果:

img

img

ConsumerA和ConsumerB同时都收到info和error的消息。

4. 整合Spring Boot

4.1 示例

添加依赖:

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    # 连接端口,默认5672
    port: 5672
    # 设置登陆认证的账号密码,默认为guest
    username: guest
    password: guest
    # 虚拟主机地址,默认为"/"
    virtual-host: /
    # 设置连接诶超时时间
    connection-timeout: 5000
    # 配置消费者监听设置
    listener:
      simple:
        # 最小消息消费线程数,这里表示每个Listener容器将开启2个线程去处理消息
        # 在2.0版本后可以在@RabbitListener注解中配置该参数
        concurrency: 2
        # 最大消费线程数
        max-concurrency: 5
        # 每个消费线程能从队列获取的消息数量
        # 每个customer会从消息队列中预取一些消息放入自己的LinkedBlockingQueue中进行消费,
        # 注意,每个customer线程都有自己对应的BlockingQueue
        prefetch: 1
        # 消息签收模式
        # none:表示没有任何的应答会被发送
        # manual:表示监听者必须通过调用Channel.basicAck()来告知所有的消息
        # auto:表示自动应答,除非坚挺着抛出异常,这是默认配置方式
        acknowledge-mode: auto
        # 当消费者监听器产生异常时是否将消息重新放回队列,默认值为true
        default-requeue-rejected: true

配置类

在配置类中主要声明Exchange、Queue等Bean的装配

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "order.exchange";
    public static final String QUEUE_NAME = "order.queue";
    public static final String ROUTER_KEY = "order.*";

    /**
     * 装配Topic类型的Exchange
     * 也可以装配其他类型如:DirectExchange、FanoutExchange
     * TopicExchange构造方法第一个参数指定交换机名称,第二个参数是否持久化交换机,
     * 第三个参数是否自动删除交换机
     */
    @Bean
    public TopicExchange exchange(){
        //return new TopicExchange(EXCHANGE_NAME);
        return new TopicExchange(EXCHANGE_NAME, false, true);
    }

    /**
     * 装配消息队列
     * Queue构造方法第一个参数指定Queue的名称,第二个参数表示是否持久化queue
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    /**
     * 将queue绑定到exchange
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);
    }
}

Consumer示例:

@Service
public class ConsumerService {

    /**
     * 使用@RabbitListener注解进行监听,通过queues属性指定要从哪个queue中消费消息
     * @Payload注解标注的参数为转换后的消息对象
     * @Headers注解标注的参数为消息头
     * @param message 消息体内容
     * @param headers 消息头
     * @param channel 消息通道
     */
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receiveMessage(@Payload String message,
                               @Headers Map<String, Object> headers,
                               Channel channel) throws IOException {
        System.out.println("接收消息:" + message);
    }
}

上面的消费者使用的是自动签收模式,如果设为手动签收,也就是在yml中设置了acknowledge-mode: manual,那么在签收时需要调用Channel的basicAck()方法来确认签收的消息。

//当手动确认签收时,需要自行给rabbitmq回馈一条消息,这条消息已经处理完毕
//从headers获取一个签收标签
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//确认签收,basicAck方法参入一个签收标签,第二个参数表示是否支持批量签收,false表示单个签收
channel.basicAck(deliveryTag, false);

Producer示例:

@Service
public class ProducerService {

    /**
     * 注入RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送文本消息
     * @param message
     */
    public void sendMessage(String message){
        //创建消息的唯一ID
        CorrelationData correlationData = new CorrelationData();
        //这里使用订单ID作为消息的ID
        correlationData.setId(UUID.randomUUID().toString());
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);
    }
}

测试:

编写单元测试,注入ProducerService来发送消息。

@SpringBootTest
class Ch04ApplicationTests {

    @Autowired
    private ProducerService service;

    @Test
    public void testSendMessage() {
        service.sendMessage("Hello world");
    }

}

先运行SpringBoot启动类,然后执行单元测试,查看ConsumerService的接收结果。

img

4.2 @RabbitListener注解

@RabbitListener可以标注在方法上或者类上,Spring会根据不同的@RabbitListener注解创建并启动不同的监听容器(MessageListenerContainer),并通过queues属性指定需要监听的队列。每个监听容器都有自己的唯一标识,可以通过id属性来标识,如果不指定id属性则会自动创建一个默认的唯一标识。

/**
  * @param message 消息内容
  * @param headers 消息头,需要@Headers或者@Header注解标注(可选参数)
  * @param channel 消息通道(可选参数)
*/
@RabbitListener(id="001", queues = "queue.a")
public void consumerA(String message,
                           @Headers Map<String, Object> headers,
                           Channel channel) {
    ...
}

@RabbitListener(id="002", queues = "queue.b")
public void consumerB(String message,
                           @Headers Map<String, Object> headers,
                           Channel channel) {
    ...
}

除了可以通过配置类来声明交换机、队列与绑定,也可以使用@RabbitListener提供的bindings属性来进行声明绑定。例如:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "order.queue", durable = "true"),
        exchange = @Exchange(name = "order.exchange", type = ExchangeTypes.TOPIC),
        key = "order.*")
public void receive(Long id) {
    ...
}

4.3 @RabbitHandler注解

当消费端需要接收不同的消息类型时,可以结合@RabbitHandler搭配使用。将@RabbitListener注解标注在类上,在不同方法上使用@RabbitHandler标注,这样Listener监听容器会根据消息转换后的类型来调用相应的方法来处理。

@RabbitListener(queues = {"queue.a","queue.b"})
public class ConsumerService {
  
  @RabbitHandler
  public void receiveA(String message) {
      ...
  }
  
  @RabbitHandler
  public void receiveB(User message) {
      ...
  }
  
  @RabbitHandler
  public void receiveC(Student message) {
      ...
  }
  
}  

4.4 自定义消息转换器

Spring默认使用的消息转换器是SimpleMessageConverter,只能处理基于文本的内容,序列化的Java对象和字节数组。

img

当然也可以自定义MessageConverter,例如将发送的一个实体把它序列化成Json,接收时又将Json自动转换为一个实体,那么可以使用Jackson2JsonMessageConverter。

添加依赖:

转换Json时需要用到Jackson

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-json</artifactId>
</dependency>

配置类:

只需在配置类中添加Jackson2JsonMessageConverter的装配

@Configuration
public class RabbitConfig {	
    ...
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

Order示例:

用于Producer将一个Order序列化为Json后发送到MQ,Consumer从MQ接收Json后将其反序列化为一个Order对象。

public class Order {
    /**
     * 订单ID
     */
    private String orderId;
    /**
     * 订单消息
     */
    private String message;

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

Producer示例:

@Service
public class ProducerService {

    /**
     * 注入RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送对象,使用自定义消息转换器转换为json
     * @param order
     */
    public void sendObject(Order order) {
        //创建消息的唯一ID
        CorrelationData correlationData = new CorrelationData();
        //这里使用订单ID作为消息的ID
        correlationData.setId(order.getOrderId());
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, 		correlationData);
    }
}

Consumer示例:

@Service
public class ConsumerService {

    /**
     * 使用自定义消息转换器
     * 使用@RabbitListener注解进行监听,通过queues属性指定要从哪个queue中消费消息
     * @Payload注解标注的参数为转换后的消息对象
     * @Headers注解标注的参数为消息头
     * @param order 转换后的消息对象
     * @param headers 消息头
     * @param channel 消息通道
     */
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receiveObject(@Payload Order order,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws IOException {
        System.out.println("接收消息:");
        System.out.println("订单编号:" + order.getOrderId());
        System.out.println("订单明细:" + order.getMessage());
    }
}

测试:

编写单元测试方法

@Test
public void testSendObject() {
    Order orderDTO = new Order();
    orderDTO.setOrderId("10001");
    orderDTO.setMessage("test order...");
    service.sendObject(orderDTO);
}

先运行SpringBoot启动类,执行单元测试并查看Consumer接收结果:

img

5. ACK机制

ACK (Acknowledge character)是一种应答确认符号。用于在网络通信中,数据接收方成功接收到消息后会给发送方返回一个确认信息。

5.1 发送确认

5.1.1 ConfirmCallback

当消息的发送端发送一条消息到Broker时,为了确保这条消息成功发送到Exchange,因此Broker可以返回一个确认信息给发送端,也就是Producer的Confirm模式。

yml配置:

设置publisher-confirm-type为correlated

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    # 启用ConfirmCallback模式
    publisher-confirm-type: correlated

Producer示例:

public void sendMessage(String message){
        //使用uuid作为消息的唯一ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);
        //通过setConfirmCallback设置一个回调来确认消息是否成功发布到Exchange中
        //如果发布成功ack则为true,失败为false
        rabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {
            //获取CorrelationData中的ID
            String eventId = cdata.getId();
            if (ack) {
                System.out.println("投递成功:"+eventId);
            } else {
                System.out.println("投递失败:"+eventId );
            }
        });

}
5.1.2 ReturnsCallback

上面的confrim模式只能确认消息是否正确到达Exchange中,但不能保证消息正确投递到目标 queue里。如果一定要确保消息投递到queue中,就需要使用ReturnCallback。

yml配置:

将publisher-returns和template.mandatory设置为true

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    # 启用ReturnCallback模式
    publisher-returns: true
    # 当mandatory标志位设置为true时,如果exchange根据自身类型和routingKey无法找到一个合适的queue,
    # 那么broker会调用basic.return方法将消息返还给生产者。设置为false时,出现上述情况broker会直接将消       息丢弃
    template:
      mandatory: true

Producer示例:

public void sendMessage(String message){
        //使用uuid作为消息的唯一ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);
        //通过setReturnsCallback设置回调来确认消息是否成功发布到queue中
        //注意,只有消息未正确到达queue时才会执行此回调此方法
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            System.out.println("ReturnsCallback=====>");
            System.out.println(returnedMessage.getMessage());
            System.out.println(returnedMessage.getReplyCode());
            System.out.println(returnedMessage.getReplyText());
            System.out.println(returnedMessage.getRoutingKey());
        });
    }

当消息未正确到达queue时,就会执行ReturnCallback。

5.2 消费确认

当消费端在消费一条消息时,Broker会等待消费端返回一条ACK来确认消息是否已成功消费,如果消费成功,那么Broker就会从队列中移除此消息。在Springboot中配置ack有none、auto、manual三种模式。

5.2.1 NONE

none表示不做任何的签收确认(相当于无ack),不管消费者是否正常消费消息,broker都认为消息已经被正常消费,并从broker中移除此消息。这样会导致消费端在处理消息的过程中如果产生异常,那么消息就会丢失。

yml配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        # ack确认机制
        # none:表示不做任何确认签收(相当于无ack)
        acknowledge-mode: none
5.2.2 AUTO

auto表示自动确认,自动确认会根据消费端在处理消息的过程是否抛出异常来决定返回ack或者nack给broker。

yml设置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        # ack确认机制
        # auto:表示自动确认(默认配置)
        acknowledge-mode: auto
        # 当消费者产生异常时是否将消息重新放回队列,默认值为true
        default-requeue-rejected: true

需要注意的是,在自动确认模式下,default-requeue-rejected设置为true并不能完全决定是否重新放回队列,另外一个决定因素是具体装配了哪一个MessageRecoverer(消息回收器)的Bean,它的作用是在消费消息失败后要做什么样的处理。默认使用是RejectAndDontRequeueRecoverer。下面分别说明几种有常见的MessageRecoverer实现。

RejectAndDontRequeueRecoverer:

这是默认使用MessageRecoverer,只要在消费端抛出除AmqpRejectAndDontRequeueException以外的其他异常并且default-requeue-rejected设置为true的情况下,消息都会自动重新投递到队列中,否则就会丢弃。

ImmediateRequeueMessageRecoverer:

这个会在抛出除AmqpRejectAndDontRequeueException以外的其他异常会自动返回nack,会忽略default-requeue-rejected的设置,并立即将消息放回当前队列。

@Configuration
public class RabbitConfig {
    /**
     * 装配ImmediateRequeueMessageRecoverer
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer() {
        return new ImmediateRequeueMessageRecoverer();
    }
}  

RepublishMessageRecoverer:

这个会在消费失败后将消息投递到自己指定的一个队列中,由其他订阅的消费者来处理。

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "error.exchange";
    public static final String QUEUE_NAME = "error.queue";
    public static final String ROUTER_KEY = "error.key";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(EXCHANGE_NAME, false, true);
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false, true);
    }

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);
    }

    /**
     * 装配RepublishMessageRecoverer
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer() {
        return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, ROUTER_KEY);
    }
}
5.2.3 MANUAL

manual表示手动确认,也就是在消费端的代码中手动调用basicAck方法确认签收。如果产生异常,可以通过basicNack或者basicReject拒绝签收。需要注意的是,当ack模式为manual时,default-requeue-rejected设置是无效的,必须在basicNack或者basicReject拒绝签收时指定是否重新放回队列。

yml配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        # ack确认机制
        # manual:表示手动确认
        acknowledge-mode: manual

手动确认签收:

在消费端通过调用basicAck方法来确认签收

@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receiveMessage(@Payload String message,
                           @Headers Map<String, Object> headers,
                           Channel channel) throws IOException {
    System.out.println("接收消息:" + message);
    //从headers中获取一个唯一标识
    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    //确认签收
    //参数1:消息投递的唯一标识
    //参数2:是否支持批量签收(true表示批量确认,false表示单个确认)
    channel.basicAck(deliveryTag, false);
}

在手动确认时,方法参数多了headers和channel两个参数。header表示消息的头信息,channel表示当前的消息通道。在投递一个消息时,消息头中会包含一个delivery tag,这个值表示本次投递的唯一标识,在同一个Channel中,这个值是唯一的。delivery tag长度为64为,值从1开始,每发送一次消息该值会递增1。消费者端在确认消息时带上此参数,用于告诉RabbitMQ某次投递已经正确应答。通过调用channel的basicAck方法来确认应答。

拒绝签收:

消费端在处理消息时可以依据业务规则来决定是否确认签收或拒绝签收。如果需要拒绝签收,可以调用channel的basicNack或者basicReject方法

//参数1:消息投递的标签
//参数2:是否支持批量拒绝
//参数3:是否重新放回队列(true表示放回)
channel.basicNack(deliveryTag, false, true);
//参数1:消息投递的标签
//参数2:是否重新放回队列(true表示放回)
channel.basicReject(deliveryTag, true);

两个方法区别在于basicReject一次只能拒绝单条消息,basicNack可以拒绝多条。并且这两个方法在拒绝签收时可以设置是否将消息重新放回消息队列。

6. 重试机制

在消息投递或者消费的过程因为网络或异常导致消息不能正常投递和消费时,可以采用重试机制。需要注意的是,这里的重试和RabbitMQ无关,RabbitMQ本身是不提供重试的功能,而是由Spring的retry框架实现,具体可以参考spring-retry模块的使用。

6.1 发送端重试

发送端重试是针对RabbitTemplate,在消息的投递过程中由于网络原因连接失败或者其他的错误导致消息没有正常投递到Broker,那么可以启用template的retry功能。

yml:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    # 发送端重试
    template:
      retry:
        # 启用重试机制
        enabled: true
        # 重试次数
        max-attempts: 3
        # 重试间隔时间(单位:毫秒)
        initial-interval: 2000ms

6.2 消费端重试

消费端重试主要针对的是消费者的Listener。当消费者在处理一条消息时,在这个过程如果Listener抛出异常或其他原因导致消息没有正常被消费,那么可以启用listener的rety功能。需要注意的是,当acknowledge-mode设置为auto并且default-requeue-rejected设置为true时,同时使用的是默认的MessageRecoverer(消息回收器),这样当消费端抛出除AmqpRejectAndDontRequeueException以外的其他异常时会将消息重新放回队列中,此时消费者又会从队列中取出消息进行消费,那么就会导致无限循环消费,这是不合理的。正确的做法是需要指定重试的次数,并且到达该次数后让RabbitMQ将此消息放到死信队列中(死信队列在下个章节讲解)做相应处理或由人工解决。如果未配置死信队列,那么达到次数后该消息将被丢弃。当然也可以配置RepublishMessageRecoverer,到达重试次数后将消息投递到自己指定的交换机和队列来处理,效果是一样的。

yml配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        # 消费端重试
        retry:
          # 启用消费端重试
          enabled: true
          # 重试次数
          max-attempts: 3
          # 重试间隔时间(单位:毫秒)
          initial-interval: 2000ms

7. 死信队列

7.1 概念

在消费端重试时,当到达重试次数后,此时被拒绝的消息就会变为死信(通常一个消息变为死信有几种情况,例如被拒绝的消息、消息达到TLL过期时间、以及队列达到了最大长度等),如果没有相应的处理,那么broker将丢弃此消息。所以当这些重试之后都无法消费的消息,我们就将其放入死信队列中做进一步的处理。而这个死信队列本身也是一个普通的Queue。这个Queue也需要绑定一个Exchange,这个Exchange就称之为死信交换机(DLX)。同样这个Exchange可以是任意类型如Direct、Topic、Fanout的Exchange,与普通的Exchange没有什么差异。因此当我们将一个消息发送到死信队列时,通过这个死信交换机将消息发送到指定的Queue。下面给出一个具体的示例:

7.2 自动确认处理

可以结合Spring的retry进行重试,当到大重试次数后指定将消息投递到死信交换机。

yml配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        # 自动确认
        acknowledge-mode: auto
        # 重试设置(如果使用手动确认建议使用redis来实现重试次数)
        retry:
          # 启用消费端重试监听
          enabled: true
          # 重试次数
          max-attempts: 3
          # 重试间隔时间(单位:毫秒)
          initial-interval: 2000ms

配置类:

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "order.exchange";
    public static final String QUEUE_NAME = "order.queue";
    public static final String ROUTER_KEY = "order.*";
    //声明死信交换机名称
    public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
    //声明死信队列名称
    public static final String DEAD_QUEUE_NAME = "dead.queue";
    //死信队列路由key
    public static final String DEAD_ROUTER_KEY = "dead.key";

    /**
     * 配置普通业务的Exchange
     */
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME, false, true);
    }

    /**
     * 装配死信Exchange(DLX),可以是direct类型也可以是其他类型
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE_NAME, false, true);
    }

    /**
     * 配置普通业务的消息队列并关联死信交换机,当这个队列中的消息被拒绝或达到重试次数后,
     * 通过死信路由的key将其发送到对应的死信交换机
     * @return
     */
    @Bean
    public Queue queue() {
        //使用QueueBuilder.nonDurable(QUEUE_NAME)创建不持久化的queue,
        //如果需要创建持久化的queue使用durable(QUEUE_NAME)方法
        return QueueBuilder.nonDurable(QUEUE_NAME)
                //自动删除
                //.autoDelete()
                //设置死信交换机的名称
                .withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME)
                //设置死信队列路由的key
                .withArgument("x-dead-letter-routing-key", DEAD_ROUTER_KEY)
                //消息超过这个时间还未被消费则路由到死信交换机
                //.withArgument("x-message-ttl", 5000)
                .build();
    }

    /**
     * 配置死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE_NAME, false);
    }

    /**
     * 将queue绑定到exchange
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);
    }

    /**
     * 将死信队列绑定到死信交换机上
     *
     * @return
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTER_KEY);
    }
  	
    /**
     * 装配Jackson2JsonMessageConverter
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

订单实体

public class Order {
    /**
     * 订单ID
     */
    private String id;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}

消费端:

@Service
public class ConsumerService {
    /**
     * 在消费者执行中引发一个异常,此时Spring会自动执行retry功能,
     * 当达到retry次数时,该消息会自动路由到DLX中
     */
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receiveObject(@Payload Order order,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws Exception {
        System.out.println("订单编号:" + order.getId());
        //产生异常
        System.out.println(10 / 0);
    }
}

死信队列消费端:

@Service
public class DeadLetterService {

    /**
     * 监听死信队列,如果有消息进入死信队列,将执行此方法做进一步的处理
     * @param message
     */
    @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)
    public void receiveDeadLetter(@Payload Order order,
                                  @Headers Map<String, Object> headers,
                                  Channel channel) throws IOException {
        System.out.println("接收到死信消息,订单ID:" + order.getId());
    }
}

发送端:

@Service
public class ProducerService {

    @Autowired
    private RabbitTemplate rabbitTemplate;
		
    public void sendObject(Order order) {
        //创建CorrelationData
        CorrelationData correlationData = new CorrelationData();
        //这里使用订单ID作为消息的ID
        correlationData.setId(order.getId());
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, correlationData);
    }
}

单元测试:

@SpringBootTest
class RabbitApplicationTests {

    @Autowired
    private ProducerService service;

    @Test
    public void testSendObject() {
        Order orderDTO = new Order();
        orderDTO.setId("10001");
        service.sendObject(orderDTO);
    }

}

7.2 手动确认处理

手动处理不需要retry的支持,可以结合Redis来存储重试的次数,当达到重试次数后执行nack并将消息投递到死信交换机中,重点在消费者中的代码实现。

yml配置:

spring:
  # redis配置
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password: wangl
    connect-timeout: 2s
  # rabbitmq设置
  rabbitmq:
    # rabbitmq服务器地址
    addresses: 127.0.0.1
    # 连接端口,默认是5672
    port: 5672
    # 账号密码
    username: guest
    password: guest
    # 虚拟主机地址,默认为"/"
    virtual-host: /
    # 连接的超时时间
    connection-timeout: 5000
    # 启用ConfirmCallback模式(发送确认),当消息到达交换机后会返回一条ack给发送端
    publisher-confirm-type: correlated
    # 设置发送端重试
    template:
      retry:
        # 启用重试机制
        enabled: true
        # 重试次数
        max-attempts: 3
        # 重试间隔时间(单位:毫秒)
        initial-interval: 2000ms
    # 消费者监听设置
    listener:
      simple:
        # 最小的消费线程数量
        concurrency: 2
        # 最大的消费线程数量
        max-concurrency: 5
        # 限流,每个线程能从队列获取的消息数量
        prefetch: 1
        # 手动确认
        acknowledge-mode: manual

消费端:

@Service
public class ConsumerService {

    /**
     * 重置次数的key前缀
     */
    private static final String ATTEMPTS_PREFIX = "attempts:";
    /**
     * 最大重试次数
     */
    private static final Integer MAX_RETRY = 3;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receiveObject(@Payload Order order,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws Exception {
        //获取一个消息的标签
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            log.info("订单ID: " + order.getOrderId());
            //产生一个异常
            System.out.println(10 / 0);
            //正常执行则手动签收消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            //如果产生异常则拒绝签收并将消息放回队列进行重试操作
            //从redis中获取重试次数,increment会在Redis中执行自增并返回自增的值,这一步是原子操作的
            Long retryTotal = redisTemplate.opsForValue().increment(ATTEMPTS_PREFIX + order.getOrderId());
            //如果大于最大重试次数则放入死信
            if(retryTotal > MAX_RETRY) {
                //拒绝签收,第三个参数设置为false表示不重新放回队列,
                //如果配置了死信队列则直接丢到死信队列中
                channel.basicNack(tag, false, false);
                //删除key
                redisTemplate.delete(ATTEMPTS_PREFIX + order.getOrderId());
            } else {
                //拒绝签收并重新放回队列继续执行重试
                channel.basicNack(tag, false, true);
            }
        }
    }
}

死信队列消费端:

@Service
public class DeadLetterConsumer {

    /**
     * 监听死信队列
     * @param order
     * @param headers
     * @param channel
     */
    @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)
    public void receiveDeadLetter(Order order,
                                  @Headers Map<String, Object> headers,
                                  Channel channel) throws IOException {
        log.info("接收到异常订单,编号:" + order.getOrderId());
        //手动确认签收
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(tag, false);
    }
}

8. 延迟队列

所谓延迟队列就是根据我们的业务要求将消息延迟进行处理。

  • 在电商中,用户下单后并没有立即支付,如果在指定的时间内未支付,则取消该订单

  • 在系统发布一个通告,在某时刻之后通知到指定的人

8.1 实现方式

Rabbitmq实现延迟消费通常有两种形式:

  1. 利用自身Time To Live(TTL)以及Dead Letter Exchanges(DLX)的特性实现

    (也就是如果达到TTL时间未消费则投递到死信队列)

  2. 利用Rabbitmq插件rabbitmq_delayed_message_exchange(延迟投递)

rabbitmq_delayed_message_exchange插件的实现方式简单点说就是当发布消息后不会立即进入队列,而是存储在mnesia(一个分布式数据系统)表中,当达到延迟的时间后就立刻将消息投递至目标队列中。需要注意的是,插件能支持的最大延迟时间为(2^32)-1毫秒, 大约49天。

官方说明:

For each message that crosses an "x-delayed-message" exchange, the plugin will try to determine if the message has to be expired by making sure the delay is within range, ie: Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).

8.2 安装插件

在官网https://www.rabbitmq.com/community-plugins.html下载延迟消息插件。

img

注意对应rabbitmq版本,下载后将插件拷贝到rabbitmq的plugins目录,拷贝后在终端使用以下命令可以看插件列表

rabbitmq-plugins list

img

启用插件:

在终端使用以下命令启用延迟插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

img

启用插件后重启RabbitMQ服务。

8.3 示例

这里以用户下单后未支付的场景为例,如果在指定的时间内未支付,则取消该订单。

创建订单表:

create table order_info(
	order_id varchar(50) primary key,
	order_status tinyint(1) not null, -- 0:取消订单 1:未支付 2:已支付
	order_message varchar(100)
);

添加依赖:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-json</artifactId>
</dependency>

<dependency>
     <groupId>org.mybatis.spring.boot</groupId>
     <artifactId>mybatis-spring-boot-starter</artifactId>
     <version>2.0.0</version>
</dependency>

<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-test</artifactId>
     <scope>test</scope>
     <exclusions>
          <exclusion>
              <groupId>org.junit.vintage</groupId>
              <artifactId>junit-vintage-engine</artifactId>
          </exclusion>
     </exclusions>
</dependency>

yml配置:

spring:
  # 数据源配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/order?serverTimezone=GMT&useUnicode=true&characterEncoding=utf-8
    username: root
    password: root
    # hikari连接池配置
    hikari:
      minimum-idle: 5
      maximum-pool-size: 20
      idle-timeout: 900000
      connection-timeout: 15000
      connection-test-query: select 1
  # 配置RabbitMQ
  rabbitmq:
    addresses: 127.0.0.1
    # 连接端口,默认5672
    port: 5672
    # 设置登陆认证的账号密码,默认为guest
    username: guest
    password: guest
    # 虚拟主机地址,默认为"/"
    virtual-host: /
    # 设置连接诶超时时间
    connection-timeout: 5000
    # 配置消费者监听设置
    listener:
      simple:
        # 最小消息消费线程数
        concurrency: 2
        # 最大消息消费线程数
        max-concurrency: 5
        # 限流,每个消费线程能从队列获取的消息数量
        prefetch: 1
        # 自动应答
        acknowledge-mode: auto
# mybatis配置
mybatis:
  type-aliases-package: edu.nf.ch05.entity
  mapper-locations: classpath:/mappers/*.xml

配置类:

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "delay.exchange";
    public static final String QUEUE_NAME = "delay.queue";
    public static final String ROUTER_KEY = "order.message";

    /**
     * 自定义Exchange,设置延迟交换机类型为direct,也可以设置为topic等其他类型
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", false, true, params);
    }

    /**
     * 装配消息队列
     * Queue构造方法第二个参数表示是否持久化消息
     * @return
     */
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME, false);
    }

    /**
     * 将queue绑定到exchange
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(delayExchange()).with(ROUTER_KEY).noargs();
    }

    /**
     * 自定义消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

Order示例:

@Data
public class Order {

    private String orderId;
    private Integer status;
    private String message;

}

OrderDao示例:

public interface OrderDao {

    /**
     * 根据ID查询订单信息
     * @param orderId
     * @return
     */
    Order getOrderById(String orderId);

    /**
     * 保存订单信息
     * @param order
     */
    void saveOrder(Order order);

    /**
     * 修改订单
     * @param order
     */
    void updateOrder(Order order);
}

Mapper映射配置:

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.nf.ch05.dao.OrderDao">

    <resultMap id="orderMap" type="order">
        <id property="orderId" column="order_id"/>
        <result property="status" column="order_status"/>
        <result property="message" column="order_message"/>
    </resultMap>

    <select id="getOrderById" parameterType="string" resultMap="orderMap">
        select order_id, order_status, order_message from order_info where order_id = #{orderId}
    </select>

    <insert id="saveOrder" parameterType="order">
        insert into order_info(order_id, order_status, order_message) values(#{orderId}, #{status}, #{message})
    </insert>

    <update id="updateOrder" parameterType="order">
        update order_info set order_status = #{status} where order_id = #{orderId}
    </update>
</mapper>

ProducerService示例:

@Service
public class ProducerService {

    /**
     * 注入RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入OrderDao
     */
    @Autowired
    private OrderDao orderDao;

    /**
     * 发送消息
     * @param order 订单对象
     * @param delayTime 延迟消费时长
     */
    public void send(Order order, int delayTime) {
        //创建消息的唯一ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getOrderId());
        //将订单信息入库,此时订单状态1,表示未支付
        orderDao.saveOrder(order);
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, 
                RabbitConfig.ROUTER_KEY, order, messagePostProcessor -> {
            //通过消息的后置处理器设置延迟放入的时间
            messagePostProcessor.getMessageProperties().setDelay(delayTime);
            return messagePostProcessor;
        }, correlationData);
    }
}

ConsumerService示例:

@Service
@Slf4j
public class ConsumerService {

    /**
     * 注入OrderDao
     */
    @Autowired
    private OrderDao orderDao;

    /**
     * 接收消息
     * 这里会延迟接收,也就是在发送端指定的延迟时间后才才进行接收
     */
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receiveMessage(Order order) {
        log.info("接收消息,订单编号:" + order.getOrderId());
        //依据订单编号查询数据库,如果订单状态为1则将其更新为0,表示取消订单
        order = orderDao.getOrderById(order.getOrderId());
        if(order.getStatus() == 1){
            order.setStatus(0);
            orderDao.updateOrder(order);
            log.info("订单已取消");
        }
    }
}

测试:

运行SpringBoot启动程序:

@SpringBootApplication
@MapperScan("edu.nf.ch05.dao")
public class Ch05Application {

    public static void main(String[] args) {
        SpringApplication.run(Ch05Application.class, args);
    }

}

执行单元测试:

@SpringBootTest
public class ProducerServiceTests {

    @Autowired
    private ProducerService producerService;

    @Test
    void testSend() {
        Order order = new Order();
        order.setOrderId("100001");
        order.setMessage("test order...");
        order.setStatus(1);
        producerService.send(order, 10000);
    }

}

查看数据库,测试会录入一条订单信息,其状态为1。

img

如果在指定的过期时间内未其他服务处理该订单,那么消费者会从队列中取出这条订单信息,根据ID去数据库查询该订单的状态,如果为1(未支付)则自动取消订单,将其状态更新为0。

img

再次查看这条订单记录,此时的状态已更新为0。

img

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/185990.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM类加载的过程和JVM垃圾回收机制

文章目录 一、JVM类加载的过程1.1类加载的基本流程1.1.1加载1.1.2验证1.1.3准备1.1.4解析1.1.5初始化 1.2双亲委派模型 二、JVM垃圾回收机制2.1找到垃圾2.1.1引用计数(比如Python&#xff0c;PHP中用到)2.1.2可达性分析(比如Java中用到) 2.2释放垃圾2.2.1标记清除2.2.2复制算法…

RAM模型从数据准备到pretrain、finetune与推理全过程详细说明

提示&#xff1a;RAM模型&#xff1a;环境安装、数据准备与说明、模型推理、模型finetune、模型pretrain等 文章目录 前言一、环境安装二、数据准备与解读1.数据下载2.数据标签内容解读3.标签map内容解读 三、finetune训练1.微调训练命令2.load载入参数问题3.权重载入4.数据加载…

大数据技术之数据安全与网络安全——CMS靶场实训

大数据技术之数据安全与网络安全——CMS靶场实训 在当今数字化时代&#xff0c;大数据技术的迅猛发展带来了前所未有的数据增长&#xff0c;同时也催生了对数据安全和网络安全的更为迫切的需求。本篇博客将聚焦于大数据技术背景下的数据安全与网络安全&#xff0c;并通过CMS&a…

4.操作系统常见面试题(2)

3.4 虚拟内存 直接使⽤物理内存会产⽣⼀些问题 1. 内存空间利⽤率的问题&#xff1a;各个进程对内存的使⽤会导致内存碎⽚化&#xff0c;当要⽤ malloc 分配⼀块很⼤的内存空间时&#xff0c;可能会出现虽然有⾜够多的空闲物理内存&#xff0c;却没有⾜够⼤的连续空闲内存这种…

点大商城V2.5.3分包小程序端+小程序上传提示限制分包制作教程

这几天很多播播资源会员反馈点大商城V2.5.3小程序端上传时提示大小超限&#xff0c;官方默认单个包都不能超过2M&#xff0c;总分包不能超20M。如下图提示超了93KB&#xff0c;如果出现超的不多情况下可采用手动删除一些images目录下不使用的图片&#xff0c;只要删除超过100KB…

82基于matlab GUI的图像处理

基于matlab GUI的图像处理&#xff0c;功能包括图像一般处理&#xff08;灰度图像、二值图&#xff09;&#xff1b;图像几何变换&#xff08;旋转可输入旋转角度、平移、镜像&#xff09;、图像边缘检测&#xff08;拉普拉斯算子、sobel算子、wallis算子、roberts算子&#xf…

unordered_map 与 unordered_set 的模拟实现

unordered_map 与 unordred_set 的模拟实现与 map 与 set 的模拟实现差不多。map 与 set 的模拟实现中&#xff0c;底层的数据结构是红黑树。unordered_map 与 unordered_set 的底层数据结构是哈希表。因此&#xff0c;在模拟实现 unordered_map 与 unordred_set 之前你必须确保…

nodejs微信小程序+python+PHP-青云商场管理系统的设计与实现-安卓-计算机毕业设计

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder

密码&#xff0c;加密&#xff0c;解密 spring-security-crypto-5.7.3.jar /** Copyright 2002-2011 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with t…

HTML新特性【缩放图像、图像切片、平移、旋转、缩放、变形、裁切路径、时钟、运动的小球】(二)-全面详解(学习总结---从入门到深化)

目录 绘制图像_缩放图像 绘制图像_图像切片 Canvas状态的保存和恢复 图形变形_平移 图形变形_旋转 图形变形_缩放 图形变形_变形 裁切路径 动画_时钟 动画_运动的小球 引入外部SVG 绘制图像_缩放图像 ctx.drawImage(img, x, y, width, height) img &#xf…

开源与闭源

我的观点&#xff1a; 开源与闭源软件都有各自的优势和劣势&#xff0c;没有绝对的对错之分。.. 一、开源和闭源的优劣势比较 开源的好处与劣处 优势&#xff1a; 创新与合作&#xff1a;开源软件能够吸引更多的开发者参与到项目中来&#xff0c;促进创新和合作。开放的源代码…

【网易云商】构建高效 SaaS 系统的技术要点与最佳实践

SaaS 是什么 定义 相信大家都对云服务中的 IaaS、PaaS、SaaS 早就有所耳闻&#xff0c;现在更是衍生出了 aPaaS、iPaaS、DaaS 等等的类似概念。对于 SaaS 也有各种各样的定义&#xff0c;本文给出的定义是&#xff1a; SaaS 是一种基于互联网提供服务和软件的交付模式&#xf…

一文彻底看懂Python切片,Python切片理解与操作

1.什么是切片 切片是Python中一种用于操作序列类型(如列表、字符串和元组)的方法。它通过指定起始索引和结束索引来截取出序列的一部分,形成一个新的序列。切片是访问特定范围内的元素,就是一个Area。 说个笑话:切片不是切片,而是切片,但是又是切片。大家理解下呢(末…

80C51单片机----数据传送类指令

目录 一.一般传送指令&#xff0c;即mov指令 1.16位传送&#xff08;仅1条&#xff09; 2.8位传送 &#xff08;1&#xff09;目的字节为A&#xff08;累加器&#xff09; &#xff08;2&#xff09;目的字节为Rn(工作寄存器) &#xff08;3&#xff09;目的字节为direct…

java中的String.format()方法详解

介绍 String.format() 是 Java 中的一个字符串格式化方法&#xff0c;它用于生成指定格式的字符串。这个方法可以接受一个或多个参数&#xff0c;并将它们按照指定的格式插入到字符串中。它使用了类似于 C 语言中的 printf 函数的语法。 String.format() 方法的使用格式如下&…

Tars框架 Tars-Go 学习

Tars 框架安装 网上安装教程比较多&#xff0c;官方可以参数这个 TARS官方文档 (tarsyun.com) 本文主要介绍部署应用。 安装完成后Tars 界面 增加应用amc 部署申请 amc.GoTestServer.GoTestObj 名称不知道的可以参考自己创建的app config 点击刷新可以看到自己部署的应用 服…

微机原理_3

一、单项选择题(本大题共15小题,每小题3分,共45分。在每小题给出的四个备选项中,选出一个正确的答案,请将选定的答案填涂在答题纸的相应位置上。) 在 8086 微机系统中&#xff0c;完成对指令译码操作功能的部件是&#xff08;)。 A. EU B. BIU C. SRAM D. DRAM 使计算机执行某…

【Rust日报】2023-11-22 Floneum -- 基于 Rust 的一款用于 AI 工作流程的图形编辑器

Floneum -- 基于 Rust 的一款用于 AI 工作流程的图形编辑器 Floneum 是一款用于 AI 工作流程的图形编辑器&#xff0c;专注于社区制作的插件、本地 AI 和安全性。 Floneum 有哪些特性&#xff1a; 可视化界面&#xff1a;您无需任何编程知识即可使用Floneum。可视化图形编辑器可…

2023年金融信创行业研究报告

第一章 行业概况 1.1 定义 金融信创是指在金融行业中应用的信息技术&#xff0c;特别是那些涉及到金融IT基础设施、基础软件、应用软件和信息安全等方面的技术和产品。这一概念源于更广泛的“信创 (信息技术应用创新)”&#xff0c;即通过中国国产信息技术替换海外信息技术&a…

某60区块链安全之未初始化的存储指针实战二学习记录

系列文章目录 文章目录 系列文章目录未初始化的存储指针实战二实验目的实验环境实验工具实验原理实验内容实验过程EXP利用 未初始化的存储指针实战二 实验目的 学会使用python3的web3模块 学会分析以太坊智能合约未初始化的存储指针漏洞 找到合约漏洞进行分析并形成利用 实验…