为了避免Redis实现的分布式锁超时,Redisson中引入了watch dog的机制,他可以帮助我们在Redisson实例被关闭前,不断的延长锁的有效期。
- 自动续租:当一个Redisson客户端实例获取到一个分布式锁时,如果没有指定锁的超时时间,Watchdog会基于Netty的时间轮启动一个后台任务,定期向Redis发送命令,重新设置锁的过期时间,通常是锁的租约时间的1/3。这确保了即使客户端处理时间较长,所持有的锁也不会过期。
- 每次续期的时长:默认情况下,每10s钟做一次续期,续期时长是30s。
- 停止续期:当锁被释放或者客户端实例被关闭时,Watchdog会自动停止对应锁的续租任务。
💖 底层实现
👨🏫 RedissonBaseLock.renewExpiration()
protected void scheduleExpirationRenewal(long threadId) {
// 创建一个新的过期续期条目
ExpirationEntry entry = new ExpirationEntry();
// 尝试将新的过期续期条目放入到过期续期映射中,如果已存在则不替换
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 如果已存在,则将线程ID添加到旧的过期续期条目中
oldEntry.addThreadId(threadId);
} else {
// 如果是新的过期续期条目,则添加线程ID,并尝试续期
entry.addThreadId(threadId);
try {
// 尝试续期
renewExpiration();
} finally {
// 如果当前线程被中断,则取消续期
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
// 定时任务执行续期
private void renewExpiration() {
// 从过期续期映射中获取当前的过期续期条目
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
// 如果没有找到,则直接返回
return;
}
// 创建一个新的定时任务,用于执行续期逻辑
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 再次检查过期续期条目是否仍然存在
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取线程ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 使用LUA脚本异步续期
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
// 如果有异常发生,记录错误并从映射中移除过期续期条目
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 如果续期成功,则重新调度续期任务
renewExpiration();
} else {
// 如果续期失败,则取消续期
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 将定时任务与过期续期条目关联
ee.setTimeout(task);
}
// 使用LUA脚本,进行续期
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
// 使用evalWriteAsync方法异步执行LUA脚本,用于续期
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
可以看到,上面的代码的主要逻辑就是用了一个TimerTask
来实现了一个定时任务,设置了internalLockLeaseTime / 3
的时长进行一次锁续期。默认的超时时长是30s,那么他会每10s进行一次续期,通过LUA脚本进行续期,再续30s
不过,这个续期也不是无脑续,他也是有条件的,其中ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
这个值得我们关注,他会从EXPIRATION_RENEWAL_MAP
中尝试获取一个KV对,如果查不到,就不续期了。
EXPIRATION_RENEWAL_MAP
这个东西,会在unlock的时候操作的,对他进行remove,所以一个锁如果被解了,那么就不会再继续续期了:
@Override
public void unlock() {
try {
// 异步执行解锁操作
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
// 检查异常是否是由于线程没有持有锁导致的
if (e.getCause() instanceof IllegalMonitorStateException) {
// 如果是,则抛出原始的 IllegalMonitorStateException异常
throw (IllegalMonitorStateException) e.getCause();
} else {
// 如果不是,则抛出原始的RedisException异常
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 使用getServiceManager执行解锁操作
return getServiceManager().execute(() -> unlockAsync0(threadId));
}
private RFuture<Void> unlockAsync0(long threadId) {
// 异步执行解锁操作
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
// 处理异步操作的结果或异常
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消续期任务
cancelExpirationRenewal(threadId);
if (e != null) {
// 如果有异常发生,抛出CompletionException
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) {
// 如果解锁操作失败,抛出IllegalMonitorStateException
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
// 将CompletableFuture包装为RFuture
return new CompletableFutureWrapper<>(f);
}
protected void cancelExpirationRenewal(Long threadId) {
// 从过期续期映射中获取过期续期条目
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
// 如果没有找到,则直接返回
return;
}
if (threadId != null) {
// 如果线程ID不为空,则从过期续期条目中移除该线程ID
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
// 如果线程ID为空或者过期续期条目中没有线程ID,则取消定时任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
// 从过期续期映射中移除过期续期条目
EXPIRATION_RENEWAL_MAP.remove(getEntryName()); // 取消续期关键点
}
}
核心:EXPIRATION_RENEWAL_MAP.remove(getEntryName());
一次unlock过程中,对EXPIRATION_RENEWAL_MAP进行移除,进而取消下一次锁续期的实现细节。
并且在unlockAsync方法中,不管unlockInnerAsync是否执行成功,还是抛了异常,都不影响cancelExpirationRenewal的执行,也可以理解为,只要unlock方法被调用了,即使解锁未成功,那么也可以停止下一次的锁续期。
💖 续期
加锁代码
/**
* 尝试异步获取分布式锁。
*
* @param waitTime 等待获取锁的最大时间,如果设置为-1,则表示无限等待。
* @param leaseTime 锁的过期时间,如果设置为-1,则表示使用默认的过期时间。
* @param unit 时间单位,用于将leaseTime转换为毫秒。
* @param threadId 当前线程的唯一标识符。
* @return 一个RFuture对象,表示异步操作的结果,如果成功获取锁,则返回剩余的过期时间(毫秒)。
* @throws InterruptedException 如果当前线程在等待过程中被中断。
*/
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 尝试获取锁的异步方法
RFuture<Long> ttlRemainingFuture;
// 如果锁的过期时间大于0,则使用指定的过期时间
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 如果锁的过期时间不大于0,则使用内部锁的过期时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 处理没有同步获取锁的情况
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
// 将处理后的CompletionStage包装为RFuture
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 当ttlRemainingFuture完成时,如果ttlRemaining为null,则表示锁已成功获取
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// 锁已获取
if (ttlRemaining == null) {
// 如果锁的过期时间大于0,则设置锁的过期时间
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 如果锁的过期时间不大于0,则安排锁的过期时间续期
scheduleExpirationRenewal(threadId);
}
}
// 返回ttlRemaining,如果为null,则表示锁已获取
return ttlRemaining;
});
// 将处理后的CompletionStage包装为RFuture
return new CompletableFutureWrapper<>(f);
}
💖 停止续期
如果一个锁的unlock方法被调用了,那么就会停止续期。
那么,取消续期的核心代码如下:
/**
* 取消与锁关联的自动续期任务。
*
* @param threadId 如果不为null,则只取消与特定线程ID关联的续期任务。
*/
protected void cancelExpirationRenewal(Long threadId) {
// 从过期续期映射中获取当前的过期续期条目
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
// 如果没有找到对应的续期条目,则直接返回
return;
}
if (threadId != null) {
// 如果提供了线程ID,则从续期条目中移除该线程ID
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
// 如果没有提供线程ID,或者续期条目中没有其他线程ID,则取消定时任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
// 取消定时任务
timeout.cancel();
}
// 从过期续期映射中移除过期续期条目
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
主要就是通过 EXPIRATION_RENEWAL_MAP.remove
来做的。那么cancelExpirationRenewal
还有下面一处调用:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
也就是说,在尝试开启续期的过程中,如果线程被中断了,那么就会取消续期动作了。
目前,Redisson是没有针对最大续期次数和最大续期时间的支持的。所以,正常情况下,如果没有解锁,是会一直续期下去的。
💖 客户端挂了,锁会不会一直续期?
Redission 是 redis 的客户端