架构图
配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterConfig {
public String Nomarl_Exchange = "normal_exchange";
public String Normal_Queue = "normal_queue";
public String Normal_RoutingKey = "normal.#";
public String Dead_Exchange = "dead_exchange";
public String Dead_Queue = "dead_queue";
public String Dead_RoutingKey = "dead.#";
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(Nomarl_Exchange).build();
}
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Normal_Queue)
.deadLetterExchange(Dead_Exchange)
.deadLetterRoutingKey("dead.sss.a")
.build();
}
@Bean
public Binding bindingNormalKey(Exchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with(Normal_RoutingKey).noargs();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(Dead_Exchange).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(Dead_Queue)
.build();
}
@Bean
public Binding bindingDeadKey(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(Dead_RoutingKey).noargs();
}
}
1.被消费者拒绝,并且requeue值设置为false
package com.example.demo.consumer;
import com.example.demo.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class RejectConsumer {
@RabbitListener(queues = DeadLetterConfig.Normal_Queue)
public void rejectOrBicNack(String str, Channel channel, Message message) throws IOException {
System.out.println("接收到消息"+str);
//1.进行channel进行basicNack,记得将requeue设置为false
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//记得在配置文件配置 acknowledge-mode: manual #开启手动ACK
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//以上方式二选一即可
}
}
2.消息过期或者队列存储消息过期
public void publishExpire(){
String msg = "dead dlx test expire";
rabbitTemplate.convertAndSend(DeadLetterConfig.Nomarl_Exchange, "normal.211313",msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//设置过期时间
return message;
}
});
}
消息过期
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Normal_Queue)
.deadLetterExchange(Dead_Exchange)
.deadLetterRoutingKey("dead.sss.a")
.ttl(10000)
.build();
}
给队列存储消息设置最大时间,超过这个时间,消息将会通过设置的这个routingkey从死信交换机转发给对应的死信队列。
3.队列消息达到最大长度
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Normal_Queue)
.deadLetterExchange(Dead_Exchange)
.deadLetterRoutingKey("dead.sss.a")
//.ttl(10000)
.maxLength(1)
.build();
}
通过maxLength属性设置最大数量,这里设置属性最大为1
4.设置延迟交换机
延迟交换机下载地址
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
public static final String Delayed_Exchange = "delayed_exchange";
public static final String Delayed_Queue = "delayed_queue";
public static final String Delayed_RoutingKey = "delayed_routingLey";
@Bean
public Exchange buildDealayedExchange(){
Map<String,Object>arguments =new HashMap<>();
arguments.put("x-delayed-type","topic");
String type = "x-delayed-type";
Exchange exchange =new CustomExchange(Delayed_Exchange,type,true,false,arguments);
return exchange;
}
@Bean
public Queue buildDealyedQueue(){
return QueueBuilder.durable(Delayed_Queue).build();
}
@Bean
public Binding bindingDelayed(Exchange buildDealayedExchange,Queue buildDealyedQueue){
return BindingBuilder.bind(buildDealyedQueue).to(buildDealayedExchange).with(Delayed_RoutingKey).noargs();
}
}
这样设置之后,发送的消息会在交换机中待够设置的过期时间后才会到相应的队列。
如果消息过期时间一致,可以只不设置延迟交换机,当过期时间类型过多的时候,就可以通过设置延迟交换机来满足不同过期时间的类型。
注意,这里有个参数,arguments 类型为Map<String,Object> 注意要在这个参数里面设置交换机类型,并且放入CustomExchange的构造函数中,不然交换机会创建失败。