文章目录
- 整合Springboot
- 概述
- 消费者
- 生产者
- 消息可靠性投递
- 故障原因
- 解决方案
- 生产者端消息确认机制(故障情况1)
- 故障情况2解决方案
- 故障情况3解决方案
- 消费端限流
- 概念
- 消息超时
- 概念
- 队列层面:配置队列过期
- 消息本身:配置消息过期
- 死信队列
- 概念
- 创建死信交换机和死信队列
- 创建正常队列,绑定死信队列
- 代码
- 延迟队列
- 方案1:借助消息超时时间+死信队列
- 方案2:给RabbitMQ安装插件
- 检查是否安装
- 测试
整合Springboot
概述
- 搭建环境
- 基础设定:交换机名称、队列名称、绑定关系
- 发送消息:使用RabbitTemplate
- 接收消息:使用@RabbitListener注解
消费者
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
yml
spring:
rabbitmq:
host: 192.168.217.134
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
com.atguigu.mq.listener.MyMessageListener: info
Listener
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
// 写法一:监听 + 在 RabbitMQ 服务器上创建交换机、队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
)
)
// 写法二:监听
// @RabbitListener(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) {
log.info("消费端接收到了消息:" + dataString);
}
}
生产者
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
yml
spring:
rabbitmq:
host: 192.168.217.134
port: 5672
username: guest
password: 123456
virtual-host: /
RabbitTemplate
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test01SendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello Rabbit!SpringBoot!");
}
}
消息可靠性投递
故障原因
- 消息没有发送到消息队列上
后果:消费者拿不到消息,业务功能缺失,数据错误 - 消息成功存入消息队列,但是消息队列服务器宕机了
原本保存在内存中的消息也丢失了
即使服务器重新启动,消息也找不回来了
后果:消费者拿不到消息,业务功能缺失,数据错误 - 消息成功存入消息队列,但是消费端出现问题,例如:宕机、抛异常等等
后果:业务功能缺失,数据错误
解决方案
- 故障情况1:消息没有发送到消息队列
- 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认, 如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
- 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
- 故障情况2:消息队列服务器宕机导致内存中消息丢失
- 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
- 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
- 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
- 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
关联交换机和备份交换机
生产者端消息确认机制(故障情况1)
故障原因1 解决方案:消息没有发送到消息队列上
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
YAML
publisher-confirm-type,publisher-returns两个必须要增加的配置,如果没有则功能不生效
# producer
spring:
rabbitmq:
host: 192.168.217.134
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info
创建配置类
// 用于出现推送失败的情况下查看返回值
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 消息发送到交换机成功或失败时调用这个方法
log.info("confirm() 回调函数打印 CorrelationData:" + correlationData);
log.info("confirm() 回调函数打印 ack:" + ack);
log.info("confirm() 回调函数打印 cause:" + cause);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
// 发送到队列失败时才调用这个方法
log.info("returnedMessage() 回调函数 消息主体: " + new String(returned.getMessage().getBody()));
log.info("returnedMessage() 回调函数 应答码: " + returned.getReplyCode());
log.info("returnedMessage() 回调函数 描述:" + returned.getReplyText());
log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : " + returned.getExchange());
log.info("returnedMessage() 回调函数 消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
API说明
①ConfirmCallback接口
这是RabbitTemplate内部的一个接口,源代码如下:
/**
* A callback for publisher confirmations.
*
*/
@FunctionalInterface
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
生产者端发送消息之后,回调confirm()方法
- ack参数值为true:表示消息成功发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
②ReturnCallback接口
同样也RabbitTemplate内部的一个接口,源代码如下:
/**
* A callback for returned messages.
*
* @since 2.3
*/
@FunctionalInterface
public interface ReturnsCallback {
/**
* Returned message callback.
* @param returned the returned message and metadata.
*/
void returnedMessage(ReturnedMessage returned);
}
注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性含义如下:
属性名 | 类型 | 含义 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
replyCode | int | 应答码,类似于HTTP响应状态码 |
replyText | String | 应答码说明 |
exchange | String | 交换机名称 |
routingKey | String | 路由键名称 |
故障情况2解决方案
指定队列名称默认自动持久化,还可设置是否自动删除队列
故障情况3解决方案
# consumer
spring:
rabbitmq:
host: 192.168.217.134
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
prefetch: 1 # 每次从队列中取回消息的数量
deliveryTag:交付标签机制,每一个消息进入队列时,broker都会生成一个唯一标识
消息复制到各个队列,但deliveryTag各不相同
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MyMessageListener {
public static final String QUEUE_NAME = "queue.order";
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
public static final String QUEUE_DELAY = "queue.test.delay";
public static final String QUEUE_PRIORITY = "queue.test.priority";
@RabbitListener(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 获取当前消息的 deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 核心操作
log.info("消费端 消息内容:" + dataString);
System.out.println(10 / 0);
// 核心操作成功:返回 ACK 信息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 获取当前消息是否是重复投递的
// redelivered 为 true:说明当前消息已经重复投递过一次了
// redelivered 为 false:说明当前消息是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 核心操作失败:返回 NACK 信息
// requeue 参数:控制消息是否重新放回队列
// 取值为 true:重新放回队列,broker 会重新投递这个消息
// 取值为 false:不重新放回队列,broker 会丢弃这个消息
if (redelivered) {
// 如果当前消息已经是重复投递的,说明此前已经重试过一次啦,所以 requeue 设置为 false,表示不重新放回队列
channel.basicNack(deliveryTag, false, false);
} else {
// 如果当前消息是第一次投递,说明当前代码是第一次抛异常,尚未重试,所以 requeue 设置为 true,表示重新放回队列在投递一次
// 第二个参数:boolean multiple表示是否一次消费多条消息,false表示只确认该序列号对应的消息,true则表示确认该序列号对应的消息以及比该序列号小的所有消息,比如我先发送2条消息,他们的序列号分别为2,3,并且他们都没有被确认,还留在队列中,那么如果当前消息序列号为4,那么当multiple为true,则序列号为2、3的消息也会被一同确认。
channel.basicNack(deliveryTag, false, true);
}
// reject 表示拒绝
// 辨析:basicNack() 和 basicReject() 方法区别
// basicNack()能控制是否批量操作
// basicReject()不能控制是否批量操作
// channel.basicReject(deliveryTag, true);
}
}
}
消费端限流
概念
一个参数:prefetch
# consumer
spring:
rabbitmq:
host: 192.168.217.134
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
prefetch: 1 # 每次从队列中取回消息的数量
消息超时
概念
- 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
- 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这
个队列中的消息全部使用同一个过期时间。 - 消息本身:给具体的某个消息设定过期时间
- 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这
- 如果两个层面都做了设置,那么哪个时间短,哪个生效
队列层面:配置队列过期
5000毫秒过期
消息本身:配置消息过期
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout";
@Test
public void test04SendMessage() {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 设置消息的过期时间,单位是毫秒
message.getMessageProperties().setExpiration("7000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", postProcessor);
}
}
死信队列
概念
概念:当一个消息无法被消费,它就变成了死信。
- 死信产生的原因大致有下面三种:
- 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
- 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储 了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变 成死信
- 超时:消息到达超时时间未被消费
- 死信的处理方式大致有下面三种:
- 丢弃:对不重要的消息直接丢弃,不做处理
- 入库:把死信写入数据库,日后处理
- 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)
创建死信交换机和死信队列
和创建普通队列一样
创建正常队列,绑定死信队列
绑定队列到交换机
代码
@Test
public void testSendMessageButReject() {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况1:消息被拒绝");
}
①监听正常队列
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
// 监听正常队列,但是拒绝消息
log.info("★[normal]消息接收到,但我拒绝。");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
②监听死信队列
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("★[dead letter]dataString = " + dataString);
log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
测试超出队列长度进入死信队列
@Test
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况2:消息数量超过队列的最大容量" + i);
}
}
执行循环20次的代码两次
正常队列:最大长度为10
死信队列:没有设置最大长度,所以推送失败的消息都进入死信队列
过一段时间正常队列中消息超时,进入死信队列
延迟队列
- 方案1:借助消息超时时间+死信队列(就是刚刚我们测试的例子)
- 方案2:给RabbitMQ安装插件
方案1:借助消息超时时间+死信队列
方案2:给RabbitMQ安装插件
插件安装
https://www.rabbitmq.com/community-plugins.html
docker inspect rabbitmq
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq
检查是否安装
创建新交换机时可以在type中看到x-delayed-message选项
关于x-delayed-type参数的理解:
原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?
这里就额外使用x-delayed-type来指定交换机本身的类型
测试
生产者
@Test
public void test05SendMessageDelay() {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 设置消息过期时间(以毫秒为单位)
// x-delay 参数必须基于 x-delayed-message-exchange 插件才能生效
message.getMessageProperties().setHeader("x-delay", "10000");
return message;
};
// 发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY,
ROUTING_KEY_DELAY,
"Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date()),
postProcessor);
}
消费者
//已创建队列
@Component
@Slf4j
public class MyDelayMessageListener {
public static final String QUEUE_DELAY = "queue.delay.video";
@RabbitListener(queues = {QUEUE_DELAY})
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
//未创建情况
@Component
@Slf4j
public class MyDelayMessageListener {
public static final String EXCHANGE_DELAY = "exchange.delay.video";
public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
public static final String QUEUE_DELAY = "queue.delay.video";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),
exchange = @Exchange(
value = EXCHANGE_DELAY,
durable = "true",
autoDelete = "false",
type = "x-delayed-message",
arguments = @Argument(name = "x-delayed-type", value = "direct")),
key = {ROUTING_KEY_DELAY}
))
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}