原文连接:可重入分布式锁有哪些应用场景 https://mp.weixin.qq.com/s/MTPS9V8jn5J91wr-UD4DyA
之前发过的一篇实现Redis分布式锁的8大坑中,有粉丝留言说,分布式锁的可重入特性在工作中有哪些应用场景,那么我们这篇文章就来看一下分布式锁的可重入特性。
实现Redis分布式锁的8大坑
一、可重入场景有哪些?
场景一:创建订单之后,处理其他的逻辑异常了,需要回滚取消订单,此时取消订单的逻辑中需要获取到当前订单的分布式锁,此时也是需要可重入的特性的。
场景二:商城的支付,当第一次对订单进行支付时获取订单的分布式锁,如果此时你退出了,在用另一个客户端对同一个订单进行支付是否还可以呢?如果因为网络异常或者其他原因,当前发起订单的客户端还是可以再次进入支付流程进行支付。
场景三:分布式系统的缓存,缓存在客户端1更新过程中,客户端1发生异常无法继续执行,在客户端1获取的分布式锁还没有过期的这段时间,其他的客户端是无法获取到分布式锁的。假如客户端1在锁过期之前恢复了,再次执行该逻辑时可以继续重入该分布式锁继续执行操作。
场景四:在主线程完成任务的情况下,异步处理另一个任务,此时可以先释放锁,异步任务完成之后再次获取锁。
除了上述描述的四种场景外,只要是涉及到分布式锁的,都是有可能会有可重入的特性了。对于可重入的理解是,在维基百科中是这样描述的。
若一个程序或子程序可以“在任意时刻被中断然后操作系统调度执行另一段代码,这段代码又使用了该副程序不会出错”,则称其为可重入(reentrant 或 re-entrant)的。即当该副程序正在运作时,执行线程可以再次进入并执行它,仍然可得到符合设计时所预期的结果。与多线程并发执行的线程安全不同,可重入强调对单一线程执行时重新进入同一个子程序仍然是安全的。
可重入概念是在单线程操作系统的时代提出的。一个子程序的重入,可能由于自身原因,如执行了jmp或者call,类似于子程序的递归调用;或者由于作业系统的中断回应。UNIX系统的signal的处理,即子程序被中断处理程序或者signal处理程序调用。所以,可重入也可称作“异步信号安全”。这里的异步是指信号中断可发生在任意时刻。 重入的子程序,按照后进先出线性序依次执行。
所以对于现在的可重入,大部分的场景就是系统异常之后再次执行或者递归调用。
二、Java中有哪些可重入的锁
在Java中,Synchronized
与 ReentrantLock
都是可重入的锁。
1、Synchronized
:应用于方法或者代码块。当一个线程持有某个对象的锁时,它可以重复的进入任何其他由该对象保护的Synchronized
方法或者代码块。
package com.zuiyu.client1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncDemo {
public static final Logger log = LoggerFactory.getLogger(SyncDemo.class);
private int count = 0;
public synchronized void increment() {
count++;
log.info("increment count {}",count);
decrement(); // 调用自身的另一个 synchronized 方法
}
public synchronized void decrement() {
count--;
log.info("decrement count {}",count);
}
public static void main(String[] args) {
SyncDemo syncDemo = new SyncDemo();
syncDemo.increment();
}
}
执行结果如下:
2、ReentrantLock
:提供了 lock()
与 unlock()
方法控制锁的获取和释放。与 synchronized
不同的是,ReentrantLock
允许在同一个线程中多次调用 lock()
方法而不被阻塞,只要每次调用 lock()
都有相应的 unlock()
来释放锁就可以。
package com.zuiyu.client1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static final Logger log = LoggerFactory.getLogger(ReentrantLockDemo.class);
//锁
private static ReentrantLock lock = new ReentrantLock();
public void doSomething(int n){
//进入递归第一件事:加锁
try{
lock.lock();
log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
log.info("--------递归{}次--------",n);
if(n<=2){
this.doSomething(++n);
}else{
return;
}
}finally {
lock.unlock();
log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
}
}
public ReentrantLock getLock(){
return lock;
}
public static void main(String[] args) {
ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo();
reentrantLockDemo.doSomething(1);
log.info("执行完doSomething方法 是否还持有锁:{}",lock.isLocked());
}
}
执行结果如下:
三、ReentrantLock 如何实现的可重入
我们通过代码 debug
可以找到 ReentrantLock
代码中的 nonfairTryAcquire
方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//先判断,c(state)是否等于0,如果等于0,说明没有线程持有锁
if (c == 0) {
//通过cas方法把state的值0替换成1,替换成功说明加锁成功
if (compareAndReentrantLock代码中的SetState(0, acquires)) {
//如果加锁成功,设置持有锁的线程是当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//判断当前持有锁的线程是否是当前线程
//如果是当前线程,则state值加acquires,代表了当前线程加锁了多少次
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以 ReentrantLock
的加锁流程就是:
1、先判断是否有线程持有锁,没有就进行加锁。
2、如果加锁成功,则设置持有锁的线程为当前线程。
3、如果有线程已经持有了锁,则在判断是否是当前线程持有的锁。
4、如果是当前线程持有的锁,则加锁数量+1
。
5、如果不是当前当前线程持有的锁,返回false
,加锁失败。
释放锁的流程如下:
/**
* 释放锁
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//state-1 减加锁次数
//如果持有锁的线程,不是当前线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果c==0了说明当前线程,已经要释放锁了
free = true;
setExclusiveOwnerThread(null);//设置当前持有锁的线程为null
}
setState(c);//设置c的值
return free;
}
1、每次释放锁对计数进行减1
。
2、当c
为0
的时候,说明锁重入的次数为0
了。
3、最终设置当前持有锁的线程为 NULL
,state
设置为0
,锁也就释放了。
四、Redisson 实现分布式锁
通过上面 ReentrantLock
的加锁释放锁学习,我们已经知道了锁的可重入的原理了,所以使用 Redis 实现分布式锁我们只需要实现如下两点即可。
1、如何保存当前的线程。
2、加锁次数的保存维护。
所以结合上一篇文章中说过的 Redisson
的可重入特性,也就知道如何使用 Redis 来实现一个分布式锁了。
文章地址在这,可以点进去看看,我下面也把关键地方截图放过来。
Redis 实现分布式锁的8大坑
https://mp.weixin.qq.com/s/j69OLgLIo6R2VI80alJF0Q
那么这里在对这些代码在进行一个说明,在对代码说明之前还是先来个demo。
@Service
public class RedissonLockDemo {
public final Logger log = LoggerFactory.getLogger(getClass());
private RedissonClient redissonClient;
String rKey = "lock1";
public RedissonLockDemo(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public void lock(){
RLock lock1 = redissonClient.getLock(rKey);
lock1.lock();
log.info("thread {} method lock,lock1:{}={}",Thread.currentThread().getName(),lock1.getName(),lock1.getHoldCount());
lock2();
lock1.unlock();
}
public void lock2(){
RLock lock2 = redissonClient.getLock(rKey);
lock2.lock();
log.info("thread {} method lock,lock2:{}={}",Thread.currentThread().getName(),lock2.getName(),lock2.getHoldCount());
lock2.unlock();
}
}
执行结果如下:
通过 debug
代码中lock.lock()
可以看到,发现它最终调用的是
RedissonLock#tryLockInnerAsync
。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0)
or (redis.call('hexists', KEYS[1], ARGV[2]) == 1))
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
加锁流程如下:
1、 判断key
是否存在,返回0
代表key
不存在,代表没有加锁。
2、或者判断field
是否在hash
中,返回1
代表当前线程加进程的ID
已经获取到锁了。
3、hincrby
对 key
中的 ARGV[2]
加 1
。
4、对整个key
设置过期时间。
为了校验执行的命令下面截图是 RedissonBaseLock#evalWriteSyncedAsync
。具体如下:
在这个脚本中,用到的命令我们来说一下
-
exists
:校验key
是否存在。 -
hexists
:校验field
是否存在hash
中。 -
hincrby
:将hash中指定的值增加给定的数字。 -
pexpire
:设置key的有效期,以毫秒为单位。 -
pttl
: 判断key的有效毫秒数。
解锁的代码在 RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
解锁的流程如下:
1、if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
判断锁是否存在。
2、redis.call('hincrby', KEYS[1], ARGV[3], -1)
加锁次数原子自减。
3、if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2])
; 自减后当前线程还持有锁(counter > 0),更新下锁的过期时间。
4、counter < 0
走 else
逻辑解锁完成,删除该锁。
加锁解锁流程相对于上一篇文章中所述有所变化,本文
Redisson
版本为3.29.0
。
五、总结
对于工作中用到分布式锁的场景,都要考虑是否可以重入,防止死锁的发生。
锁的可重入,两点需要我们注意,一个是保存当前持有锁的线程,另一个就是锁的加锁次数。
好了本文到这就结束了,如果读完感觉有所收获,欢迎三连。
大家都要一起进步。