一、使用Redisson步骤
Redisson各个锁基本所用Redisson各个锁基本所用Redisson各个锁基本所用
二、源码解析
lock锁
1) 基本思想:
lock有两种方法 一种是空参 另一种是带参
* 空参方法:会默认调用看门狗的过期时间30*1000(30秒)
* 然后在正常运行的时候,会启用定时任务调用重置时间的方法(间隔为开门看配置的默认过期时间的三分之一,也就是10秒)
* 当出现错误的时候就会停止续期,直到到期释放锁或手动释放锁
* 带参方法:手动设置解锁时间,到期后自动解锁,或者业务完成后手动解锁,不会自动续期
源码:
Lock
调用lockInterruptibly()方法会默认传入lease 为-1,该值再后面起作用
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//获取该锁的过期时间,如果该锁没被持有,会返回一个null,如果被持有 会返回一个过期时间
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
//ttl不为null,说明锁已经被抢占了
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
//开始循环获取锁
while(true) {
//刚进如循环先尝试获取锁,获取成功返回null,跳出循环,获取失败,则继续往下走
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
//如果过期时间大于0,则调用getLatch
// 返回一个信号量,开始进入阻塞,阻塞时长为上一次锁的剩余过期时长,并且让出cup
//有阻塞必然有唤醒,位于解锁操作中
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果leaseTime != -1,即不等于默认值,则表示手动设置了过期时间
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//如果leaseTime = -1,表示使用默认方式,即使用看门狗默认实现自动续期
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
//如果tryLockInnerAsync执行成功
if (future.isSuccess()) {
//获取过期时间
Long ttlRemaining = (Long)future.getNow();
//过期时间为空,表示加锁成功
if (ttlRemaining == null) {
//开启刷新重置过期时间步骤
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
// lua脚本尝试抢占锁,失败返回锁过期时间
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
//直接使用lua脚本发起命令
//通过lua脚本可以看出,redisson加锁除了使用自定义的名字以外,还要使用uuid
// 加上当前线程的threadId组合,以自定义名字作hash的key,使用
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
//如果该锁未被占有,则设置锁,设置过期时间,过期时间为 internalLockLeaseTime ,然后返回null
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]);return nil; end; " +
//如果锁已经被占有,判断是否是重入锁,如果是重入锁,则将value增加1 ,代表重入,并且设置过期时间,返回null。
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; " +
//如果已经被站有所,且不是重入锁,则返回过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
看门狗续命
//看门狗续命机制
private void scheduleExpirationRenewal(final long threadId) {
//首先会判断该线程是否已经再重置时间的map中,仅仅第一次进来是空的。
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
//使用了看门狗默认的时间(30秒) 除以3 ,也就是延迟10秒后执行
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
//判断是否该线程是否还持有锁,如果持有,返回1,并且设置过期时间,如果没持有,返回0
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
//从map中移除该线程,这样下次再调用该方法仍然可以执行
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
//当lua脚本返回1表是true,也就是仍然持有锁,则递归调用该方法,
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
2、unlock
源码
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise();
//调用lua脚本释放锁
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
} else {
Boolean opStatus = (Boolean)future.getNow();
//如果锁状态为null,表示存在异常,为正常释放锁之前,被别人占领锁了
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + RedissonLock.this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
//如果返回0.为false 表示可重入锁,不取消重置过期时间,
//返回1 为true,表示已解锁,取消重置过期时间
if (opStatus) {
RedissonLock.this.cancelExpirationRenewal();
}
//解锁
result.trySuccess((Object)null);
}
}
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//当key不存在,表示锁未被持有,说明不用解锁了,返回1 ,1在后续表示取消重置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;" +
//key存在,但是持有锁的线程不是当前线程,返回null,后面会提出一个异常
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; " +
//锁状态-1后仍然大于0,表示可重入锁,仍处于锁定状态,返回0,0在后续表示 不做处理,仍然重置过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; " +
//返回锁状态不大于0,正常解锁,返回1,1在后续表示取消重置过期时间
"else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; " +
"return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}