RabbitMQ进阶篇

文章目录

  • 发送者的可靠性
    • 生产者重试机制
    • 实现生产者确认
  • MQ的可靠性
    • 数据持久化
      • 交换机持久化
      • 队列持久化
      • 消息持久化
    • Lazy Queue(可配置~)
      • 控制台配置Lazy模式
      • 代码配置Lazy模式
      • 更新已有队列为lazy模式
  • 消费者的可靠性
    • 消费者确认机制
    • 失败重试机制
    • 失败处理策略
  • 业务幂等性
    • 唯一消息ID
    • 业务判断
    • 兜底方案
  • 延迟消息
    • 死信交换机(不推荐使用)
    • 延迟消息
    • DelayExchange插件(推荐)
      • 插件下载地址:
      • 安装:
      • 声明延迟交换机
      • 发送延迟消息
    • 超时订单问题
      • 定义常量
      • 抽取共享mq配置
      • 改造下单业务
      • 编写查询支付状态接口
      • 消息监听

发送者的可靠性

保证一个消息发送出去,至少被消费一次。
image.png
可能在多个步骤中给消息弄丢了

生产者重试机制

不建议使用, 会增加网络和资源的消耗

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断
当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yaml文件,添加下面的内容:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

实现生产者确认

image.png

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

Rabbitmq之ConfirmCallback与ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

定义ReturnCallback:
image.png

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

image.png
image.png
定义ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此**ConfirmCallback需要在每次发消息时定义。**具体来说,是在调用RabbitTemplate中的convertAndSend方法 时,多传递一个参数:

image.png

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image.png

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData,需要一个UUID,回调的时候通过id识别
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

image.png

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了

image.png

MQ的可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化
image.png

  • 交换机持久化
  • 队列持久化
  • 消息持久化

数据持久化

交换机持久化

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

image.png

消息持久化

image.png
image.png

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

当内存占满, page out会影响MQ阻塞

Lazy Queue(可配置~)

image.png

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。官方推荐所有队列都为LazyQueue模式。

控制台配置Lazy模式

image.png

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue")
    .lazy() // 开启Lazy模式
    .build();
}

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",					
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为lazy模式

可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
image.png

消费者的可靠性

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回

    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 不做处理

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

失败处理策略

Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

在consumer服务中定义处理失败消息的交换机和队列
定义一个RepublishMessageRecoverer,关联队列和交换机
image.png

/**
 * 错误消息配置类,用于配置 RabbitMQ 错误消息处理相关的 Bean。
 * 当前类会根据 spring.rabbitmq.listener.simple.retry.enabled 属性的值来决定是否创建相关的 Bean。
 * 如果该属性值为 true,则会创建错误消息交换机、错误队列和绑定关系,并配置消息恢复器。
 */
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {

    /**
     * 创建错误消息交换机
     */
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * 创建错误队列
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    /**
     * 创建错误队列与错误消息交换机的绑定关系,"error"是路由键
     */
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    /**
     * 创建消息恢复器
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto, 由spring确认消息处理成功后返回ack, 异常时返回nack
  • 开启消费者失败重试机制, 并设置MessageRecoverer, 多次重试失败后将信息投递到异常交换机

业务幂等性

image.png
保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可
以Jackson的消息转换器为例( 加在启动类 ):

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);// 在底层会自动创建一个UUID
    return jjmc;
}

在Spring AMQP中,当使用Jackson2JsonMessageConverter并开启setCreateMessageIds(true)功能时,底层会自动在消息的属性中添加一个名为amqp_messageId的字段,其值为自动生成的UUID。
具体来说,UUID会成为消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)属性中。这些属性是与消息一起传递的元数据,包含了关于消息的一些信息。amqp_messageId字段就是用于唯一标识消息的UUID。
当消息被发送给消费者时,消费者可以通过**message.getMessageProperties().getMessageId()**方法来获取消息的ID,然后根据业务需求将该ID保存到数据库中。在处理相同消息时,消费者可以在数据库中查询是否存在相同的消息ID,以判断是否为重复消息。

业务判断

image.png
image.png
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

image.png

兜底方案

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

image.png

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

image.png

死信交换机(不推荐使用)

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

延迟消息

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件(推荐)

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装:

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
image.png

[
  {
    "CreatedAt": "2024-01-19T09:22:59+08:00",
    "Driver": "local",
    "Labels": null,
    "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
    "Name": "mq-plugins",
    "Options": null,
    "Scope": "local"
  }
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,启用插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue", durable = "true"),
    exchange = @Exchange(name = "delay.direct", delayed = "true"),
    key = "delay"	
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
        .directExchange("delay.direct") // 指定交换机类型和名称
        .delayed() // 设置delay的属性为true
        .durable(true) // 持久化
        .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }

    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

可以写一个延迟时间的类, 不用每次都new一个,具体代码如下:
image.png

/**
 * @author Ccoo
 * 2024/1/22
 */
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

	private final int delay;

	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
		message.getMessageProperties().setDelay(delay);
		return message;
	}
}

最后上面的业务代码变为:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message,
                                 new DelayMessageProcessor(message.removeNextDelay().intValue());
} 

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

超时订单问题

image.png

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到hm-common模块下:

image.png

@Data
public class MultiDelayMessage<T> {
	/**
	 * 消息体
	 */
	private T data;
	/**
	 * 记录延迟时间的集合
	 */
	private List<Long> delayMillis;

	public MultiDelayMessage(T data, List<Long> delayMillis) {
		this.data = data;
		this.delayMillis = delayMillis;
	}
	public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
		return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
	}

	/**
	 * 获取并移除下一个延迟时间
	 * @return 队列中的第一个延迟时间
	 */
	public Long removeNextDelay(){
		return delayMillis.remove(0);
	}

	/**
	 * 是否还有下一个延迟时间
	 */
	public boolean hasNextDelay(){
		return !delayMillis.isEmpty();
	}
}

定义常量

image.png

/**
 * @author Ccoo
 * 2024/1/22
 */
public interface MqConstants {
	String DELAY_EXCHANGE = "trade.delay.topic";
	String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
	String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取共享mq配置

在nacos中定义一个名为shared-mq.xml的配置文件,内容如下:

spring: 
  rabbitmq:
    host: ${hm.mq.host:192.168.164.128} # 主机名
    port: ${hm.mq.port:5672} # 端口
    virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
    username: ${hm.mq.un:itheima} # 用户名
    password: ${hm.mq.pw:123321} # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在trade-service模块添加共享配置:
image.png

改造下单业务

  1. 引入依赖

在trade-service模块的pom.xml中引入amqp的依赖:

<!--amqp-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  1. 改造下单业务

image.png

编写查询支付状态接口

首先,在hm-api模块定义三个类:

image.png

说明:

  • PayOrderDTO:支付单的数据传输实体
  • PayClient:支付系统的Feign客户端
  • PayClientFallback:支付系统的fallback逻辑
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
	@ApiModelProperty("id")
	private Long id;
	@ApiModelProperty("业务订单号")
	private Long bizOrderNo;
	@ApiModelProperty("支付单号")
	private Long payOrderNo;
	@ApiModelProperty("支付用户id")
	private Long bizUserId;
	@ApiModelProperty("支付渠道编码")
	private String payChannelCode;
	@ApiModelProperty("支付金额,单位分")
	private Integer amount;
	@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
	private Integer payType;
	@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
	private Integer status;
	@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
	private String expandJson;
	@ApiModelProperty("第三方返回业务码")
	private String resultCode;
	@ApiModelProperty("第三方返回提示信息")
	private String resultMsg;
	@ApiModelProperty("支付成功时间")
	private LocalDateTime paySuccessTime;
	@ApiModelProperty("支付超时时间")
	private LocalDateTime payOverTime;
	@ApiModelProperty("支付二维码链接")
	private String qrCodeUrl;
	@ApiModelProperty("创建时间")
	private LocalDateTime createTime;
	@ApiModelProperty("更新时间")
	private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
	/**
	 * 根据交易订单id查询支付单
	 * @param id 业务订单id
	 * @return 支付单信息
	 */
	@GetMapping("/pay-orders/biz/{id}")
	PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
	@Override
	public PayClient create(Throwable cause) {
		return new PayClient() {
			@Override
			public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
				return null;
			}
		};
	}
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
    PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
    return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

消息监听

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
image.png

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {

    private final IOrderService orderService;

    private final PayClient payClient;

    private final RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
            exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
            key = MqConstants.DELAY_ORDER_ROUTING_KEY
    ))
    public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {
        // 1.获取消息中的订单id
        Long orderId = msg.getData();
        // 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭
        Order order = orderService.getById(orderId);
        if (order == null || order.getStatus() > 1) {
            // 订单不存在或交易已经结束,放弃处理
            return;
        }
        // 3.可能是未支付,查询支付服务
        PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
        if (payOrder != null && payOrder.getStatus() == 3) {
            // 支付成功,更新订单状态
            orderService.markOrderPaySuccess(orderId);
            return;
        }
        // 4.确定未支付,判断是否还有剩余延迟时间
        if (msg.hasNextDelay()) {
            // 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值
            int delayVal = msg.removeNextDelay().intValue();
            // 4.2.发送延迟消息
            rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
                    message -> {
                        message.getMessageProperties().setDelay(delayVal);
                        return message;
                    });
            return;
        }
        // 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单
        orderService.cancelOrder(orderId);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/766895.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

layui-页面布局

1.布局容器 分为固定和完整宽度 class layui-container 是固定宽度 layui-fluid是完整宽度

傻瓜交换机多网段互通组网、设备无法配置网关案例

记录一下&#xff1a; 一、傻瓜交换机多网段互通组网 1、客户在核心交换机上创建了VLAN10&#xff0c;VLAN20。 VLAN10&#xff1a;IP192.168.10.254 VLAN20&#xff1a;IP192.168.20.254 在核心交换机下挂了一台傻瓜交换机&#xff0c;傻瓜交换机接入了一台OA服务器IP&#…

从零开始:在Windows上部署大型模型

这是一个超详细安装教程&#xff0c;介绍了在 Window 电脑上如何部署 Qwen1.5 大模型。本文还涉及到 Python 及其环境的配置。 适合对象&#xff1a;有点后端编程基础&#xff0c;没有 Python 基础。 需要环境&#xff1a;Window10/11&#xff0c;支持 Cuda 的 Nvidia 显卡。…

数据结构与算法-动态规划-最长回文子串

最长回文子串 给你一个字符串 s&#xff0c;找到 s 中最长的回文子串。 示例 1&#xff1a; 输入&#xff1a;s "babad" 输出&#xff1a;"bab" 解释&#xff1a;"aba" 同样是符合题意的答案。示例 2&#xff1a; 输入&#xff1a;s "…

识图ACWP.BCWS.BCWP

将三个概念想象成三个角色&#xff08;如&#xff1a;勇士、法师、盗贼&#xff09;&#xff0c;其中&#xff1a; ACWP是勇士&#xff0c;代表实际力量&#xff08;实际成本&#xff09;&#xff1b;BCWS是法师&#xff0c;代表预期魔法&#xff08;预算成本工作量预测&#x…

vscode移动侧边栏到右边

vscode移动侧边栏到右边&#xff0c;的简单办法 直接在侧栏上单击右键&#xff0c;选择向右移动主侧栏

有哪些好的 Stable Diffusion 提示词(Prompt)可以参考?

Docker 作图咒语生成器 docker-prompt-generator 是一个开源项目&#xff0c;可以利用模型反推出提示词&#xff0c;让你偷偷懒&#xff0c;无需琢磨怎么写prompt&#xff0c;只需要找一个差不多的模型反推一下&#xff0c;直接用就好了&#xff0c;支持支持 MidJourney、Stab…

Go - 9.struct 使用指南

目录 一.引言 二.struct 定义 三.struct 实践 1. 初始化 struct 2. 嵌套 struct 3. func 与 struct 四.struct 进阶 1.Json Tags 2.Other Tags 五.总结 一.引言 在编程中&#xff0c;结构体&#xff08;struct&#xff09;是一种聚合数据类型&#xff0c;用于将多个…

文献解读-长读长测序-第十四期|《作为了解棉花驯化的资源,印度棉(Gossypium herbaceum L. Wagad)基因组》

关键词&#xff1a;基因组&#xff1b;长读长测序&#xff1b;棉花基因组&#xff1b; 文献简介 标题&#xff08;英文&#xff09;&#xff1a;The Gossypium herbaceum L. Wagad genome as a resource for understanding cotton domestication标题&#xff08;中文&#xff…

【HTML入门】列表与表格

文章目录 前言一、列表与表格是什么&#xff1f;列表表格 二、使用标签列表标签表格标签 三、组合情况列表的组合表格的组合 四、示例代码总结 好的&#xff0c;以下是一个关于HTML列表与表格的文章示例&#xff1a; 前言 随着网页开发的普及&#xff0c;HTML成为了构建网页的…

零基础学习MySQL---MySQL入门

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、什么是数据库 问&#xff1a;存储数据用文件就可以了&#xff0c;为什么还要弄个数据库呢&#xff1f; 这就不得不提…

Java面试八股文

一、Redis 1. 使用场景 &#xff08;1&#xff09;Redis的数据持久化策略有哪些 RDB&#xff1a;全称Redis Database Backup file&#xff08;Redis数据备份文件&#xff09;&#xff0c;也被叫作Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故…

Chapter9 更复杂的光照——Shader入门精要学习笔记

Chapter9 更复杂的光照 一、Unity的渲染路径1.渲染路径的概念2.渲染路径的类型①前向渲染路径a. 前向渲染路径的原理b. Unity中的前向渲染c. 两种Pass ②延迟渲染路径a. 延迟渲染路径的原理b. Unity中的延迟渲染c. 两种Pass ③顶点照明渲染路径 二、Unity的光源类型1.光源类型①…

毫秒级相应逆流检测电表安科瑞防逆流电能表

家庭储能系统 生态环境与人们的日常生活密切相关&#xff0c;越来越多的家庭开始重视居住环境的绿色、环保、智能及可持续性&#xff0c;并采取具体行动。截至2023年&#xff0c;欧洲太阳能发电容量已超200GW&#xff0c;家庭储能系统的安装量呈爆炸性增长。 用户痛点及需求 …

前端基础:JavaaScript(篇二)

目录 内置对象 String字符串 属性 代码 运行 方法 代码 运行 日期 代码 运行 Math 代码 运行 数组 定义 属性 代码 运行 方法 join(分隔符>) &#xff1a; 代码 运行 reverse()&#xff1a; 代码 运行 sort() &#xff1a; 代码 运行 事件 …

有哪些手持小风扇品牌推荐?五大手持小风扇诚意推荐!

在炎炎夏日&#xff0c;一款便携且高效的手持小风扇无疑是消暑的必备神器。为了帮助大家轻松应对酷暑&#xff0c;我们精心挑选了五大手持小风扇品牌进行诚意推荐。这些品牌不仅拥有出色的降温效果&#xff0c;更在外观设计、便携性、续航能力及操作便捷性上表现卓越。接下来&a…

电子邮件OTP验证身份认证接口API服务商比较

电子邮件OTP验证身份认证接口API服务商如何正确选择&#xff1f; 电子邮件OTP验证是一种广泛应用且安全的身份认证方式。AokSend将比较几家主要的电子邮件OTP验证身份认证接口API服务商&#xff0c;帮助企业选择合适的解决方案。 电子邮件OTP&#xff1a;验证优势 可以为用户…

软考高级-系统分析师知识点100条速记!

宝子们&#xff01;上半年软考已经结束一段时间了&#xff0c;准备备考下半年软考高级-系统分析师的小伙伴可以开始准备了&#xff0c;毕竟高级科目的难度可是不低的&#xff0c;相信参加过上半年系分的小伙伴深有体会。 这里给大家整理了100条系分知识点&#xff0c;涵盖全书9…

【SPIE独立出版】第四届智能交通系统与智慧城市国际学术会议(ITSSC 2024)

第四届智能交通系统与智慧城市国际学术会议&#xff08;ITSSC 2024&#xff09;将于2024年8月23-25日在中国西安举行。本次会议主要围绕智能交通、交通新能源、无人驾驶、智慧城市、智能家居、智能生活等研究领域展开讨论&#xff0c; 旨在为该研究领域的专家学者们提供一个分享…