掌握RabbitMQ:全面知识点汇总与实践指南

前言

RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。

特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。

作用:服务间异步通信;顺序消费;定时任务;请求削峰;

1、AMQP协议定义

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输

特性AMQPMQTT
适用场景大型企业级应用、金融交易、云服务物联网、移动应用、智能家居
通信模式生产者-消费者发布-订阅
消息大小较大,适合复杂的消息结构小型,适合简单的消息
QoS 级别支持,但不如 MQTT 精细详细的 QoS 级别,特别是针对 IoT 场景
性能要求对性能有一定要求,但更注重可靠性和安全性极低的带宽消耗和资源占用
安全性强调端到端的安全性支持基本的安全特性,适用于资源受限环境

2、AMQP机制

1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。

  • 生产者发送消息到Broker消息代理服务
  • 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
  • 队列存储消息,直到消费者取走消息
  • 消费者,读取队列中的消息

AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。

类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。

2>AMQP消息传递方式

特性点对点模式 (P2P)发布/订阅模式 (Pub/Sub)
消息传递方式每条消息仅被一个消费者处理一条消息可以被多个消费者同时接收
队列数量单个队列每个消费者有自己的队列
生产者行为直接发送到队列发送到交换器,由交换器负责路由
消费者行为从同一队列中竞争消费各自独立消费自己的队列中的消息
适用场景任务分配、工作流管理广播通知、日志记录、事件驱动架构
扩展性受限于单个队列的吞吐量可以通过增加更多的消费者来提高整体吞吐量
复杂度较低,易于理解和实现需要考虑交换器类型、路由规则等因素,稍微复杂

在这里插入图片描述

  • 1、点对点
    生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
    每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。

竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。

// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());

// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 执行任务...
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});

  • 2、发布订阅
    生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本

(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息

// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());

// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

3、AMQP消息只被消费一次

  • 1、合理配置消息队列ACK机制
    大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
  • 2、合理配置消息队列预取数量
    防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
  • 3、消费者幂等性设计
    针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
    确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
    增加补偿机制,比如退款,退积分等概念的操作
  • 4、分布式锁
    借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。
  • 5、监控告警机制
    监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。
  • 6、事务性消息
    指的是消息和业务操作,一起成功或一起失败的机制。
    (1)本地事务+补偿机制
    (2)二阶段提交
    引入协调者和参与者的概念。
    客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
    (3)三阶段提交
    针对二阶段提交完善事务性消息机制。
    首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。

4、AMQP 消息顺序消费

  • 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
  • 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:channel.basicQos(1);
  • 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
  • 另外使用幂等性设计来避免重复消费。
  • 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
  • 增加监控告警到服务负责人。
  • 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。

5、AMQP消息可靠性

  • 事务支持
    允许一组操作作为一个整体提交或回滚。
  • ACK确认机制
    当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。
  • 持久化选项
    可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。

6、RabbitMQ配置ACK

1>rabbitmq.conf或rabbitmq.ini开启配置

# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on

2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class RabbitMQConsumer {
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列,确保它存在
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 设置预取计数为 1,确保每次只处理一条消息
            channel.basicQos(1);

            // 开启手动确认模式
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 模拟任务处理时间
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送 ACK 确认
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };

            // 开始消费消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}

3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认

// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}

// 开启事务模式
channel.txSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    channel.txCommit();
} catch (Exception e) {
    channel.txRollback();
}

7、RabbitMQ配置协议

1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。

# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]

# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/private_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671

# 设置心跳间隔时间为 60 秒
heartbeat = 60

8、RabbitMQ消息持久化

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PersistentExample {
    private final static String QUEUE_NAME = "persistent_queue";
    private final static String EXCHANGE_NAME = "persistent_exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建持久化的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            // 创建持久化的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 绑定队列到交换器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");

            // 发送持久化的消息
            String message = "Persistent message!";
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 2 表示持久化
                    .build();
            channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
  • 1、持久化队列
    channel.queueDeclare("durable_queue", true, false, false, null);
  • 2、交换器持久化
    确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
    channel.exchangeDeclare("durable_exchange", "direct", true);
  • 3、消息持久化
    delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
    channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());

9、RabbitMQ自动重连

网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态

ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

10、RabbitMQ组件

组件名称说明
Producer生产者负责生成并发送消息的应用程序。
Consumer消费者负责接收并处理消息的应用程序。
Message消息承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。
Exchange交换机用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。
Queue队列存储待处理消息的地方,消费者从中拉取消息进行处理。
Binding绑定定义了交换机和队列之间的关系,包括路由键等参数。
Virtual Host虚拟主机类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。

11、RabbitMQ核心组件交换器和路由键

交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。

消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。

消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。

若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。

交换器说明应用场景
Direct精确匹配路由键只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。
Topic基于通配符模式匹配路由键适用于灵活的消息过滤和多条件匹配。
Fanout广播所有消息给所有绑定的队列适用于需要将相同消息发送给多个消费者的场景。
Headers根据消息头属性进行路由适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。

1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。

  • 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");

// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");

// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class DirectConsumer {
    private final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key
            if (argv.length < 1) {
                System.err.println("Usage: DirectConsumer [info] [warning] [error]");
                System.exit(1);
            }
            for (String severity : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列

  • 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");

// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");

// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class FanoutConsumer {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词

  • 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");

// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");

// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class TopicConsumer {
    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key 模式
            if (argv.length < 1) {
                System.err.println("Usage: TopicConsumer [binding_key_pattern]");
                System.exit(1);
            }
            for (String bindingKey : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。

  • 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");

// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);

// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class HeadersConsumer {
    private final static String EXCHANGE_NAME = "headers_exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "headers");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Headers 匹配规则
            Map<String, Object> headers = new HashMap<>();
            headers.put("user_id", "12345");
            headers.put("order_status", "pending");
            AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

12、RabbitMQ核心方法及参数说明

1>newConnection 创建连接工程并开启连接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();

2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据

信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。

Channel channel = connection.createChannel();

3>exchangeDeclare 交换器声明

channel.exchangeDeclare("my_exchange", "direct", true, false, null);

exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。

4>queueDeclare 队列声明

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。

5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则

channel.queueBind(queueName, "my_exchange", "routing_key");

queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则

6>basicPublish 发布消息
向指定的交换器发布一条消息

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .contentType("text/plain")
        .deliveryMode(2) // 2 表示持久化
        .build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());

exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。

7>basicConsume 消费消息
费来自指定队列的消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用

8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费

channel.basicAck(envelope.getDeliveryTag(), false);

9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃

// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);

13、RabbitMQ镜像集群模式

搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。

参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/949017.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

国内Ubuntu环境Docker部署Stable Diffusion入坑记录

国内Ubuntu环境Docker部署Stable Diffusion入坑记录 本文旨在记录使用dockerpython进行部署 stable-diffusion-webui 项目时遇到的一些问题&#xff0c;以及解决方案&#xff0c;原项目地址: https://github.com/AUTOMATIC1111/stable-diffusion-webui 问题一览&#xff1a; …

SpringBoot3-深入理解自动配置类的原理(尚硅谷SpringBoot3-雷神)

文章目录 目录了解自动配置 一、导入对应场景的Mean依赖&#xff1a;1、引入依赖**找到自动配置类的所有配置都存放在哪里** 二、编写主程序&#xff1a;SpringBootApplication观察源码时所需要知道的几个核心注解&#xff1a;1、观察SpringBootApplication源码都做了什么 三、…

【沉默的羔羊心理学】汉尼拔的“移情”游戏:操纵与理解的艺术,精神分析学视角下的角色互动

终极解读《沉默的羔羊》&#xff1a;弗洛伊德精神分析学视角下的深层剖析 关键词 沉默的羔羊弗洛伊德精神分析学角色心理意识与潜意识性别与身份 弗洛伊德精神分析学简介 弗洛伊德的精神分析学是心理学的一个重要分支&#xff0c;主要关注人类行为背后的无意识动机和冲突。…

字玩FontPlayer开发笔记3 性能优化 大量canvas渲染卡顿问题

字玩FontPlayer开发笔记3 性能优化 大量canvas渲染卡顿问题 字玩FontPlayer是笔者开源的一款字体设计工具&#xff0c;使用Vue3 ElementUI开发&#xff0c;源代码&#xff1a; github: https://github.com/HiToysMaker/fontplayer gitee: https://gitee.com/toysmaker/fontpl…

javaEE-网络编程-3 UDP

目录 Socaket套接字 UDP数据报套字节编程 1.DatagrameSocket类 DatagramSocaket构造方法: DatagramSocaket常用方法&#xff1a; 2.DatagramPacket类 DatagramPacket构造方法&#xff1a; UDP回显服务器实现 UDP服务端实现&#xff1a; 创建一个Socket类对象&#xf…

Linux:操作系统不朽的传说

操作系统是计算机的灵魂&#xff0c;它掌控着计算机的硬件和软件资源&#xff0c;为用户和应用程序提供了一个稳定、高效、安全的运行环境。 在众多操作系统中&#xff0c;Linux 的地位举足轻重。它被广泛应用于服务器、云计算、物联网、嵌入式设备等领域。Linux 的成功离不开…

模拟出一个三维表面生成表面点,计算体积,并处理边界点

python代码 生成表面点,计算体积,并处理边界点,最终模拟出一个三维表面。 步骤: 初始参数设置: initial_fixed_point:一个初始固定点的坐标。 slop_thre:坡度阈值。 v_thre:体积阈值。 slope_rad:将坡度从度转换为弧度。 step_size:步长。 lam_x, lam_y:泊松分布的…

STM32拓展 低功耗案例1:睡眠模式 (register)

需求描述 让MCU进入睡眠模式&#xff0c;然后通过串口发送消息来唤醒MCU退出睡眠模式。观察LED在进入休眠模式后是否仍然开启。 思考 首先睡眠模式&#xff0c;唤醒的条件是中断&#xff0c;外部内部都可以&#xff0c;这里的串口接收中断时内部中断。 拓展&#xff1a;中断…

vue 基础参数增加多语言配置

js 对数组的增删改查 字段在数据库存储为nvarchar &#xff0c;varchar存储波斯语会乱码 数组格式&#xff1a; {"en": [{"type": "10","value": "Confirm","color": ""},{"type": "…

[桌面运维]windows自动设置浅深色主题

设置自动浅色/深色主题 我看很多up主的教程过于繁琐&#xff0c;需要添加四个功能&#xff0c;并且有些还不能生效&#xff01; 大多数都是教程&#xff1a; 自动任务栏浅色 add HKCUSOFTWAREMicrosoftWindowsCurrentVersionThemesPersonalize/v SystemUsesLightTheme /t …

[ubuntu-22.04]ubuntu不识别rtl8153 usb转网口

问题描述 ubuntu22.04插入rtl8153 usb转网口不识别 解决方案 安装依赖包 sudo apt-get install libelf-dev build-essential linux-headers-uname -r sudo apt-get install gcc-12 下载源码 Realtek USB FE / GBE / 2.5G / 5G Ethernet Family Controller Softwarehttps:/…

基于Python的考研学习系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

图漾相机基础操作

1.客户端概述 1.1 简介 PercipioViewer是图漾基于Percipio Camport SDK开发的一款看图软件&#xff0c;可实时预览相机输出的深度图、彩色图、IR红外图和点云图,并保存对应数据&#xff0c;还支持查看设备基础信息&#xff0c;在线修改gain、曝光等各种调节相机成像的参数功能…

Yocto 项目中的包管理系统详细解析

1. 包管理系统概念 包管理系统是用于管理软件包的工具和机制&#xff0c;包括创建、分发和安装软件包。Yocto 项目支持以下三种主要的包管理系统及其相关包格式&#xff1a; IPK (Itsy Package System)&#xff1a;适合轻量级嵌入式应用&#xff0c;通过 OPKG 管理。RPM (Red…

RISC-V学习笔记

1.RISC ISA1个基本整数指令集多个可选的扩展指令集&#xff0c;如RV32I表示支持32位整数指令集。I表示基本指令集&#xff0c;M表示整数乘法与除法指令集&#xff0c;A表示存储器原子指令集&#xff0c;F表示单精度浮点指令集&#xff0c;D表示双精度浮点指令集等&#xff0c;C…

第四届计算机、人工智能与控制工程

第四届计算机、人工智能与控制工程 The 4th International Conference on Computer, Artificial Intelligence and Control Engineering 重要信息 大会官网&#xff1a;www.ic-caice.net 大会时间&#xff1a;2025年1月10-12日 大会地点&#xff1a;中国合肥 (安徽大学磬苑…

Docker安装易有云(casaos安装易有云)

无法拉取易有云&DDNSTO Docker镜像&#xff1f; 官方视频 Docker方式安装易有云&#xff0c;包括并不限于Unraid/爱快/群晖等&#xff0c;只要有Docker的设备都成&#xff0c;包括一些Linux发行版等。 铁威马&#xff1a;首先在应用中心里安装Docker(TOS 4.0及更高的系统…

【计算机视觉技术 - 人脸生成】2.GAN网络的构建和训练

GAN 是一种常用的优秀的图像生成模型。我们使用了支持条件生成的 cGAN。下面介绍简单 cGAN 模型的构建以及训练过程。 2.1 在 model 文件夹中新建 nets.py 文件 import torch import torch.nn as nn# 生成器类 class Generator(nn.Module):def __init__(self, nz100, nc3, n…

手机投屏到电视的3种选择:无线本地投屏,无线远程投屏,AirPlay投屏

现在大部分手机投屏都要求连接相同的WiFi&#xff0c;这就意味着手机投屏到电视必须是近距离投屏&#xff0c;稍微远一点就会脱离WiFi连接范围&#xff0c;投屏失败。 如果想将手机远程投屏到安卓电视&#xff0c;要怎样做&#xff1f; 第一步&#xff0c;在手机和安卓电视都安…

zookeeper 数据类型

文章目录 引言I Znodezonde stat (状态信息)znode类型临时\永久序列化特性引言 在结构上与标准文件系统非常类似,拥有一个层次的命名空间,都是采用树形层次结构 Zookeeper树中的每个节点被称为:Znode,没有文件和目录之分。Znode兼具文件和目录两种特点Znode存储数据大小有…