作者:后端小肥肠
🍇 我写过的文章中的相关代码放到了gitee,地址:xfc-fdw-cloud: 公共解决方案
🍊 有疑问可私信或评论区联系我。
🥑 创作不易未经允许严禁转载。
目录
1. 前言
2. 为何要使用分布式锁?
2.1. 单机场景里的锁
2.2. 分布式场景里的锁
3. Redis分布式锁实现
3.1. SpringBoot实现分布式锁
3.2. 看门狗方案实现
3.2.1. 开门狗方案原理
3.2.2. 看门狗方案核心代码
4. 如何使用Redission分布式锁
4.1. Redission简介
4.2. SpringBoot使用Redission分布式锁
5. 结语
1. 前言
在当今快速发展的分布式系统中,多个节点之间的协调和一致性成为了一个日益重要的挑战。随着云计算、微服务架构和大数据处理的普及,系统的复杂性显著增加,这使得并发操作的管理愈发困难。在这样的背景下,分布式锁作为一种重要的机制,能够有效地防止数据竞争和不一致性问题,确保系统的稳定与可靠。本文将深入探讨分布式锁的原理、实现方式以及在实际应用中的重要性。
2. 为何要使用分布式锁?
在系统开发中,尤其是高并发场景下,多个线程同时操作共享资源是常见的需求。例如,多个线程同时售票、更新库存、扣减余额等。这些操作如果没有妥善管理,很容易导致资源竞争、数据不一致等问题。在单机环境中,我们可以通过锁机制(如 synchronized
或 ReentrantLock
)解决这些问题,但在分布式环境中,这些机制无法直接使用,需要更复杂的分布式锁方案。
2.1. 单机场景里的锁
在单机环境中,可以使用线程安全的操作来避免多线程竞争。在以下代码中,我们通过三种方式逐步引入锁机制来保障线程安全。
以普通的售票代码为例,原始代码:
public class SaleTicket {
public static void main(String[] args) throws Exception {
Ticket ticket = new Ticket();
for (int j = 0; j < 5; j++) { // 创建5个线程模拟并发
new Thread(() -> { // 每个线程执行售票操作
for (int i = 1; i <= 10000; i++) {
ticket.sale();
}
}).start();
}
Thread.sleep(5000); // 等待线程执行完成
ticket.print(); // 打印剩余票数
}
}
// 无锁资源类
class Ticket {
// 总票数
private Integer number = new Integer(50000);
// 售票方法,无线程安全保障
public void sale() {
if (number > 0) {
number--;
}
}
public void print() {
System.out.println("剩余票:" + number);
}
}
运行以上代码,可能会出现以下问题:
- 票数不一致:多个线程可能同时读取和修改
number
,导致最终票数小于 0 或大于实际值。 - 数据竞争:线程之间没有同步机制,数据容易被破坏。
解决这一问题的关键在于引入锁机制,下面我们介绍三种常见的单机锁实现方式。
1. 使用 AtomicInteger
AtomicInteger
是 Java 提供的线程安全类,使用 CAS(Compare-And-Swap)原子操作实现多线程数据一致性。它适合简单场景,例如递增、递减等操作。
代码示例如下:
import java.util.concurrent.atomic.AtomicInteger;
public class SaleTicket {
public static void main(String[] args) throws Exception {
Ticket ticket = new Ticket();
for (int j = 0; j < 5; j++) {
new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
ticket.sale();
}
}).start();
}
Thread.sleep(5000); // 等待线程完成
ticket.print(); // 打印剩余票数
}
}
class Ticket {
private AtomicInteger number = new AtomicInteger(50000); // 线程安全的票数
public void sale() {
if (number.get() > 0) {
number.decrementAndGet(); // 原子操作
}
}
public void print() {
System.out.println("剩余票:" + number.get());
}
}
优点:
- 原子操作,无需显式加锁。
- 性能较高,适合简单的并发场景。
缺点:
- 不适合复杂业务逻辑,例如多个共享资源需要同时操作的场景。
2. 使用 synchronized
Synchronized
是 Java 提供的关键字,可以用来保证方法或代码块的线程安全。它通过内部锁(Monitor)机制,确保同一时间只有一个线程能够执行加锁的代码。
代码示例如下:
public class SaleTicket {
public static void main(String[] args) throws Exception {
Ticket ticket = new Ticket();
for (int j = 0; j < 5; j++) {
new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
ticket.sale();
}
}).start();
}
Thread.sleep(5000); // 等待线程完成
ticket.print(); // 打印剩余票数
}
}
class Ticket {
private Integer number = new Integer(50000); // 总票数
public synchronized void sale() {
if (number > 0) {
number--; // 在锁保护下操作
}
}
public void print() {
System.out.println("剩余票:" + number);
}
}
优点:
- 简单易用,内置关键字,便于开发者理解和使用。
- 适合多线程复杂操作。
缺点:
- 性能较低,因为线程竞争会导致阻塞。
- 粒度较大,可能降低系统并发性。
3. 使用 ReentrantLock
ReentrantLock
是 Java 并发包中的显式锁,与 synchronized
相比,它提供了更丰富的功能,例如支持公平锁、非公平锁、条件变量等。
代码示例如下:
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicket {
public static void main(String[] args) throws Exception {
Ticket ticket = new Ticket();
for (int j = 0; j < 5; j++) {
new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
ticket.sale();
}
}).start();
}
Thread.sleep(5000); // 等待线程完成
ticket.print(); // 打印剩余票数
}
}
class Ticket {
private Integer number = new Integer(50000); // 总票数
private final ReentrantLock lock = new ReentrantLock(); // 显式锁
public void sale() {
lock.lock(); // 加锁
try {
if (number > 0) {
number--; // 线程安全操作
}
} finally {
lock.unlock(); // 确保释放锁
}
}
public void print() {
System.out.println("剩余票:" + number);
}
}
优点:
- 灵活,支持公平锁、非公平锁等特性。
- 更适合复杂的并发场景。
缺点:
- 必须显式加锁和释放锁,代码复杂度较高。
- 需要正确处理异常,防止死锁。
在单机场景中,使用锁机制可以有效解决线程安全问题。对于简单的操作(如计数器),可以优先使用 AtomicInteger
;如果需要保护复杂的业务逻辑,可以选择 synchronized
或 ReentrantLock
。
2.2. 分布式场景里的锁
在单机环境中,使用线程锁(如 synchronized
、JUC)可以有效地管理并发操作,保证数据的一致性。但在分布式系统中,多个节点可能并行执行相同的操作,访问的是共享资源(如数据库、缓存、队列等)。这就带来了一个新的问题:如何在不同的节点之间协调资源的访问?
光说可能你不是很理解,我来举个例子:
假设你有一个售票系统,多个用户同时请求购买同一张票。如果没有分布式锁,可能会发生如下情况:
- 用户 A 和用户 B 同时查询到有票可买。
- 用户 A 和用户 B 分别进行扣款操作,且系统仍认为票数未减少,这会导致“超卖”情况。
使用分布式锁后,只有一个请求可以修改票数,其他请求将被阻塞或等待,直到锁被释放,从而避免超卖问题。
在分布式环境中,多个服务或节点可能并发访问同一份数据。如果没有适当的机制来管理这些并发操作,就会发生资源竞争和数据不一致等问题。因此,分布式锁应运而生,用于控制不同节点对共享资源的访问,确保同一时刻只有一个节点能够执行某项操作。
常见的分布式锁应用场景:
- 防止超卖:比如多个用户请求同时购买同一票,或者多个服务同时修改同一份数据。通过分布式锁,确保只有一个请求能够操作共享资源,从而避免超卖。
- 避免缓存穿透:多个服务可能同时访问缓存失效的数据,使用分布式锁可以确保只有一个请求去查询数据库,其他请求需要等待。
- 确保数据一致性:多个微服务可能会并发修改同一份数据,通过分布式锁来确保同一时刻只有一个服务能够修改数据,从而避免数据不一致的风险。
如何在分布式环境中实现锁?
分布式锁的目标是确保不同节点对共享资源的访问不冲突。以下是几种常见的分布式锁实现方式:
-
基于 Redis 的分布式锁: Redis 提供了高效的键值存储,可以通过
SETNX
命令(Set if Not eXists)来创建分布式锁。该命令只有在锁不存在时才会成功设置,从而保证了只有一个节点可以获取锁。示例:
SETNX lock_key value
该命令如果成功设置,表示当前节点获得了锁;如果失败,表示其他节点已获得锁。
-
基于 ZooKeeper 的分布式锁: ZooKeeper 是一个分布式协调服务,提供了可靠的锁机制。通过在 ZooKeeper 中创建临时节点,当一个节点成功创建锁节点时,其他节点无法重复创建,从而实现分布式锁。(这部分会放到ZooKeeper系列说)
-
基于数据库的分布式锁: 通过在数据库中创建锁表,使用数据库行锁来控制并发访问。虽然简单易用,但性能较低,适合低并发场景。(本文不讲)
-
基于 Redisson 的分布式锁: Redisson 是一个 Java 客户端,提供了高效的分布式锁功能,支持多种锁类型,如公平锁、读写锁等。它封装了 Redis 的原子操作,并提供了更易用的 API,使得在分布式系统中实现锁机制更加方便。
在实践中,分布式锁可以应用于多个场景,如防止超卖、确保数据一致性和避免缓存穿透等问题。在下一节中,我们将详细介绍分布式锁的具体技术实现,包括redis、Redission、Zookeeper的具体实现技术细节。
3. Redis分布式锁实现
想要实现分布式锁,必须要求 Redis 有互斥的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。
两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。
客户端 1 申请加锁,加锁成功:
客户端 2 申请加锁,因为它后到达,加锁失败:
此时,加锁成功的客户端,就可以去操作共享资源,例如,修改 数据库 的某一行数据,或者调用一个 API 请求。
操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?
也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。
但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成死锁:
1、程序处理业务逻辑异常,没及时释放锁
2、进程挂了,没机会释放锁
这时,这个客户端就会一直占用这个锁,而其它客户端就永远拿不到这把锁了。怎么解决这个问题呢?
如何避免死锁?
我们很容易想到的方案是,在申请锁时,给这把锁设置一个租期。
在 Redis 中实现时,就是给这个 key 设置一个过期时间。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:
SETNX lock 1 // 加锁
EXPIRE lock 10 // 10s后自动过期
这样一来,无论客户端是否异常,这个锁都可以在 10s 后被自动释放,其它客户端依旧可以拿到锁。
但现在还是有问题:
现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却来不及执行的情况发生呢?例如:
-
SETNX 执行成功,执行EXPIRE 时由于网络问题,执行失败
-
SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
-
SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行
总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生死锁问题。
在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:
SET lock 1 EX 10 NX
锁被别人释放怎么办?
上面的命令执行时,每个客户端在释放锁时,都是无脑操作,并没有检查这把锁是否还归自己持有,所以就会发生释放别人锁的风险,这样的解锁流程,很不严谨!如何解决这个问题呢?
解决办法是:客户端在加锁时,设置一个只有自己知道的唯一标识进去。
例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:
SET lock $uuid EX 20 NX
之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:
if redis.get("lock") == $uuid:
redis.del("lock")
这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。
安全释放锁的 Lua 脚本如下:
if redis.call("GET",KEYS[1]) == ARGV[1]
then
return redis.call("DEL",KEYS[1])
else
return 0
end
好了,这样一路优化,整个的加锁、解锁的流程就更严谨了。
这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:
1、加锁
SET lock_key $unique_id EX $expire_time NX
2、操作共享资源
3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁
3.1. SpringBoot实现分布式锁
只贴核心代码,redis配置和maven依赖就不贴了:
/**
* 分布式锁的实现
*/
@Component
public class RedisDistLock implements Lock {
private final static int LOCK_TIME = 5*1000;//失效时间
private final static String RS_DISTLOCK_NS = "tdln:"; //加锁的key的前缀
/*
if redis.call('get',KEYS[1])==ARGV[1] then
return redis.call('del', KEYS[1])
else return 0 end
*/
//释放锁的时候,确保原子。lua脚本:确保 释放锁的线程就是加锁的线程,不能被线程的线程无脑调用释放
private final static String RELEASE_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
" else return 0 end";
/*保存每个线程的独有的ID值*/
private ThreadLocal<String> lockerId = new ThreadLocal<>();
/*解决锁的重入*/
private Thread ownerThread;
private String lockName = "lock";
@Autowired
private JedisPool jedisPool;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
public Thread getOwnerThread() {
return ownerThread;
}
public void setOwnerThread(Thread ownerThread) {//加锁成功,就会把抢到锁的线程进行保存
this.ownerThread = ownerThread;
}
@Override
public void lock() { //redis的分布式锁
while(!tryLock()){
try {
Thread.sleep(100); //每隔100ms 都会去尝试加锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException("不支持可中断获取锁!");
}
@Override
public boolean tryLock() {
Thread t = Thread.currentThread();
if(ownerThread==t){/*说明本线程持有锁*/
return true;
}else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/
return false;
}
Jedis jedis = jedisPool.getResource();
try {
String id = UUID.randomUUID().toString();
SetParams params = new SetParams();
params.px(LOCK_TIME);
params.nx();
synchronized (this){/*线程们,本地抢锁*/
if((ownerThread==null)&&
"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){
lockerId.set(id);
setOwnerThread(t);
return true;
}else{
return false;
}
}
} catch (Exception e) {
throw new RuntimeException("分布式锁尝试加锁失败!");
} finally {
jedis.close();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("不支持等待尝试获取锁!");
}
@Override
public void unlock() {
if(ownerThread!=Thread.currentThread()) {
throw new RuntimeException("试图释放无所有权的锁!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockName),
Arrays.asList(lockerId.get()));
if(result.longValue()!=0L){
System.out.println("Redis上的锁已释放!");
}else{
System.out.println("Redis上的锁释放失败!");
}
} catch (Exception e) {
throw new RuntimeException("释放锁失败!",e);
} finally {
if(jedis!=null) jedis.close();
lockerId.remove();
setOwnerThread(null);
System.out.println("本地锁所有权已释放!");
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("不支持等待通知操作!");
}
}
这段代码实现了一个基于 Redis 的分布式锁 (RedisDistLock
) 类。它实现了 Lock
接口,用于在分布式环境中对资源进行加锁和解锁,确保同一时间只有一个线程可以操作共享资源。主要功能包括:
- 加锁 (
lock
方法):尝试获取分布式锁,如果当前线程未持有锁,则进入循环每 100 毫秒重试,直到成功获得锁。 - 释放锁 (
unlock
方法):只有持有锁的线程才能释放锁,利用 Lua 脚本确保锁的释放操作是原子的,避免锁被错误释放。 - 锁的重入:通过检查当前线程是否已经持有锁,来支持锁的重入机制,避免死锁。
- 线程唯一标识:每个线程持有一个唯一的
lockerId
,用于标识和验证锁的拥有者。 - 不支持中断和等待超时的加锁:该实现不支持可中断的加锁操作,也不支持在指定时间内尝试获取锁。
该锁使用了 Redis 的 SET
命令来加锁,并且使用 Lua 脚本确保释放锁时不会被其他线程误释放。
3.2. 看门狗方案实现
3.2.1. 开门狗方案原理
在分布式系统中,使用 Redis 锁时,如果业务逻辑执行时间超过锁的过期时间,可能会引发锁的提前释放问题,进而导致并发冲突。为了避免这种情况,可以引入 看门狗机制。
未加看门狗机制时分布式场景的工作流程:
- 加锁:客户端(如客户端 C)请求加锁,Redis 设置
lock_key
并附加一个过期时间(例如 10 秒)。 - 业务执行:客户端在锁的保护下,执行业务逻辑。
- 解锁:业务逻辑执行完毕后,客户端主动释放锁。
潜在问题:
- 如果业务逻辑的执行时间超过锁的过期时间(如大于 10 秒),在客户端释放锁之前,锁已经因为过期而自动释放。
- 锁释放后,其他客户端(如客户端 A 或 B)可能抢到锁,导致多个客户端同时执行相同的业务逻辑,发生并发冲突。
如下图所示:
为了解决锁提前释放的问题,可以引入 看门狗机制,通过定期续期保证锁在业务逻辑执行完成前不会被自动释放。
看门狗机制的工作原理:
-
加锁后启动看门狗:
- 客户端C加锁后,启动一个守护线程(看门狗)。
- 守护线程会定期检查锁的过期时间。
-
定期续期:
- 守护线程每隔一段时间(例如 5 秒)检查锁是否即将过期。
- 如果锁没有被解锁且仍然属于当前客户端,则向 Redis 请求续期(如将锁的过期时间从 10 秒延长到 20 秒)。
- 这样,即使业务逻辑的执行时间超过初始过期时间,锁也不会过期。
-
主动释放锁:
- 客户端在业务逻辑执行完毕后,主动释放锁。
- 此时,看门狗线程会停止续期,锁正常释放。
如下图展示了看门狗机制的具体流程:
3.2.2. 看门狗方案核心代码
只贴核心代码:
RedisDistLockWithDog
@Component
public class RedisDistLockWithDog implements Lock {
private final static int LOCK_TIME = 1*1000;
private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
private final static String RS_DISTLOCK_NS = "tdln2:";
/*
if redis.call('get',KEYS[1])==ARGV[1] then
return redis.call('del', KEYS[1])
else return 0 end
*/
private final static String RELEASE_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
" else return 0 end";
/*还有并发问题,考虑ThreadLocal*/
private ThreadLocal<String> lockerId = new ThreadLocal<>();
private Thread ownerThread;
private String lockName = "lock";
@Autowired
private JedisPool jedisPool;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
public Thread getOwnerThread() {
return ownerThread;
}
public void setOwnerThread(Thread ownerThread) {
this.ownerThread = ownerThread;
}
@Override
public void lock() {
while(!tryLock()){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException("不支持可中断获取锁!");
}
@Override
public boolean tryLock() {
Thread t=Thread.currentThread();
/*说明本线程正在持有锁*/
if(ownerThread==t) {
return true;
}else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/
return false;
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/
String id = UUID.randomUUID().toString();
SetParams params = new SetParams();
params.px(LOCK_TIME); //加锁时间1s
params.nx();
synchronized (this){
if ((ownerThread==null)&&
"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
lockerId.set(id);
setOwnerThread(t);
if(expireThread == null){//看门狗线程启动
expireThread = new Thread(new ExpireTask(),"expireThread");
expireThread.setDaemon(true);
expireThread.start();
}
//往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)
delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
System.out.println(Thread.currentThread().getName()+"已获得锁----");
return true;
}else{
System.out.println(Thread.currentThread().getName()+"无法获得锁----");
return false;
}
}
} catch (Exception e) {
throw new RuntimeException("分布式锁尝试加锁失败!",e);
} finally {
jedis.close();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("不支持等待尝试获取锁!");
}
@Override
public void unlock() {
if(ownerThread!=Thread.currentThread()) {
throw new RuntimeException("试图释放无所有权的锁!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockName),
Arrays.asList(lockerId.get()));
System.out.println(result);
if(result.longValue()!=0L){
System.out.println("Redis上的锁已释放!");
}else{
System.out.println("Redis上的锁释放失败!");
}
} catch (Exception e) {
throw new RuntimeException("释放锁失败!",e);
} finally {
if(jedis!=null) jedis.close();
lockerId.remove();
setOwnerThread(null);
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("不支持等待通知操作!");
}
/*看门狗线程*/
private Thread expireThread;
//通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数 阻塞延迟队列 刷1 没有刷2
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
//续锁逻辑:判断是持有锁的线程才能续锁
private final static String DELAY_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
" else return 0 end";
private class ExpireTask implements Runnable{
@Override
public void run() {
System.out.println("看门狗线程已启动......");
while(!Thread.currentThread().isInterrupted()) {
try {
LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到 0.9s
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),
Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
if(result.longValue()==0L){
System.out.println("Redis上的锁已释放,无需续期!");
}else{
delayDog.add(new ItemVo<>((int)LOCK_TIME,
new LockItem(lockItem.getKey(),lockItem.getValue())));
System.out.println("Redis上的锁已续期:"+LOCK_TIME);
}
} catch (Exception e) {
throw new RuntimeException("锁续期失败!",e);
} finally {
if(jedis!=null) jedis.close();
}
} catch (InterruptedException e) {
System.out.println("看门狗线程被中断");
break;
}
}
System.out.println("看门狗线程准备关闭......");
}
}
// @PostConstruct
// public void initExpireThread(){
//
// }
@PreDestroy
public void closeExpireThread(){
if(null!=expireThread){
expireThread.interrupt();
}
}
}
针对3.1小节中的分布式锁而言,看门狗方案主要做了以下改进 :
1. 锁的过期时间设计
private final static int LOCK_TIME = 1*1000; // 锁的过期时间为1秒
- 相比RedisDistLock的5秒,这里故意设置较短的过期时间
- 通过看门狗机制来自动续期,避免业务执行时间过长导致锁过期
2. 看门狗机制的核心实现
// 看门狗相关的成员变量
private Thread expireThread;
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
// 续期的Lua脚本
private final static String DELAY_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
" else return 0 end";
- 使用DelayQueue来实现延迟任务,避免无效的轮询
- 通过Lua脚本保证续期操作的原子性
3. 加锁时启动看门狗
if ((ownerThread==null) && "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
lockerId.set(id);
setOwnerThread(t);
if(expireThread == null){ // 启动看门狗线程
expireThread = new Thread(new ExpireTask(),"expireThread");
expireThread.setDaemon(true);
expireThread.start();
}
// 添加续期任务到延迟队列
delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
return true;
}
4. 自动续期的实现
public void run() {
while(!Thread.currentThread().isInterrupted()) {
try {
LockItem lockItem = delayDog.take().getData(); // 阻塞等待直到快过期
// 执行续期
Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey()),
Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
if(result.longValue()!=0L){
// 续期成功,继续添加下一次续期任务
delayDog.add(new ItemVo<>((int)LOCK_TIME,
new LockItem(lockItem.getKey(),lockItem.getValue())));
}
} catch (InterruptedException e) {
break;
}
}
}
5. 优化的续期时机
public ItemVo(long expirationTime, T data) {
// 提前100ms进行续期
this.activeTime = expirationTime+System.currentTimeMillis()-100;
this.data = data;
}
主要优势在于:
- 解决了长时间业务导致锁过期的问题
- 使用DelayQueue避免了轮询带来的性能开销
- 自动续期机制更加可靠
- 优雅关闭机制(通过@PreDestroy注解)
这种实现类似于Redis官方客户端Redisson的实现原理,更适合实际生产环境使用。
4. 如何使用Redission分布式锁
4.1. Redission简介
Redisson 是一个基于 Redis 的高性能工具库,它简化了 Redis 的使用,并提供了丰富的分布式工具支持,如分布式锁、分布式集合、队列等。在分布式锁场景下,Redisson 封装了锁的创建、续期和释放等逻辑,并内置了看门狗机制,大大提升了分布式锁的可靠性和开发效率。
Redisson 的核心功能:
- 提供可重入锁(ReentrantLock)和公平锁(FairLock)。
- 内置看门狗机制,自动续期防止锁过期。
- 支持集群、哨兵模式和单节点模式。
- 与 Spring Boot 集成简单,支持注解形式使用。
4.2. SpringBoot使用Redission分布式锁
1. 新增 Maven 依赖
在 Spring Boot 项目的 pom.xml
文件中,添加 Redisson 的 Maven 依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version> <!-- 使用最新版本 -->
</dependency>
此依赖可以帮助我们轻松将 Redisson 集成到 Spring Boot 项目中。
2. 配置类编写
为 Redisson 创建一个配置类,用于初始化 Redis 连接。以下是一个基于单机模式的示例:
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 配置单节点 Redis 地址
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword(null) // 如果有密码则填写
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(64);
return Redisson.create(config);
}
}
如果你的 Redis 部署为集群模式或哨兵模式,可以使用以下方法:
- 集群模式:
config.useClusterServers().addNodeAddress(...)
- 哨兵模式:
config.useSentinelServers().addSentinelAddress(...)
3. 使用 Redisson 分布式锁
下面通过一个具体的业务场景(如库存扣减)来演示如何使用 Redisson 分布式锁。
示例代码:
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class StockService {
@Autowired
private RedissonClient redissonClient;
public void deductStock(String productId) {
// 获取锁实例
RLock lock = redissonClient.getLock("lock:stock:" + productId);
try {
// 加锁,等待时间 5 秒,锁超时时间 10 秒
if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
try {
// 模拟业务逻辑:扣减库存
System.out.println("扣减库存逻辑执行中...");
Thread.sleep(8000); // 模拟耗时操作
System.out.println("库存扣减成功!");
} finally {
lock.unlock(); // 释放锁
System.out.println("锁释放成功!");
}
} else {
System.out.println("获取锁失败,可能有其他线程在执行!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码使用 Redisson 实现分布式锁,通过 tryLock
尝试获取商品的锁,设置等待时间为 5 秒、锁超时时间为 10 秒。获取锁后,在锁保护下模拟扣减库存的操作,完成后释放锁。如果未能获取锁,则提示可能有其他线程在执行。
5. 结语
本文从分布式锁的核心原理入手,结合手写实现的 Redis 分布式锁与看门狗机制,深入剖析了解决锁过期问题的设计思路。同时,我们还介绍了 Redisson 的使用方法,通过它的封装与内置的看门狗机制,可以更高效地实现分布式锁,减少开发成本,提升可靠性。分布式锁的实现既是分布式系统中的基础问题,也是解决高并发和数据一致性挑战的重要工具。希望本文的讲解能为你在实际开发中提供帮助!