Redis高并发分布锁实战
问题场景
场景一: 没有捕获异常
// 仅仅加锁
// 读取 stock=15
Boolean ret = stringRedisTemplate.opsForValue().setIfAbsent("lock_key", "1"); // jedis.setnx(k,v)
// TODO 业务代码 stock--
stringRedisTemplate.delete("lock_key");
- **
问题
**- 以上场景在代码出现异常的时候,会出现死锁,导致后面的线程无法获取锁,会阻塞所有线程
场景二: 线程间交互删除锁
// 加锁,且设置锁过期时间
// 读取 stock = 15
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key", "1", 10, TimeUnit.SECONDS);
// TODO 业务代码 stock--
stringRedisTemplate.delete(key);
问题
- 相对于场景一多了锁的
过期时间
- 假如线程A执行业务代码的时间是15s,而锁的时间是10s,那么锁过期后自动会被删除,此时线程B获取锁,执行业务代码时间为8s,而这个时候线程A刚好执行完业务代码了,就会
出现线程A把线程B的锁删除掉
- 相对于场景一多了锁的
// 加锁,且(给每个线程)设置锁过期时间, 删除锁时判断是否当前线程
// 读取 stock = 15
String uuid = UUID.getUuid;
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key", uuid, 10, TimeUnit.SECONDS);
// TODO 业务代码 stock-- 15 -> 14
// 判断是否当前线程
if (uuid.equals(stringRedisTemplate.opsForValue().get(key)) {
// 极端场景下:(执行时间定格在9.99秒)突然卡顿 10ms or redis服务宕机!!!
// 此时刚好锁过期,自动删除
// 其他线程获取锁,然后会把上个线程的锁删除,又会出现bug
stringRedisTemplate.delete(key);
}
问题
- 当线程A持有锁,执行完扣减库存后,假设锁过期时间是10s,
恰好此时在执行9.99s的时候出现卡顿
,等服务器反应过来之间,锁过期自动删除了,这个时候线程B获取锁,然后执行业务代码,此时线程A刚好反应过来,执行锁删除
,这样就会把线程B的锁删除,要知道此时线程B是没有执行完业务代码的,锁删除后,线程C又获取锁,此时线程B执行完,又会把线程C的锁删除,依次类推
- 当线程A持有锁,执行完扣减库存后,假设锁过期时间是10s,
解决方案
方案: 使用Redisson分布式锁
@Autowire
public Redisson redisson;
public void stock () {
String key = "key";
RLock lock = redisson.getLock(key);
try {
lock.lock();
// TODO: 业务代码
} catch(Exception e) {
lock.unlock();
}
}
优点
-
自带
锁续命
功能,默认30s过期时间,可以自行调整过期时间 -
LUA脚本模拟商品减库存
//模拟一个商品减库存的原子操作
//lua脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10
jedis.set("product_stock_10016", "15"); // 初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
// 模拟语法报错回滚操作
" bb == 0 " +
" return 1 " +
" end " +
" return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
System.out.println(obj);
Redisson实现
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
-
LUA脚本适合用于做原子操作,在Redisson分布式锁实现中,就有用到LUA脚本实现创建/获取锁的操作,而Redis的事务机制(multi/exec)非常鸡肋,
可以对相同的key通过不同的数据结构做修改,比如事务开启后,将String类型的key,再次使用hset修改,而且还能修改成功,这就意味着事务已失效,而且不支持事务回滚
-
Redisson分布式锁流程
-
高并发下Lua脚本保证了原子性
-
Schedule定期锁续命
-
未获取锁的线程先Subscribe channel
-
自旋,再次尝试获取锁
-
如果还是未获取锁,则通过Semaphore->tryAcquire(ttl.TimeUnit)阻塞所有进入自旋代码块的线程(
这样做的目的是为了不让其他线程因为不停的自旋而给服务器造成压力,所以让其他线程先阻塞一段时间,等阻塞时间结束,再次自旋
) -
获取锁的线程解锁后,使用Redis的发布功能进行发布消息,订阅消息的线程调用release方法释放阻塞的线程,再次尝试获取锁
-
如果是调用Redisson的tryAcquire(1000,TimeUnit.SECONDS)方法,那么未获取到锁的线程不用进行自旋,因为时间一到,未获取到锁的线程就会自动往下走进入业务代码块
-
总结
- Redis分布式锁自己去实现
可能会出现几个问题
- 没有在finally显示释放锁,当客户端挂掉了,锁没有被及时删除,这样会导致死锁问题,它这个是
需要我们显示的释放锁
- 假如此时我们设置过期时间,但是我们用的是同一个key,就可能出现下一个线程删除上一个线程的锁,但是上一个线程还没有执行完,它这个
需要key是不能重复的
- 假如我们既设置了过期时间也指定了不同的key,此时可能因为网络延迟出现上一个线程删除下一个线程的锁,也就是说业务执行的时间超过了锁过期的时间,它这个
需要一个锁续命的功能
- 没有在finally显示释放锁,当客户端挂掉了,锁没有被及时删除,这样会导致死锁问题,它这个是
- 对于Redis它也有事务,但是它的事务非常鸡肋,仅仅只能保证多个指令按照顺序执行,并不能保证原子性,而且key还能被其他指令修改对应的数据结构,所以我们选择Redisson来进行分布式锁的实现,因为它提供了锁续命的功能以及通过lua脚本保证了多个指令的原子操作,
主要流程
是这样的- 当线程抢到了锁,假如业务没执行完,会定时去进行锁续命,而其他线程会订阅这个抢到锁的线程的channel,然后自旋一定时间去尝试获取锁,如果获取锁失败,会被安排进入队列中阻塞,一旦线程释放锁,他们会被通知到,然后继续去自旋一定时间去尝试获取锁,重复此操作