前言
RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。
特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。
作用:服务间异步通信;顺序消费;定时任务;请求削峰;
1、AMQP协议定义
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输
特性 | AMQP | MQTT |
---|---|---|
适用场景 | 大型企业级应用、金融交易、云服务 | 物联网、移动应用、智能家居 |
通信模式 | 生产者-消费者 | 发布-订阅 |
消息大小 | 较大,适合复杂的消息结构 | 小型,适合简单的消息 |
QoS 级别 | 支持,但不如 MQTT 精细 | 详细的 QoS 级别,特别是针对 IoT 场景 |
性能要求 | 对性能有一定要求,但更注重可靠性和安全性 | 极低的带宽消耗和资源占用 |
安全性 | 强调端到端的安全性 | 支持基本的安全特性,适用于资源受限环境 |
2、AMQP机制
1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。
- 生产者发送消息到Broker消息代理服务
- 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
- 队列存储消息,直到消费者取走消息
- 消费者,读取队列中的消息
AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。
类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。
2>AMQP消息传递方式
特性 | 点对点模式 (P2P) | 发布/订阅模式 (Pub/Sub) |
---|---|---|
消息传递方式 | 每条消息仅被一个消费者处理 | 一条消息可以被多个消费者同时接收 |
队列数量 | 单个队列 | 每个消费者有自己的队列 |
生产者行为 | 直接发送到队列 | 发送到交换器,由交换器负责路由 |
消费者行为 | 从同一队列中竞争消费 | 各自独立消费自己的队列中的消息 |
适用场景 | 任务分配、工作流管理 | 广播通知、日志记录、事件驱动架构 |
扩展性 | 受限于单个队列的吞吐量 | 可以通过增加更多的消费者来提高整体吞吐量 |
复杂度 | 较低,易于理解和实现 | 需要考虑交换器类型、路由规则等因素,稍微复杂 |
- 1、点对点
生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。
竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。
// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());
// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 执行任务...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});
- 2、发布订阅
生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本
(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息
// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());
// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
3、AMQP消息只被消费一次
- 1、合理配置消息队列ACK机制
大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
- 2、合理配置消息队列预取数量
防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
- 3、消费者幂等性设计
针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
增加补偿机制,比如退款,退积分等概念的操作 - 4、分布式锁
借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。 - 5、监控告警机制
监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。 - 6、事务性消息
指的是消息和业务操作,一起成功或一起失败的机制。
(1)本地事务+补偿机制
(2)二阶段提交
引入协调者和参与者的概念。
客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
(3)三阶段提交
针对二阶段提交完善事务性消息机制。
首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。
4、AMQP 消息顺序消费
- 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
- 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:
channel.basicQos(1);
- 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
- 另外使用幂等性设计来避免重复消费。
- 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
- 增加监控告警到服务负责人。
- 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。
5、AMQP消息可靠性
- 事务支持
允许一组操作作为一个整体提交或回滚。 - ACK确认机制
当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。 - 持久化选项
可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。
6、RabbitMQ配置ACK
1>rabbitmq.conf或rabbitmq.ini开启配置
# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on
2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列,确保它存在
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置预取计数为 1,确保每次只处理一条消息
channel.basicQos(1);
// 开启手动确认模式
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟任务处理时间
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
} finally {
System.out.println(" [x] Done");
// 手动发送 ACK 确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开始消费消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认
// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
if (!channel.waitForConfirmsOrDie(timeout)) {
// 处理未确认的消息...
}
} catch (Exception e) {
// 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
if (!channel.waitForConfirmsOrDie(timeout)) {
// 处理未确认的消息...
}
} catch (Exception e) {
// 处理异常情况...
}
// 开启事务模式
channel.txSelect();
try {
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
7、RabbitMQ配置协议
1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。
# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]
# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/private_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671
# 设置心跳间隔时间为 60 秒
heartbeat = 60
8、RabbitMQ消息持久化
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class PersistentExample {
private final static String QUEUE_NAME = "persistent_queue";
private final static String EXCHANGE_NAME = "persistent_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建持久化的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 创建持久化的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");
// 发送持久化的消息
String message = "Persistent message!";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 表示持久化
.build();
channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
- 1、持久化队列
channel.queueDeclare("durable_queue", true, false, false, null);
- 2、交换器持久化
确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
channel.exchangeDeclare("durable_exchange", "direct", true);
- 3、消息持久化
delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());
9、RabbitMQ自动重连
网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
10、RabbitMQ组件
组件 | 名称 | 说明 |
---|---|---|
Producer | 生产者 | 负责生成并发送消息的应用程序。 |
Consumer | 消费者 | 负责接收并处理消息的应用程序。 |
Message | 消息 | 承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。 |
Exchange | 交换机 | 用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。 |
Queue | 队列 | 存储待处理消息的地方,消费者从中拉取消息进行处理。 |
Binding | 绑定 | 定义了交换机和队列之间的关系,包括路由键等参数。 |
Virtual Host | 虚拟主机 | 类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。 |
11、RabbitMQ核心组件交换器和路由键
交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。
消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。
消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。
若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。
交换器 | 说明 | 应用场景 |
---|---|---|
Direct | 精确匹配路由键 | 只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。 |
Topic | 基于通配符模式匹配路由键 | 适用于灵活的消息过滤和多条件匹配。 |
Fanout | 广播所有消息给所有绑定的队列 | 适用于需要将相同消息发送给多个消费者的场景。 |
Headers | 根据消息头属性进行路由 | 适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。 |
1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。
- 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");
// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");
// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
- 消费者
import com.rabbitmq.client.*;
public class DirectConsumer {
private final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到 Exchange,并指定 Binding Key
if (argv.length < 1) {
System.err.println("Usage: DirectConsumer [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列
- 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");
// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
- 消费者
import com.rabbitmq.client.*;
public class FanoutConsumer {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到 Exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词
- 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");
// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");
// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
- 消费者
import com.rabbitmq.client.*;
public class TopicConsumer {
private final static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到 Exchange,并指定 Binding Key 模式
if (argv.length < 1) {
System.err.println("Usage: TopicConsumer [binding_key_pattern]");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。
- 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");
// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);
// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
- 消费者
import com.rabbitmq.client.*;
public class HeadersConsumer {
private final static String EXCHANGE_NAME = "headers_exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "headers");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
12、RabbitMQ核心方法及参数说明
1>newConnection 创建连接工程并开启连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据
信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。
Channel channel = connection.createChannel();
3>exchangeDeclare 交换器声明
channel.exchangeDeclare("my_exchange", "direct", true, false, null);
exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。
4>queueDeclare 队列声明
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。
5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则
channel.queueBind(queueName, "my_exchange", "routing_key");
queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则
6>basicPublish 发布消息
向指定的交换器发布一条消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2) // 2 表示持久化
.build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());
exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。
7>basicConsume 消费消息
费来自指定队列的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用
8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费
channel.basicAck(envelope.getDeliveryTag(), false);
9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃
// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
13、RabbitMQ镜像集群模式
搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。
参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao