之前项目中用到RabbitMQ的场景主要是订单信息的传递,还有就是利用RabbitMQ的死信队列属性设置,实现延迟队列效果,实现超时支付取消功能,以及在两个不同项目中传递数据等场景。
最近几年的工作中都是一直用的RabbitMQ,几年前简单的写过一些实践的总结。现在再做个稍微全面一点的总结。算是复习。
浏览器的收藏夹里边保留的有RabbitMQ的管理控制台。之前在2019年写的博客中有讲到安装部署RabbitMQ的教程:消息队列RabbitMQ的使用-CSDN博客
RabbitMQ是一个高级的消息代理(也称为消息队列)。它支持多种消息协议,提供可靠的消息传递服务,并具有易于使用的管理界面。
目录
1.RabbitMQ基本概念
2. RabbitMQ 消息模式
3. RabbitMQ交换机类型
4.RabbitMQ 消息传递保障
5.RabbitMQ消息持久化
6.RabbitMQ 集群与高可用性
7.RabbitMQ 性能优化
8. RabbitMQ 安全性
9. RabbitMQ 管理与监控
10. RabbitMQ 消息协议支持
11.RabbitMQ 消息确认与拒绝
12. RabbitMQ 死信队列
13.RabbitMQ 延迟队列
14.RabbitMQ 集群搭建
15.RabbitMQ 消息追踪
16. RabbitMQ 消息序列化
17.RabbitMQ 与微服务
18.RabbitMQ,RocketMQ与Kafka特点对比
19.Spring Boot整合RabbitMQ
1.RabbitMQ基本概念
概念 | 描述 |
---|---|
消息代理 | RabbitMQ是一个消息代理,用于在分布式系统中传递消息。 |
消息队列 | 消息队列是消息的集合,按照它们被发送的顺序排列。 |
交换机 | 交换机将消息路由到队列。 |
绑定 | 绑定是交换机和队列之间的关联,定义了消息如何从交换机路由到队列。 |
连接 | 连接是客户端和RabbitMQ服务器之间的网络连接。 |
通道 | 通道是连接中的一个虚拟通道,用于消息传递。 |
消费者 | 消费者是一个从队列中接收消息的客户端。 |
生产者 | 生产者是一个向队列发送消息的客户端。 |
2. RabbitMQ 消息模式
模式 | 描述 |
---|---|
简单模式 | 生产者发送消息,消费者接收消息,无需复杂的路由。 |
工作模式 | 多个消费者可以公平地接收同一条消息进行处理。 |
发布/订阅模式 | 消息可以被多个订阅者接收,但每个订阅者只接收一次。 |
路由模式 | 消息根据路由键(routing key)发送到特定的队列。 |
拓扑模式 | 结合了路由模式的特点,可以定义更复杂的路由逻辑。 |
3. RabbitMQ交换机类型
交换机类型 | 描述 | 使用场景 |
---|---|---|
Direct | 根据消息的路由键将消息发送到与之完全匹配的队列。 | 当需要精确匹配路由键时使用。 |
Fanout | 将消息广播到与之绑定的所有队列,忽略路由键。 | 当需要将消息发送到多个队列时使用。 |
Topic | 根据消息的路由键与队列绑定时指定的路由键模式匹配程度,将消息路由到一个或多个队列。 | 当需要根据模式匹配路由键时使用。 |
Headers | 根据消息的标头信息来决定消息的路由。 | 当需要根据消息的标头信息来路由消息时使用。 |
4.RabbitMQ 消息传递保障
保障级别 | 描述 |
---|---|
无保障 | 消息可能会丢失。 |
持久性 | 消息被存储在磁盘上,提高消息的持久性。 |
确认 | 消息确认机制确保消息被正确处理。 |
死信队列 | 消息在无法被消费时,会被发送到死信队列。 |
优先级队列 | 消息根据优先级进行排序,高优先级的消息先被消费。 |
5.RabbitMQ消息持久化
持久化项 | 描述 |
---|---|
队列持久化 | 当队列被声明为持久化时,即使RabbitMQ服务器重启,队列也不会丢失。 |
消息持久化 | 当消息被标记为持久化时,即使RabbitMQ服务器重启,消息也不会丢失。但请注意,持久化消息会降低性能。 |
交换机持久化 | 交换机本身通常不需要持久化,因为交换机只定义了消息路由的逻辑,而不是存储消息。但某些交换机类型(如topic交换机)的绑定可能需要持久化。 |
持久化策略 | 可以设置持久化策略来指定哪些队列应该被持久化,以及何时将内存中的消息持久化到磁盘。 |
事务 | 使用事务机制保证消息传递的原子性。在channel上开启。 |
6.RabbitMQ 集群与高可用性
概念 | 描述 |
---|---|
集群 | 多个RabbitMQ节点协同工作,提供负载均衡和故障转移。 |
镜像队列 | 队列在多个节点上镜像,提高消息的可用性。 |
故障转移 | 当一个节点失败时,其他节点可以接管其工作。 |
持久化队列 | 队列和消息都被持久化存储,确保在节点故障时消息不会丢失。 |
7.RabbitMQ 性能优化
优化策略 | 描述 |
---|---|
预取限制 | 控制消费者一次从队列中获取的消息数量。 |
消息批处理 | 批量处理消息以减少网络开销。 |
索引 | 对队列中的消息进行索引,加快消息检索速度。 |
负载均衡 | 通过集群和镜像队列实现负载均衡。 |
资源监控 | 监控节点和队列的资源使用情况,及时调整配置。 |
8. RabbitMQ 安全性
安全特性 | 描述 |
---|---|
用户认证 | 使用用户名和密码进行认证。 |
访问控制 | 根据用户权限控制对资源的访问。 |
传输层安全 | 使用TLS加密客户端和服务器之间的通信。 |
网络隔离 | 将RabbitMQ部署在受保护的网络区域中。 |
审计日志 | 记录重要的安全事件和系统操作。 |
9. RabbitMQ 管理与监控
管理工具 | 描述 |
---|---|
管理界面 | 提供一个Web界面用于管理RabbitMQ节点和队列。 |
命令行工具 | 使用RabbitMQ提供的命令行工具进行管理。 |
监控插件 | 安装监控插件,如Prometheus,进行性能监控。 |
日志文件 | 查看RabbitMQ的日志文件以了解系统状态和问题诊断。 |
10. RabbitMQ 消息协议支持
协议 | 描述 |
---|---|
AMQP | 高级消息队列协议,RabbitMQ的核心协议。 |
STOMP | 一个简单的文本协议,用于客户端消息传递。 |
MQTT | 轻量级的消息传输协议,适用于物联网场景。 |
HTTP | 通过HTTP协议进行消息传递。 |
11.RabbitMQ 消息确认与拒绝
机制 | 描述 |
---|---|
手动确认 | 消费者在处理完消息后手动确认。 |
自动确认 | 消费者接收到消息后自动确认。 |
消息拒绝 | 消费者无法处理消息时,可以选择拒绝该消息。 |
幂等性 | 确保消息即使重复传递也不会被多次处理。 |
12. RabbitMQ 死信队列
死信队列配置 | 描述 |
---|---|
死信队列 | 当消息无法被消费时,会被发送到死信队列。 |
死信路由 | 定义消息成为死信的条件,如TTL(生存时间)过期。 |
死信交换机 | 死信队列可以绑定到特定的交换机。 |
13.RabbitMQ 延迟队列
延迟队列配置 | 描述 |
---|---|
延迟消息 | 消息在指定的延迟时间后才会变为可用。 |
延迟插件 | 使用延迟插件来实现延迟队列。或者利用死信队列属性特点,实现延迟队列的效果 |
定时任务 | 使用外部定时任务触发延迟消息的可用性。 |
14.RabbitMQ 集群搭建
集群组件 | 描述 |
---|---|
节点 | RabbitMQ集群中的一个独立服务器。 |
集群形成 | 通过特定的协议和端口使节点相互发现和通信。 |
集群形成 | 通过特定的协议和端口使节点相互发现和通信。 |
镜像队列 | 队列在多个节点上进行镜像,以提高可用性和负载均衡。 |
故障转移 | 集群中的节点可以自动接管失败节点的任务。 |
元数据同步 | 集群中的节点会同步元数据,以保持一致性。 |
15.RabbitMQ 消息追踪
追踪机制 | 描述 |
---|---|
消息ID | 每条消息都有一个唯一的ID,用于追踪。 |
插件 | 使用如rabbitmq_tracing 插件来追踪消息。 |
日志文件 | 查看RabbitMQ的日志文件,了解消息的传递过程。 |
管理界面 | 通过RabbitMQ的管理界面查看消息状态。 |
16. RabbitMQ 消息序列化
序列化格式 | 描述 |
---|---|
JSON | 轻量级的数据交换格式,易于人阅读和编写。 |
XML | 标记语言,用于存储和传输数据。 |
MessagePack | 高效的二进制格式,比JSON更紧凑。 |
Protobuf | 由Google开发的高性能序列化框架。 |
17.RabbitMQ 与微服务
微服务应用 | 描述 |
---|---|
服务解耦 | 微服务之间通过RabbitMQ传递消息,实现解耦。 |
异步处理 | 服务通过RabbitMQ异步处理任务,提高效率。 |
流量削峰 | 使用RabbitMQ缓存消息,平滑处理高流量。 |
服务发现 | 服务通过RabbitMQ实现发现和通信。 |
18.RabbitMQ,RocketMQ与Kafka特点对比
特性/中间件 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
官网 | rabbitmq.com | rocketmq.apache.org | kafka.apache.org |
开发语言 | Erlang | Java | Scala/Java |
消息模型 | 队列、发布/订阅 | 队列、发布/订阅、事务、顺序消息 | 队列、发布/订阅、流处理 |
持久性 | 支持 | 支持 | 支持 |
路由功能 | 支持 | 支持 | 有限支持 |
集群支持 | 支持 | 支持 | 支持 |
事务 | 支持 | 支持 | 不支持 |
消息确认 | 支持 | 支持 | 支持 |
死信队列 | 支持 | 支持 | 支持 |
优先级队列 | 支持 | 支持 | 支持 |
消息追踪 | 插件支持 | 支持 | 社区插件支持 |
消息大小限制 | 约2MB | 4~8MB | 约1MB |
性能 | 高吞吐量,低延迟 | 高吞吐量,低延迟 | 极高吞吐量,延迟较高 |
使用场景 | 通用消息队列 | 金融级消息队列 | 大数据实时处理 |
社区活跃度 | 高 | 高 | 非常高 |
事务性消息 | 支持 | 支持 | 不支持 |
顺序消息 | 不原生支持 | 支持 | 不原生支持 |
批处理能力 | 一般 | 一般 | 优秀 |
数据倾斜处理 | 一般 | 优秀 | 优秀 |
跨机房同步 | 支持 | 支持 | 支持 |
注意,上表中提到的“消息大小限制”是一个大致的数字,具体限制可能会因配置和中间件的版本不同而有所变化。此外,每个中间件的具体特性和最佳实践可能随时间而发展,因此建议查阅最新的官方文档以获得最准确的信息。
19.Spring Boot整合RabbitMQ
下边用一个示例,演示利用死信队列的特性实现延迟队列的效果;
19.1 添加RabbitMQ依赖
首先,确保你的Spring Boot项目中已经添加了RabbitMQ的依赖。如果你使用Maven,可以在pom.xml
文件中添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
19.2 配置RabbitMQ连接信息
在application.properties
或application.yml
中配置RabbitMQ连接信息:
spring.rabbitmq.addresses=your_rabbitmq_server_address
spring.rabbitmq.username=your_username
spring.rabbitmq.password=your_password
spring.rabbitmq.virtual-host=your_virtual_host
19.3 定义死信队列和延迟队列
其实没有专门的queue类型是死信队列,都是普通的queue类型,只是在普通的queue中设置了x-dead-letter-exchange,然后指定另一个队列来接收这个队列超时未消费的消息,接收的那个队列就称作死信队列,这个设置了超时的队列,可以暂且叫它延迟队列吧。死信队列(DLQ)是用于存储无法被正常消费的消息。可以创建一个普通的队列作为延迟队列,然后配置其TTL(生存时间)和死信交换机,使得当消息在延迟队列中生存时间超过TTL后,消息会被发送到死信队列。
@Bean
public Queue delayQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "deadLetterExchange");
arguments.put("x-dead-letter-routing-key", "deadLetterRoutingKey");
arguments.put("x-message-ttl", 5000); // 设置消息的TTL为5000毫秒
return new Queue("delayQueue", true, false, false, arguments);
}
@Bean
public Queue deadLetterQueue() {
return new Queue("deadLetterQueue", true, false, false);
}
@Bean
public Exchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange", true, false);
}
19.4绑定死信队列
将死信队列绑定到死信交换机,并指定路由键:
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, Exchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("deadLetterRoutingKey");
}
19.5 发送消息到延迟队列
发送消息到delayQueue
,消息将被存储直到TTL过期:
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message) {
rabbitTemplate.convertAndSend("delayQueue", message);
}
19.6 消费死信队列中的消息
创建一个消费者监听死信队列,以处理过期的消息:
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "deadLetterQueue")
public void receiveDeadLetter(String message) {
// 处理死信消息
System.out.println("Received dead letter message: " + message);
}
}
通过以上步骤,在Spring Boot应用中整合RabbitMQ,并通过死信队列特性实现延迟队列的效果。消息在延迟队列中超过预设的TTL后,将自动成为死信,并被发送到配置的死信队列中,从而实现了延迟处理消息的需求。
个人感觉RabbitMQ比RocketMQ和Kafka要简单,而且管理起来也方便,而且RabbitMQ性能也不错,而且功能也能满足绝大部分企业项目要求,可以作为首选。