【RabbitMQ】RabbitMQ 7种工作模式简单使用示例

目录

1. 简单模式

2. Work Queues(作队列) 

3. Publish/Subscribe(发布/订阅)

4. Routing(路由模式)

5. Topics(通配符模式)

6. RPC(RPC通信)

7. Publisher Confirms(发布确认)

7.1Publishing Messages Individually(单独确认) 

7.2 Publishing Messages in Batches(批量确认)

7.3 Handling Publisher Confirms Asynchronously(异步确认) 


上一篇文章中, 我们简单介绍了RabbitMQ 7种工作模式: 

【RabbitMQ】RabbitMQ 的七种工作模式介绍-CSDN博客

在这篇文章中, 将会对这7种工作模式进行代码演示

这篇文章代码中用到的常量:

public class Constants {
    public static final String HOST = "8.130.35.237";
    public static final Integer PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "test";
    //工作队列模式
    public static final String WORK_QUEUE = "work.queue";

    //发布订阅模式
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    //路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";

    //通配符模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";

    //rpc模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

    //publisher confirms
    public static final String PUBLISH_CONFIRMS_QUEUE1 = "publish.confirms.queue1";
    public static final String PUBLISH_CONFIRMS_QUEUE2 = "publish.confirms.queue2";
    public static final String PUBLISH_CONFIRMS_QUEUE3 = "publish.confirms.queue3";
}

咱们在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法 

1. 简单模式

在第一篇文章中的入门程序就是简单模式. 此处就省略啦~~

第一篇文章的地址:【RabbitMQ】RabbitMQ 的概念以及使用RabbitMQ编写生产者消费者代码-CSDN博客

2. Work Queues(作队列) 

简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收

编写生产者代码
工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本一样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一次发送10条消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明
         * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
         * routingKey: 内置交换机, routingKey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "Hello work queue... " + i;
            channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
            System.out.println(msg + "消息发送成功!");
        }

        //6.资源释放
        channel.close();
        connection.close();
    }
}

编写消费者代码
消费者代码和简单模式一样, 只是复制两份. 两个消费者代码可以是一样的

Consumer1:

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1 收到消息: " + new String(body));
            }
        };

        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //5.释放资源
//        channel.close();
//        connection.close();
    }
}

Consumer2: 

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 收到消息: " + new String(body));
            }
        };

        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //5.释放资源
//        channel.close();
//        connection.close();
    }
}

运行程序,观察结果

先启动两个消费者运行,再启动生产者
如果先启动生产者,在启动消费者,由于消息较少,处理较快,那么第一个启动的消费者就会瞬间把10条消息消费掉,所以我们先启动两个消费者,再启动生产者
1. 启动2个消费者
2. 启动生产者

可以看到两个消费者都打印了消费信息

可以看到管理界面上显示两个消费者

3. Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了一个Exchange角色
Exchange 常见有三种类型,分别代表不同的路由规则

a) Fanout: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe模式)
b) Direct: 定向,把消息交给符合指定routing key的队列 (Routing模式)
c)Topic: 通配符,把消息交给符合routing pattern(路由模式)的队列 (Topics模式)

也就分别对应不同的工作模式

我们来看看Publish/Subscribe 模式

步骤:
1.引入依赖
2.编写生产者代码
3.编写消费者代码

编写生产者代码

和前面两个的区别是:
需要创建交换机,并且绑定队列和交换机

创建交换机

声明两个队列
后面验证是否两个队列都能收到消息

绑定队列和交换机

完整代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
        //4.申明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        //5.交换机和队列绑定
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明
         * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
         * routingKey: 内置交换机, routingKey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "Hello fanout queue... " + i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
            System.out.println(msg + "消息发送成功!");
        }

        //6.资源释放
        channel.close();
        connection.close();
    }
}

编写消费者代

交换机和队列的绑定关系及声明已经在生产方写完,所以消费者不需要再写了
去掉声明队列的代码就可以了

1.创建Channel
2.接收消息,并处理

完整代码
消费者1

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.申明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 收到消息: " + new String(body));
            }
        };

        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
    }
}

消费者2

把队列名称改一下就可以了.此处省略

运行程序,观察结果
1.运行生产者
a) 可以看到两个队列分别有了一条消息

b) Exchange多了队列绑定关系

2.运行消费者

4. Routing(路由模式)

队列和交换机的绑定, 不能是任意的绑定了, 而是要指定一个BindingKey(RoutingKey的一种)
消息的发送方在向Exchange发送消息时, 也需要指定消息的RoutingKey

Exchange也不再把消息交给每一个绑定的key, 而是根据消息的RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全一致, 才会接收到消息

编写生产者代码

和发布订阅模式的区别是: 交换机类型不同, 绑定队列的BindingKey不同

创建交换机, 定义交换机类型为BuiltinExchangeType.DIRECT

声明队列

绑定交换机和队列

完整代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        //4.申明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        //5.交换机和队列绑定
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");

        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明
         * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
         * routingKey: 内置交换机, routingKey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        String msg = "hello direct, my routingKey is a ...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg.getBytes());

        String msg_b = "hello direct, my routingKey is b ...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());

        String msg_c = "hello direct, my routingKey is c ...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());

        System.out.println("消息发送成功!");

        //6.资源释放
        channel.close();
        connection.close();
    }
}

编写消费者代码

Routing模式的消费者代码和Publish/Subscribe 代码一样,同样复制出来两份

消费者1:DirectRabbitmqConsumer1
消费者2: DirectRabbitmgConsumer2

修改消费的队列名称就可以

完整代码:

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.申明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 收到消息: " + new String(body));
            }
        };

        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
    }
}

运行程序, 观察结果:

5. Topics(通配符模式)

Topics 和Routing模式的区别是:

1. topics 模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)

2. topic 类型的交换机在匹配规则上进行了扩展, Binding Key支持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配).

编写生产者代码

和路由模式,发布订阅模式的区别是: 交换机类型不同,绑定队列的RoutingKey不同

创建交换机

定义交换机类型为BuiltinExchangeType.TOPIC

声明队列

绑定交换机和队列

完整代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
        //4.申明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        //5.交换机和队列绑定
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");

        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明
         * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
         * routingKey: 内置交换机, routingKey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        String msg = "hello direct, my routingKey is sae.a.fa ...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "sae.a.fa", null, msg.getBytes()); //转发到Q1

        String msg_b = "hello direct, my routingKey is ef.a.b ...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());//转发到Q1,Q2

        String msg_c = "hello direct, my routingKey is c.ef.d ...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());//转发到Q2

        System.out.println("消息发送成功!");

        //6.资源释放
        channel.close();
        connection.close();
    }
}

编写消费者代码

Routing模式的消费者代码和Routing模式代码一样,修改消费的队列名称即可
同样复制出来两份
消费者1:TopicRabbitmqConsumerl
消费者2: TopicRabbitmqConsumer2

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.申明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 收到消息: " + new String(body));
            }
        };

        channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
    }
}

代码运行结果:

6. RPC(RPC通信)

RPC(Remote Procedure Cal),即远程过程调用.它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术.类似于Http远程调用.

RabbitMQ实现RPC通信的过程,大概是通过两个队列实现一个可回调的过程

大概流程如下:
1.客户端发送消息到一个指定的队列, 并在消息属性中设置 replyTo 字段, 这个字段指定了一个回调队列, 服务端处理后, 会把响应结果发送到这个队列.

2.服务端接收到请求后, 处理请求并发送响应消息到 replyTo 指定的回调队列

3.客户端在回调队列上等待响应消息. 一旦收到响应,客户端会检查消息的correlationld属性,以确
保它是所期望的响应.

编写客户端代码

客户端代码主要流程如下:

1.声明两个队列, 包含回调队列 replyQueueName, 声明本次请求的唯一标志 corrld
2.将 replyQueueName 和 corrld 配置到要发送的消息队列中
3.使用阻塞队列来阻塞当前进程, 监听回调队列中的消息, 把请求放到阻塞队列中
4.阻塞队列有消息后, 主线程被唤醒,打印返回内容

申明队列:

使用内置交换机发送消息:

//3.发送请求
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
        .correlationId(correlationID)
        .replyTo(Constants.RPC_RESPONSE_QUEUE)
        .build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

使用阻塞队列, 来存储回调结果:

//4.接收响应
//使用阻塞队列, 来存储响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String respMsg = new String(body);
        System.out.println("接收到回调信息: " + respMsg);
        if(correlationID.equals(properties.getCorrelationId())) {
            //如果 correlationID 校验一致
            response.offer(respMsg);
        }
    }
};

channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);

获取回调结果:

String result = response.take();
System.out.println("[RPC Client 响应结果]: " + result);

完整代码:

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
 * rpc 客户端
 * 1.发送请求
 * 2.接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false,null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false,null);
        //3.发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4.接收响应
        //使用阻塞队列, 来存储响应
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调信息: " + respMsg);
                if(correlationID.equals(properties.getCorrelationId())) {
                    //如果 correlationID 校验一致
                    response.offer(respMsg);
                }
            }
        };

        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]: " + result);
    }
}

编写服务端代码

服务端代码主要流程如下:
1.接收消息
2.根据消息内容进行响应处理,把应答结果返回到回调队列中

设置同时最多只能获取一个消息

如果不设置 basicQos, RabbitMQ 会使用默认的 OoS 设置, 其 prefetchcount 默认值为0. 当
prefetchCount为0 时,RabbitMO 会根据内部实现和当前的网络状况等因素,可能会同时发送多条
消息给消费者. 这意味着在默认情况下,消费者可能会同时接收到多条消息, 但具体数量不是严格保
证的,可能会有所波动

在RPC模式下,通常期望的是一对一的消息处理,即一个请求对应一个响应,消费者在处理完一个消息并确认之后,才会接收到下一条消息.

接收消息, 并做出相应的处理

DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String request = new String(body);
        System.out.println("接收到请求: " + request);
        String response = "针对request:" + request + " , 响应成功";
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                                .build();
        channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);

RabbitMQ 消息确定机制

在RabbitMO中,basicConsume方法的autoAck参数用于指定消费者是否应该自动向消息队列确认

消息自动确认(autoAck=true):  消息队列在将消息发送给消费者后, 会立即从内存中删除该消息. 这意味着, 如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费

手动确认(autoAck=false):  消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck
方法来确认消息. 手动确认提供了更高的可靠性, 确保消息不会被意外丢失, 适用于消息处理重要且需要确保每个消息都被正确处理的场景.

完整代码:

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RPC Server
 * 1.接收请求
 * 2.发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body);
                System.out.println("接收到请求: " + request);
                String response = "针对request:" + request + " , 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                                .correlationId(properties.getCorrelationId())
                                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

运行结果: 

7. Publisher Confirms(发布确认)

作为消息中间件,都会面临消息丢失的问题.

消息丢失大概分为三种情况:

生产者问题. 因为应用程序故障, 网络抖动等各种原因, 生产者没有成功向broker发送消息

消息中间件自身问题, 生产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失

消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费
失败的消息从队列中删除了

RabbitMO也对上述问题给出了相应的解决方案,问题2可以通过持久化机制.问题3可以采用消息应答机制.

针对问题1,可以采用发布确认(Publisher Confirms)机制实现

发布确认 属于RabbitMQ的七大工作模式之一

生产者将信道设置成 confirm(确认) 模式, 一旦信道进入confirm模式, 所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始), 一旦消息被投递到所有匹配的队列之后, RabbitMO就会发送一个确认给生产者(包含消息的唯一ID), 这就使得生产者知道消息已经正确到达目的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写入磁盘之后发出. broker回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号, 此外 broker也可以设置 channel.basicAck 方法中的 multiple参数, 表示到这个序号之前的所有消息都已经得到了处理.

发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息

1.当消息最终得到确认之后, 生产者可以通过回调方法来处理该确认消息.
2.如果 RabbitMO 因为自身内部错误导致消息丢失, 就会发送一条nack(Basic.Nack)命令, 生产者同样可以在回调方法中处理该nack命令

使用发送确认机制, 必须要信道设置成 confirm(确认) 模式

发布确认是 AMOP 0.9.1协议的扩展, 默认情况下它不会被启用. 生产者通过channel.confirmSelect() 将信道设置为confirm模式.

发布确认有3种策略, 接下来我们来学习这三种策略

7.1Publishing Messages Individually(单独确认) 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws Exception {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws Exception{
        //Publishing Messages Individually(单独确认)
        publishingMessagesIndividually();
        //Publishing Messages in Batches(批量确认)
        publishingMessagesInBatches();
        //Handling Publisher Confirms Asynchronously(异步确认)
        handlingPublisherConfirmsAsynchronously();
    }

    private static void handlingPublisherConfirmsAsynchronously() {
    }

    private static void publishingMessagesInBatches() {

    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception{
        try (Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.申明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1, true, false, false, null);
            //4.发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for(int i = 0; i<MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
        }
    }
}

代码运行结果: 

可以发现,发送200条消息,耗时很长

观察上面代码, 会发现这种策略是每发送一条消息后就调用channel.waitForConfirmsOrDie方法, 之后等待服务端的确认, 这实际上是一种串行同步等待的方式, 尤其对于持久化的消息来说, 需要等待消息确认存储在磁盘之后才会返回(调用Linux内核的fsync方法).

但是发布确认机制是支持异步的. 可以一边发送消息, 一边等待消息确认.

由此进行了改进, 接下来看另外两种策略:

Publishing Messages in Batches(批量确认): 每发送一批消息后, 调用channel.waitForConfirms方
法, 等待服务器的确认返回.

Handling Publisher Confirms Asynchronously(异步确认): 提供一个回调方法, 服务端确认了一条
或者多条消息后客户端会回这个方法进行处理

7.2 Publishing Messages in Batches(批量确认)

核心代码: 

private static void publishingMessagesInBatches() throws Exception{
try (Connection connection = createConnection()){
    //1.开启信道
    Channel channel = connection.createChannel();
    //2.设置信道为confirm模式
    channel.confirmSelect();
    //3.申明队列
    channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2, true, false, false, null);
    //4.发送消息, 并进行确认
    long start = System.currentTimeMillis();
    int batchSize = 100;
    int outstandingMessageCount = 0;
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String msg = "hello publisher confirms " + i;
        channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE2, null, msg.getBytes());
        outstandingMessageCount++;
        if(outstandingMessageCount == batchSize) {
            //等待确认
            channel.waitForConfirmsOrDie(5000);
            outstandingMessageCount = 0;
        }
    }
    if(outstandingMessageCount > 0) {
        //等待确认
        channel.waitForConfirmsOrDie(5000);
        outstandingMessageCount = 0;
    }
    long end = System.currentTimeMillis();
    System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}

代码运行结果: 

可以观察到, 性能提高了很多

相比于单独确认策略, 批量确认极大地提升了confirm的效率, 缺点是出现Basic,Nack或者超时时, 我们不清楚具体哪条消息出了问题.客户端需要将这一批次的消息全部重发, 这会带来明显的重复消息数量.

当消息经常丢失时, 批量确认的性能应该是不升反降的.

7.3 Handling Publisher Confirms Asynchronously(异步确认) 

异步confirm方法的编程实现最为复杂. Channel接口提供了一个方法addConfirmListener. 这个方法
可以添加ConfirmListener回调接口.

ConfirmListener接口中包含两个方法:handleAck(long deliveryTag,booleanmultiple) 和 handleNack(long deliveryTag,boolean multiple), 分别对应处理RabbitMO发送给生产者的ack和nack.

deliveryTag 表示发送消息的序号. multiple 表示是否批量确认

我们需要为每一个Channel维护一个已发送消息的序号集合, 当收到RabbitMO的 confirm 回调时,从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带一个从1开始递增的deliveryTag序号. 我们可以使用SortedSet 的有序性来维护这个已发消息的集合

1. 当收到ack时,从序列中删除该消息的序号,如果为批量确认消息,表示小于等于当前序号deliveryTag的消息都收到了,则清除对应集合

2. 当收到nack时,处理逻辑类似, 不过需要结合具体的业务情况, 进行消息重发等操作

/**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.申明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3, true, false, false, null);
            //4.监听confirm
            //有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    //multiple 批量
                    //confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
                    if(multiple) {
                        //批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
                        //这批序号的消息都已经被ack了
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        //单条确认:将当前的deliveryTag从集合中移除
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple) {
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务要根据实际场景进行处理, 比如重发, 此处省略
                }
            });

            //5.发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for(int i = 0; i<MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms " + i;
                //得到下次发送消息的序号, 从1开始
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()) {
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
        }
    }

三种策略对比, 完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws Exception {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME); //账号
        connectionFactory.setPassword(Constants.PASSWORD); //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws Exception{
        //Publishing Messages Individually(单独确认)
        publishingMessagesIndividually();
        //Publishing Messages in Batches(批量确认)
        publishingMessagesInBatches();
        //Handling Publisher Confirms Asynchronously(异步确认)
        handlingPublisherConfirmsAsynchronously();
    }

    /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.申明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3, true, false, false, null);
            //4.监听confirm
            //有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    //multiple 批量
                    //confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
                    if(multiple) {
                        //批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
                        //这批序号的消息都已经被ack了
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        //单条确认:将当前的deliveryTag从集合中移除
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple) {
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务要根据实际场景进行处理, 比如重发, 此处省略
                }
            });

            //5.发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for(int i = 0; i<MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms " + i;
                //得到下次发送消息的序号, 从1开始
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()) {
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
        }
    }

    private static void publishingMessagesInBatches() throws Exception{
        try (Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.申明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2, true, false, false, null);
            //4.发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if(outstandingMessageCount == batchSize) {
                    //等待确认
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if(outstandingMessageCount > 0) {
                //等待确认
                channel.waitForConfirmsOrDie(5000);
                outstandingMessageCount = 0;
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
        }
    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception{
        try (Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.申明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1, true, false, false, null);
            //4.发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for(int i = 0; i<MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/893932.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【实战篇】用SkyWalking排查线上[xxl-job xxl-rpc remoting error]问题

一、组件简介和问题描述 SkyWalking 简介 Apache SkyWalking 是一个开源的 APM&#xff08;应用性能管理&#xff09;工具&#xff0c;专注于微服务、云原生和容器化环境。它提供了分布式追踪、性能监控和依赖分析等功能&#xff0c;帮助开发者快速定位和解决性能瓶颈和故障。…

Tbox编译注意问题

Tbox是一个强大的开源库&#xff0c;感谢做为ruki的无私奉献。 tbox: 跨平台的c开发库&#xff0c;提供asio、stream、容器、算法、xml/json/plist解析、数据库等常用模块 在使用tbox开源库的数据库模块时&#xff0c;没有使用xmake进行编译&#xff0c;而是使用make编译的。…

Golang | Leetcode Golang题解之第474题一和零

题目&#xff1a; 题解&#xff1a; func findMaxForm(strs []string, m, n int) int {dp : make([][]int, m1)for i : range dp {dp[i] make([]int, n1)}for _, s : range strs {zeros : strings.Count(s, "0")ones : len(s) - zerosfor j : m; j > zeros; j--…

数据库血缘工具学习,使用以及分享

一.血缘关系是什么&#xff1f;为什么要分析血缘关系&#xff1f; 首先&#xff0c;什么是血缘关系&#xff1f; 是指在数据的全生命周期中&#xff0c;从数据的产生、处理、加工、融合、流转到最终消亡&#xff0c;数据之间自然形成的一种类似人类血缘的关联关系。 说的再简…

PyTorch 2.5 发布带来一些新特性和改进

官网&#xff1a;https://github.com/pytorch/pytorchGitHub&#xff1a;https://github.com/pytorch/pytorch原文&#xff1a;https://github.com/pytorch/pytorch/releases/tag/v2.5.0 主要亮点 (Highlights)] SDPA CuDNN 后端&#xff1a;为 torch.nn.functional.scaled_d…

Zico 2 靶机 - 详细流程

✨ 准备工作 靶机 && kali 环境要求 机器名网络配置靶机Zico 2NAT 模式攻击机kaliNAT 模式 靶机下载链接&#xff1a;zico2: 1 ~ VulnHub 打开 VMware&#xff0c;将 zico2.ova 拖拽到 VMware 中 设置 虚拟机名称(A) - 存储路径(P)- 导入 若是&#xff0c;…

3DsMax删除FBX 导出的预设

3DsMax删除FBX 导出的预设 文档 https://help.autodesk.com/view/3DSMAX/2025/CHS/?guidGUID-9939F041-5E2D-4AA8-A732-6C2A1DFB5314删除静态FBX 这个预设 使用everything 搜索预设文件的后缀.fbxexportpreset &#xff0c;然后 文件路径 C:\Users\GoodCooking\Documents\3…

C++标准模板库--vector

vector 介绍 vector&#xff08;向量&#xff09;是一种序列容器&#xff0c;表示为可以改变大小的数组。vector中的元素使用连续的存储位置&#xff0c;这意味着也可以使用指向其元素的常规指针偏移量来访问任意元素&#xff0c;且与数组一样高效。但与数组不同的是&#xff…

React01 开发环境搭建

React 开发环境搭建 一、创建 React 项目二、项目精简 一、创建 React 项目 执行下述命令创建 react 项目 blu-react-basis npx create-react-app blu-react-basis项目目录结构如下&#xff1a; 执行下述命令启动项目 npm run start启动效果如下&#xff1a; 二、项目精简 …

51单片机的万年历【proteus仿真+程序+报告+原理图+演示视频】

1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块时钟模块按键蜂鸣器等模块构成。适用于电子万年历、数字时钟万年历等相似项目。 可实现功能: 1、LCD1602实时显示年月日星期和北京时间&#xff0c;具备闰年判断功能 2、按键可设置闹钟时间 3、按键可修改当前时…

案例-登录认证

案例-登录认证 在前面的课程中&#xff0c;我们已经实现了部门管理、员工管理的基本功能&#xff0c;但是大家会发现&#xff0c;我们并没有登录&#xff0c;就直接访问到了Tlias智能学习辅助系统的后台。 这是不安全的&#xff0c;所以我们今天的主题就是登录认证。 最终我们…

redo文件误删除后通过逻辑备份进行恢复

问题描述 开发同事让在一个服务器上查找下先前库的备份文件是否存在&#xff0c;如果存在进行下恢复。翻了服务器发现备份文件存在&#xff0c;多愁了一眼竟翻到了该备份文件于2024.6.17日恢复过的日志&#xff0c;赶紧和开发沟通说2024.6.17号已经恢复过了为啥还要恢复&#x…

空间大数据的数据变换与价值提炼

在数字化时代&#xff0c;空间大数据正成为推动社会经济发展的关键因素。空间大数据不仅体量巨大&#xff0c;而且具有高速流转、多样类型和真实性等特点&#xff0c;它们在获取、存储、管理、分析方面超出了传统数据库软件工具的能力范围。地理信息系统&#xff08;GIS&#x…

AWS账号与邮箱的关系解析

在当今数字化时代&#xff0c;云计算服务的普及使得越来越多的企业和个人用户开始使用亚马逊网络服务&#xff08;AWS&#xff09;。作为全球领先的云服务平台&#xff0c;AWS为用户提供了丰富的计算、存储和数据库服务。然而&#xff0c;对于许多新用户来说&#xff0c;关于AW…

openresty通过header_filter_by_lua记录特定的请求头和特定的响应头到日志文件

有时我们希望记录特定的请求头信息和特定的响应头信息,以便能够通过关联请求信息和响应头信息,来实现记录请求和响应的对应关系。这里通过逐步尝试和优化的方式进行尝试。具体包括将需要的请求头和响应头组织到一条日志记录,输出到单独的错误日志文件记录等的配置尝试。 1.…

C语言中的文件操作:从基础到深入底层原理

文件操作是几乎所有应用程序的重要组成部分&#xff0c;特别是在系统级编程中。C语言因其高效、灵活以及接近硬件的特点&#xff0c;成为了文件操作的理想选择。本文将全面深入地探讨C语言中的文件操作&#xff0c;从文件系统的概念到具体的文件操作函数&#xff0c;再到底层的…

c++的哈希表、哈希桶的介绍与实现

目录 前言 哈希概念 哈希冲突 哈希函数 哈希冲突解决 闭散列 —— 开放定址法 开散列 —— 链地址法&#xff08;拉链法、哈希桶&#xff09; 哈希表的闭散列实现 哈希表的结构 哈希表的仿函数 哈希表的插入 哈希表的查找 哈希表的删除 哈希表的开散列实现&#xff…

如何查看默认网关地址:详细步骤

在日常的网络配置与故障排查中&#xff0c;了解并正确查看默认网关地址是一项基础且至关重要的技能。默认网关是连接本地网络与外部网络&#xff08;如互联网&#xff09;的关键节点&#xff0c;它扮演着数据包转发的重要角色。无论是家庭网络、办公室网络还是更复杂的网络环境…

MySQL:基于Spring监听Binlog日志

binlog的三种模式 MySQL 的二进制日志&#xff08;binlog&#xff09;有三种不同的格式&#xff0c;通常被称为 binlog 模式。这三种模式分别是 Statement 模式、Row 模式和Mixed 模式。 Statement 模式&#xff1a; 在 Statement 模式下&#xff0c;MySQL 记录每个会更改数…