Redission分布式锁 watch dog 看门狗机制

为了避免Redis实现的分布式锁超时,Redisson中引入了watch dog的机制,他可以帮助我们在Redisson实例被关闭前,不断的延长锁的有效期。

  • 自动续租:当一个Redisson客户端实例获取到一个分布式锁时,如果没有指定锁的超时时间,Watchdog会基于Netty的时间轮启动一个后台任务,定期向Redis发送命令,重新设置锁的过期时间,通常是锁的租约时间的1/3。这确保了即使客户端处理时间较长,所持有的锁也不会过期。
  • 每次续期的时长:默认情况下,每10s钟做一次续期,续期时长是30s。
  • 停止续期:当锁被释放或者客户端实例被关闭时,Watchdog会自动停止对应锁的续租任务。

💖 底层实现

👨‍🏫 RedissonBaseLock.renewExpiration()

protected void scheduleExpirationRenewal(long threadId) {
     // 创建一个新的过期续期条目
     ExpirationEntry entry = new ExpirationEntry();
     // 尝试将新的过期续期条目放入到过期续期映射中,如果已存在则不替换
     ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
     if (oldEntry != null) {
         // 如果已存在,则将线程ID添加到旧的过期续期条目中
         oldEntry.addThreadId(threadId);
     } else {
         // 如果是新的过期续期条目,则添加线程ID,并尝试续期
         entry.addThreadId(threadId);
         try {
             // 尝试续期
             renewExpiration();
         } finally {
             // 如果当前线程被中断,则取消续期
             if (Thread.currentThread().isInterrupted()) {
                 cancelExpirationRenewal(threadId);
             }
         }
     }
}

// 定时任务执行续期
private void renewExpiration() {
     // 从过期续期映射中获取当前的过期续期条目
     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (ee == null) {
         // 如果没有找到,则直接返回
         return;
     }
     
     // 创建一个新的定时任务,用于执行续期逻辑
     Timeout task = getServiceManager().newTimeout(new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
             // 再次检查过期续期条目是否仍然存在
             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
             if (ent == null) {
                 return;
             }
             // 获取线程ID
             Long threadId = ent.getFirstThreadId();
             if (threadId == null) {
                 return;
             }
             
             // 使用LUA脚本异步续期
             CompletionStage<Boolean> future = renewExpirationAsync(threadId);
             future.whenComplete((res, e) -> {
                 if (e != null) {
                     // 如果有异常发生,记录错误并从映射中移除过期续期条目
                     log.error("Can't update lock {} expiration", getRawName(), e);
                     EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                     return;
                 }
                 
                 if (res) {
                     // 如果续期成功,则重新调度续期任务
                     renewExpiration();
                 } else {
                     // 如果续期失败,则取消续期
                     cancelExpirationRenewal(null);
                 }
             });
         }
     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
     
     // 将定时任务与过期续期条目关联
     ee.setTimeout(task);
}

// 使用LUA脚本,进行续期
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
     // 使用evalWriteAsync方法异步执行LUA脚本,用于续期
     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                     "return 1; " +
                     "end; " +
                     "return 0;",
             Collections.singletonList(getRawName()),
             internalLockLeaseTime, getLockName(threadId));
}

可以看到,上面的代码的主要逻辑就是用了一个TimerTask来实现了一个定时任务,设置了internalLockLeaseTime / 3的时长进行一次锁续期。默认的超时时长是30s,那么他会每10s进行一次续期,通过LUA脚本进行续期,再续30s

不过,这个续期也不是无脑续,他也是有条件的,其中ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());这个值得我们关注,他会从EXPIRATION_RENEWAL_MAP中尝试获取一个KV对,如果查不到,就不续期了。

EXPIRATION_RENEWAL_MAP这个东西,会在unlock的时候操作的,对他进行remove,所以一个锁如果被解了,那么就不会再继续续期了

@Override
public void unlock() {
     try {
         // 异步执行解锁操作
         get(unlockAsync(Thread.currentThread().getId()));
     } catch (RedisException e) {
         // 检查异常是否是由于线程没有持有锁导致的
         if (e.getCause() instanceof IllegalMonitorStateException) {
             // 如果是,则抛出原始的 IllegalMonitorStateException异常
             throw (IllegalMonitorStateException) e.getCause();
         } else {
             // 如果不是,则抛出原始的RedisException异常
             throw e;
         }
     }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
     // 使用getServiceManager执行解锁操作
     return getServiceManager().execute(() -> unlockAsync0(threadId));
}

private RFuture<Void> unlockAsync0(long threadId) {
     // 异步执行解锁操作
     CompletionStage<Boolean> future = unlockInnerAsync(threadId);
     // 处理异步操作的结果或异常
     CompletionStage<Void> f = future.handle((opStatus, e) -> {
         // 取消续期任务
         cancelExpirationRenewal(threadId);

         if (e != null) {
             // 如果有异常发生,抛出CompletionException
             if (e instanceof CompletionException) {
                 throw (CompletionException) e;
             }
             throw new CompletionException(e);
         }
         if (opStatus == null) {
             // 如果解锁操作失败,抛出IllegalMonitorStateException
             IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                     + id + " thread-id: " + threadId);
             throw new CompletionException(cause);
         }

         return null;
     });

     // 将CompletableFuture包装为RFuture
     return new CompletableFutureWrapper<>(f);
}

protected void cancelExpirationRenewal(Long threadId) {
     // 从过期续期映射中获取过期续期条目
     ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (task == null) {
         // 如果没有找到,则直接返回
         return;
     }
     
     if (threadId != null) {
         // 如果线程ID不为空,则从过期续期条目中移除该线程ID
         task.removeThreadId(threadId);
     }

     if (threadId == null || task.hasNoThreads()) {
         // 如果线程ID为空或者过期续期条目中没有线程ID,则取消定时任务
         Timeout timeout = task.getTimeout();
         if (timeout != null) {
             timeout.cancel();
         }
         // 从过期续期映射中移除过期续期条目
         EXPIRATION_RENEWAL_MAP.remove(getEntryName()); // 取消续期关键点
     }
}

核心:EXPIRATION_RENEWAL_MAP.remove(getEntryName());

一次unlock过程中,对EXPIRATION_RENEWAL_MAP进行移除,进而取消下一次锁续期的实现细节。

并且在unlockAsync方法中,不管unlockInnerAsync是否执行成功,还是抛了异常,都不影响cancelExpirationRenewal的执行,也可以理解为,只要unlock方法被调用了,即使解锁未成功,那么也可以停止下一次的锁续期。

💖 续期

加锁代码

/**
 * 尝试异步获取分布式锁。
 *
 * @param waitTime      等待获取锁的最大时间,如果设置为-1,则表示无限等待。
 * @param leaseTime     锁的过期时间,如果设置为-1,则表示使用默认的过期时间。
 * @param unit          时间单位,用于将leaseTime转换为毫秒。
 * @param threadId      当前线程的唯一标识符。
 * @return              一个RFuture对象,表示异步操作的结果,如果成功获取锁,则返回剩余的过期时间(毫秒)。
 * @throws InterruptedException 如果当前线程在等待过程中被中断。
 */
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
     // 尝试获取锁的异步方法
     RFuture<Long> ttlRemainingFuture;
     // 如果锁的过期时间大于0,则使用指定的过期时间
     if (leaseTime > 0) {
         ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
     } else {
         // 如果锁的过期时间不大于0,则使用内部锁的过期时间
         ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
     }
     // 处理没有同步获取锁的情况
     CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
     // 将处理后的CompletionStage包装为RFuture
     ttlRemainingFuture = new CompletableFutureWrapper<>(s);

     // 当ttlRemainingFuture完成时,如果ttlRemaining为null,则表示锁已成功获取
     CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
         // 锁已获取
         if (ttlRemaining == null) {
             // 如果锁的过期时间大于0,则设置锁的过期时间
             if (leaseTime > 0) {
                 internalLockLeaseTime = unit.toMillis(leaseTime);
             } else {
                 // 如果锁的过期时间不大于0,则安排锁的过期时间续期
                 scheduleExpirationRenewal(threadId);
             }
         }
         // 返回ttlRemaining,如果为null,则表示锁已获取
         return ttlRemaining;
     });
     // 将处理后的CompletionStage包装为RFuture
     return new CompletableFutureWrapper<>(f);
}

在这里插入图片描述

💖 停止续期

如果一个锁的unlock方法被调用了,那么就会停止续期。

那么,取消续期的核心代码如下:

/**
 * 取消与锁关联的自动续期任务。
 *
 * @param threadId 如果不为null,则只取消与特定线程ID关联的续期任务。
 */
protected void cancelExpirationRenewal(Long threadId) {
     // 从过期续期映射中获取当前的过期续期条目
     ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (task == null) {
         // 如果没有找到对应的续期条目,则直接返回
         return;
     }
     
     if (threadId != null) {
         // 如果提供了线程ID,则从续期条目中移除该线程ID
         task.removeThreadId(threadId);
     }

     if (threadId == null || task.hasNoThreads()) {
         // 如果没有提供线程ID,或者续期条目中没有其他线程ID,则取消定时任务
         Timeout timeout = task.getTimeout();
         if (timeout != null) {
             // 取消定时任务
             timeout.cancel();
         }
         // 从过期续期映射中移除过期续期条目
         EXPIRATION_RENEWAL_MAP.remove(getEntryName());
     }
}

主要就是通过 EXPIRATION_RENEWAL_MAP.remove来做的。那么cancelExpirationRenewal还有下面一处调用:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

也就是说,在尝试开启续期的过程中,如果线程被中断了,那么就会取消续期动作了。

目前,Redisson是没有针对最大续期次数和最大续期时间的支持的。所以,正常情况下,如果没有解锁,是会一直续期下去的。


💖 客户端挂了,锁会不会一直续期?

Redission 是 redis 的客户端

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/596136.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

笔记86:关于【#ifndef + #define + #endif】的用法

当你在编写一个头文件&#xff08;例如 pid_controller.h&#xff09;时&#xff0c;你可能会在多个源文件中包含它&#xff0c;以便在这些源文件中使用该头文件定义的函数、类或其他声明。如果你在多个源文件中都包含了同一个头文件&#xff0c;那么当你将整个工程统一编译&am…

银行卡实名认证API接口快速对接

银行卡实名认证API接口又叫银行卡核验类API接口、银行卡验证类API接口、银联核验类API接口,根据入参字段不同&#xff0c;分银行卡二要素验证API接口&#xff0c;银行卡三要素验证API接口&#xff0c;银行卡四要素验证API接口。其中&#xff0c;银行卡二要素验证API接口是验证开…

锂电池SOH估计 | Matlab实现基于ALO-SVR模型的锂电池SOH估计

目录 预测效果基本介绍程序设计参考资料 预测效果 基本介绍 锂电池SOH估计 | Matlab实现基于ALO-SVR模型的锂电池SOH估计 蚁狮优化支持向量机锂电池健康状态SOH估计&#xff1b; 具体流程如下&#xff1b; 1、分析锂离子电池老化数据集&#xff0c;从中选取具有代表电池性能衰减…

【自用】了解移动存储卡的基本信息

前言 本文是看B站视频做的一个简单笔记&#xff0c;方便日后自己快速回顾&#xff0c;内容主要介绍了存储卡基本参数&#xff0c;了解卡面上的数字、图标代表的含义。对于日后如何挑选判断一张存储卡的好坏、判别一张存储卡是否合格有一定帮助。 视频参考链接&#xff1a;【硬…

深入剖析Tomcat(六) Tomcat各组件的生命周期控制

Catalina中有很多组件&#xff0c;像上一章提到的四种容器&#xff0c;载入器&#xff0c;映射器等都是一种组件。每个组件在对外提供服务之前都需要有个启动过程&#xff1b;组件在销毁之前&#xff0c;也需要有个关闭过程&#xff1b;例如servlet容器关闭时&#xff0c;需要调…

OpenNJet应用引擎——云原生时代的Web服务新选择

文章目录 OpenNJet应用引擎——云原生时代的Web服务新选择引言&#xff1a;数字化转型的推动力&#xff1a;OpenNJet应用引擎为什么选择OpenNJet&#xff1f; OpenNJet的核心优势1. 云原生功能增强2. 安全加固3. 代码重构与性能优化4. 动态加载机制5. 多样化的产品形态6. 易于集…

产业空间集聚DO指数计算

1.前言 创始人 :Duranton and Overman&#xff08;2005&#xff09; 目前应用较多的产业集聚度量指数主要基于两类&#xff0c;一是根据不同空间地理单元中产业经济规模的均衡性进行构造&#xff0c;如空间基尼系数与EG指数&#xff1b;二是基于微观企业地理位置信息形成的产业…

嵌入式系统应用-拓展-FLASH之操作 SFUD (Serial Flash Universal Driver)之KEIL应用

这里已经假设SFUD代码已经移植到工程下面成功了&#xff0c;如果读者对SFUD移植还不了解。可以参考笔者这篇文章&#xff1a;SFUD (Serial Flash Universal Driver)之KEIL移植 这里主要介绍测试和应用 1 硬件设计 这里采用windbond 的W25Q32这款芯片用于SFUD测试。 W25Q32是…

LLM⊗KG范式下的知识图谱问答实现框架思想阅读

分享一张有趣的图&#xff0c;意思是在分类场景下&#xff0c;使用大模型和fasttext的效果&#xff0c;评论也很逗。 这其实背后的逻辑是&#xff0c;在类别众多的分类场景下&#xff0c;尤其是在标注数据量不缺的情况下&#xff0c;大模型的收益是否能够比有监督模型的收益更多…

[渗透利器]全能工具=信息收集->漏洞扫描->EXP调用

前言 hxd开发的工具&#xff0c;大致模块有&#xff08;信息收集&#xff0c;漏洞扫描&#xff0c;暴力破解&#xff0c;POC/EXP&#xff0c;常用编码&#xff09; 工具使用 下载后解压 安装环境 pip install -r requirements.txt 注意&#xff0c;该工具继承了两种不同的使…

HTML_CSS学习:定位

一、相对定位 相关代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>相对定位</title><style>.outer{width: 500px;background-color: #999ff0;border: 1px solid #000;p…

OpenHarmony实战开发-上传文件

Web组件支持前端页面选择文件上传功能&#xff0c;应用开发者可以使用onShowFileSelector()接口来处理前端页面文件上传的请求。 下面的示例中&#xff0c;当用户在前端页面点击文件上传按钮&#xff0c;应用侧在onShowFileSelector()接口中收到文件上传请求&#xff0c;在此接…

不考408的985,不想考408的有福了!吉林大学计算机考研考情分析

吉林大学&#xff08;Jilin University&#xff09;简称吉大&#xff0c;位于吉林长春&#xff0c;始建于1946年&#xff0c;是中华人民共和国教育部直属的综合性全国重点大学&#xff0c;国家“双一流”、“211工程”、“985工程”、“2011计划”重点建设的著名学府&#xff0…

我是如何带团队从0到1做了AI中台

经历心得 我从18年初就开始带这小团队开始做项目&#xff0c;比如最初的数字广东的协同办公项目&#xff0c;以及粤信签小程序等&#xff0c;所以&#xff0c;在团队管理&#xff0c;人员安排&#xff0c;工作分工&#xff0c;项目拆解等方面都有一定的经验。 19年中旬&#…

基于TL431和CSA的恒压与负压输出

Hello uu们,51去那里玩了呀?该收心回来上班了,嘿嘿! 为什么会有这个命题,因为我的手头只有这些东西如何去实现呢?让我们一起来看电路图吧.电路图如下图1所示 图1:CSA恒压输出电路 图1中,R1给U2提供偏置,Q1给R1提供电流,当U1-VOUT输出大于2.5V时候,U2内部的三极管CE导通,使得…

Kalign 3:大型数据集的多序列比对

之前一直用的是muscle&#xff0c;看到一个文章使用了Kalign&#xff0c;尝试一下吧 安装 wget -c https://github.com/TimoLassmann/kalign/archive/refs/tags/v3.4.0.tar.gz tar -zxvf v3.4.0.tar.gz cd kalign-3.4.0 mkdir build cd build cmake .. make make test su…

JVM之内存分配的详细解析

内存分配 两种方式 不分配内存的对象无法进行其他操作&#xff0c;JVM 为对象分配内存的过程&#xff1a;首先计算对象占用空间大小&#xff0c;接着在堆中划分一块内存给新对象 如果内存规整&#xff0c;使用指针碰撞&#xff08;Bump The Pointer&#xff09;。所有用过的内…

图片四张的时候两个一排 图片三张 五张的时候三个一排 css 如何实现

实现的效果如下图 1、html <view v-if"item.photo_list && item.photo_list.length ! 0" :class"getImageClass(item.photo_list.length)"><view v-for"(j,ind) in item.photo_list" :key"photoind" class"imag…

[python]texthero安装后测试代码

测试环境&#xff1a; anaconda3python3.8 texthero1.1.0 测试代码来自官方&#xff1a;https://github.com/jbesomi/texthero 代码&#xff1a; import texthero as hero import pandas as pddf pd.read_csv("https://gitee.com/FIRC/texthero/raw/master/dataset/…

自动化运维管理工具-------------Ansible

目录 一、自动化运维工具有哪些&#xff1f; 1.1Chef 1.2puppet 1.3Saltstack 二、Ansible介绍 2.1Ansible简介 2.2Ansible特点 2.3Ansible工作原理及流程 2.3.1内部流程 2.3.2外部流程 三、Ansible部署 3.1环境准备 3.2管理端安装 ansible 3.3Ansible相关文件 …