专栏导航
RabbitMQ入门指南
从零开始了解大数据
目录
专栏导航
前言
一、MQ数据持久化
1.交换机持久化
2.队列持久化
3.消息持久化
4.生产者确认机制
二、LazyQueue
1.LazyQueue模式介绍
2.管理控制台配置Lazy模式
3.代码配置Lazy模式
4.更新已有队列为lazy模式
总结
前言
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
一、MQ数据持久化
在消息队列(MQ)中,数据的持久化至关重要。如果MQ不能及时保存消息,可能会导致数据丢失。为了确保数据的可靠性,必须配置数据持久化。
1.交换机持久化
在RabbitMQ管理控制台新建交换机时可以配置交换机的Durability参数:
- Durable:持久化模式
- Transient:临时模式
代码声明交换机时,默认为持久化模式,DirectExchange源码:
public DirectExchange(String name) {
super(name);
}
public DirectExchange(String name, boolean durable, boolean autoDelete) {
super(name, durable, autoDelete);
}
public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(name, durable, autoDelete, arguments);
}
AbstractExchange源码(DirectExchange extends AbstractExchange):
public AbstractExchange(String name) {
this(name, true, false);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, (Map)null);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(arguments);
this.name = name;
this.durable = durable;
this.autoDelete = autoDelete;
}
2.队列持久化
在RabbitMQ管理控制台新建队列时可以配置队列的Durability参数:
- Durable:持久化模式
- Transient:临时模式
代码声明队列时,默认为持久化模式,源码如下:
public Queue(String name) {
this(name, true, false, false);
}
public Queue(String name, boolean durable) {
this(name, durable, false, false, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) {
super(arguments);
Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
}
3.消息持久化
在控制台发送消息的时候,可以通过配置消息的属性来实现消息的持久化。消息持久化是将消息保存在磁盘上,即使MQ重启,消息也不会丢失。在发送消息时,可以选择Delivery mode参数来配置消息的持久化属性。
查看消息:
4.生产者确认机制
在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执给生产者,进一步确保消息的可靠性。为了减少IO次数并提高性能,MQ并不会逐条将消息持久化到数据库,而是每隔一段时间批量进行持久化。因此,建议生产者确认全部采用异步方式,以避免ACK的延迟。
二、LazyQueue
1.LazyQueue模式介绍
在 RabbitMQ 的默认设置中,为了降低消息收发的延迟,它会将接收到的信息保存在内存中。然而,在某些特殊情况下,这种设置可能会导致消息积压。以下是几种可能导致消息积压的情况:
- 消费者宕机或出现网络故障:当消费者无法正常处理消息时,RabbitMQ 将无法将消息传递给消费者,从而导致消息积压。
- 消息发送量激增,超过了消费者处理速度:当消息发送速度超过消费者的处理速度时,消息将在 RabbitMQ 的队列中积压。
- 消费者处理业务发生阻塞:如果消费者在处理消息时遇到业务阻塞,例如等待某些资源或进行耗时的操作,那么它可能会花费更长的时间来处理消息,从而导致消息积压。
当消息堆积问题出现时,RabbitMQ 的内存占用会逐渐增加,直到触发内存预警上限。此时,RabbitMQ 将开始将内存中的消息刷写到磁盘上,这个过程称为“PageOut”。“PageOut”过程会耗费一定的时间,并且会阻塞队列进程。因此,在这个过程中,RabbitMQ 将无法处理新的消息,导致生产者的所有请求都被阻塞。
为了解决这个问题,从 RabbitMQ 3.6.0 版本开始,引入了 Lazy Queues(惰性队列)模式。惰性队列具有以下特征:
- 接收到消息后直接存入磁盘而非内存:在惰性队列中,消息在接收到后不会立即加载到内存中,而是直接存储在磁盘上。
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载):当消费者需要消费消息时,它才会从磁盘中读取消息并加载到内存中。这样可以避免在消费者不活跃时浪费内存资源。
- 支持数百万条的消息存储:惰性队列支持大量的消息存储,即使在没有内存限制的情况下也能处理大量的消息。
在RabbitMQ 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。这种模式可以有效地解决消息积压问题,提高 RabbitMQ 的性能和稳定性。
2.管理控制台配置Lazy模式
在新建队列的时候,添加x-queue-mode=lazy参数设置队列为Lazy模式(RabbitMQ 3.12 版本之前):
3.代码配置Lazy模式
在使用Spring AMQP声明队列的时候,添加x-queue-mod=lazy参数设置队列为Lazy模式:
@Bean
public Queue lazyQueue() {
// .lazy()开启Lazy模式
return QueueBuilder.durable("lazy.queue").lazy().build();
}
源码如下:
public QueueBuilder lazy() {
return this.withArgument("x-queue-mode", "lazy");
}
基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")))
public void listenLazyQueue(String msg){
log.info("lazy.queue:{}", msg);
}
4.更新已有队列为lazy模式
基于命令行设置policy实现将已有队列更新为lazy模式:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
rabbitmqctl | RabbitMQ的命令行工具 |
set_policy | 添加一个策略 |
Lazy | 策略名称 |
"^lazy-queue$" | 用正则表达式匹配队列的名字 |
'{"queue-mode":"lazy"}' | 设置队列模式为lazy模式 |
--apply-to queues | 策略的作用对象,是所有的队列 |
管理控制台配置policy(RabbitMQ 3.12 版本之前):
总结
RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容,希望对大家有所帮助。