目标:
了解熟悉RabbitMQ的高级特性
学习步骤:
高级特性主要分为以下几点, 官网介绍
1、消息可靠性投递 【confirm 确认模式、return 退回模式】2、Consumer ACK 【acknowledge】
3、消费端限流 【prefetch】
4、TTL过期时间 【time to live】
5、死信队列 【Dead Letter Exchange】
6、延迟队列 【rabbitmq-delayed-message-exchange】
7、优先级队列 【x-max-priority】
前戏:项目搭建
1、创建两个module,一个为生产者,一个为消费者
分别添加如下依赖【或者将依赖放置在父工程下,两个module作为子工程引用即可】
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2、 配置RabbitMQ 的基本信息 [application.yml]
spring:
rabbitmq:
host: 服务器IP
port: 5672 # 端口默认为 5672
username: guest # 默认账号有guest 密码一致
password: guest
virtual-host: /
3、编写配置类RabbitMQConfig,注册队列、交换机、以及绑定关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "csnz_queue";
public static final String EXCHANGE_NAME = "csnz_exchange";
// 1、注册队列
@Bean("CSNZQueue")
public Queue getQueue(){
// 使用QueueBuilder构建一个队列,设置队列持久化,以及自动删除。
return QueueBuilder.durable(QUEUE_NAME).autoDelete().build();
}
// 2、注册交换机
@Bean("CSNZExchange")
public Exchange getExchange(){
// 使用ExchangeBuilder构建一个交换机(类型可选,此处为通配符交换机),设置持久化和自动删除
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
}
// 3、绑定队列和交换机
@Bean("CSNZBind")
public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
// 使用BindingBuilder 将刚刚声明的队列和交换机绑定并设置绑定的路由key
return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
}
}
一、消息可靠性投递
RabbitMQ提供了两种模式来控制消息的投递
(生产者发送的)可靠性
- confirm 确认模式
- return 退回模式
因为消息投递过程是从 生产者 到 Broker[exchange -> queue] 再到 消费者的
两种模式过程:
-
1:message从producer到exchange成功时会返回一个 confirmCallback
-
2:message从exchange到queue失败时会返回一个 returnCallback
只要利用这俩个callback就可以控制消息的 可靠性投递了
demo演示
确认模式:
1、在配置中开启 publisher-confirms 为 true
2、在rabbitTemplate定义 confirmCallBack 回调函数
/*
确认模式:
1、在配置中开启 publisher-confirms为true
2、在rabbitTemplate定义confirmCallBack回调函数
*/
@Test
public void testConfirm(){
// 定义confirmCallBack回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
/*
CorrelationData:相关的配置信息【在convertAndSend重载方法中有包含此信息】
ack;exchange交换机,是否成功收到了信息。
cause:失败原因。如果成功接收则此值为null
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息成功发送");
}else{
System.out.println("发送失败原因:" + cause);
// 重新发起或其他操作
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testConfirm","测试确认回调模式");
}
此时,如果发送消息时指定的交换机和路由键都是正确的,即代码块第27行参数正确,则第17行的 ack值为true,执行 消息成功发送到交换机后的逻辑代码
这里发送失败就是因为我们把交换机的名称写错了,换成正确的交换机名称就好
回退模式:
当消息发送给Exchange后,Exchange路由到queue失败时才会执行 ReturnCallBack
1、在配置中开启 publisher-returns 为 true
2、设置ReturnCallBack
3、设置Exchange处理消息的模式:
@Test
public void testReturn(){
// 设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
// 设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
/*
Message:消息对象
replyCode:错误码
replyText:错误信息
exchange:交换机
routingKey:路由键
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"error.testReturn","测试回退模式");
}
此时,如果发送消息时指定的交换机和路由键发生找不到的情况,即代码块第20行参数错误
情况一:没写第四行设置交换机处理失败消息的模式为true,则不会执行第15行的代码,因为消息失败默认是丢弃模式
因为默认此值就是false
情况二:设置交换机处理失败消息的模式为true,则会执行第15行的代码块,消息发送失败时,消息会通过回调返回,此时就可以查看消息发送失败的具体原因
二、Consumer Ack
Ack为Acknowledge,顾名思义,指的是消费者
收到消息后的确认模式。
分为三种模式
- 自动确认:acknowledge=“none”(默认)
- 当消息一旦被consumer接收了,则自动确认收到,并移除消息缓存中的信息
- 手动确认:acknowledge=“manual”
- 手动ACK
- 手动NACK
- 根据异常情况判断是否确认:acknowledge=“auto”
实际业务情况:
一般不会使用 自动确认模式,因为收到消息后,很可能在进行业务处理时出现异常,造成数据丢失。真就啪一下没了。
一般都是使用 手动确认模式
- 即在业务处理成功后,调用 channel.basicAck() 进行手动确认,会发送给 broker 一个应答,代表消息处理成功。
- 如果在进行业务处理时发生异常,则调用 channel.basicNack() 方法【如果设置了重回队列,broker 就会将没有成功处理的消息重新发送。否则将该消息从队列中剔除】。
demo演示
自动确认模式:
1、定义一个监听器:AckListener 实现 MessageListener 接口
2、在onMessage方法上绑定要监听的队列
@Component
public class AckListener implements MessageListener {
@Override
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void onMessage(Message message) throws Exception {
System.out.println(new String(message.getBody()));
}
}
测试
手动确认模式:
1、设置手动接收:acknowledge-mode: manual
2、定义一个监听器:AckListener 实现 ChannelAwareMessageListener 接口 :(因为此接口才有返回channel参数)
3、在onMessage方法上绑定要监听的队列
4、消息成功处理:调用 channel.basicAck() 接收
5、消息处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer
application.yml配置文件
# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
rabbitmq:
host: 服务器IP
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
监听器:AckListener
@Component
public class AckListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener
@Override
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000); // 模拟业务时间
// 传递标签:该字段为MQ server 用于消息确认的标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收转换消息
System.out.println(new String(message.getBody()));
// 2、处理业务逻辑
Thread.sleep(1000); // 模拟业务时间
// 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
// ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
}
}
}
测试正常情况下的手动接收代码:
测试异常情况下的手动接收代码(在处理业务逻辑时加上错误即可):可以看见消息一直被重新入队进行消费
三、消费端限流
限流机制
- 设置手动接收:acknowledge-mode: manual
- 设置消费端每次消费消息的条数
- prefetch = 1
- 表示消费端每次从MQ拉取一条消息来消费,直至手动确认消费完毕后,才会继续拉取下一条数据
- 监听器类实现 ChannelAwareMessageListener 接口
- 消息成功处理:调用 channel.basicAck() 接收
- 处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer
demo演示
监听器类LimitListener
@Component
public class LimitListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener
@Override
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000); // 模拟业务时间
// 传递标签:该字段为MQ server 用于消息确认的标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收转换消息
System.out.println(new String(message.getBody()));
// 2、处理业务逻辑
Thread.sleep(1000); // 模拟业务时间
// int i = 1/0;
// 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
// ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
}
}
}
消费端:application.yml配置文件
# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
rabbitmq:
host: 服务器IP
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
direct:
prefetch: 1
生产者测试代码
循环发送数据
@Test
public void testSend(){
// 设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
// 设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
/*
Message:消息对象
replyCode:错误码
replyText:错误信息
exchange:交换机
routingKey:路由键
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(message);
}
});
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testSend","测试限流-手动接收:"+i);
}
}
消息发送完毕后,观察MQ工作台
消费者测试代码
@Test
public void testLimitAck(){
System.out.println("执行 限流Ack模式");
while(true){
}
}
观察消费端控制台打印
可以看到消息是一条一条消费的(设置了业务时间为2秒),这就是所谓的限流消费。
四、TTL
TTL 称为 time to live,也就是存活时间,也称过期时间
它的用处是当消息到达存活时间后,如果还没有被消费掉,则会自动清除。此用途也经常被用来做订单的延时付款。
- 可以对消息设置过期时间(使用参数:expiration,单位:毫秒,且当该消息在队列头部时,才会单独判断这一消息是否过期)
- 可以对整个队列所有消息设置过期时间(时间一到,队列内全部消息清空,使用参数:x-message-ttl,单位:毫秒)
- 如果两者都进行了设置,则以时间短的为主。(因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的)
且RabbitMQ不保证消息会在精确的TTL时间后立即被删除。这是因为RabbitMQ使用一种基于时间戳的方式来检查消息的过期时间,并且该方式是有一定的误差的
问题一:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较短,那么是A先过期还是消息总体一起过期,A消息的过期时间是否会被设置的总体过期时间所覆盖
答:
A 消息会先过期,而不是队列中所有消息一起过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当 A 消息过期时,它会被从队列中删除,而不受队列总体消息过期时间的影响。队列总体消息过期时间只会影响那些没有设置过期时间的消息。因此,A 消息的过期时间不会被设置的总体过期时间所覆盖。
问题二:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较长,那么A会不会比队列中的其他消息后过期
答:
A 消息会比队列中的其他消息后过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当队列中的消息的过期时间早于 A 消息的过期时间时,这些消息会先被删除,而 A 消息会继续存在于队列中,直到其过期时间到达后才会被删除。因此,A 消息会比队列中的其他消息后过期。
问题三:rabbitmq设置队列过期时间 和 设置队列消息过期时间的区别
答:
设置队列过期时间,是指在队列空闲一段时间之后(即没有消费者消费该队列,也没有新消息进入该队列),队列会自动被删除。这个过期时间是应用于整个队列的,而不是具体某一条消息。
设置队列消息过期时间,是指在每一条消息入队时,设置消息的过期时间,当消息在队列中等待时间超过其过期时间时,该消息会被自动删除。这个过期时间是应用于具体某一条消息的,而不是整个队列。
设置队列消息过期使用x-message-ttl
参数,而设置队列过期使用x-expires
参数
在实际应用中,根据不同的需求,我们可以选择设置队列过期时间或设置队列消息过期时间。如果我们希望在一段时间内没有消费者消费该队列时,自动删除该队列,那么可以设置队列过期时间。如果我们希望在一条消息在队列中存活的时间超过一定时间后自动被删除,那么可以设置队列消息过期时间。
demo演示
1、单独对一个消息设置过期时间,用到MessagePostProcessor后置处理器
@Test
public void testOneTtl(){
// 消息后置处理器,可以设置一些参数
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置message的信息
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:",processor);
}
返回观察rabbitMQ控制台,是已经进了1条数据了
让我们等他个5秒,看他拜拜了没
2、对整个队列所有消息设置过期时间, 首先需要修改我们之前声明队列的地方(生产者),给它加点参数
在这里新增了一个args参数,作用是声明队列消息TTL以及过期时间
// 1、注册队列
@Bean("CSNZQueue")
public Queue getQueue(){
// 设置过期时间参数
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
其他的就不需要修改了,直接写测试
@Test
public void testTtl(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:"+i);
}
}
返回观察rabbitMQ控制台,是已经进了10条数据了
让我们等他个5秒,看他拜拜了没
官网还告诉我们:The original expiry time of a message is preserved if it is requeued (for example due to the use of an AMQP method that features a requeue parameter, or due to a channel closure).
我知道你们英语四级没过🤭,把翻译也丢过来了:
如果消息被重新排队(例如,由于使用了具有requeue参数的AMQP方法,或者由于通道关闭),则保留消息的原始到期时间。
也就是说如果 rabbitMQ设置A消息过期时间为10秒 此时5秒后A消息因为某种原因被重新排队 那么A消息的剩余过期时间会被重置为10秒,而不是5秒,因为消息过期时间是从消息第一次被发送到队列开始计算的,而不是从消息第一次被消费开始计算的。
五、死信队列(弥补RabbitMQ3.0以前支持的immediate参数的功能)
概述:在所有MQ产品里,此队列都叫死信队列,在RabbitMQ中也不例外,但是它又有点特殊,因为只有RabbitMQ才有交换机的概念,所有在RabbitMQ中又称死信队列为 DLX(Dead Letter Exchange 死信交换机),
当消息称为死信息
后,可以被重新发送到另一个交换机,此时另一个交换机就称之为死信交换机(DLX)。
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
消息变成死信的三种情况:
- 1、队列消息长度到达上限,导致消息被丢弃
- 2、消费者拒绝接收消息,并且不让消息重新入队
- 3、消息设置的TTL已经到达 超时时间 而仍未被消费
来自官方提示:Note that expiration of a queue will not dead letter the messages in it.
请注意,队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)
如何将队列绑定至 死信交换机?
To set the dead letter exchange for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
要为队列设置死信交换,请在声明队列时指定可选的x-dead-letter-exchange参数。该值必须是同一虚拟主机中的exchange名称
You may also specify a routing key to be used when dead-lettering messages. If this is not set, the message’s own routing keys will be used.
您还可以指定在死信消息时使用的路由关键字。如果没有设置,将使用消息自己的路由关键字 args.put(“x-dead-letter-routing-key”, “some-routing-key”);
队列设置参数:
- 1、x-dead-letter-exchange
- 2、x-dead-letter-routing-key
注意:只有当原来的队列绑定了死信交换机后,原队列发生消息变成死信消息,此消息才会被死信交换机重新路由到死信队列
demo演示
1、再创建一套DLX队列和交换机
@Bean("DLXQueue")
public Queue DLXQueue(){
return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
}
@Bean("DLXExchange")
public Exchange DLXExchange(){
return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
}
@Bean
public Binding bindDLX(){
return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
}
2、原有的队列添加参数,让他绑定DLX交换机和DLX路由键
@Bean("CSNZQueue")
public Queue getQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key","DLX.#");
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
3、发送一条没设置过期时间的信息和一条设置10秒过期时间的信息
@Test
public void testDLX(){
// 消息后置处理器,可以设置一些参数
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置message的信息
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDLX","测试DLX", processor);
}
4、观察RabbitMQ控制台情况
发现一开始 原队列中有两条信息,过了十秒后其中设置了过期时间的那条信息转移到了DLX队列,所以最后是各自一条信息
5、测试队列设置整体消息过期时间,是否会发生信息转移情况
队列设置参数:
- x-message-ttl
@Bean("CSNZQueue")
public Queue getQueue(){
// 设置过期时间参数
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key","DLX.#");
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
观察控制台,发现数据照样转移
6、再试试 队列消息长度到达上限,导致消息被丢弃变成死信的情况
队列设置参数:
- x-max-length
@Bean("CSNZQueue")
public Queue getQueue(){
// 设置过期时间参数
HashMap<String, Object> args = new HashMap<>();
args.put("x-max-length",5);
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key","DLX.#");
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
直接测试10条数据,只有5条在原队列,另外5条去了死信队列
7、测试 设置队列到期时间,是不是跟官网讲的一样 => 队列到期不会使其中的消息成为死信
队列设置参数:
- x-expires
修改原队列,设置队列的过期时间,我这设置队列的过期时间为20秒!以及配置死信队列,让原队列绑定死信交换机
@Bean("CSNZQueue")
public Queue getQueue(){
// 设置过期时间参数
HashMap<String, Object> args = new HashMap<>();
args.put("x-expires",20000);
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key","DLX.#");
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
// 2、注册交换机
@Bean("CSNZExchange")
public Exchange getExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
// return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
}
// 3、绑定队列和交换机
@Bean("CSNZBind")
public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("Delay.#").noargs();
}
/*
以下声明 死信交换机
*/
@Bean("DLXQueue")
public Queue DLXQueue(){
return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
}
@Bean("DLXExchange")
public Exchange DLXExchange(){
return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
}
@Bean
public Binding bindDLX(){
return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
}
编写一个测试方法 => 测试队列过期是否其中的信息会成为死信
@Test
public void testDelayMessageWithExpires(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"Delay.testDelay","测试队列过期是否其中的信息会成为死信",message -> {
return message;
});
}
运行测试方法,结果如下,原队列中有一条我们刚刚录入的信息
等待二十秒,看是否此信息会从原队列 去往死信队列
发现原队列确实过期自动删除了,但是原队列中的信息并没有去往死信队列,证明官网没骗人😁
队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)
六、延迟队列
所谓延迟队列,就是消息进入队列之后不会立即被消费掉,而是等到指定时间后,才会被消费
相关场景:某多多下单后,30分钟内客户未支付,则自动取消此订单,库存回滚。
但是RabbitMQ中并没有提供延迟队列这一功能,有两种方式可以实现:
-
1、靠 TTL + 死信队列 组合实现延迟队列的效果。
- 缺点:两个交换机、两个队列,且RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,但是如果第一个消息设置的过期时间很长,而后续消息设置的过期时间很短,则会导致后续的消息一直不会被过期处理(不会按时过期)
- 缺点:两个交换机、两个队列,且RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,但是如果第一个消息设置的过期时间很长,而后续消息设置的过期时间很短,则会导致后续的消息一直不会被过期处理(不会按时过期)
-
2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)
- 优点: 只需要一个交换机、一个队列,且不会出现只判断第一个消息的情况,会根据过期时间优先处理。
- 优点: 只需要一个交换机、一个队列,且不会出现只判断第一个消息的情况,会根据过期时间优先处理。
demo演示
1、靠 TTL + 死信队列 组合实现延迟队列的效果
监听器监听死信队列
@Component
public class DelayListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener
@Override
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
public void onMessage(Message message, Channel channel) throws Exception {
// 传递标签:该字段为MQ server 用于消息确认的标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收转换消息
System.out.println(new String(message.getBody()));
// 2、处理业务逻辑
Thread.sleep(1000); // 模拟业务时间
// 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
// ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
}
}
}
注册正常队列、死信队列,并让正常队列绑定死信交换机,且设置正常队列的过期时间为10秒
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "csnz_queue";
public static final String EXCHANGE_NAME = "csnz_exchange";
public static final String DLX_QUEUE_NAME = "DLX_QUEUE";
public static final String DLX_EXCHANGE_NAME = "DLX_EXCHANGE";
// 1、注册队列
@Bean("CSNZQueue")
public Queue getQueue(){
// 设置过期时间参数
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",10000);
args.put("x-max-length",5);
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key","DLX.#");
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
}
// 2、注册交换机
@Bean("CSNZExchange")
public Exchange getExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
}
// 3、绑定队列和交换机
@Bean("CSNZBind")
public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
}
/*
以下声明 死信交换机
*/
@Bean("DLXQueue")
public Queue DLXQueue(){
return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
}
@Bean("DLXExchange")
public Exchange DLXExchange(){
return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
}
@Bean
public Binding bindDLX(){
return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
}
}
编写测试类
生产者端:
@Test
public void testDelay() throws InterruptedException {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDelay","测试延迟队列");
for (int i = 0; i < 15; i++) {
System.out.println(i);
Thread.sleep(1000);
}
}
消费者端:
@Test
public void testDelay(){
System.out.println("Delay模式");
while(true){
}
}
运行即可发现,发送消息过了10秒后,消息会到死信队列中,此时再在死信队列中执行逻辑代码即可实现延迟队列功能。
2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)
RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送
插件官网下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases(记得适配版本号)
下载完成后将其解压在plugins文件夹下
运行cmd切换到rabbitMQ的sbin目录下执行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启一下rabbitMQ服务
使用管理员运行CMD
去RabbitMQ的控制台看看,插件有没有加载成功
注册队列,交换机
@Bean("DelayQueue")
public Queue DelayQueue(){
return QueueBuilder.durable(DELAY_QUEUE_NAME).autoDelete().build();
}
@Bean("DelayExchange")
public CustomExchange DelayExchange(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
}
@Bean
public Binding bindDelay(){
return BindingBuilder.bind(DelayQueue()).to(DelayExchange()).with("Delay.#").noargs();
}
生产者代码
@Test
public void testDelayMessage(){
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testDelay","测试延迟插件队列",message -> {
// setHeader 为延时时间 不填及为及时发送
message.getMessageProperties().setDelay(5000);
return message;
});
}
最终实现的效果就是发送的消息得等到5秒后才进入延迟队列,此时一步到位
此时需要注意的是,延迟队列插件的实现方式是通过在消息发布时设置消息的过期时间来实现的,因此在发送消息时,其实是MQ自动将消息的过期时间设置为当前时间加上延迟时间。
七、优先级队列(3.5.0以上版本)
人如其名,优先级队列即优先级比较高,优先被消费
一般通过x-max-priority
参数设置优先级队列的最大值
官方推荐参数设置:支持有限数量的优先级:255。建议使用1到10之间的值,数字越大表示优先级越高,没有设置优先级的消息被视为优先级为0
队列需要设置为优先级队列,消息需要设置优先级(在MQ出现消息堆积情况下、及消费速度小于生产速度,优先级才有意义)
交换机设置最大优先级参数x-max-priority
@Bean("DelayExchange")
public Exchange DelayExchange(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type","topic");
args.put("x-max-priority", 10);
return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
}
测试发送的消息设置优先级setPriority
级别 ,这里可以去掉设置优先级,然后多发几条,最后发现有设置优先级的消息最先被消费
@Test
public void testFirstMessage(){
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testFirst","测试延迟插件队列",message -> {
// setPriority 为设置此条数据的优先级
message.getMessageProperties().setPriority(5);
return message;
});
}