本文将深入探讨高并发场景下,分布式事务处理的方案。随着互联网的快速发展,对系统性能和稳定性的需求也日益增长,尤其在高并发场景下,分布式事务成为重中之重。在本文中,我将分享我对分布式事务的理论理解,并结合个别典型业务应用场景,最终通过代码实现来展示解决方案。希望通过这篇博客能够对读者在面对类似挑战时提供一些帮助和启发。
分布式事务基础理论
什么是事务
事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。分为两种:1)一个是本地事务:本地事物其实可以认为是数据库提供的事务机;2)一个是分布式事务;
什么是分布式事务
指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用。分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
产生的原因
1)业务发展,数据库的拆分-分库分表;
2)SOA和微服务架构的使用;
3)多个微服务之间调用异常:网络异常、请求超时、数据库异常、程序宕机等;
常见分布式事务解决方案
1)2PC 和 3PC:两/三阶段提交, 基于XA协议;
2)TCC:Try、Confirm、Cancel;
3)事务消息:最大努力通知型;
分布式事务分类
1)刚性事务:遵循ACID,加锁概念,事务性比较强,强一致性;
2)柔性事务:遵循BASE理论,最终一致性;
分布式事务框架
1)TX-LCN:支持2PC、TCC等多种模式,更新慢(个人感觉处于停滞状态);
github:https://github.com/codingapi/tx-lcn
2)Seata(免费版):支持 AT、TCC、SAGA 和 XA 多种模式,背靠阿里,专门团队推广;
github:https://github.com/seata/seata
GTS(商业版): 阿里云商业化产品
官网:https://www.aliyun.com/aliware/txc
3)RocketMq:自带事务消息解决分布式事务
github:https://github.com/apache/rocketmq
最终一致性
什么是Base理论
CAP 中的一致性和可用性进行一个权衡的结果,核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性, 来自 ebay 的架构师提出。
基本可用(Basically Available)
假设系统,出现了不可预知的故障,但还是能用, 可能会有性能或者功能上的影响,比如RT是10ms,变成50ms。
软状态(Soft state)
允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。
最终一致性(Eventually consistent)
系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问最终都能够获取到最新的值。
关于数据一致性
1)强一致:操作后的能立马一致且可以访问;
2)弱一致:容忍部分或者全部访问不到;
3)最终一致:弱一致性经过多一段时间后,都一致且正常;
CAP权衡的结果
事务消息
消息队列提供类似Open XA的分布式事务功能,通过消息队列事务消息能达到分布式事务的最终一致;
半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了消息队列服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
交互图
(来源rocketmq官方文档)
目前较为主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事务消息,如果其他队列需要事务消息,可以开发个消息服务,自行实现半消息和回查功能。事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性,同时将传统的大事务可以被拆分为小事务,能提升效率,不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。
缺点:不能实时保证数据一致性,极端情况下需要人工补偿,比如 假如生产者成功处理本地业务,消费者始终消费不成功。
高并发下分布式事务案例
高并发场景下如果通过加锁这种强一致性分布式事务来保证数据一致性,那么其性能将大大降低,这种强一致性的事务更适合于常规的管理后台和并发量不高的场景。这里以短链创建,流量包扣减的业务场景为例,设计其高并发分布式事务可行性方案,其方案理论正式借助于前面所提到的Base理论,确保其最终一致性。
应用场景
短链创建失败,但是流量包已经扣减,怎么解决分布式事务问题?
解决方案 1 :
1)流量包服务扣减库存前保存一个task任务,记录扣减的流量包;
2)使用定时任务定时扫描task任务表,允许一段时间不同步,但需确保最终一致。如果一段时间过后检查发现短链未创建,则回滚流量包;
解决方案 2:
采用延迟队列方式,调用扣减流量包服务,在执行扣减前先将扣减消息放入task任务,并发送一个延迟消息,待一定时间后回查短链写入状态,如写入异常则回滚当天流量包。
解决方案 3:
方案1与方案2的结合,分布式调度为兜底方案。
代码实现
流量包锁定任务表
CREATE TABLE `traffic_task` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`account_no` bigint DEFAULT NULL,
`traffic_id` bigint DEFAULT NULL,
`use_times` int DEFAULT NULL,
`lock_state` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '锁定状态锁定LOCK 完成FINISH-取消CANCEL',
`biz_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '唯一标识',
`gmt_create` datetime DEFAULT CURRENT_TIMESTAMP,
`gmt_modified` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_biz_id` (`biz_id`) USING BTREE,
KEY `idx_release` (`account_no`,`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Lock枚举类
public enum TaskStateEnum {
/**
* 锁定
*/
LOCK,
/**
* 完成
*/
FINISH,
/**
* 取消,释放库存
*/
CANCEL;
}
流量包任务相关Manage
@Component
@Slf4j
public class TrafficTaskManagerImpl implements TrafficTaskManager {
@Autowired
private TrafficTaskMapper trafficTaskMapper;
@Override
public int add(TrafficTaskDO trafficTaskDO) {
return trafficTaskMapper.insert(trafficTaskDO);
}
@Override
public TrafficTaskDO findByIdAndAccountNo(Long id, Long accountNo) {
TrafficTaskDO taskDO = trafficTaskMapper.selectOne(new QueryWrapper<TrafficTaskDO>()
.eq("id", id).eq("account_no", accountNo));
return taskDO;
}
@Override
public int deleteByIdAndAccountNo(Long id, Long accountNo) {
return trafficTaskMapper.delete(new QueryWrapper<TrafficTaskDO>()
.eq("id", id).eq("account_no", accountNo));
}
}
死信队列相关配置
//================流量包扣减,创建短链死信队列配置==================================
// 发送锁定流量包消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue 延迟队列,不能被监听消费
/**
* 第一个队列延迟队列,
*/
private String trafficReleaseDelayQueue = "traffic.release.delay.queue";
/**
* 第一个队列的路由key
* 进入队列的路由key
*/
private String trafficReleaseDelayRoutingKey = "traffic.release.delay.routing.key";
/**
* 第二个队列,被监听恢复流量包的队列
*/
private String trafficReleaseQueue = "traffic.release.queue";
/**
* 第二个队列的路由key
*
* 即进入死信队列的路由key
*/
private String trafficReleaseRoutingKey="traffic.release.routing.key";
/**
* 过期时间,毫秒,1分钟
*/
private Integer ttl = 60000;
/**
* 延迟队列
*/
@Bean
public Queue trafficReleaseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-message-ttl",ttl);
args.put("x-dead-letter-exchange", trafficEventExchange);
args.put("x-dead-letter-routing-key",trafficReleaseRoutingKey);
return new Queue(trafficReleaseDelayQueue,true,false,false,args);
}
/**
* 死信队列,普通队列,用于被监听
*/
@Bean
public Queue trafficReleaseQueue(){
return new Queue(trafficReleaseQueue,true,false,false);
}
/**
* 第一个队列,即延迟队列的绑定关系建立
* @return
*/
@Bean
public Binding trafficReleaseDelayBinding(){
return new Binding(trafficReleaseDelayQueue,Binding.DestinationType.QUEUE, trafficEventExchange,trafficReleaseDelayRoutingKey,null);
}
/**
* 死信队列绑定关系建立
* @return
*/
@Bean
public Binding trafficReleaseBinding(){
return new Binding(trafficReleaseQueue,Binding.DestinationType.QUEUE, trafficEventExchange,trafficReleaseRoutingKey,null);
}
保存Task表
//先更新,再扣减当前使用的流量包
int rows = trafficManager.addDayUsedTimes(accountNo,useTrafficVO.getCurrentTrafficDO().getId(),1);
TrafficTaskDO trafficTaskDO = TrafficTaskDO.builder().accountNo(accountNo)
.bizId(trafficRequest.getBizId())
.useTimes(1).trafficId(useTrafficVO.getCurrentTrafficDO().getId())
.lockState(TaskStateEnum.LOCK.name()).build();
trafficTaskManager.add(trafficTaskDO);
发送Mq消息
EventMessage usedTrafficEventMessage = EventMessage.builder()
.accountNo(accountNo)
.bizId(trafficTaskDO.getId() + "")
.eventMessageType(EventMessageType.TRAFFIC_USED.name())
.build();
//发送延迟信息,用于异常回滚,数据最终一致性
rabbitTemplate.convertAndSend(rabbitMQConfig.getTrafficEventExchange(),
rabbitMQConfig.getTrafficReleaseDelayRoutingKey(), usedTrafficEventMessage);
return JsonData.buildSuccess();
检查短链创建情况
@FeignClient(name = "link-service")
public interface ShortLinkFeignService {
/**
* 检查短链是否存在
*
* @param shortLinkCode 短链码
* @return
*/
@GetMapping(value = "/api/link/v1/check", headers = {"rpc-token=${rpc.token}"})
JsonData simpleDetail(@RequestParam("shortLinkCode") String shortLinkCode);
}
@Value("${rpc.token}")
private String rpcToken;
/**
* rpc调用获取短链信息
*
* @return
*/
@GetMapping("check")
public JsonData simpleDetail(@RequestParam("shortLinkCode") String shortLinkCode, HttpServletRequest request) {
String requestToken = request.getHeader("rpc-token");
if (rpcToken.equalsIgnoreCase(requestToken)) {
ShortLinkVO shortLinkVO = shortLinkService.parseShortLinkCode(shortLinkCode);
return shortLinkVO == null ? JsonData.buildError("不存在") : JsonData.buildSuccess();
} else {
return JsonData.buildError("非法访问");
}
}
延迟队列消费,回查短链是否创建状态,不成功则恢复流量包
else if(EventMessageType.TRAFFIC_USED.name().equalsIgnoreCase(messageType)){
//流量包使用,检查是否成功使用
//检查task是否存在
//检查短链是否成功
//如果不成功则恢复流量包,删除缓存key
//删除task(也可以更新状态,定时删除也行)
Long trafficTaskId = Long.valueOf(eventMessage.getBizId());
TrafficTaskDO trafficTaskDO = trafficTaskManager.findByIdAndAccountNo(trafficTaskId, accountNo);
//非空 且 是锁定状态
if(trafficTaskDO!=null && trafficTaskDO.getLockState()
.equalsIgnoreCase(TaskStateEnum.LOCK.name())){
JsonData jsonData = shortLinkFeignService.check(trafficTaskDO.getBizId());
if(jsonData.getCode()!=0){
log.error("创建短链失败,流量包回滚");
trafficManager.releaseUsedTimes(accountNo,trafficTaskDO.getTrafficId(),trafficTaskDO.getUseTimes());
}
trafficTaskManager.deleteByIdAndAccountNo(trafficTaskId,accountNo);
}
}