由于synchronized跟ReetrantLock是JVM级别的锁,在分布式情况下失效,这时候我们通常会选择redisson基于redis封装好的分布式锁。下面我们一起来分析以下redisson的源码。
使用方式
流程
getLock源码
- 给命令执行器赋值
- 给看门狗时间赋值,默认30秒
- 给发布订阅器赋值
-生成UUID
tryLock源码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
//获取当前线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁,成功返回null,失败返回剩余等待时间
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
//获取锁成功,返回true
return true;
} else {
//剩余等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//获取锁超时失败
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
//当前线程订阅频道redisson_lock__channel加锁的名字
}
CompletableFuture subscribeFuture = this.subscribe(threadId);
try {
//等待
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException var20) {
//超时取消订阅
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}
});
}
//获取失败
this.acquireFailed(waitTime, unit, threadId);
return false;
}
try {
//进入到这里说明订阅到消息
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//超时
this.acquireFailed(waitTime, unit, threadId);
boolean var22 = false;
return var22;
} else {
boolean var16;
//循环获取锁直到超时或成功
do {
long currentTime = System.currentTimeMillis();
//尝试获取锁
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
//取消订阅
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
}
}
}
tryAcquire方法
-执行lua脚本尝试获取锁,成功获取锁或者锁重入返回nil,失败返回锁的剩余时间
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);"
, Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
- 执行lua脚本,它首先检查锁是否存在,如果不存在则创建锁并设置过期时间;如果锁已经存在,则尝试获取锁并将锁的计数器加一;最后返回锁的剩余生存时间。
- 看门狗
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;"
,
Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
- 当前线程持有锁,刷新锁的有效时间,递归开启延迟任务继续刷新锁的有效时间即看门狗
- 当前线程不持有锁时,lua脚本返回0,取消看门狗
释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;"
,
Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
- lua脚本减少锁的重入次数,如果为0,删除锁并发布消息到频道