RocketMQ 实战:模拟电商网站场景综合案例(十)
一、RocketMQ 实战:模拟电商网站场景综合案例-- 创建支付订单流程
1、支付订单流程
2、在 shop-pay-service 工程模块中,创建 启动 类 PayServiceApplication.java
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
*
* 2024-6-11 创建 启动 类 PayServiceApplication.java
*/
package com.itheima.shop;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {
public static void main(String[] args) {
SpringApplication.run(PayServiceApplication.class,args);
}
}
3、在 shop-pay-service 工程模块中,创建 支付订单 实现类 PayServiceImpl.java
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
*
* 2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
*/
package com.itheima.shop.service.impl;
import com.alibaba.dubbo.config.annotation.Service;
import com.itheima.api.IPayService;
import com.itheima.shop.pojo.TradePay;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{
//1.判断订单支付状态
//2.设置订单的状态为未支付
//3.保存支付订单
}
二、RocketMQ 实战:模拟电商网站场景综合案例-- 创建支付订单实现
1、在 shop-api 工程模块中,创建 支付订单 接口类 IPayService.java
/**
* shop\shop-api\src\main\java\com\itheima\api\IPayService.java
*
* 2024-6-11 创建 支付订单 接口类 IPayService.java
*/
package com.itheima.api;
import com.itheima.entity.Result;
import com.itheima.shop.pojo.TradePay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.exception.RemotingException;
public interface IPayService {
public Result createPayment(TradePay tradePay);
public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
}
2、在 shop-pay-service 工程模块中,启动类 PayServiceApplication.java 中,添加 IDWorker
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
*
* 2024-6-11 创建 启动 类 PayServiceApplication.java
*/
package com.itheima.shop;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.itheima.utils.IDWorker;
@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {
public static void main(String[] args) {
SpringApplication.run(PayServiceApplication.class,args);
}
@Bean
public IDWorker getBean(){
return new IDWorker(1,2);
}
}
3、在 shop-pay-service 工程模块中,实现类 PayServiceImpl.java 中,完成 创建支付订单 方法 代码。
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
*
* 2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
*/
package com.itheima.shop.service.impl;
import com.alibaba.dubbo.config.annotation.Service;
import com.itheima.api.IPayService;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import org.springframework.stereotype.Component;
import com.itheima.utils.IDWorker;
import com.itheima.entity.Result;
import com.itheima.constant.ShopCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{
@Autowired
private TradePayMapper tradePayMapper;
@Autowired
private IDWorker idWorker;
@Override //创建支付订单
public Result createPayment(TradePay tradePay) {
if(tradePay==null || tradePay.getOrderId()==null){
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//1.判断订单支付状态
TradePayExample example = new TradePayExample();
TradePayExample.Criteria criteria = example.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
int r = tradePayMapper.countByExample(example);
if(r>0){
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
//2.设置订单的状态为未支付
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
//3.保存支付订单
tradePay.setPayId(idWorker.nextId());
tradePayMapper.insert(tradePay);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
}
三、RocketMQ 实战:模拟电商网站场景综合案例-- 支付回调处理流程分析
四、RocketMQ 实战:模拟电商网站场景综合案例-- 支付回调代码实现
1、支付回调代码实现
public Result callbackPayment(TradePay tradePay) {
if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {
tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());
if (tradePay == null) {
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);
//更新成功代表支付成功
if (i == 1) {
TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
mqProducerTemp.setGroupName("payProducerGroup");
mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
mqProducerTemp.setMsgTag(topic);
mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
mqProducerTemp.setCreateTime(new Date());
mqProducerTempMapper.insert(mqProducerTemp);
TradePay finalTradePay = tradePay;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic,
tag,
finalTradePay.getPayId(),
JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
2、在 shop-pay-service 工程模块中,订单支付 实现类 PayServiceImpl.java 中,创建 支付回调 方法。
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
*
* 2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
*/
package com.itheima.shop.service.impl;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Date;
@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{
@Autowired
private TradePayMapper tradePayMapper;
@Autowired
private TradeMqProducerTempMapper mqProducerTempMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private IDWorker idWorker;
@Value("${rocketmq.producer.group}")
private String groupName;
@Value("${mq.topic}")
private String topic;
@Value("${mq.pay.tag}")
private String tag;
@Override //创建支付订单
public Result createPayment(TradePay tradePay) {
if(tradePay==null || tradePay.getOrderId()==null){
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//1.判断订单支付状态
TradePayExample example = new TradePayExample();
TradePayExample.Criteria criteria = example.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
int r = tradePayMapper.countByExample(example);
if(r>0){
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
//2.设置订单的状态为未支付
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
//3.保存支付订单
tradePay.setPayId(idWorker.nextId());
tradePayMapper.insert(tradePay);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
@Override //支付回调方法
public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
log.info("支付回调");
//1. 判断用户支付状态
if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
//2. 更新支付订单状态为已支付
Long payId = tradePay.getPayId();
TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
//判断支付订单是否存在
if(pay==null){
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int r = tradePayMapper.updateByPrimaryKeySelective(pay);
log.info("支付订单状态改为已支付");
if(r==1){
//3. 创建支付成功的消息
TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
tradeMqProducerTemp.setGroupName(groupName);
tradeMqProducerTemp.setMsgTopic(topic);
tradeMqProducerTemp.setMsgTag(tag);
tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
tradeMqProducerTemp.setCreateTime(new Date());
//4. 将消息持久化数据库
mqProducerTempMapper.insert(tradeMqProducerTemp);
log.info("将支付成功消息持久化到数据库");
//5. 发送消息到MQ
SendResult result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
if(result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("消息发送成功");
//6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
log.info("持久化到数据库的消息删除");
}
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}else{
CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
}
}
/**
* 发送支付成功消息
* @param topic
* @param tag
* @param key
* @param body
*/
private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if(StringUtils.isEmpty(topic)){
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
if(StringUtils.isEmpty(body)){
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
Message message = new Message(topic,tag,key,body.getBytes());
SendResult sendResult = rocketMQTemplate.getProducer().send(message);
return sendResult;
}
}
五、RocketMQ 实战:模拟电商网站场景综合案例-- 线程池优化消息发送
1、线程池优化消息发送逻辑:创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
2、线程池优化消息发送逻辑: 使用线程池
@Autowired
private ThreadPoolTaskExecutor executorService;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
3、在 shop-pay-service 工程模块中,启动 类 PayServiceApplication.java 添加 线程池优化消息发送。
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
*
* 2024-6-11 创建 启动 类 PayServiceApplication.java
*/
package com.itheima.shop;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import com.itheima.utils.IDWorker;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {
public static void main(String[] args) {
SpringApplication.run(PayServiceApplication.class,args);
}
@Bean
public IDWorker getBean(){
return new IDWorker(1,2);
}
@Bean //添加 线程池优化消息发送。
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
4、在 shop-pay-service 工程模块中,支付订单 实现类 PayServiceImpl.java 添加 线程池优化消息发送。
/**
* shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
*
* 2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
*/
package com.itheima.shop.service.impl;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Date;
@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{
@Autowired
private TradePayMapper tradePayMapper;
@Autowired
private TradeMqProducerTempMapper mqProducerTempMapper;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private IDWorker idWorker;
@Value("${rocketmq.producer.group}")
private String groupName;
@Value("${mq.topic}")
private String topic;
@Value("${mq.pay.tag}")
private String tag;
@Override //创建支付订单
public Result createPayment(TradePay tradePay) {
if(tradePay==null || tradePay.getOrderId()==null){
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//1.判断订单支付状态
TradePayExample example = new TradePayExample();
TradePayExample.Criteria criteria = example.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
int r = tradePayMapper.countByExample(example);
if(r>0){
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
//2.设置订单的状态为未支付
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
//3.保存支付订单
tradePay.setPayId(idWorker.nextId());
tradePayMapper.insert(tradePay);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
@Override //支付回调方法
public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
log.info("支付回调");
//1. 判断用户支付状态
if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
//2. 更新支付订单状态为已支付
Long payId = tradePay.getPayId();
TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
//判断支付订单是否存在
if(pay==null){
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int r = tradePayMapper.updateByPrimaryKeySelective(pay);
log.info("支付订单状态改为已支付");
if(r==1){
//3. 创建支付成功的消息
TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
tradeMqProducerTemp.setGroupName(groupName);
tradeMqProducerTemp.setMsgTopic(topic);
tradeMqProducerTemp.setMsgTag(tag);
tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
tradeMqProducerTemp.setCreateTime(new Date());
//4. 将消息持久化数据库
mqProducerTempMapper.insert(tradeMqProducerTemp);
log.info("将支付成功消息持久化到数据库");
//在线程池中进行处理
threadPoolTaskExecutor.submit(new Runnable() {
@Override
public void run() {
//5. 发送消息到MQ
SendResult result = null;
try {
result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
} catch (Exception e) {
e.printStackTrace();
}
if(result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("消息发送成功");
//6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
log.info("持久化到数据库的消息删除");
}
}
});
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}else{
CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
}
}
/**
* 发送支付成功消息
* @param topic
* @param tag
* @param key
* @param body
*/
private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if(StringUtils.isEmpty(topic)){
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
if(StringUtils.isEmpty(body)){
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
Message message = new Message(topic,tag,key,body.getBytes());
SendResult sendResult = rocketMQTemplate.getProducer().send(message);
return sendResult;
}
}
六、RocketMQ 实战:模拟电商网站场景综合案例-- 订单微服务处理支付成功消息
1、订单微服务处理支付成功消息
支付成功后,支付服务 payService 发送 MQ 消息,订单服务、用户服务、日志服务需要订阅消息进行处理。
1) 订单服务修改订单状态为已支付
2) 日志服务记录支付日志
3) 用户服务负责给用户增加积分
2、以下用订单服务为例说明消息的处理情况
2.1、配置 RocketMQ 属性值
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
2.2、消费消息:在订单服务中,配置公共的消息处理类
public class BaseConsumer {
public TradeOrder handleMessage(IOrderService
orderService,
MessageExt messageExt,Integer code) throws Exception {
//解析消息内容
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
//查询
TradeOrder order = orderService.findOne(orderMq.getOrderId());
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
}
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
}
orderService.changeOrderStatus(order);
return order;
}
}
2.3、消费消息:接受订单支付成功消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
consumerGroup = "${mq.pay.consumer.group.name}")
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("CancelOrderProcessor receive message:"+messageExt);
TradeOrder order = handleMessage(orderService,
messageExt,
ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
log.info("订单:["+order.getOrderId()+"]支付成功");
} catch (Exception e) {
e.printStackTrace();
log.error("订单支付失败");
}
}
}
3、在 shop-pay-service 工程模块中,application.properties 配置文件中,配置 RocketMQ 属性值,保持 topic 属性值要一致。
# shop\shop-pay-service\src\main\resources\application.properties
# dubbo
spring.application.name=dubbo-pay-provider
spring.dubbo.application.id=dubbo-pay-provider
spring.dubbo.application.name=dubbo-pay-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20885
# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root
# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml
# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=payProducerGroup
# 配置 RocketMQ 属性值,保持 topic 属性值要一致。
mq.topic=payTopic
mq.pay.tag=paid
4、在 shop-order-service 工程模块中,application.properties 配置文件中,配置 RocketMQ 属性值,保持 topic 属性值要一致。
# shop\shop-order-service\src\main\resources\application.properties
# dubbo
spring.application.name=dubbo-order-provider
spring.dubbo.application.id=dubbo-order-provider
spring.dubbo.application.name=dubbo-order-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20884
# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root
# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml
# RocketMQ
# 下单失败消息发送组
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.cancel=order_cancel
# 配置 RocketMQ 属性值,保持 topic 属性值要一致。
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
5、在 shop-order-service 工程模块中,创建 订单微服务处理支付成功消息 的类 PaymentListener.java
/**
* shop\shop-order-service\src\main\java\com\itheima\shop\mq\PaymentListener.java
*
* 2024-6-12 创建 订单微服务处理支付成功消息 的类 PaymentListener.java
*/
package com.itheima.shop.mq;
import com.alibaba.fastjson.JSON;
import com.itheima.constant.ShopCode;
import com.itheima.shop.mapper.TradeOrderMapper;
import com.itheima.shop.pojo.TradeOrder;
import com.itheima.shop.pojo.TradePay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",consumerGroup = "${mq.pay.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class PaymentListener implements RocketMQListener<MessageExt>{
@Autowired
private TradeOrderMapper orderMapper;
@Override
public void onMessage(MessageExt messageExt) {
log.info("接收到支付成功消息");
try {
//1.解析消息内容
String body = new String(messageExt.getBody(),"UTF-8");
TradePay tradePay = JSON.parseObject(body,TradePay.class);
//2.根据订单ID查询订单对象
TradeOrder tradeOrder = orderMapper.selectByPrimaryKey(tradePay.getOrderId());
//3.更改订单支付状态为已支付
tradeOrder.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
//4.更新订单数据到数据库
orderMapper.updateByPrimaryKey(tradeOrder);
log.info("更改订单支付状态为已支付");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
七、RocketMQ 实战:模拟电商网站场景综合案例-- 支付业务服务端测试
1、在 shop-pay-service 工程模块中,创建 测试类 PayServiceTest.java
/**
* shop\shop-pay-service\src\test\java\com\itheima\test\PayServiceTest.java
*
* 2024-6-12 创建 测试类 PayServiceTest.java
*/
package com.itheima.test;
import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.shop.PayServiceApplication;
import com.itheima.shop.pojo.TradePay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.math.BigDecimal;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PayServiceApplication.class)
public class PayServiceTest {
@Autowired
private IPayService payService;
@Test
public void createPayment(){
long orderId = 351526299216515072L; //状态为1已确认支付的订单ID
TradePay tradePay = new TradePay();
tradePay.setOrderId(orderId);
tradePay.setPayAmount(new BigDecimal(880)); //支付金额=总金额-优惠券-余额
payService.createPayment(tradePay);
}
@Test
public void callbackPayment() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, IOException {
long payId = 352516176372441088L; //支付定单的ID
long orderId = 351526299216515072L; //定单的ID
TradePay tradePay = new TradePay();
tradePay.setPayId(payId);
tradePay.setOrderId(orderId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode()); //支付成功的状态
payService.callbackPayment(tradePay);
System.in.read(); //让主线程不要停止,以让子线程有时间发送消息。
}
}
2、启动各个微服务,进行测试。
上一节关联链接请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(九)
环境搭建:数据库表结构介绍–项目工程初始化 查看 请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(三)
mybatis 逆向工程 生成 POJO 类 源文件 和 mapper 映射文件 查看 请点击:
RocketMQ 实战:模拟电商网站场景综合案例(四)