1 基于业务场景掌握RocketMQ5.0
本篇文章主要结合聚合支付系统中的业务场景来落地RocketMQ中间件的应用,聚合支付系统主要在支付系统超时订单和商户支付结果异步通知场景中会使用到RocketMQ消息中间件。本文使用到了RocketMQ中的延迟消息知识点,RocketMQ延迟消息投递等级一共为18个等级,具体投递等级和延迟时间如下
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
RocketMQ消息类型主要有普通消息、顺序消息、延迟消息、批量消息、事务消息
接下来我们一起来分析延迟消息在具体业务场景中的应用吧
1.1 超时订单(下游商户未支付)
在聚合支付系统和下游商户系统业务场景中,下游商户在调用聚合支付系统支付接口后,用户实际并没有对该笔订单进行支付。所以这时我们需要对未支付的订单进行订单关闭操作,那么这时我们使用消息队列中的延迟消息实现该业务功能 ,以下为具体的业务流程
1.2 下游商户支付结果通知
下游商户调用聚合支付系统统一支付功能后,用户完成支付,这时支付渠道需要把用户支付结果返回给聚合支付系统,聚合支付系统再通过RocketMQ延迟消息通知给下游商户系统。该业务功能同样是基于消息队列的延迟消息进行技术实现,系统第一次通知采用RocketMQ普通实时消息进行支付结果通知,在没有收到商户系统支付通知响应结果时聚合支付系统会采用延迟消息每隔10s通知,循环通知5次。
2 RocketMQ延迟消息核心代码实现
本技术文档采用SpringCloud2021.x和RocketMQ5.0进行代码实现
Spring Cloud Alibaba Version | Spring Cloud Version | Spring Boot Version |
---|---|---|
2021.x | Spring Cloud 2021.x | 2.7.18 |
2.1 在SpringCloud中集成RocketMQ流程
2.1.1 引入RocketMQ Stream Starter
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2.1.2 修改application.properties配置文件
server.port=1000
spring.application.name=rocketmq-delay-consume-pay
spring.cloud.stream.function.definition=consumer;
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
spring.cloud.stream.bindings.producer-out-0.destination=delay
spring.cloud.stream.bindings.consumer-in-0.destination=delay
spring.cloud.stream.bindings.consumer-in-0.group=delay-group
logging.level.org.springframework.context.support=debug
2.1.3 代码实现
1 生产者
package cn.itbeien.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
* @author beien
* @date 2024-06-19 23:00
* Copyright© 2024 beien
*/
@Service
@Slf4j
public class ProducerService {
@Autowired
private StreamBridge streamBridge;
/**
* 生产者
* @return
*/
@Bean
public void producerDelay() {
String key = "KEY01";
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, "1001");
headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3);//10秒
SimpleMsg simpleMsg = new SimpleMsg();
simpleMsg.setOrderId("10001");
Message<SimpleMsg> msg = new GenericMessage(simpleMsg, headers);
streamBridge.send("producer-out-0", msg);
}
}
2 消费者
package cn.itbeien.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
* @author beien
* @date 2024-06-19 23:05
* Copyright© 2024 beien
*/
@Service
@Slf4j
public class ConsumerService {
/**
* 消费者
*/
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getOrderId());
};
}
}
3 关注我
欢迎关注我的视频号和公众号,视频号有相关技术和业务视频可学习支付业务/文旅行业数字化。探讨技术(系统架构、微服务、容器化、云原生、云原生),支付系统实战。