RabbitMQ之延迟消息实战
- 使用死信交换机实现延迟消息
使用死信交换机的过期时间以及没有消费者进行消费,时间到了就会到死信队列中,由此可以实现延迟消息 - 使用延迟消息插件
前提:需要mq配置插件
- 延时信息案例实战
把一个30分钟的延迟消息可以拆分成各个小时间段来优化MQ大量信息的压力
代码实现
(1)定义消息体对象
@Data
public class MultyDelayMessage<T>{
// 消息体
private T data;
// 记录延迟时间的集合
private List<Long> delayMillis;
public MultiDelayMessage(T data List<Long> delayMilis){
this.data = data;
this.delayMillis = delayMillis;
}
public static <T> MultiDelayMessage<T> of(T data,Long ... delayMillis){
return new MultiDelayMessage<>(data,Collutils.newArrayList(delayMilis));
}
//获取并移除下一个延迟时间,返回队列中的第一个延时时间
public Long removeNextDelay{
return delayMillis.remove(0);
}
// 判断是否有下一个延时时间
public bollean hasNextDelay(){
return !delayMilis.isEmpty();
}
}
(2)定义队列名称
public interface MqConstants{
String DELAY_EXCHANGE = "trade.delay.topic";
String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
String DELAY_ORDER_ROUTING_KEY = "order.query"-;
}
(3)创建订单发送延迟消息
MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(),10000L,10000L,10000L,15000L,15000L,30000L,30000L);
rabbitTemplate.convertAndSend(MqConstants,DELAY_EXCHANGE,DELAY_ORDER_ROUTING_KEY,msg,new MessagePostProcess{
@Override
public Message postProcessMessage(Message message) throw AmqpException{
message.getMessageProperties().setDelay(msg.removeNextDelay().intValue());
}
});
(4)监听到延迟消息处理订单状态
@Component
public class OrderStatusChechListenter{
@Autowrid
private OrderService orderService;
@RabbitListener(@QueueBinding(value=@Queue(value=MqConstants.DELAY_ORDER_QUEUE,durable="true"),exchange=@Exchange(value=MqConstants.DELAY_EXCHANGE,delayed="true",type=ExchangeTypes.TOPIC),key=MqConstants.DELAY_ORDER_ROUTING_KEY))
public void listenOrderDelayMessage(MultiDelayMessage message){
// 查询订单支持状态
Order order = orderService.getById(message.getData());
if(order == null || order.getStatus == 2){// 已支付
return;
}
// 查询支付服务该订单的支付状态
boolean isPay = payServive.getById(order.getBussesesId());
if(isPay){
// 已支付
orderService.markOrderPaySuccess(order.getId());
return;
}
// 未支付,并且有延迟时间
if(msg.hasNextDelay()){
//重发延迟消息
Long nextDelay = msg.removeNextDelay();
rabbitTemplate.convertAndSend(MqConstants,DELAY_EXCHANGE,DELAY_ORDER_ROUTING_KEY,msg,new MessagePostProcess{
@Override
public Message postProcessMessage(Message message) throw AmqpException{
message.getMessageProperties().setDelay(nextDelay.intValue()); }
});
return;
}
// 没有延迟消息,取消订单
orderService.cancleOrder(order.getId());
//恢复库存
orderService.updateStore(order.getId());
}
}