一、案例介绍
1.1、业务分析
模拟电商网站购物场景中的【下单】和【支付】业务
1.1.1、下单
流程
-
用户请求订单系统下单
-
订单系统通过RPC调用订单服务下单
-
订单服务调用优惠券服务,扣减优惠券
-
订单服务调用调用库存服务,校验并扣减库存
-
订单服务调用用户服务,扣减用户余额
-
订单服务完成确认订单
1.1.2、支付
流程
- 用户请求支付系统
- 支付系统调用第三方支付平台API进行发起支付流程
- 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
- 支付系统调用订单服务修改订单状态
- 支付系统调用积分服务添加积分
- 支付系统调用日志服务记录日志
1.2、问题分析
1.2.1、问题1
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。
如何保证数据的完整性?
使用MQ保证在下单失败后系统数据的完整性
1.2.2、问题2
用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。
商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?
通过MQ进行数据分发,提高系统处理性能
二.、技术分析
2.1、技术选型
- SpringBoot
- Dubbo
- Zookeeper
- RocketMQ
- Mysql
2.2、SpringBoot整合RocketMQ
下载rocketmq-spring项目
将rocketmq-spring安装到本地仓库
mvn install -Dmaven.skip.test=true
2.2.1、消息生产者
2.2.1.1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-producer</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spring-boot-version>2.3.2.RELEASE</spring-boot-version>
</properties>
<dependencies>
<!-- 配置web启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- rocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- lombok插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-version}</version>
</plugin>
</plugins>
</build>
</project>
2.2.1.2、配置文件
server:
port: 8094
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: 127.0.0.1:9876 # rocketMQ的名称服务器,格式为:' host:port;host:port '。
# 生产端配置
producer:
group: ${spring.application.name} # 生产着组名
#access-key: access-key # rocketMQ服务端配置acl授权信息,没有则不需要
#secret-key: secret-key # rocketMQ服务端配置acl授权信息,没有则不需要
# 消费端配置
# consumer:
# access-key: access-key #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
# secret-key: secret-key #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
2.2.1.3、消息发送
/**
* <p>
* RocektMQ 事务消息监听器
* </p>
*
**/
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 检测半消息,在该方法中,执行本地事务
*
* @param msg 发送消息
* @param arg 外部参数
* @return commit:提交事务,它允许消费者消费此消息。bollback:回滚事务,它代表该消息将被删除,不允许被消费。 unknown:中间状态,它代表需要检查消息队列来确定状态(checkLocalTransaction方法)。
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(">>>> MQ事务执行器,执行本地事务 message={},args={} <<<<", msg, arg);
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
OrderPaidEvent payload = JSON.parseObject(jsonString, OrderPaidEvent.class);
//模拟业务操作,当paidMoney >5 则提交,否则等事务会查
if (payload.getPaidMoney().compareTo(new BigDecimal("5")) > 0) {
//提交事务
log.info("MQ提交事务啦!payload ={} ", payload);
return RocketMQLocalTransactionState.COMMIT;
}
//不知道状态,转 checkLocalTransaction 回查执行
log.info("MQ无法确定,等回查!payload ={} ", payload);
return RocketMQLocalTransactionState.UNKNOWN;
} catch (Exception e) {
log.error("事务消息出错啦~ e:{}", e.getMessage(), e);
//回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 该方法时MQ进行消息事务状态回查、
* <p>
*
* @param msg
* @return bollback, commit or unknown
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info(">>>> MQ事务执行器,事务状态回查 message={} <<<<", msg);
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
OrderPaidEvent payload = JSON.parseObject(jsonString, OrderPaidEvent.class);
log.info("事务回查:checkLocalTransaction提交事务啦!payload ={} ", payload);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("回调的事务出错啦~ e:{}", e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
/**
* <p>
* RocektMQ生产者常用发送消息方法
* 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* </p>
*
**/
public interface IRocketMQService {
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
* <p>
* (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
* 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
* 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* )
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String destination, Object msg);
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String topicName, String tags, Object msg);
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param key 唯一标识码要设置到keys字段,方便将来定位消息丢失问题
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String topicName, String tags, String key, Object msg);
/**
* 发送同步消息-SQL92模式
* 需要配置RocketMQ服务器 vim conf/broker.conf ##支持sql语句过滤 enablePropertyFilter=true
* 在console控制台查看集群状态 enablePropertyFilter=true 才正常
*
* @param topicName 主题名 topicName
* @param map 自定义属性
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg);
/**
* 发送同步消息-SQL92模式
* 需要配置RocketMQ服务器 vim conf/broker.conf ##支持sql语句过滤 enablePropertyFilter=true
* 在console控制台查看集群状态 enablePropertyFilter=true 才正常
*
* @param topicName 主题名 topicName
* @param map 自定义属性
* @param key 唯一标识码要设置到keys字段,方便将来定位消息丢失问题
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg);
/**
* 发生异步消息(异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param sendCallback 异步回调函数
*/
void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback);
/**
* 发送单向消息(这种方式主要用在不特别关心发送结果的场景,例如日志发送。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
*/
void sendOneway(String destination, Object msg);
/**
* 发送批量消息(发送超过1MB,做了自动分割,超时时间设置30s(默认3s)),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
*
* @param destination 主题名:标签 topicName:tags
* @param list 批量消息
*/
void sendBatchMessage(String destination, List<?> list);
/**
* 发送批量消息(发送超过1MB,做了自动分割。),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param timeout 超时时间,空则默认设为30s
* @param list 批量消息
*/
void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list);
/**
* 发送延时消息(超时时间,设置30s(默认3s))
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param delayTimeLevel 延时等级(从1开始)
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);
/**
* 发送延时消息
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param timeout 超时时间(单位毫秒)
* @param delayTimeLevel 延时等级(从1开始)
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel);
/**
* 发送顺序消息(分区有序,多个queue参与,即相对每个queue,消息都是有序的。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param hashKey 根据其哈希值取模后确定发送到哪一个queue队列
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendInOrder(String destination, Object msg, String hashKey);
/**
* 发送事务消息
* 事务消息使用上的限制
* 1:事务消息不支持延时消息和批量消息。
* 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
* 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
* 4:事务性消息可能不止一次被检查或消费。
* 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
* 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param arg arg
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageInTransaction(String destination, Object msg, Object arg);
}
/**
* <p>
* RocektMQ生产者常用发送消息方法
* 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* </p>
*
*/
@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {
@Autowired
private RocketMQTemplate rocketMqTemplate;
/**
* 原生的producer
*/
@Autowired
private DefaultMQProducer producer;
@Override
public SendResult sendMessage(String destination, Object msg) {
String[] split = destination.split(":");
if (split.length == 2) {
return this.sendMessage(split[0], split[1], msg);
}
return this.sendMessage(destination, null, msg);
}
@Override
public SendResult sendMessage(String topicName, String tags, Object msg) {
return this.sendMessage(topicName, tags, null, msg);
}
@Override
public SendResult sendMessage(String topicName, String tags, String key, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
//设置key,唯一标识码要设置到keys字段,方便将来定位消息丢失问题
if (StringUtils.isNotBlank(key)) {
messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
}
Message<?> message = messageBuilder.build();
SendResult sendResult = this.rocketMqTemplate.syncSend(StringUtils.isBlank(tags) ? topicName : (topicName + ":" + tags), message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("MQ发送同步消息成功,topicName={},tags={},msg={},sendResult={}", topicName, tags, msg, sendResult);
} else {
log.warn("MQ发送同步消息不一定成功,topicName={},tags={},msg={},sendResult={}", topicName, tags, msg, sendResult);
}
return sendResult;
}
@Override
public SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg) {
return this.sendMessageBySql(topicName, map, null, msg);
}
@Override
public SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
//设置key,唯一标识码要设置到keys字段,方便将来定位消息丢失问题
if (StringUtils.isNotBlank(key)) {
messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
}
//设置自定义属性
if (map != null && !map.isEmpty()) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
messageBuilder.setHeader(entry.getKey(), entry.getValue());
}
}
Message<?> message = messageBuilder.build();
SendResult sendResult = this.rocketMqTemplate.syncSend(topicName, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("发送同步消息-SQL92模式成功,topicName={},map={},msg={},sendResult={}", topicName, map, msg, sendResult);
} else {
log.warn("发送同步消息-SQL92模式不一定成功,topicName={},map={},msg={},sendResult={}", topicName, map, msg, sendResult);
}
return sendResult;
}
@Override
public void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback) {
this.rocketMqTemplate.asyncSend(destination, msg, sendCallback);
log.info("MQ发送异步消息,destination={} msg={}", destination, msg);
}
@Override
public void sendOneway(String destination, Object msg) {
this.rocketMqTemplate.sendOneWay(destination, msg);
log.info("MQ发送单向消息,destination={} msg={}", destination, msg);
}
@Override
public void sendBatchMessage(String destination, List<?> list) {
String topicName = destination;
String tags = "";
String[] split = destination.split(":");
if (split.length == 2) {
topicName = split[0];
tags = split[1];
}
this.sendBatchMessage(topicName, tags, 30000L, list);
}
@Override
public void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list) {
//转为message
List<org.apache.rocketmq.common.message.Message> messages = list.stream().map(x ->
new org.apache.rocketmq.common.message.Message(topicName, tags,
//String类型不需要转JSON,其它类型都要转为JSON模式
x instanceof String ? ((String) x).getBytes(StandardCharsets.UTF_8) : JSON.toJSONBytes(x))
).collect(Collectors.toList());
//自动分割发送,把大的消息分裂成若干个小的消息(每次发送最大只能4MB)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<org.apache.rocketmq.common.message.Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem, timeout == null ? 30000L : timeout);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("MQ发送批量消息成功,topicName={} tags={}, size={},sendResult={}", topicName, tags, listItem.size(), sendResult);
} else {
log.warn("MQ发送批量消息不一定成功,topicName={} tags={}, size={},sendResult={}", topicName, tags, listItem.size(), sendResult);
}
} catch (Exception e) {
//处理error
log.error("MQ发送批量消息失败,topicName={} tags={},,errorMessage={}", topicName, tags, e.getMessage(), e);
throw new RuntimeException("MQ发送批量消息失败,原因:" + e.getMessage());
}
}
}
@Override
public SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {
return this.sendDelayLevel(destination, msg, 30000, delayTimeLevel);
}
@Override
public SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel) {
Message<?> message = MessageBuilder
.withPayload(msg)
.build();
SendResult sendResult = this.rocketMqTemplate.syncSend(destination, message, timeout, delayTimeLevel);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("MQ发送延时消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
} else {
log.warn("MQ发送延时消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
}
return sendResult;
}
@Override
public SendResult sendInOrder(String destination, Object msg, String hashKey) {
Message<?> message = MessageBuilder
.withPayload(msg)
.build();
//hashKey: 根据其哈希值取模后确定发送到哪一个队列
SendResult sendResult = this.rocketMqTemplate.syncSendOrderly(destination, message, hashKey);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("MQ发送顺序消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
} else {
log.warn("MQ发送顺序消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
}
return sendResult;
}
@Override
public SendResult sendMessageInTransaction(String destination, Object msg, Object arg) {
Message<?> message = MessageBuilder
//转为JSON格式
.withPayload(msg instanceof String ? msg : JSON.toJSONString(msg))
.build();
TransactionSendResult sendResult = rocketMqTemplate.sendMessageInTransaction(destination, message, arg);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("MQ发送事务消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
} else {
log.warn("MQ发送事务消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
}
return sendResult;
}
}
/**
* <p>
* 消息列表分割,复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
* </p>
*
**/
public class ListSplitter implements Iterator<List<Message>> {
/**
* 最大4MB,这里每次只发送1MB。(为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB)
*/
private final int SIZE_LIMIT = 1024 * 1024 * 1;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while (tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
/**
* 计算消息字节长度
*/
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
/**
* <p>
* 批量消息实体
* </p>
*
**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchDto implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String message;
}
/**
* <p>
* 订单支付事件
* </p>
*
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderPaidEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String orderId;
private BigDecimal paidMoney;
private String msg;
}
/**
* <p>
* 顺序的步骤
* </p>
*
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderStep {
private long orderId;
private String desc;
}
2.2.1.4、测试类
/**
* <p>
* 测试类
* </p>
*
**/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketMQProducerApplication.class)
public class RocketMQProducerTest {
@Autowired
private IRocketMQService rocketMQService;
/**
* 发送同步消息
*/
@Test
public void sendMessage() {
for (int i = 0; i < 10; i++) {
//广播消费端下的同步消息
rocketMQService.sendMessage("Consumer_Broadcast", "广播同步消息" + i);
//集群消费端下的同步消息
rocketMQService.sendMessage("Consumer_Cluster", "集群同步消息" + i);
// rocketMQService.sendMessage("Consumer_Cluster", null, i + "", "集群同步消息" + i);
}
}
/**
* 发送同步消息-Tag过滤模式
*/
@Test
public void sendMessageByTag() {
rocketMQService.sendMessage("Consumer_Tag", "tag1", "过滤同步消息tag1");
rocketMQService.sendMessage("Consumer_Tag", "tag2", "过滤同步消息tag2");
rocketMQService.sendMessage("Consumer_Tag", "tag3", "过滤同步消息tag3");
}
/**
* 发送同步消息-发送同步消息-SQL92模式
*/
@Test
public void sendMessageBySql() {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put("a", i);
map.put("b", i % 2 == 0 ? "sql" : "notSql");
rocketMQService.sendMessageBySql("Consumer_SQL", map, "SQL92模式消息 map=" + map);
}
}
/**
* 发送异步消息
*/
@SneakyThrows
@Test
public void sendAsyncMessage() {
for (int i = 0; i < 10; i++) {
rocketMQService.sendAsyncMessage("Consumer_Cluster", "集群异步消息" + i, new SendCallback() {
//发送成功
@Override
public void onSuccess(SendResult sendResult) {
log.info("集群异步消息发送成功啦!sendResult={}", sendResult);
}
//发送失败
@Override
public void onException(Throwable e) {
log.error("集群异步消息发送失败啦!原因={}", e.getMessage(), e);
}
});
}
//先睡20秒,避免还没发送完毕就关闭了
TimeUnit.SECONDS.sleep(20L);
}
/**
* 发送单向消息
*/
@Test
public void sendOneway() {
for (int i = 0; i < 10; i++) {
rocketMQService.sendOneway("Consumer_Cluster", "集群单向消息" + i);
}
}
/**
* 发送批量消息(小批量)
*/
@Test
public void sendBatchMessage() {
//演示发送实体类型
List<BatchDto> batchDtoList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
batchDtoList.add(
BatchDto.builder()
.id(i)
.message("发送批量消息Dto类型" + i)
.build());
}
rocketMQService.sendBatchMessage("Consumer_Batch", batchDtoList);
log.info("=================发送Consumer_Batch主题完毕=================");
//演示发送String类型
List<String> stringList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
stringList.add("发送批量消息String类型" + i);
}
// rocketMQService.sendBatchMessage("Consumer_Cluster","A",null,stringList);
rocketMQService.sendBatchMessage("Consumer_Cluster", stringList);
log.info("=================发送Consumer_Cluster完毕=================");
}
/**
* 发送批量消息(大批量,超过4MB)
*/
@Test
@SneakyThrows
public void sendBatchMessage2() {
//演示发送实体类型
List<BatchDto> batchDtoList = new ArrayList<>(1000000);
for (int i = 0; i < 1000000; i++) {
batchDtoList.add(
BatchDto.builder()
.id(i)
.message("发送批量消息Dto类型" + i)
.build());
}
rocketMQService.sendBatchMessage("Consumer_Batch", batchDtoList);
log.info("=================发送Consumer_Batch主题完毕=================");
Thread.sleep(200000);
}
/**
* 发送延时消息
*/
@Test
public void sendDelayLevel() {
this.rocketMQService.sendDelayLevel("Consumer_Cluster", "集群延时消息", 4);
}
/**
* 发送顺序消息-分区有序
*/
@Test
public void sendInOrder() {
// 订单列表
List<OrderStep> orderList = this.buildOrders();
//循环一下,增加测试样本
// for (int i = 0; i < 10; i++) {
for (OrderStep orderStep : orderList) {
this.rocketMQService.sendInOrder("Consumer_InOrder", orderStep, orderStep.getOrderId() + "");
}
// }
}
/**
* 发送事务消息(一定要做幂等性处理(其实所有消息都要做幂等性。。))
*/
@Test
@SneakyThrows
public void sendMessageInTransaction() {
OrderPaidEvent build = OrderPaidEvent.builder()
.orderId("123")
.msg("事务消息-开始支付")
.paidMoney(new BigDecimal(2))
.build();
this.rocketMQService.sendMessageInTransaction("Consumer_Transaction", build, "test");
//先睡200秒,避免还没发送完毕就关闭了
TimeUnit.SECONDS.sleep(200L);
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
2.2.2、消息消费者
2.2.2.1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-consumer</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spring-boot-version>2.3.2.RELEASE</spring-boot-version>
</properties>
<dependencies>
<!-- 配置web启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- rocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- lombok插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-version}</version>
</plugin>
</plugins>
</build>
</project>
2.2.2.2、配置文件
server:
port: 8096
spring:
application:
name: rocketmq-consumer
rocketmq:
name-server: 127.0.0.1:9876 # rocketMQ的名称服务器,格式为:' host:port;host:port '。
# 生产端配置
producer:
group: ${spring.application.name} # 生产着组名
#access-key: access-key # rocketMQ服务端配置acl授权信息,没有则不需要
#secret-key: secret-key # rocketMQ服务端配置acl授权信息,没有则不需要
# 消费端配置
# consumer:
# access-key: access-key #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
# secret-key: secret-key #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
2.2.2.3、消息监听器
2.2.2.3.1、批量消息消费
/**
* <p>
* 批量消息消费
* </p>
*
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Batch",//主题
consumerGroup = "Consumer_Batch_group"//消费组 唯一
)
public class ConsumerBatch implements RocketMQListener<BatchDto>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(BatchDto message) {
log.info("ConsumerCluster 批量消息消费 message: {} ", message);
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.2、广播模式消费
/**
* <p>
* 广播模式消费,每个消费者消费的消息都是相同的
* </p>
*
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Broadcast",//主题
consumerGroup = "Consumer_Broadcast_group",//消费组 唯一
messageModel = MessageModel.BROADCASTING //消费模式 默认CLUSTERING集群 BROADCASTING:广播(接收所有信息)
)
public class ConsumerBroadcast implements RocketMQListener<String> {
//todo 广播消费模式宽带消费仍然确保消息至少被消费一次,但是没有提供重发选项。
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(String message) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerBroadcast 广播模式消费 message: {} ", message);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
2.2.2.3.3、集群模式消费
/**
* <p>
* 集群模式消费,负载均衡模式消费(最常用!!!)
* </p>
*
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Cluster",//主题
consumerGroup = "Consumer_Cluster_group",//消费组 唯一
messageModel = MessageModel.CLUSTERING //消费模式 默认CLUSTERING集群 BROADCASTING:广播(接收所有信息)
)
public class ConsumerCluster implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(String message) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerCluster 集群模式消费 message: {} ", message);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.4、顺序消息消费
/**
* <p>
* 顺序消息消费-分区有序
* </p>
*
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_InOrder",//主题
consumerGroup = "Consumer_InOrder_group",//消费组
consumeMode = ConsumeMode.ORDERLY//消费类型 ORDERLY 一个队列一个线程,即分区有序
)
public class ConsumerInOrder implements RocketMQListener<OrderStep>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param msg
*/
@Override
public void onMessage(OrderStep msg) {
log.info("Receive message: {} ThreadName: {}", msg, Thread.currentThread().getName());
//模拟出错,触发重试
// int i = 1 / 0;
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//程序报错,顺序消息这里不会等待重试,会立即执行。不设最大重试次数,会一直不断重试执行。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.5、过滤消息消费
/**
* <p>
* 过滤消息消费,SQL92模式
* 需要配置RocketMQ服务器 vim conf/broker.conf ##支持sql语句过滤 enablePropertyFilter=true
* 在console控制台查看集群状态 enablePropertyFilter=true 才正常
* </p>
* <p>
* <p>
* 数值比较,比如:>,>=,<,<=,BETWEEN,=;
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;
* <p>
* 常量支持类型为:
* <p>
* 数值,比如:123,3.1415;
* 字符,比如:'abc',必须用单引号包裹起来;
* NULL,特殊的常量
* 布尔值,TRUE 或 FALSE
*
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_SQL",//主题
consumerGroup = "Consumer_SQL_group",//消费组 唯一
/* 下面都有默认值,可选 */
selectorType = SelectorType.SQL92,//过滤选项类型 默认TAG 还有 SQL92,Brro
selectorExpression = "a between 0 and 3 or b='sql'"
)
public class ConsumerSQL implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(String message) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerSQL 消费 message: {} ", message);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.6、过滤消息消费,Tag模式
/**
* <p>
* 过滤消息消费,Tag模式
* </p>
*
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Tag",//主题
consumerGroup = "Consumer_Tag1_group",//消费组 唯一
selectorType = SelectorType.TAG,//过滤选项类型 默认TAG
selectorExpression = "tag1"//过滤选项 默认* Tag多个时,"tag1 || tag2 || tag3"
)
public class ConsumerTag1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(String message) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerTag1 消费 message: {} ", message);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.7、过滤消息消费,Tag模式
/**
* <p>
* 过滤消息消费,Tag模式
* </p>
*
* @author MrWen
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Tag",//主题
consumerGroup = "Consumer_Tag2_group",//消费组 唯一
selectorType = SelectorType.TAG,//过滤选项类型 默认TAG
selectorExpression = "tag1||tag2"//过滤选项 默认* Tag多个时,"tag1 || tag2 || tag3"
)
public class ConsumerTag2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(String message) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerTag2 消费 message: {} ", message);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.8、事务消息演示
/**
* <p>
* 事务消息演示
* <p>
* 事务消息使用上的限制
* 1:事务消息不支持延时消息和批量消息。
* 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
* 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
* 4:事务性消息可能不止一次被检查或消费。
* 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
* 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
* </p>
*
* @author MrWen
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Transaction",//主题
consumerGroup = "Consumer_Transaction_group"//消费组 唯一
)
public class ConsumerTransaction implements RocketMQListener<OrderPaidEvent>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param orderPaidEvent 接收的消息,订单支付事件
*/
@Override
public void onMessage(OrderPaidEvent orderPaidEvent) {
try {
//模拟业务逻辑处理中...
log.info("ConsumerTransaction 事务消息消费 message: {} ", orderPaidEvent);
TimeUnit.SECONDS.sleep(10);
//模拟出错,触发重试
// int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
2.2.2.3.9、测试
/**
* <p>
* 测试
* </p>
*
*/
@Slf4j
//@Component
@RocketMQMessageListener(topic = "synchronously",//主题
consumerGroup = "synchronously_group",//消费组 唯一
/* 下面都有默认值,可选 */
selectorType = SelectorType.TAG,//过滤选项类型 默认TAG 还有 SQL92
selectorExpression = "*",//过滤选项 默认* Tag多个时,"tag1 || tag2 || tag3"
consumeMode = ConsumeMode.CONCURRENTLY,//消费类型 默认CONCURRENTLY同步 还有有序 ORDERLY 一个队列一个线程
messageModel = MessageModel.CLUSTERING, //消费模式 默认CLUSTERING集群 还有 广播CLUSTERING(接收所有信息)
consumeThreadMax = 64,//最大线程数,默认64
consumeTimeout = 15L//超时时间,以分钟为单位,默认15L
)
public class Consumer implements RocketMQListener<OrderPaidEvent>, RocketMQPushConsumerLifecycleListener {
/**
* 消费者
* 程序报错则进行重试
*
* @param message 接收的消息
*/
@Override
public void onMessage(OrderPaidEvent message) {
log.info("Receive message: {} ThreadName: {}", message, Thread.currentThread().getName());
}
/**
* consumer配置都是通过这个
*
* @param consumer consumer配置
*/
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设最大重试次数,默认16次
//距离上一次重试间隔
//第1次:10s 第2次:30s 第3次:1min 第4次:2min 第5次:3min 第6次:4min 第7次:5min 第8次:6min
//第9次:7min 第10次:8min 第11次:9min 第12次:10min 第13次:20min 第14次:30min 第15次:1h 第16次:2h 16次以后:都是2h
//某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
consumer.setMaxReconsumeTimes(3);
//关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
//时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
}
}
/**
* <p>
* 批量消息实体
* </p>
*
**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchDto implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String message;
}
/**
* <p>
* 订单支付事件
* </p>
*
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderPaidEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String orderId;
private BigDecimal paidMoney;
private String msg;
}
/**
* <p>
* 顺序的步骤
* </p>
*
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderStep {
private long orderId;
private String desc;
}
2.3、SpringBoot整合Dubbo
下载dubbo-spring-boot-starter依赖包
将dubbo-spring-boot-starter
安装到本地仓库
mvn install -Dmaven.skip.test=true
2.3.1、搭建Zookeeper集群
2.3.1.1、准备工作
- 安装JDK
- 将Zookeeper上传到服务器
- 解压Zookeeper,并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg
- 建立
/user/local/zookeeper-cluster
,将解压后的Zookeeper复制到以下三个目录
/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3
-
配置每一个 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183
修改
/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
2.3.1.2、配置集群
-
在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID
-
在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。
集群服务器 IP 列表如下
server.1=192.168.25.140:2881:3881
server.2=192.168.25.140:2882:3882
server.3=192.168.25.140:2883:3883
解释:server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口
2.3.1.3、启动集群
启动集群就是分别启动每个实例。
2.3.2、RPC服务接口
public interface IUserService {
public String sayHello(String name);
}
2.3.3、服务提供者
2.3.3.1、添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!--dubbo-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<!--spring-boot-stater-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--API-->
<dependency>
<groupId>com.angyan.demo</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
2.3.3.2、配置文件
# application.properties
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880
2.3.3.3、启动类
@EnableDubboConfiguration
@SpringBootApplication
public class ProviderBootstrap {
public static void main(String[] args) throws IOException {
SpringApplication.run(ProviderBootstrap.class,args);
}
}
2.3.3.4、服务实现
@Component
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService{
@Override
public String sayHello(String name) {
return "hello:"+name;
}
}
2.3.4、服务消费者
2.3.4.1、添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--dubbo-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--API-->
<dependency>
<groupId>com.angyan.demo</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
2.3.4.2、配置文件
# application.properties
spring.application.name=dubbo-demo-consumer
spring.dubbo.application.name=dubbo-demo-consumer
spring.dubbo.application.id=dubbo-demo-consumer
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
2.3.4.3、启动类
@EnableDubboConfiguration
@SpringBootApplication
public class ConsumerBootstrap {
public static void main(String[] args) {
SpringApplication.run(ConsumerBootstrap.class);
}
}
2.3.4.4、Controller
@RestController
@RequestMapping("/user")
public class UserController {
@Reference
private IUserService userService;
@RequestMapping("/sayHello")
public String sayHello(String name){
return userService.sayHello(name);
}
}
三. 环境搭建
3.1、数据库
3.1.1、优惠券表
Field | Type | Comment |
---|---|---|
coupon_id | bigint(50) NOT NULL | 优惠券ID |
coupon_price | decimal(10,2) NULL | 优惠券金额 |
user_id | bigint(50) NULL | 用户ID |
order_id | bigint(32) NULL | 订单ID |
is_used | int(1) NULL | 是否使用 0未使用 1已使用 |
used_time | timestamp NULL | 使用时间 |
3.1.2、商品表
Field | Type | Comment |
---|---|---|
goods_id | bigint(50) NOT NULL | 主键 |
goods_name | varchar(255) NULL | 商品名称 |
goods_number | int(11) NULL | 商品库存 |
goods_price | decimal(10,2) NULL | 商品价格 |
goods_desc | varchar(255) NULL | 商品描述 |
add_time | timestamp NULL | 添加时间 |
3.1.3、订单表
Field | Type | Comment |
---|---|---|
order_id | bigint(50) NOT NULL | 订单ID |
user_id | bigint(50) NULL | 用户ID |
order_status | int(1) NULL | 订单状态 0未确认 1已确认 2已取消 3无效 4退款 |
pay_status | int(1) NULL | 支付状态 0未支付 1支付中 2已支付 |
shipping_status | int(1) NULL | 发货状态 0未发货 1已发货 2已退货 |
address | varchar(255) NULL | 收货地址 |
consignee | varchar(255) NULL | 收货人 |
goods_id | bigint(50) NULL | 商品ID |
goods_number | int(11) NULL | 商品数量 |
goods_price | decimal(10,2) NULL | 商品价格 |
goods_amount | decimal(10,0) NULL | 商品总价 |
shipping_fee | decimal(10,2) NULL | 运费 |
order_amount | decimal(10,2) NULL | 订单价格 |
coupon_id | bigint(50) NULL | 优惠券ID |
coupon_paid | decimal(10,2) NULL | 优惠券 |
money_paid | decimal(10,2) NULL | 已付金额 |
pay_amount | decimal(10,2) NULL | 支付金额 |
add_time | timestamp NULL | 创建时间 |
confirm_time | timestamp NULL | 订单确认时间 |
pay_time | timestamp NULL | 支付时间 |
3.1.4、订单商品日志表
Field | Type | Comment |
---|---|---|
goods_id | int(11) NOT NULL | 商品ID |
order_id | varchar(32) NOT NULL | 订单ID |
goods_number | int(11) NULL | 库存数量 |
log_time | datetime NULL | 记录时间 |
3.1.5、用户表
Field | Type | Comment |
---|---|---|
user_id | bigint(50) NOT NULL | 用户ID |
user_name | varchar(255) NULL | 用户姓名 |
user_password | varchar(255) NULL | 用户密码 |
user_mobile | varchar(255) NULL | 手机号 |
user_score | int(11) NULL | 积分 |
user_reg_time | timestamp NULL | 注册时间 |
user_money | decimal(10,0) NULL | 用户余额 |
3.1.6、用户余额日志表
Field | Type | Comment |
---|---|---|
user_id | bigint(50) NOT NULL | 用户ID |
order_id | bigint(50) NOT NULL | 订单ID |
money_log_type | int(1) NOT NULL | 日志类型 1订单付款 2 订单退款 |
use_money | decimal(10,2) NULL | 操作金额 |
create_time | timestamp NULL | 日志时间 |
3.1.7、订单支付表
Field | Type | Comment |
---|---|---|
pay_id | bigint(50) NOT NULL | 支付编号 |
order_id | bigint(50) NULL | 订单编号 |
pay_amount | decimal(10,2) NULL | 支付金额 |
is_paid | int(1) NULL | 是否已支付 1否 2是 |
3.1.8、MQ消息生产表
Field | Type | Comment |
---|---|---|
id | varchar(100) NOT NULL | 主键 |
group_name | varchar(100) NULL | 生产者组名 |
msg_topic | varchar(100) NULL | 消息主题 |
msg_tag | varchar(100) NULL | Tag |
msg_key | varchar(100) NULL | Key |
msg_body | varchar(500) NULL | 消息内容 |
msg_status | int(1) NULL | 0:未处理;1:已经处理 |
create_time | timestamp NOT NULL | 记录时间 |
3.1.9、MQ消息消费表
Field | Type | Comment |
---|---|---|
msg_id | varchar(50) NULL | 消息ID |
group_name | varchar(100) NOT NULL | 消费者组名 |
msg_tag | varchar(100) NOT NULL | Tag |
msg_key | varchar(100) NOT NULL | Key |
msg_body | varchar(500) NULL | 消息体 |
consumer_status | int(1) NULL | 0:正在处理;1:处理成功;2:处理失败 |
consumer_times | int(1) NULL | 消费次数 |
consumer_timestamp | timestamp NULL | 消费时间 |
remark | varchar(500) NULL | 备注 |
3.2、项目初始化
shop系统基于Maven进行项目管理
3.2.1、工程浏览
- 父工程:shop-parent
- 订单系统:shop-order-web
- 支付系统:shop-pay-web
- 优惠券服务:shop-coupon-service
- 订单服务:shop-order-service
- 支付服务:shop-pay-service
- 商品服务:shop-goods-service
- 用户服务:shop-user-service
- 实体类:shop-pojo
- 持久层:shop-dao
- 接口层:shop-api
- 工具工程:shop-common
共12个系统
3.2.2、工程关系
3.3、Mybatis逆向工程使用
3.3.1、代码生成
使用Mybatis逆向工程针对数据表生成CURD持久层代码
3.3.2、代码导入
- 将实体类导入到shop-pojo工程
- 在服务层工程中导入对应的Mapper类和对应配置文件
3.4 公共类介绍
-
ID生成器
IDWorker:Twitter雪花算法
-
异常处理类
CustomerException:自定义异常类
CastException:异常抛出类
-
常量类
ShopCode:系统状态类
-
响应实体类
Result:封装响应状态和响应信息
四. 下单业务
4.1、下单基本流程
4.1.1、接口定义
- IOrderService
public interface IOrderService {
/**
* 确认订单
* @param order
* @return Result
*/
Result confirmOrder(TradeOrder order);
}
4.1.2、业务类实现
@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订单
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
//7.返回成功状态
} catch (Exception e) {
//1.确认订单失败,发送消息
//2.返回失败状态
}
}
}
4.1.3、校验订单
private void checkOrder(TradeOrder order) {
//1.校验订单是否存在
if(order==null){
CastException.cast(ShopCode.SHOP_ORDER_INVALID);
}
//2.校验订单中的商品是否存在
TradeGoods goods = goodsService.findOne(order.getGoodsId());
if(goods==null){
CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
}
//3.校验下单用户是否存在
TradeUser user = userService.findOne(order.getUserId());
if(user==null){
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//4.校验商品单价是否合法
if(order.getGoodsPrice().compareTo(goods.getGoodsPrice())!=0){
CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
}
//5.校验订单商品数量是否合法
if(order.getGoodsNumber()>=goods.getGoodsNumber()){
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
log.info("校验订单通过");
}
4.1.4、生成预订单
private Long savePreOrder(TradeOrder order) {
//1.设置订单状态为不可见
order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
//2.订单ID
order.setOrderId(idWorker.nextId());
//核算运费是否正确
BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
if (order.getShippingFee().compareTo(shippingFee) != 0) {
CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
}
//3.计算订单总价格是否正确
BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
orderAmount.add(shippingFee);
if (orderAmount.compareTo(order.getOrderAmount()) != 0) {
CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
}
//4.判断优惠券信息是否合法
Long couponId = order.getCouponId();
if (couponId != null) {
TradeCoupon coupon = couponService.findOne(couponId);
//优惠券不存在
if (coupon == null) {
CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
}
//优惠券已经使用
if ((ShopCode.SHOP_COUPON_ISUSED.getCode().toString())
.equals(coupon.getIsUsed().toString())) {
CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
}
order.setCouponPaid(coupon.getCouponPrice());
} else {
order.setCouponPaid(BigDecimal.ZERO);
}
//5.判断余额是否正确
BigDecimal moneyPaid = order.getMoneyPaid();
if (moneyPaid != null) {
//比较余额是否大于0
int r = order.getMoneyPaid().compareTo(BigDecimal.ZERO);
//余额小于0
if (r == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);
}
//余额大于0
if (r == 1) {
//查询用户信息
TradeUser user = userService.findOne(order.getUserId());
if (user == null) {
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//比较余额是否大于用户账户余额
if (user.getUserMoney().compareTo(order.getMoneyPaid().longValue()) == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
}
order.setMoneyPaid(order.getMoneyPaid());
}
} else {
order.setMoneyPaid(BigDecimal.ZERO);
}
//计算订单支付总价
order.setPayAmount(orderAmount.subtract(order.getCouponPaid())
.subtract(order.getMoneyPaid()));
//设置订单添加时间
order.setAddTime(new Date());
//保存预订单
int r = orderMapper.insert(order);
if (ShopCode.SHOP_SUCCESS.getCode() != r) {
CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR);
}
log.info("订单:["+order.getOrderId()+"]预订单生成成功");
return order.getOrderId();
}
4.1.5、扣减库存
- 通过dubbo调用商品服务完成扣减库存
private void reduceGoodsNum(TradeOrder order) {
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setGoodsId(order.getGoodsId());
goodsNumberLog.setOrderId(order.getOrderId());
goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
Result result = goodsService.reduceGoodsNum(goodsNumberLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
}
log.info("订单:["+order.getOrderId()+"]扣减库存["+order.getGoodsNumber()+"个]成功");
}
- 商品服务GoodsService扣减库存
@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
if (goodsNumberLog == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getOrderId() == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getGoodsNumber().intValue() <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
if(goods.getGoodsNumber()<goodsNumberLog.getGoodsNumber()){
//库存不足
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
//减库存
goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
goodsMapper.updateByPrimaryKey(goods);
//记录库存操作日志
goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
4.1.6、扣减优惠券
- 通过dubbo完成扣减优惠券
private void changeCoponStatus(TradeOrder order) {
//判断用户是否使用优惠券
if (!StringUtils.isEmpty(order.getCouponId())) {
//封装优惠券对象
TradeCoupon coupon = couponService.findOne(order.getCouponId());
coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
coupon.setUsedTime(new Date());
coupon.setOrderId(order.getOrderId());
Result result = couponService.changeCouponStatus(coupon);
//判断执行结果
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
//优惠券使用失败
CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
}
log.info("订单:["+order.getOrderId()+"]使用扣减优惠券["+coupon.getCouponPrice()+"元]成功");
}
}
- 优惠券服务CouponService更改优惠券状态
@Override
public Result changeCouponStatus(TradeCoupon coupon) {
try {
//判断请求参数是否合法
if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//更新优惠券状态为已使用
couponMapper.updateByPrimaryKey(coupon);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
4.1.7、扣减用户余额
- 通过用户服务完成扣减余额
private void reduceMoneyPaid(TradeOrder order) {
//判断订单中使用的余额是否合法
if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setOrderId(order.getOrderId());
userMoneyLog.setUserId(order.getUserId());
userMoneyLog.setUseMoney(order.getMoneyPaid());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
//扣减余额
Result result = userService.changeUserMoney(userMoneyLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
}
log.info("订单:["+order.getOrderId()+"扣减余额["+order.getMoneyPaid()+"元]成功]");
}
}
- 用户服务UserService,更新余额
@Override
public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
//判断请求参数是否合法
if (userMoneyLog == null
|| userMoneyLog.getUserId() == null
|| userMoneyLog.getUseMoney() == null
|| userMoneyLog.getOrderId() == null
|| userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//查询该订单是否存在付款记录
TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
userMoneyLogExample.createCriteria()
.andUserIdEqualTo(userMoneyLog.getUserId())
.andOrderIdEqualTo(userMoneyLog.getOrderId());
int count = userMoneyLogMapper.countByExample(userMoneyLogExample);
TradeUser tradeUser = new TradeUser();
tradeUser.setUserId(userMoneyLog.getUserId());
tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());
//判断余额操作行为
//【付款操作】
if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {
//订单已经付款,则抛异常
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
//用户账户扣减余额
userMapper.reduceUserMoney(tradeUser);
}
//【退款操作】
if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {
//如果订单未付款,则不能退款,抛异常
if (count == 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
}
//防止多次退款
userMoneyLogExample = new TradeUserMoneyLogExample();
userMoneyLogExample.createCriteria()
.andUserIdEqualTo(userMoneyLog.getUserId())
.andOrderIdEqualTo(userMoneyLog.getOrderId())
.andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
count = userMoneyLogMapper.countByExample(userMoneyLogExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
}
//用户账户添加余额
userMapper.addUserMoney(tradeUser);
}
//记录用户使用余额日志
userMoneyLog.setCreateTime(new Date());
userMoneyLogMapper.insert(userMoneyLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
4.1.8、确认订单
private void updateOrderStatus(TradeOrder order) {
order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
order.setConfirmTime(new Date());
int r = orderMapper.updateByPrimaryKey(order);
if (r <= 0) {
CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
}
log.info("订单:["+order.getOrderId()+"]状态修改成功");
}
4.1.9、小结
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
checkOrder(order);
//2.生成预订单
Long orderId = savePreOrder(order);
order.setOrderId(orderId);
try {
//3.扣减库存
reduceGoodsNum(order);
//4.扣减优惠券
changeCoponStatus(order);
//5.使用余额
reduceMoneyPaid(order);
//6.确认订单
updateOrderStatus(order);
log.info("订单:["+orderId+"]确认成功");
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
//确认订单失败,发送消息
...
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
4.2、失败补偿机制
4.2.1、消息发送方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
- 注入模板类和属性值信息
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${mq.order.topic}")
private String topic;
@Value("${mq.order.tag.cancel}")
private String cancelTag;
- 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
} catch (Exception e) {
//确认订单失败,发送消息
CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
cancelOrderMQ.setOrderId(order.getOrderId());
cancelOrderMQ.setCouponId(order.getCouponId());
cancelOrderMQ.setGoodsId(order.getGoodsId());
cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
cancelOrderMQ.setUserId(order.getUserId());
cancelOrderMQ.setUserMoney(order.getMoneyPaid());
try {
sendMessage(topic,
cancelTag,
cancelOrderMQ.getOrderId().toString(),
JSON.toJSONString(cancelOrderMQ));
} catch (Exception e1) {
e1.printStackTrace();
CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
}
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
//判断Topic是否为空
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
//判断消息内容是否为空
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
//消息体
Message message = new Message(topic, tags, keys, body.getBytes());
//发送消息
rocketMQTemplate.getProducer().send(message);
}
4.2.2、消费接收方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
- 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
consumerGroup = "${mq.order.consumer.group.name}",
messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{
@Override
public void onMessage(MessageExt messageExt) {
...
}
}
4.2.2.1、回退库存
- 流程分析
- 消息消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Value("${mq.order.consumer.group.name}")
private String groupName;
@Autowired
private TradeGoodsMapper goodsMapper;
@Autowired
private TradeMqConsumerLogMapper mqConsumerLogMapper;
@Autowired
private TradeGoodsNumberLogMapper goodsNumberLogMapper;
@Override
public void onMessage(MessageExt messageExt) {
String msgId=null;
String tags=null;
String keys=null;
String body=null;
try {
//1. 解析消息内容
msgId = messageExt.getMsgId();
tags= messageExt.getTags();
keys= messageExt.getKeys();
body= new String(messageExt.getBody(),"UTF-8");
log.info("接受消息成功");
//2. 查询消息消费记录
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if(mqConsumerLog!=null){
//3. 判断如果消费过...
//3.1 获得消息处理状态
Integer status = mqConsumerLog.getConsumerStatus();
//处理过...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",已经处理过");
return;
}
//正在处理...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",正在处理");
return;
}
//处理失败
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
//获得消息处理次数
Integer times = mqConsumerLog.getConsumerTimes();
if(times>3){
log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
return;
}
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
//使用数据库乐观锁更新
TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
criteria.andGroupNameEqualTo(groupName);
criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
if(r<=0){
//未修改成功,其他线程并发修改
log.info("并发修改,稍后处理");
}
}
}else{
//4. 判断如果没有消费过...
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(0);
//将消息处理信息添加到数据库
mqConsumerLogMapper.insert(mqConsumerLog);
}
//5. 回退库存
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
Long goodsId = mqEntity.getGoodsId();
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
goodsMapper.updateByPrimaryKey(goods);
//记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setOrderId(mqEntity.getOrderId());
goodsNumberLog.setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
//6. 将消息的处理状态改为成功
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
mqConsumerLog.setConsumerTimestamp(new Date());
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
log.info("回退库存成功");
} catch (Exception e) {
e.printStackTrace();
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if(mqConsumerLog==null){
//数据库未有记录
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(1);
mqConsumerLogMapper.insert(mqConsumerLog);
}else{
mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
}
}
}
}
4.2.2.2、回退优惠券
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Autowired
private TradeCouponMapper couponMapper;
@Override
public void onMessage(MessageExt message) {
try {
//1. 解析消息内容
String body = new String(message.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
//2. 查询优惠券信息
TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
//3.更改优惠券状态
coupon.setUsedTime(null);
coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
coupon.setOrderId(null);
couponMapper.updateByPrimaryKey(coupon);
log.info("回退优惠券成功");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("回退优惠券失败");
}
}
}
4.2.2.3、回退余额
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Autowired
private IUserService userService;
@Override
public void onMessage(MessageExt messageExt) {
try {
//1.解析消息
String body = new String(messageExt.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){
//2.调用业务层,进行余额修改
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setUseMoney(mqEntity.getUserMoney());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
userMoneyLog.setUserId(mqEntity.getUserId());
userMoneyLog.setOrderId(mqEntity.getOrderId());
userService.updateMoneyPaid(userMoneyLog);
log.info("余额回退成功");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("余额回退失败");
}
}
}
4.2.2.4、取消订单
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
log.info("CancelOrderProcessor receive message:"+messageExt);
CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);
TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
orderService.changeOrderStatus(order);
log.info("订单:["+order.getOrderId()+"]状态设置为取消");
return order;
}
4.3、测试
4.3.1、准备测试环境
- 用户数据
- 商品数据
- 优惠券数据
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ShopOrderServiceApplication.class)
public class OrderTest {
@Autowired
private IOrderService orderService;
}
4.3.2、测试下单成功流程
@Test
public void add(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;
TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setGoodsNumber(1);
order.setAddress("北京");
order.setGoodsPrice(new BigDecimal("5000"));
order.setOrderAmount(new BigDecimal("5000"));
order.setMoneyPaid(new BigDecimal("100"));
order.setCouponId(couponId);
order.setShippingFee(new BigDecimal(0));
orderService.confirmOrder(order);
}
执行完毕后,查看数据库中用户的余额、优惠券数据,及订单的状态数据
4.3.3、测试下单失败流程
代码同上。
执行完毕后,查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。
五. 支付业务
5.1、创建支付订单
public Result createPayment(TradePay tradePay) {
//查询订单支付状态
try {
TradePayExample payExample = new TradePayExample();
TradePayExample.Criteria criteria = payExample.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int count = tradePayMapper.countByExample(payExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
long payId = idWorker.nextId();
tradePay.setPayId(payId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
tradePayMapper.insert(tradePay);
log.info("创建支付订单成功:" + payId);
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
5.2、支付回调
5.2.1、流程分析
5.2.2、代码实现
- 创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Autowired
private ThreadPoolTaskExecutor executorService;
@Override
public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
log.info("支付回调");
//1. 判断用户支付状态
if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
//2. 更新支付订单状态为已支付
Long payId = tradePay.getPayId();
TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
//判断支付订单是否存在
if(pay==null){
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int r = tradePayMapper.updateByPrimaryKeySelective(pay);
log.info("支付订单状态改为已支付");
if(r==1){
//3. 创建支付成功的消息
TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
tradeMqProducerTemp.setGroupName(groupName);
tradeMqProducerTemp.setMsgTopic(topic);
tradeMqProducerTemp.setMsgTag(tag);
tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
tradeMqProducerTemp.setCreateTime(new Date());
//4. 将消息持久化数据库
mqProducerTempMapper.insert(tradeMqProducerTemp);
log.info("将支付成功消息持久化到数据库");
//在线程池中进行处理
threadPoolTaskExecutor.submit(new Runnable() {
@Override
public void run() {
//5. 发送消息到MQ
SendResult result = null;
try {
result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
} catch (Exception e) {
e.printStackTrace();
}
if(result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("消息发送成功");
//6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
log.info("持久化到数据库的消息删除");
}
}
});
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}else{
CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
}
}
5.2.3、处理消息
支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理
- 订单服务修改订单状态为已支付
- 日志服务记录支付日志
- 用户服务负责给用户增加积分
以下用订单服务为例说明消息的处理情况
5.2.3.1、配置RocketMQ属性值
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
5.2.3.2、消费消息
- 在订单服务中,配置公共的消息处理类
public class BaseConsumer {
public TradeOrder handleMessage(IOrderService
orderService,
MessageExt messageExt,Integer code) throws Exception {
//解析消息内容
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
//查询
TradeOrder order = orderService.findOne(orderMq.getOrderId());
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
}
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
}
orderService.changeOrderStatus(order);
return order;
}
}
- 接受订单支付成功消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
consumerGroup = "${mq.pay.consumer.group.name}")
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("CancelOrderProcessor receive message:"+messageExt);
TradeOrder order = handleMessage(orderService,
messageExt,
ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
log.info("订单:["+order.getOrderId()+"]支付成功");
} catch (Exception e) {
e.printStackTrace();
log.error("订单支付失败");
}
}
}
六、整体联调
通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作
6.1、准备工作
6.1.1、配置RestTemplate类
@Configuration
public class RestTemplateConfig {
@Bean
@ConditionalOnMissingBean({ RestOperations.class, RestTemplate.class })
public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
RestTemplate restTemplate = new RestTemplate(factory);
// 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")
List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();
Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();
while (iterator.hasNext()) {
HttpMessageConverter<?> converter = iterator.next();
if (converter instanceof StringHttpMessageConverter) {
iterator.remove();
}
}
messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));
return restTemplate;
}
@Bean
@ConditionalOnMissingBean({ClientHttpRequestFactory.class})
public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// ms
factory.setReadTimeout(15000);
// ms
factory.setConnectTimeout(15000);
return factory;
}
}
6.1.2、配置请求地址
- 订单系统
server.host=http://localhost
server.servlet.path=/order-web
server.port=8080
shop.order.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.order.confirm=/order/confirm
- 支付系统
server.host=http://localhost
server.servlet.path=/pay-web
server.port=9090
shop.pay.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callbackPayment
6.2、下单测试
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopOrderWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class OrderTest {
@Autowired
private RestTemplate restTemplate;
@Value("${shop.order.baseURI}")
private String baseURI;
@Value("${shop.order.confirm}")
private String confirmOrderPath;
@Autowired
private IDWorker idWorker;
/**
* 下单
*/
@Test
public void confirmOrder(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;
TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setGoodsNumber(1);
order.setAddress("北京");
order.setGoodsPrice(new BigDecimal("5000"));
order.setOrderAmount(new BigDecimal("5000"));
order.setMoneyPaid(new BigDecimal("100"));
order.setCouponId(couponId);
order.setShippingFee(new BigDecimal(0));
Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();
System.out.println(result);
}
}
6.3、支付测试
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopPayWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class PayTest {
@Autowired
private RestTemplate restTemplate;
@Value("${shop.pay.baseURI}")
private String baseURI;
@Value("${shop.pay.createPayment}")
private String createPaymentPath;
@Value("${shop.pay.callbackPayment}")
private String callbackPaymentPath;
@Autowired
private IDWorker idWorker;
/**
* 创建支付订单
*/
@Test
public void createPayment(){
Long orderId = 346321587315814400L;
TradePay pay = new TradePay();
pay.setOrderId(orderId);
pay.setPayAmount(new BigDecimal(4800));
Result result = restTemplate.postForEntity(baseURI + createPaymentPath, pay, Result.class).getBody();
System.out.println(result);
}
/**
* 支付回调
*/
@Test
public void callbackPayment(){
Long payId = 346321891507720192L;
TradePay pay = new TradePay();
pay.setPayId(payId);
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
Result result = restTemplate.postForEntity(baseURI + callbackPaymentPath, pay, Result.class).getBody();
System.out.println(result);
}
}