👨🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助
上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。
异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题
消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。
Redis消息队列实现异步秒杀
- 认识消息队列
- 基于List实现消息队列
- PubSub实现消息队列
- Stream的单消费模式
- Stream的消费者组模式
- 基于Stream消息队列实现异步秒杀
认识消息队列
消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次
基于List实现消息队列
List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOP与BLPOP来实现阻塞效果(B表示阻塞)
优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者
PubSub实现消息队列
PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失
Stream的单消费模式
Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
例如:
读取消息:XREAD
例如,用XREAD读第一个消息:
XREAD COUNT 1 STREAMS users 0
用XREAD阻塞方式读取最新消息:
XREAD COUNT 1 BLOCK STREAMS users $
所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读。
特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险
Stream的消费者组模式
这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。
消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。
特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始
基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。
STREAM类型消息队列的XREADGROUP命令特点:
1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次
基于Stream消息队列实现异步秒杀
1、创建Stream类型的消息队列stream.orders和消费者组:
XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:
-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]
-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2 库存不足,返回1
return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.2 存在,说明重复下单
return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0
与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
这个参数的增加,在后续的编写中会省去一些麻烦。
3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
//注入秒杀优惠券的service
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行
@PostConstruct //该注解表示在当前类初始化完毕以后立即执行
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程
Long userId = voucherOrder.getUserId();
//创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁
boolean isLock = lock.tryLock();
//判断是否获取锁成功
if(!isLock){
log.error("不允许重复下单");//理论上不会发生
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
IVoucherOrderService proxy;
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.orders";
@Override
public void run() {
while (true){
try {
//获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.orders
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//判断消息获取是否成功
if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环
continue;
}
//解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
//将其转变为VoucherOrder对象,忽略异常
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//获取成功,下单
handleVoucherOrder(voucherOrder);
//ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
//异常则表示没有被ACK确认,剩下的操作都是针对pending-list的
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true){
try {
//获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), //消费者信息
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
if(list == null || list.isEmpty()) {
//获取失败,说明没有消息,结束循环
break;
}
//解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
//将其转变为VoucherOrder对象,忽略异常
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//获取成功,下单
handleVoucherOrder(voucherOrder);
//ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常", e);
}
}
}
}
//秒杀优化,调用Lua的代码
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
//获取订单id
long orderId = redisIdWorker.nextId("order");
//执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
//判断结果是否为0
int r = result.intValue();
if(r != 0){
//不为0,没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//返回订单id
return Result.ok(orderId);
}
@Transactional(rollbackFor = Exception.class)
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//判断是否存在
if (count > 0) {
log.error("不可重复购买");
}
//扣减库存
boolean success = seckillVoucherService.update().
setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0).
update();
if (!success) {
log.error("库存不足");
}
//保存订单
this.save(voucherOrder);
}
}
我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。