在多线程环境下,为了保证数据的线程安全,我们通常用加锁的方式,使同一时刻只有一个线程可以对这个共享资源进行操作,在单服务系统我们常用JVM锁——Synchronized、ReentrantLock等。然而在多台服务系统的情况下,JVM锁就无法在多个服务器之间生效了,这时候我们就需要用分布式锁来解决线程安全的问题。
分布式锁的实现方式有很多,主流的就是基于数据库、zookeeper以及redis,当然使用redis的居多,由于篇幅原因,本节就详细介绍一下使用redis实现分布式锁的几种方式。
一、SETNX实现
ps:本文重点使Redisson实现分布式锁,咱就不从SETNX+EXPIRE、SETNX+LUA脚本...什么的逐步演进了,本身就是一回事,直接一步到位,用set ex px nx+唯一校验+LUA脚本删除等操作实现。
利用Redis的单线程特性,在多个Redis客户端通过SETNX,如果返回1表示获取锁成功,反之失败。因为Redis的单线程机制,所以可以保证一个客户端成功获取后,其它客户端都会获取失败。伪代码如下:
public class RedisLock {
private Jedis jedis;
private void init(){
//建立连接
jedis = JedisPoolFactory.getJedis();
}
/**
* 获取锁
* @param lockKey 锁的键值
* @param requestId 唯一标识
* @param expireTime 过期时间
* @return 是否获取锁 成功返回true,反之false
*/
public boolean tryLock(String lockKey,String requestId,int expireTime){
//2、加锁
String result = jedis.set(lockKey,requestId,"NX","EX",expireTime);
return "OK".equals(result);
}
/**
* 释放锁
* @param lockKey 锁的键值
* @param requestId 唯一标识
* @return 成功true,失败false
*/
public boolean unlock(String lockKey,String requestId){
//LUA脚本:判断当前锁的值是否等于请求标识requestId,如果是则删除锁并返回true,反之返回false。
String scripts = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedis.eval(scripts, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return Long.parseLong(result.toString())==1L;
}
}
存在的问题:
锁无法续期:假设线程A获取了锁,但是由于网络原因,执行时间超过了设置的过期时间,这是锁被释放了,线程B获取锁成功,此时线程A和B都会执行临界区的代码,这是绝对不允许的。
二、Redisson实现分布式锁
在使用SETNX实现的分布式锁中,存在锁无法续期导致并发冲突的问题。不过这个问题在Redisson中用看门狗的机制巧妙地解决了,这也是我们实现分布式锁最常用的方式。
2.1 整体类图
标黄的两个类就是我们今天的重点,看门狗续期的实现逻辑在RedissionBaseLock类中,加锁逻辑在RedissionLock类中。
2.2 大致流程
在深入代码前,我们先看下加锁、看门狗续期大致的流程,有个大致印象。
2.3 加锁流程源码分析
下面我们就按上面的流程图,走走源码。
2.3.1 lock()---加锁入口
-
lock方法,一个没设置过期时间,一个设置了过期时间。
解析:
- 第一个红框:尝试加锁,会返回null或者具体的值,返回null表示加锁成功,反之有线程持有该锁,加锁失败。
- 第二个红框:加锁失败,while循环不断尝试。
2.3.2 tryAcquire()---执行加锁LUA脚本并判断是否要进行锁续期
第一个红框:执行加锁LUA脚本,返回null说明加锁成功,反之失败。
如果设置了过期时间,第二个参数就传设置的时间。
反之,使用默认的internallockLeaseTime时间。
第二个红框:如果加锁成功(null),且设置了过期时间,将设置过期时间赋值给internallockLeaseTime,如果没设置,则执行scheduleExpirationRenewal方法(看门狗)。返回结果。
ps:internallockLeaseTime默认就是30s。
2.3.3 tryLockInnerAsync()-----选择slot槽并执行lua脚本
我们先看如何执行LUA加锁脚本的,这里面有点深。。。
slot槽这里就不多讲了。。。我们回到LUA脚本。
- 首先检查锁的key是否存在,如果不存在,判断是否是同一线程再次过来对同一个key进行加锁,也就是当前key是否被当前线程持有(可重入性)。
- 如果上述两个条件任意一个成立,则对当前key执行自增和设置过期时间操作,并返回null表示加锁成功。
- 反之,返回当前锁的过期时间,表示加锁失败。
2.4 watch dog源码分析
2.4.1 scheduleExpirationRenewal()--锁续期入口
当加锁成功,且没有设置过期时间,执行scheduleExpirationRenewal()方法,这也是我们常说的"看门狗"的实现逻辑。
- 第一个红框:EXPIRATION_RENEWAL_MAP存放续期任务,get有值说明当前锁需要续期,为null则不需要再续期了。
- 第二个红框,执行续期操作。
2.4.2 renewExpiration()----执行锁续期操作
这个方法用netty的时间轮进行续期。
第一个红框:首先会从EXPIRATION_RENEWAL_MAP中获取一个值,如果为null,就不续期了,说明这个锁可能已经被释放或过期了。
第二个红框:基于TimerTask实现一个定时任务,设置internalLockLeaseTime / 3的时长进行一次锁续期,也就是每10s进行一次续期。
这里也会从EXPIRATION_RENEWAL_MAP里获取一个值,检查锁是否被释放或者过期了。
如果不为null,则获取第一个等待该锁的线程,如果没有等待也就说明此时没有竞争,也同样不需要续期了。
如果有等待的线程,说明需要续期,它会异步调用renewExpirationAsync(threadId)方法来实现续期。
当异步续期操作完成,会调用whenComplete方法来处理结果,如果有异常,则将该锁从EXPIRATION_RENEWAL_MAP中移除。如果续期成功,则会重新调用renewExpiration()方法进行下一次续期,如果续期失败,则调用cancelExpirationRenewal()方法取消续期。
2.4.3 renewExpirationAsync()--执行锁续期LUA脚本
如果当前key存在,说明当前锁还被该线程持有,那么就重置过期时间为30s,并返回true,表示续期成功,反之返回false。
2.4.4 cancelExpirationRenewal---取消锁续期
- 还是从这个map里获取键值对,如果为null,说明续期任务不存在,也没必要进行下去了,直接返回。
- 如果threadId不为null,直接将这个续期任务从task里移除。
- 如果threadId为null或者task中不再有任何线程在等待续期,此时就调用cancel方法来取消定时任务,然后在从EXPIRATION_RENEWAL_MAP中移除该续期任务。
ps:当unlock的时候也会调该方法,来执行取消锁续期的操作。
2.5 小结
2.5.1 什么时候会进行锁续期
加锁时,如果没有指定过期时间,则默认过期时间为30s且每隔10s进行锁续期操作。
ps:参考2.3.2和2.4.2小节。
2.5.2 什么情况会停止续期
锁被释放。
没有其它线程竞争当前锁资源。
续期时发生异常。
执行锁续期LUA脚本失败。
Redission的续期时Netty时间轮(TimerTask、TimeOut、Timer)的,并且操作都是基于JVM,所以当应用宕机、下线或重启后,续期任务也没有了。
ps:参考2.4.3小节。
2.6 lock()和trylock()的区别
讲了半天忘了说使用redission实现分布式锁的示例了😂,索性就在这补充一下吧。
lock():
RLock lock = redisson.getLock("MyLock");
lock.lock();//阻塞方法,知道获取到锁
try {
//业务代码
}finally {
//当前锁存在且被当前线程持有
if(lock.isLocked() && lock.isHeldByCurrentThread()){
//释放锁
lock.unlock();
}
}
lock的原理是以阻塞的方式获取锁,如果获取失败则一直等待,直到获取成功。
ps:可以参考2.3.1小节。
trylock():
RLock lock = redisson.getLock("MyLock");
boolean b = lock.tryLock();//非阻塞方法,立即返回获取结果
if(b){
try {
//业务代码
}finally {
//当前锁存在且被当前线程持有
if(lock.isLocked() && lock.isHeldByCurrentThread()){
//释放锁
lock.unlock();
}
}
}else {
//获取锁失败,处理逻辑
}
tryLock是尝试获取锁,如果能获取直接返回true,如果无法获取,它会按照我们指定的超时时间进行阻塞,这个时间内还会尝试获取锁,如果超过这个时间还没获取到,直接返回false。如果没有指定超时时间,就如我们的示例,那获取不到的话直接就返回false。
我们看下源码:
这是没有指定超时时间执行的方法,方法名也很见名知意,就是尝试一次加锁。指定了超时时间的这里就不介绍了,无非是在超时时间内执行while循环尝试获取锁。
三、Redission公平锁(FairLock)、联锁(MultiLock)、读写锁的使用
ps:这几种锁都不常用,所以就不细讲了,知道有这个事就行。
3.1 公平锁(FairLock)
RLock lock = redisson.getFairLock("MyLock");
lock.lock();
3.2 联锁(MultiLock)
RLock lock1 = redisson.getLock("MyLock1");
RLock lock2 = redisson.getLock("MyLock2");
RLock lock3 = redisson.getLock("MyLock3");
RedissonMultiLock lock=new RedissonMultiLock(lock1,lock2,lock3);
//同时加锁lock1、lock2、lock3
//所有的锁都上锁成功才算成功
lock.lock();
//...
lock.unlock();
3.3 读写锁
RReadWriteLock lock = redisson.getReadWriteLock("myLock");
//读锁
lock.readLock().lock();
//写锁
lock.writeLock().lock();
ps:还有个RedLock,这个可以细讲一下,但是由于篇幅原因,就放在下一篇文章吧。
四、Redission实现分布式锁存在的问题
Redission使用看门狗续期的方案在大多数场景下是挺不错的,但在极端情况下还是会存在问题,比如:
- 线程1首先获取锁成功,将键值对写入redis的master节点。
- 在redis将master数据同步到slave节点之前,master故障了。
- 此时会触发故障转移,将其中一个slave升级为master。
- 但新的master并没有线程1写入的键值对,因此如果此时来个线程2,也同样可以获取到锁,这就违背了锁的初衷。
这个场景就是我们常说的集群脑裂(网络分区)问题。
那么比较主流的解决方案就是Redis作者提出的Redlock和Zookeeper实现的分布式锁,这个我们下节再讲。
End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。