RabbitMQ 应用

在这里插入图片描述

文章目录

  • 前言
  • 1. Simple 简单模式
  • 2. Work Queue 工作队列模式
  • 3. Pubulish/Subscribe 发布/订阅模式
    • Exchange 的类型
  • 4. Routing 路由模式
  • 5. Topics 通配符模式
  • 6. RPC RPC通信
  • 7. Publisher Confirms 发布确认
    • 1. 单独确认
    • 2. 批量确认
    • 3. 异步确认

前言

前面我们学习了 RabbitMQ 的基本使用以及 RabbitMQ 的快速上手,那么这篇文章我将为大家介绍 RabbitMQ 提供的 7 种工作模式,我们上一篇快速入门实现的案例其实就是 7 种工作模式中的简单模式。

第一种:Simple 简单模式
在这里插入图片描述
第二种:Work Queue 工作队列模式
在这里插入图片描述
第三种:Publish/Subscribe 发布/订阅模式
在这里插入图片描述
第四种:Routing 路由模式
在这里插入图片描述
第五种:Topic 通配符模式
在这里插入图片描述
第六种:RPC 模式
在这里插入图片描述

第七种就是 Publisher Confirms 发布确认模式。

1. Simple 简单模式

在这里插入图片描述
简单模式主要由一个 Producer、一个 Queue和一个 Consumer 组成。

简单模式的特点就是:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为点对点(Point-to-Point)模式。

这里的具体实现上一篇文章我就写了,这里就不写了,大家可以去看看。

2. Work Queue 工作队列模式

在这里插入图片描述
工作队列模式由一个生产者 P,一个队列 Queue 和多个消费者 C1、C2…组成,在这种模式下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同的消息,意思就是 Work Queue 接收到了 10 条消息,那么 Work Queue 会将这 10 条消息分成两部分,每个部分 5 条消息,每条消息都不重复,然后将这五条消息分别发送给 C1 和 C2。

特点:消息不会重复,分配给不同的消费者。

适用场景:集群环境中做异步处理。比如我们平时在银行中办理业务取号的时候,当要办理业务的人取号(生产者)了之后,那么这些号码就会被存放在队列中,银行中的每个窗口(消费者)会给不同号的人办理业务。

在这里插入图片描述
那么我们看看通过 Java 代码如何实现一个工作队列模式。

对于这些经常使用到的变量,我们将其归到一个类中进行管理:

public class Constants {
    public static final String IP = "*.*.*.*";
    public static final Integer PORT = 5672;
    public static final String VIRTUAL_HOST = "test";
    public static final String USER_NAME = "admin";
    public static final String PASSWORD = "xxx";
    public static final String WORK_QUEUE = "work.queue";
}

生产者代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "work queue" + i;
            channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
        }
        System.out.println("消息发送成功");
        channel.close();
        connection.close();
    }
}

消费者1 和消费者 2 的代码是一样的:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE,consumer);
        //释放资源
        //channel.close(); 我们这里可以先不释放资源,不然当我们先运行消费者的时候queue中没有消息,consumer的连接就会直接关闭了
        //connection.close();
    }
}

先启动两个消费者,然后再启动生产者:

在这里插入图片描述
在这里插入图片描述

可以看到 Consumer1 和 Consumer2 拿到的消息都是不重复的消息。

3. Pubulish/Subscribe 发布/订阅模式

在这里插入图片描述

Exchange 的类型

可以发现这个模式相较于前面两个模式,多出了一个 X,这个 X 指的是 Exchange 交换机。

交换机的作用是:生产者将消息发送到 Exchange,由交换机将消息按照一定的规则路由到一个或者多个队列中。

AMQP 协议中的交换机的类型有六种:fanout,direct,topic,headers,System和自定义,但是 RabbitMQ 中交换机的类型只有前四种。

  • Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subcribe模式)
  • Direct:定向,把消息交给符合指定 routing key 的队列(Routing 模式)
  • Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topic 模式)
  • headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。headers 类型的交换机性能很差,而且也不实用,基本上不会看到它的存在

Exchaneg 只负责转发消息,不具备存储消息的能力,因此如果没有队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息就会丢失。

  • Routing Key:路由键。生产者将消息发送给转换机的时候,指定的一个字符串,用来告诉交换机应该如何处理这个消息
  • Binding Key:绑定。RabbitMQ 中通过 Binding 将交换机和队列关联起来,在绑定的时候一般会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确的将消息路由到队列了。

在这里插入图片描述
当 Exchange 拿到生产者发送来的消息之后,会将消息中带的 Routing Key 和与该交换机绑定的队列的 Binding Key 进行匹配,然后将这个消息发送给 Routing Key 和 Binding Key 匹配的队列。

当知道了交换机的几种类型之后,我们来看看如何使用代码实现出来一个 Publish/Subscribe 模式。

生产者代码:

首先还是与 RabbitMQ-Server 建立连接,开启信道,与前面不同的操作是,前面我们声明交换机的时候因为使用的是默认的交换机,所以就没有显式的声明交换机,但是在涉及到交换机类型的时候,我们就需要显式的声明交换机,虽然 RabbitMQ 默认为我们提供了各个类型的交换机,但是名字可能不好记,所以不如我们自己实现一个:

在这里插入图片描述

Java 中声明一个交换机的方法主要是 exchangeDeclare() 方法,这个方法有很多个重载方法,但是我们主要使用下面的这种方法:

在这里插入图片描述

AMQP.Exchange.DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;
  • String var1: 这个参数是交换机的名称。它是必须的,用于在RabbitMQ中唯一标识一个交换机。你可以根据需要为这个交换机命名。
  • BuiltinExchangeType var2: 这个参数指定了交换机的类型。该类是一个枚举类,内部枚举了交换器的类型
  • boolean var3: 这个布尔值参数指定交换机是否应该被标记为持久的(即,在RabbitMQ重启后仍然存在)。如果设置为true,交换机将持久化;如果设置为false,交换机则不会持久化。
public enum BuiltinExchangeType {
    DIRECT("direct"),
    FANOUT("fanout"),
    TOPIC("topic"),
    HEADERS("headers");

    private final String type;

    private BuiltinExchangeType(String type) {
        this.type = type;
    }

    public String getType() {
        return this.type;
    }
}

声明完成交换器后,就是声明队列,声明队列之后就是需要绑定交换器和队列了:

绑定交换器和队列使用的方法是 queueBind() 方法,该方法也是有两个重载的方法:

AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;
    
AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;
  • String var1: 队列的名称。这是你想要绑定到交换机的队列的唯一标识符。
  • String var2: 交换机的名称。这是你想要将队列绑定到的交换机的名称。
  • String var3: 路由键。当消息发送到交换机时,交换机将使用路由键来确定哪些队列应该接收这个消息。路由键可以是任何字符串,其解释取决于交换机的类型。
  • Map<String, Object> var4: 绑定参数。这是一个可选参数,允许你为绑定指定额外的参数,这些参数将根据交换机和队列的特定需求进行解释。例如,对于某些交换机类型(如headers交换机),绑定参数可能用于定义消息头中的条件。对于大多数用途,这个参数可能为空或未使用。

我们这里没有使用到额外的参数,所以就使用三个参数的方法:

channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
        //声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //绑定交换机和队列
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
        //生产消息
        for (int i = 0; i < 10; i++) {
            String msg = "fanout exchange" + i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
        }
        System.out.println("消息发送成功");
        channel.close();
        connection.close();
    }
}

消费者代码:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1,consumer);
    }
}

两个消费者的代码基本上都一样,就是声明队列的时候声明的是两个不同的队列。

先启动两个消费者,然后再启动生产者:

在这里插入图片描述
在这里插入图片描述
可以看到,Exchange 将收到的生产者生产的消息复制为 N 份发送给了所有与其绑定的队列,然后消费者拿到的消息就是一样的消息。这就是 Publish/Subscribe 模式。

4. Routing 路由模式

在这里插入图片描述
路由模式其实和发布/订阅模式非常相似,它是在发布订阅模式的基础上,增加了路由 key,其实也不算增加,只不过发布/订阅模式交换器和队列的 Binding key 都是一样的,然后生产者发送的消息中携带的 Routing Key 也是和这些 Binding Key 是相匹配的,所有交换器会将收到的消息发送给所有与其绑定的队列。

而路由模式则是交换器和队列的 Binding Key 并不是完全相同的,而是存在差异,这样当交换器接收到 Routing Key 的时候,就会将消息发送给与之匹配的队列。那么我们看看如何使用代码来实现 路由模式。

生产者代码

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(Constants.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true);
        //声明队列
        channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);
        //绑定交换器和队列
        channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,"a");
        channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,"b");
        channel.queueBind(Constants.ROUTING_QUEUE2,Constants.ROUTING_EXCHANGE,"a");
        //生产消息
        String msg = "routing exchange a";
        channel.basicPublish(Constants.ROUTING_EXCHANGE,"a",null,msg.getBytes());
        msg = "routing exchange b";
        channel.basicPublish(Constants.ROUTING_EXCHANGE,"b",null,msg.getBytes());
        System.out.println("消息发送成功");
        channel.close();
        connection.close();
    }
}

消费者代码:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.ROUTING_QUEUE1,consumer);
    }
}

在这里插入图片描述
在这里插入图片描述

5. Topics 通配符模式

在这里插入图片描述
跟 Java、MySQL 一样,我们的 RabbitMQ 也是支持 通配符的,所以我们的 Routing Key 和 Binding Key 也是可以有通配符的。在 RabbitMQ 中的匹配字符有两个 * 和 #。

  • *匹配一个单词
  • #匹配0个或者多个单词

注意 RabbitMQ 中匹配字符匹配的不是字符,而是单词,RabbitMQ 中由 . 分隔一个单词。a.b.c,a b c 都叫做一个单词,aa.bb.cc,aa bb cc 也就做一个单词。

生产者代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC,true);
        //声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.TOPICS_QUEUE2,true,false,false,null);
        //绑定交换器和队列
        channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"*.a.*");
        channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"c.#");
        channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"*.a.*");
        //生产消息
        String msg = "topics exchange *.a.*";
        channel.basicPublish(Constants.TOPICS_EXCHANGE,"hello.a.r",null,msg.getBytes());
        msg = "topics exchange c.#";
        channel.basicPublish(Constants.TOPICS_EXCHANGE,"c.world",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

消费者代码:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPICS_QUEUE1,consumer);
    }
}

在这里插入图片描述
在这里插入图片描述

6. RPC RPC通信

在这里插入图片描述

在 RPC 通信的过程中,没有生产者和消费者,比较像咱们的 RPC 远程调用,大概就是通过两个队列实现了一个可回调的过程。

在这里插入图片描述
RPC 通信的过程:

  1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了一个回调队列,用于接收服务器的响应
  2. 服务端收到请求后,处理请求并发送响应消息到 replyTo 指定的回调队列
  3. 客户端在回调队列上等待消息,一旦收到响应,客户端会检查消息的 replyTo 属性,以确保它是所期望的响应

RPC 通信客户端代码:

public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换器 我们这是使用默认的交换器
        //生命队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
        //发送请求
        String msg = "rpc...";
        //设置请求的唯一标识
        String correlationId = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .correlationId(correlationId)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,msg.getBytes());
        //接收响应
        //通过阻塞队列来接收响应
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String response = new String(body);
                System.out.println("接收到回调消息:" + response);
                if (correlationId.equals(properties.getCorrelationId())) {
                    blockingQueue.offer(response);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,consumer);
        String result = blockingQueue.take();
        System.out.println("[RPC Client 响应结果]:" + result);
    }
}

RPC 服务端代码:

public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
        //接收请求
        channel.basicQos(1); //这个的作用后面再为大家介绍
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body);
                System.out.println("接收到请求:" + request);
                String response = "针对request:" + request + ",相应成功";
                AMQP.BasicProperties properties1 = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,properties1,response.getBytes());
                //envelope.getDeliveryTag() 每个消息都有一个唯一的deliveryTag
                //false表示是否批量确认消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE,consumer);
    }
}

在这里插入图片描述
在这里插入图片描述

7. Publisher Confirms 发布确认

RabbitMQ的Publisher Confirms(发布确认)机制是一种确保消息从生产者(Publisher)安全发送到RabbitMQ服务器的机制。当生产者向RabbitMQ发送消息时,它可能希望知道消息是否已经被RabbitMQ服务器成功接收并存储起来,以确保消息的可靠性。Publisher Confirms机制就是为了满足这一需求而设计的。

  1. 生产者将 Channel 设置为 confirm 模式 (通过调用 channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态
  2. 当消息被 RabbitMQ 服务器接收并处理之后,服务器会异步的像生产者发送一个确认 ACK 给生产者(包含消息的唯一ID),表明消息已经送达

通过这个 Publisher Confirms 模式,可以避免消息的丢失问题。

消息丢失大概分为三种情况:

  1. 生产者问题,因为程序故障,网络抖动等原因,生产者没有向 Broker 发送消息
  2. 消息中间价问题,也就是我们的 RabbitMQ Broker 出现问题,生产者将消息成功的发送给了 Broker,但是 Broker 没有将消息保存好,导致消息丢失
  3. 消费者问题,Broker 将消息成功发送给了消费者,但是消费者在消费的时候,因为没有处理好,导致消费者这里的消息丢失了,并且 broker 也将消费者失败的消息从队列中删除了

RabbitMQ 对于上面可能出现的三种问题都给出了解决,问题 2 可以通过持久化机制解决,问题 3 可以通过消息应答机制解决。针对问题 1,则可以通过 Publisher Confirms 机制解决。

RabbitMQ 发布确认是 0.9.1 协议的扩展,默认情况下他不会被启用,生产者可以通过 channel.confirmSelect() 将信道设置为 confirm 模式。

RabbitMQ 提供的发布确认有三种策略,那么接下来我们来了解一下这三种策略的优劣。

1. 单独确认

它要求生产者(Publisher)在发送每一个消息后,都等待RabbitMQ服务器的确认(Confirm),确保消息已经被RabbitMQ成功接收并处理,然后再继续发送下一个消息。

2. 批量确认

在 RabbitMQ 的发布确认(Publisher Confirms)机制中,批量确认(Batch Acknowledgment)是一个重要的特性,它允许 RabbitMQ 在一次确认消息中同时确认多个消息。这对于提高性能和减少网络开销非常有帮助。

3. 异步确认

RabbitMQ 发布确认中的异步确认策略是一种高效且可靠的机制,用于在消息发送过程中异步地接收确认回调,以提高生产者的吞吐量和性能。

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 1000;
    static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        return factory.newConnection();
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //Strategy #1: Publishing Messages Individually
        //单独确认
        publishingMessagesIndividually();
        //Strategy #2: Publishing Messages in Batches
        //批量确认
        publishingMessagesInBatches();

        //Strategy #3: Handling Publisher Confirms Asynchronously
        //异步确认
        handlingPublisherConfirmsAsynchronously();
    }

    /**
     * 单独确认
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
        try (Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            //4. 发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }


    /**
     * 批量确认
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);

        }
    }

    /**
     * 异步确认
     * @throws IOException
     * @throws TimeoutException
     */
    private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException {
        try (Connection connection = createConnection()) {
            //开启信道
            Channel channel = connection.createChannel();
            //设置信道为confirm模式
            channel.confirmSelect();
            //声明转换器 这里我们使用默认的转换器
            //声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
            //监听confirm
            //记录开始时间
            long start= System.currentTimeMillis();
            //该集合用来存放未确认的消息的ID
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    //deliveryTag 是消息的唯一ID multiple 表示是否批量确认
                    if (multiple) {
                        //如果是批量确认,则将deliveryTag之前的消息ID都删除了
                        confirmSeqNo.headSet(deliveryTag + 1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    //这里为了简单,当RabbitMQ Broker无法正确处理消息的话,我们也认为它处理了
                    if (multiple) {
                        confirmSeqNo.headSet(deliveryTag + 1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }
            });
            //发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "pulisher confirms" + i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n",MESSAGE_COUNT,end - start);
        }
    }
}

在这里插入图片描述
可以看到单独确认策略所需要的时间是比较多的,而异步策略则能够快速的处理这些。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/872841.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学习笔记--MybatisPlus

官网&#xff1a;MyBatis-Plus &#x1f680; 为简化开发而生 快速入门 入门案例 引入MybatisPlus的起步依赖 定义Mapper 问题&#xff1a; MybatisPlus中Invalid bound statement (not found): com.itheima.mp.mapper.UserMapper.insert 一定要指定实体类&#xff01;&am…

GDB watch starti i files

watch break starti 在程序的最初开始运行的位置处断下来 ​​ i files 查看程序及加载的 so 的 sections ​​

遍历有向网格链路实现

在实际的业务中&#xff0c;我们可能遇到复杂规则&#xff08;多个或与条件组合&#xff09;&#xff0c;复杂链路等类似场景问题&#xff0c;如&#xff1a;规则引擎相关业务&#xff0c;生产任务排期等。 复杂链路示意图如下&#xff1a; 复杂网路链路场景描述 有一个或多…

机器学习如何用于音频分析?

机器学习如何用于音频分析&#xff1f; 一、说明 近十年来&#xff0c;机器学习越来越受欢迎。事实上&#xff0c;它被用于医疗保健、农业和制造业等众多行业。随着技术和计算能力的进步&#xff0c;机器学习有很多潜在的应用正在被创造出来。由于数据以多种格式大量可用&…

EasyExcel实现复杂Excel的导入

最近项目中遇到一个复杂的Excel的导入&#xff0c;并且数据量较大。因为数据不规则&#xff0c;所以只能使用POI进行自定义读取&#xff0c;但是发现数据量大之后&#xff0c;读取数据非常耗时。后面换成EasyExcel&#xff0c;性能起飞。 1. Excel样板 如上图&#xff0c;需要…

USB - 笔记

1.USB接口区分 2 充电宝 图中提到的各种充电协议都是用于快速充电技术的标准,适用于不同品

Chrome 浏览器插件获取网页 window 对象(方案三)

前言 最近有个需求&#xff0c;是在浏览器插件中获取 window 对象下的某个数据&#xff0c;当时觉得很简单&#xff0c;和 document 一样&#xff0c;直接通过嵌入 content_scripts 直接获取&#xff0c;然后使用 sendMessage 发送数据到插件就行了&#xff0c;结果发现不是这…

JAMA network open|自动化定量评估胃肠道肿瘤中三级淋巴结构的机器学习模型|文献精析·24-09-07

小罗碎碎念 这篇文章报道了一种基于机器学习模型的自动化方法&#xff0c;用于在常规组织病理学图像中检测和分类胃肠道癌症中的三级淋巴结构&#xff0c;并验证了其与患者生存预后的关联。 在这项多中心诊断/预后研究中&#xff0c;开发了一种基于机器学习的计算工具&#xff…

【pyhton】python如何实现将word等文档中的文字转换成语音

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

数据结构基本知识

一、什么是数据结构 1.1、组织存储数据 ---------》内存&#xff08;存储&#xff09; 1.2、研究目的 如何存储数据&#xff08;变量&#xff0c;数组....)程序数据结构算法 1.3、常见保存数据的方法 数组&#xff1a;保存自己的数据指针&#xff1a;是间接访问已经存在的…

移远通信高端5G智能模组SG560D-NA率先通过PTCRB认证

近日&#xff0c;移远通信宣布&#xff0c;其基于高通QCM6490平台打造的高端5G智能模组SG560D-NA顺利通过PTCRB认证。 在此之前&#xff0c;该模组还获得了美国FCC和加拿大IC认证&#xff0c;这意味着&#xff0c;其已完全满足北美地区的相关标准和规定&#xff0c;能够支持相关…

【AI大模型应用开发】2.1 Function Calling连接外部世界 - 入门与实战(1)

Function Calling是大模型连接外部世界的通道&#xff0c;目前出现的插件&#xff08;Plugins &#xff09;、OpenAI的Actions、各个大模型平台中出现的tools工具集&#xff0c;其实都是Function Calling的范畴。时下大火的OpenAI的GPTs&#xff0c;原理就是使用了Function Cal…

C++ | Leetcode C++题解之第355题设计推特

题目&#xff1a; 题解&#xff1a; class Twitter {struct Node {// 哈希表存储关注人的 Idunordered_set<int> followee;// 用链表存储 tweetIdlist<int> tweet;};// getNewsFeed 检索的推文的上限以及 tweetId 的时间戳int recentMax, time;// tweetId 对应发送…

828华为云征文 | 华为云Flexus X实例上实现Docker容器的实时监控与可视化分析

Docker容器监控之 CAdvisorInfluxDBGranfana 需要了解 本文章主要讲述在 华为云Flexus X 实例上搭建开源的容器管理平台&#xff0c;使用的Web UI界面来简化和优化容器及集群的管理和监控选择合适的云服务器&#xff1a; 本文采用的是 华为云服务器 Flexus X 实例&#xff08;…

Prefetch文件分析

目录 介绍步骤 介绍 Prefetch&#xff08;预读取&#xff09;&#xff0c;从Windows XP开始引入&#xff0c;用来加速应用程序启动过程。Prefetch包含可执行文件的名称、文件时间戳、运行次数、上次执行时间、Hash等。Win7上记录最近128个可执行文件的信息&#xff0c;Win8-10…

正点原子STM32F103+ESP8266+DS18B20+DHT11连接阿里云

文章目录 MQTT协议1. 基础知识2. 报文形式3. 连接报文4. 心跳报文5. 订阅报文5.1. 订阅主题报文SUBSCRIBE5.2. 订阅确认SUBACK5.3. 取消订阅UNSUBSCRIBE5.4. 取消订阅确认UNSUBACK 6. 发布报文6.1. 发布消息PUBLISH6.2. 发布确认PUBACK 7. 阿里云账号创建8. 网络调试助手接入阿…

Java | Leetcode Java题解之第389题找不同

题目&#xff1a; 题解&#xff1a; class Solution {public char findTheDifference(String s, String t) {int ret 0;for (int i 0; i < s.length(); i) {ret ^ s.charAt(i);}for (int i 0; i < t.length(); i) {ret ^ t.charAt(i);}return (char) ret;} }

Matplotlib 颜色设置详解

在使用matplotlib进行颜色绘制的时候,如绘制图表、背景色或者对文字设置的时候都可以配置颜色, 以下说明主流的三种颜色使用方法 颜色名称 可以是直接使用颜色名称的字符串对color进行赋值,包括可以使用首字母缩写或者完整拼写的形式,以下为部分颜色的书写形式 缩写版 • …

Spring Boot 多数据源配置(JPA)

目录 前言 前置环境 pom yml Entity Dao Config Controller 演示 前言 一般一个系统至少有一个数据源&#xff0c;用来持久化业务数据以及查询。单个数据源的系统很常见&#xff0c;在 Spring Boot 框架下配置也很简单。在约定大于配置这个思想下&#xff0c;只需要在…

递推,CF 353D - Queue

目录 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 二、解题报告 1、思路分析 2、复杂度 3、代码详解 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 353D - Queue 二、解题报告 1、思路分析 手玩一下&#xff0c;我们发现相…