引言
在之前的文章中,我们介绍了如何使用 Redis 和 Lua 脚本来应对秒杀活动中的高并发请求,并通过引入阻塞队列实现异步下单来提升系统性能。然而,在高并发场景下,阻塞队列的容量和处理速度可能会成为瓶颈。这篇文章将介绍如何使用 Redis Stream 队列进一步优化秒杀系统,提升整体性能和稳定性。
方案设计
基本思路
- 用户发起秒杀请求,通过 Redis Lua 脚本进行资格判断和库存扣减。
- Lua 脚本将订单信息写入 Redis Stream 队列。
- 后台线程从 Redis Stream 队列中读取订单信息并处理订单,确保数据库操作的线程安全和高效。
- 返回订单 ID 给用户。
具体实现
Lua 脚本
Lua 脚本负责判断秒杀资格、扣减库存,并将订单信息写入 Redis Stream 队列。
-- 1.参数列表
-- 1.1.优惠券ID
local voucherId = ARGV[1]
-- 1.2.用户ID
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本逻辑
-- 3.1.判断库存是否足够
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 3.2.判断用户是否已经抢购过
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end
-- 3.3.减少库存
redis.call('incrby', stockKey, -1)
-- 3.4.记录用户下单
redis.call('sadd', orderKey, userId)
-- 3.5.将订单信息写入Redis Stream
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
Java 代码
在 Java 代码中,我们通过 Redis Stream 队列实现异步下单,并利用 Redisson 分布式锁确保订单操作的线程安全。
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 从Redis Stream队列中获取订单信息
List<MapRecord<String, Object, Object>> list =
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed()));
// 判断是否成功获取订单信息
if (list == null || list.isEmpty()) {
continue;
}
// 解析订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 处理订单
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 从Pending List中获取未处理的订单信息
List<MapRecord<String, Object, Object>> list =
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0")));
// 判断是否成功获取订单信息
if (list == null || list.isEmpty()) {
break;
}
// 解析订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 处理订单
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理Pending List订单异常", e);
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),
userId.toString(), String.valueOf(orderId));
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
@Override
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("不允许重复下单!");
return;
}
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id",
voucherOrder.getVoucherId())
.gt("stock", 0).update();
if (!success) {
log.error("库存不足!");
return;
}
save(voucherOrder);
}
}
代码详解
初始化和启动订单处理线程
我们在服务启动时,通过 @PostConstruct
注解确保订单处理线程被初始化和启动。
订单处理逻辑
通过 VoucherOrderHandler
类,从 Redis Stream 队列中读取订单信息并处理订单,确保订单的创建和库存的扣减是原子操作。
订单处理方法
在订单处理方法 handleVoucherOrder
中,我们通过 Redisson 分布式锁确保同一用户不会重复下单。
结论
通过引入 Redis Stream 队列,我们进一步优化了秒杀系统的性能和稳定性。Stream 队列不仅解决了阻塞队列容量的限制问题,还提供了更强大的消息处理能力,适用于各种高并发场景。这种优化方法不仅适用于秒杀活动,还可以推广到其他需要高性能处理的系统中。希望这些改进对你的系统设计有所帮助。