RabbitMQ死信队列
1、过期时间TTL
过期时间TTL
表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被
删除。RabbitMQ
可以对消息和队列设置TTL
,目前有两种方法可以设置:
-
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
-
第二种方法是对消息进行单独设置,每条消息
TTL
可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者TTL
较小的那个数值为准。消息在队列的生存时间一旦超
过设置的TTL
值,就称为dead message
被投递到死信队列,消费者将无法再收到该消息。
1.1 设置队列TTL
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-rabbitmq-ttl</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-rabbitmq-ttl</name>
<description>spring-boot-rabbitmq-ttl</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置类
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQConfiguration {
// 1.声明注册direct模式的交换机
@Bean
public DirectExchange ttldirectExchange() {
return new DirectExchange("ttl_direct_exchange", true, false);
}
// 2.队列的过期时间
@Bean
public Queue directttlQueue() {
//设置过期时间
Map<String, Object> args = new HashMap<>();
//这里一定是int类型
args.put("x-message-ttl", 5000);
return new Queue("ttl.direct.queue", true, false, false, args);
}
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");
}
}
Service
package com.example.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟用户下单
public void makeOrder() {
//1.根据商品id查询库存是否足够
//2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功:" + orderId);
//3.通过MQ来完成消息的分发
//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_direct_exchange";
String routingKey = "ttl";
// 队列中会产生一条消息并且5秒钟后会消失
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
启动类
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}
配置文件
# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030
测试
package com.example;
import com.example.service.OrderService;
import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {
@Autowired
private OrderService orderService;
@Test
public void test() {
orderService.makeOrder();
}
}
订单生产成功:8a965457-330b-4e2b-9087-a40cbfdee033
1.2 设置消息TTL
配置类
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLRabbitMQConfiguration1 {
//1.声明注册direct模式的交换机
@Bean
public DirectExchange ttlMessageDirectExchange() {
return new DirectExchange("ttl_message_direct_exchange", true, false);
}
@Bean
public Queue directttlMessageQueue() {
return new Queue("ttl.message.direct.queue", true, false, false);
}
@Bean
public Binding ttlMessageBingding() {
return BindingBuilder.bind(directttlMessageQueue()).to(ttlMessageDirectExchange()).with("ttlmessage");
}
}
Service
package com.example.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService1 {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟用户下单
public void makeOrder() {
//1.根据商品id查询库存是否足够
//2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功:" + orderId);
//3.通过MQ来完成消息的分发
//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_message_direct_exchange";
String routingKey = "ttlmessage";
//给消息设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
public Message postProcessMessage(Message message) {
//这里就是字符串
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);
}
}
测试类
package com.example;
import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest1 {
@Autowired
private OrderService1 orderService1;
@Test
public void test1() {
orderService1.makeOrder();
}
}
订单生产成功:9eb9e120-1379-4e54-bc43-1944b3c22713
2、死信队列
DLX
,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变
成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX
,绑定DLX
的队列就称之为死信队列。消
息变成死信,可能是由于以下原因:
-
消息被拒绝
-
消息过期
-
队列达到最大长度
DLX
也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队
列的属性,当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由
到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可。
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-rabbitmq-dlx</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-rabbitmq-dlx</name>
<description>spring-boot-rabbitmq-dlx</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置类
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadRabbitMqConfiguration {
//1.声明注册direct模式的交换机
@Bean
public DirectExchange deadDirect() {
return new DirectExchange("dead_direct_exchange", true, false);
}
//2.队列的过期时间
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue", true);
}
@Bean
public Binding deadbinds() {
return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
}
}
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQConfiguration {
//1.声明注册direct模式的交换机
@Bean
public DirectExchange ttldirectExchange() {
return new DirectExchange("ttl_direct_exchange", true, false);
}
//2.队列的过期时间
@Bean
public Queue directttlQueue() {
//设置过期时间
Map<String, Object> args = new HashMap<>();
// ttl队列最大可以接受5条消息,超过的条数也是会被移入死信队列,过期之后依然会被移入死信队列
// args.put("x-max-length",5);
// 这里一定是int类型
args.put("x-message-ttl", 5000);
// 死信队列的交换机
args.put("x-dead-letter-exchange", "dead_direct_exchange");
// fanout不需要配置
// 路由
args.put("x-dead-letter-routing-key", "dead");
return new Queue("ttl.direct.queue", true, false, false, args);
}
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");
}
}
Service
package com.example.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟用户下单
public void makeOrder() {
//1.根据商品id查询库存是否足够
//2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功:" + orderId);
//3.通过MQ来完成消息的分发
//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_direct_exchange";
String routingKey = "ttl";
// 队列中会产生一条消息并且5秒钟后会消失
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
启动类
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}
配置文件
# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030
测试类
package com.example;
import com.example.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {
@Autowired
private OrderService orderService;
@Test
public void test() {
orderService.makeOrder();
}
}
订单生产成功:74adb096-734c-4484-8947-032ef1ee7c5d