RabbitMQ 七种工作模式介绍

目录

1.简单模式队列

2.WorkQueue(⼯作队列)

3 Publish/Subscribe(发布/订阅)

4 Routing(路由模式)

5.Topics(通配符模式)

6 RPC(RPC通信)

7 Publisher Confirms(发布确认)


RabbitMQ 共提供了7种⼯作模式供我们进⾏消息传递,接下来一一介绍它的实现与目的

1.简单模式队列

P为生产者  发送信息中间(消息队列)C作为消费者 直接消费消息队列里面的内容

特点:⼀个⽣产者P,⼀个消费者C,消息只能被消费⼀次.也称为点对点(Point-to-Point)模式.

 生产者:

1.创建连接工厂

2.设置工厂参数

3.创建channel

4.声明queue

5 通过channel发送到queue

6. 资源释放

public static void main(String[] args) throws IOException,TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         * Map<String, Object> arguments)
         * 1.队列名称
         * 2.durable 可持久化 true为持久化
         * 3.exclusive 是否独占 false
         * 4. autoDelete 是否自动删除 false
         * arguments 参数
         */
        //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
        channel.queueDeclare("simple",true,false,false,null);
        //5 通过channel发送到queue
        /**
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props,
         * byte[] body)
         * 1.exchange 交换机名称 ,简单情况下 一般默认的情况为""
         * 2.routingKey 路由名称=队列名称
         * 3.props 配置信息
         * 4.body 发现信息的数据
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello 简单队列~"+i;
            channel.basicPublish("","simple", null, msg.getBytes());
        }
        System.out.println("信息发送成功!");
        //6. 资源释放
        channel.close();
        connection.close();
    }

消费者:

1.创建连接工厂

2.设置工厂参数

3.创建channel

4.声明queue

5.消费数据

6.资源释放

public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();

        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         * Map<String, Object> arguments)
         * 1.队列名称
         * 2.durable 可持久化 true为持久化
         * 3.exclusive 是否独占 false
         * 4. autoDelete 是否自动删除 false
         * arguments 参数
         */
        //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
        channel.queueDeclare("simple",true,false,false,null);
        //5.接收信息 并消费
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue :队列名称
         * autoAck:是否自动确认 消费者接收信息与MQ
         * callback :回调对象
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /*
            回调方法 当收到信息 自动执行该方法
            consumerTag
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到信息:"+new String(body));
            }
        };
        channel.basicConsume("simple",true,consumer);
        // 释放资源
        channel.close();
        connection.close();
    }


2.WorkQueue(⼯作队列)

 一个生产者(P) 多个消费者(C) 消息队列会平均分配给消费者

特点:消息不会重复,分配给不同的消费者.
适⽤场景:集群环境中做异步处理

生产者:

跟简单模式类似 或一个队列名称

 public static void main(String[] args) throws IOException,TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //5 通过channel发送到queue
        for (int i = 0; i < 10; i++) {
            String msg = "hello 工作队列~"+i;
            channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
        }
        System.out.println("信息发送成功!");
        //6. 资源释放
        channel.close();
        connection.close();
    }

消费者1:

public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //5.接收信息 并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到信息:"+new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
        // 释放资源
        channel.close();
        connection.close();
    }

 消费者2 也是同样的代码


3 Publish/Subscribe(发布/订阅)

Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

X作为交换机  将消息复制多份 并且发送多个消费者 并且每个消费者收到相同的信息

比如 P发送了10条消息  C1和C2得消费10条信息

适合场景: 消息需要被多个消费者同时接收的场景. 如: 实时通知或者⼴播消息

 

Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失

RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.

Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了.

 

生产者:

    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
        //6. 发布消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello 发布订阅队列~"+i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
        }
        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }

 

 消费者1

public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
    }

消费者2同理 

 


4 Routing(路由模式)

路由模式是发布订阅模式的变种, 在发布订阅基础上, 增加路由key

发布订阅模式是⽆条件的将所有消息分发给所有消费者, 路由模式是Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列
适合场景: 需要根据特定规则分发消息的场景.

⽐如系统打印⽇志, ⽇志等级分为error, warning, info,debug, 就可以通过这种模式,把不同的⽇志发送到不同的队列, 最终输出到不同的⽂件

生产者:

    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
        //6. 发布消息
        String msg_a = "hello 路由队列~ my routingKey is a...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg_a.getBytes());
        String msg_b = "hello 路由队列~ my routingKey is b...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
        String msg_c = "hello 路由队列~ my routingKey is c...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());

        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }

消费者1

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        //5.接收信息 并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到信息:"+new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
    }

消费者 2同理


5.Topics(通配符模式)

 

路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.

Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.

适合场景: 需要灵活匹配和过滤消息的场景

生产者:

public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
        //4. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");
        //6. 发布消息
        String msg_a = "hello 路由队列~ my routingKey is ae.a.f...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg_a.getBytes());
        String msg_b = "hello 路由队列~ my routingKey is ef.a.b...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes());
        String msg_c = "hello 路由队列~ my routingKey is c.ef.d ...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());

        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }

消费者1: 只接收*.a.*

 

 

消费者2: 接收*.*.b c.#

 


6 RPC(RPC通信)

 

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

 

客户端:1.发送请求(携带replyTo,correlationId )2.接收响应(验证correlationId)

服务器:1.接收请求 进行响应 2.发送响应(按客户端指定的replyTo,设置correlationId)

 客户端:

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);
    }

 

客户端:

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);
    }

 


7 Publisher Confirms(发布确认)

消息丢失其中一种情况 ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息
可以采⽤发布确认(Publisher Confirms)机制实现

发送⽅确认机制最⼤的好处在于它是异步的, ⽣产者可以同时发布消息和等待信道返回确认消息.

1. 当消息最终得到确认之后, ⽣产者可以通过回调⽅法来处理该确认消息.

2. 如果RabbitMQ因为⾃⾝内部错误导致消息丢失, 就会发送⼀条nack(Basic.Nack)命令, ⽣产者同样 可以在回调⽅法中处理该nack命令.

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式.
 

发布确认有3种策略, 接下来我们来学习这三种策略

(1) Publishing Messages Individually(单独确认)

    private static void publishingMessagesIndividually() throws Exception {
        try(Connection connection = createConnection()) {
            //1.开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3.声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
            //4.发送消息,并等待确认
            Long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            Long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

(2)Publishing Messages in Batches(批量确认) 

private static void publishingMessagesInBatches() throws Exception {
        try(Connection connection = createConnection()) {
            //1.开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3.声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes());
                outstandingMessageCount++;
                if(outstandingMessageCount == batchSize) {
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if(outstandingMessageCount > 0) {
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

(3)Handling Publisher Confirms Asynchronously(异步确认)

    private static void handlingPublisherConfirmsAsynchronously() throws Exception {
        try(Connection connection = createConnection()) {
            //1.开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3.声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
            //4. 监听confirm
            //集合中存储的是未确认的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple) {
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    } else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple) {
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else  {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
                }
            });
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()) {
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

 消息数越多, 异步确认的优势越明显


 小结:1-5工作模式重点学习 6-7工作模式了解即可

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/907077.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

自动化测试类型与持续集成频率的关系

持续集成是敏捷开发的一个重要实践&#xff0c;可是究竟多频繁的集成才算“持续”集成&#xff1f; 一般来说&#xff0c;持续集成有3种常见的集成频率&#xff0c;分别是每分钟集成、每天集成和每迭代集成。项目组应当以怎样的频率进行集成&#xff0c;这取决于测试策略&…

操作系统期中复习2-4单元

Chapter-2 第一个图形界面——Xerox Alto 早期操作系统&#xff1a;规模小&#xff0c;简单&#xff0c;功能有限&#xff0c;无结构(简单结构)。&#xff08;MS-DOS,早期UNIX&#xff09; 层次结构&#xff1a;最底层为硬件&#xff0c;最高层为用户层&#xff0c;自下而上构…

2-141 怎么实现ROI-CS压缩感知核磁成像

怎么实现ROI-CS压缩感知核磁成像&#xff0c;这个案例告诉你。基于matlab的ROI-CS压缩感知核磁成像。ROI指在图像中预先定义的特定区域或区域集合&#xff0c;选择感兴趣的区域&#xff0c;通过减少信号重建所需的数据来缩短信号采样时间&#xff0c;减少计算量&#xff0c;并在…

Android中同步屏障(Sync Barrier)介绍

在 Android 中&#xff0c;“同步屏障”&#xff08;Sync Barrier&#xff09;是 MessageQueue 中的一种机制&#xff0c;允许系统临时忽略同步消息&#xff0c;以便优先处理异步消息。这在需要快速响应的任务&#xff08;如触摸事件和动画更新&#xff09;中尤为重要。 在 An…

【tomcat系列漏洞利用】

Tomcat 服务器是一个开源的轻量级Web应用服务器&#xff0c;在中小型系统和并发量小的场合下被普遍使用。主要组件&#xff1a;服务器Server&#xff0c;服务Service&#xff0c;连接器Connector、容器Container。连接器Connector和容器Container是Tomcat的核心。一个Container…

【压力测试】如何确定系统最大并发用户数?

一、明确测试目的与了解需求 明确测试目的&#xff1a;首先需要明确测试的目的&#xff0c;即为什么要确定系统的最大并发用户数。这通常与业务需求、系统预期的最大用户负载以及系统的稳定性要求相关。 了解业务需求&#xff1a;深入了解系统的业务特性&#xff0c;包括用户行…

深入理解Redis的四种模式

Redis是一个内存数据存储系统&#xff0c;支持多种不同的部署模式。以下是Redis的四种主要部署模式。 1、单机模式 单机模式是最简单的部署模式&#xff0c;Redis将数据存储在单个节点上。这个节点包括一个Redis进程和一个持久化存储。单机模式非常适合小型应用程序或者开发和…

【多态】析构函数的重写

析构函数的重写&#xff08;面试常见题&#xff09; 基类的析构函数为虚函数&#xff0c;此时派生类析构函数只要定义&#xff0c;⽆论是否加virtual关键字&#xff0c;都与基类的析构函数构成重写。 虽然基类与派⽣类析构函数名字不同看起来不符合重写的规则&#xff0c;实际…

合并区间 leetcode56

合并区间leetcode 目录一、题目二、踩坑过程三、上官方解答四、含泪体会彩蛋 目录 一、题目 二、踩坑过程 一开始想使用一个数组来标记区间&#xff0c;但是仔细想不好实现&#xff0c;单纯把区间里出现的设置为1&#xff0c;不好体现重叠的概念&#xff0c;如果使用三种状态…

机器人领域中的scaling law:通过复现斯坦福机器人UMI——探讨数据规模化定律(含UMI的复现关键)

前言 在24年10.26/10.27两天&#xff0c;我司七月在线举办的七月大模型机器人线下营时&#xff0c;我们带着大家一步步复现UMI「关于什么是UMI&#xff0c;详见此文&#xff1a;UMI——斯坦福刷盘机器人&#xff1a;从手持夹持器到动作预测Diffusion Policy(含代码解读)」&…

MybatisPlus入门(六)MybatisPlus-空值处理

一、MybatisPlus-空值处理 1.1&#xff09;问题引入&#xff1a; 在查询中遇到如下情况&#xff0c;有部分筛选条件没有值&#xff0c;如商品价格有最大值和最小值&#xff0c;商品价格部分时候没有值。 1.2&#xff09;解决办法&#xff1a; 步骤一&#xff1a;新建查询实体…

3.2链路聚合

1、链路聚合手动配置 将交换机S1、S2的GE0/0/1、GE0/0/2口来进行链路聚合。 交换机S1配置命令; [S1]interface eth-trunk 1 [S1-Eth-Trunk1]trunkport GigabitEthernet 0/0/1 to 0/0/2 [S1-Eth-Trunk1]port link-type trunk [S1-Eth-Trunk1]port trunk allow-pass vlan all …

Pinctrl子系统中Pincontroller构造过程驱动分析:imx_pinctrl_soc_info结构体

往期内容 本专栏往期内容&#xff1a; Pinctrl子系统和其主要结构体引入Pinctrl子系统pinctrl_desc结构体进一步介绍Pinctrl子系统中client端设备树相关数据结构介绍和解析 input子系统专栏&#xff1a; 专栏地址&#xff1a;input子系统input角度&#xff1a;I2C触摸屏驱动分析…

基于YOLO11/v10/v8/v5深度学习的维修工具检测识别系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

jenkins搭建及流水线配置

1.安装docker curl https://mirrors.aliyun.com/repo/Centos-7.repo >> CentOS-Base-Aliyun.repomv CentOS-Base-Aliyun.repo /etc/yum.repos.d/yum -y install yum-utils device-mapper-persistent-data lvm2yum-config-manager --add-repo http://mirrors.aliyun.com/…

vue项目安装组件失败解决方法

1.vue项目 npm install 失败 删除node_modules文件夹、package-lock.json 关掉安装对话框 重新打开对话框 npm install

WPF中如何解决DataGrid的Header没有多余的一行

将最后一行设置DataGridTemplateColumn Width"*" 使其自适应

ros与mqtt相互转换

vda5050 VDA5050协议介绍 和 详细翻译-CSDN博客 ros与mqtt相互转换 如何转换的&#xff0c;通过某个中转包&#xff0c;获取ros的消息然后以需要的格式转换为mqtt 需要的参数 ros相关 parameters[ (ros_subscriber_type, vda5050_msgs/NodeState), (ros_subscriber_queue…

一些硬件知识【2024/11/2】

当需要提供功率型的输出信号的时候&#xff0c;可以在信号发生器外接功率放大器&#xff0c;这样可以提高输出功率 信号的调幅&#xff08;AM&#xff09;、调频&#xff08;FM&#xff09;与调相&#xff08;PM&#xff09;&#xff1a; 调制信号&#xff1a;控制高频振荡的低…

YOLO即插即用---PKIBlock

Poly Kernel Inception Network for Remote Sensing Detection 论文地址 1. 解决的问题 2. 解决方案 3. 解决问题的具体方法 4. 模块的应用 5. 在目标检测任务中的添加位置 6.即插即用代码 论文地址 2403.06258https://arxiv.org/pdf/2403.06258 1. 解决的问题 遥感图…