解密分布式锁:保障系统一致性的关键

作者:后端小肥肠

🍇 我写过的文章中的相关代码放到了gitee,地址:xfc-fdw-cloud: 公共解决方案

🍊 有疑问可私信或评论区联系我。

🥑  创作不易未经允许严禁转载。

目录

1. 前言

2. 为何要使用分布式锁?

2.1. 单机场景里的锁

2.2. 分布式场景里的锁

3. Redis分布式锁实现

3.1. SpringBoot实现分布式锁

3.2. 看门狗方案实现

3.2.1. 开门狗方案原理

3.2.2. 看门狗方案核心代码

4. 如何使用Redission分布式锁

4.1. Redission简介

4.2. SpringBoot使用Redission分布式锁

5. 结语


1. 前言

在当今快速发展的分布式系统中,多个节点之间的协调和一致性成为了一个日益重要的挑战。随着云计算、微服务架构和大数据处理的普及,系统的复杂性显著增加,这使得并发操作的管理愈发困难。在这样的背景下,分布式锁作为一种重要的机制,能够有效地防止数据竞争和不一致性问题,确保系统的稳定与可靠。本文将深入探讨分布式锁的原理、实现方式以及在实际应用中的重要性。

2. 为何要使用分布式锁?

在系统开发中,尤其是高并发场景下,多个线程同时操作共享资源是常见的需求。例如,多个线程同时售票、更新库存、扣减余额等。这些操作如果没有妥善管理,很容易导致资源竞争、数据不一致等问题。在单机环境中,我们可以通过锁机制(如 synchronized ReentrantLock)解决这些问题,但在分布式环境中,这些机制无法直接使用,需要更复杂的分布式锁方案。

2.1. 单机场景里的锁

在单机环境中,可以使用线程安全的操作来避免多线程竞争。在以下代码中,我们通过三种方式逐步引入锁机制来保障线程安全。

以普通的售票代码为例,原始代码:

public class SaleTicket {
    public static void main(String[] args) throws Exception {
        Ticket ticket = new Ticket();
        for (int j = 0; j < 5; j++) {  // 创建5个线程模拟并发
            new Thread(() -> {  // 每个线程执行售票操作
                for (int i = 1; i <= 10000; i++) {
                    ticket.sale();
                }
            }).start();
        }
        Thread.sleep(5000);  // 等待线程执行完成
        ticket.print();  // 打印剩余票数
    }
}

// 无锁资源类
class Ticket {
    // 总票数
    private Integer number = new Integer(50000);

    // 售票方法,无线程安全保障
    public void sale() {
        if (number > 0) {
            number--;
        }
    }

    public void print() {
        System.out.println("剩余票:" + number);
    }
}

运行以上代码,可能会出现以下问题:

  • 票数不一致:多个线程可能同时读取和修改 number,导致最终票数小于 0 或大于实际值。
  • 数据竞争:线程之间没有同步机制,数据容易被破坏。

解决这一问题的关键在于引入锁机制,下面我们介绍三种常见的单机锁实现方式。

1. 使用 AtomicInteger

AtomicInteger 是 Java 提供的线程安全类,使用 CAS(Compare-And-Swap)原子操作实现多线程数据一致性。它适合简单场景,例如递增、递减等操作。

代码示例如下:

import java.util.concurrent.atomic.AtomicInteger;

public class SaleTicket {
    public static void main(String[] args) throws Exception {
        Ticket ticket = new Ticket();
        for (int j = 0; j < 5; j++) {
            new Thread(() -> {
                for (int i = 1; i <= 10000; i++) {
                    ticket.sale();
                }
            }).start();
        }
        Thread.sleep(5000); // 等待线程完成
        ticket.print(); // 打印剩余票数
    }
}

class Ticket {
    private AtomicInteger number = new AtomicInteger(50000); // 线程安全的票数

    public void sale() {
        if (number.get() > 0) {
            number.decrementAndGet(); // 原子操作
        }
    }

    public void print() {
        System.out.println("剩余票:" + number.get());
    }
}

优点

  • 原子操作,无需显式加锁。
  • 性能较高,适合简单的并发场景。

缺点

  • 不适合复杂业务逻辑,例如多个共享资源需要同时操作的场景。

2. 使用 synchronized

Synchronized 是 Java 提供的关键字,可以用来保证方法或代码块的线程安全。它通过内部锁(Monitor)机制,确保同一时间只有一个线程能够执行加锁的代码。

代码示例如下:

public class SaleTicket {
    public static void main(String[] args) throws Exception {
        Ticket ticket = new Ticket();
        for (int j = 0; j < 5; j++) {
            new Thread(() -> {
                for (int i = 1; i <= 10000; i++) {
                    ticket.sale();
                }
            }).start();
        }
        Thread.sleep(5000); // 等待线程完成
        ticket.print(); // 打印剩余票数
    }
}

class Ticket {
    private Integer number = new Integer(50000); // 总票数

    public synchronized void sale() {
        if (number > 0) {
            number--; // 在锁保护下操作
        }
    }

    public void print() {
        System.out.println("剩余票:" + number);
    }
}

 优点

  • 简单易用,内置关键字,便于开发者理解和使用。
  • 适合多线程复杂操作。

缺点

  • 性能较低,因为线程竞争会导致阻塞。
  • 粒度较大,可能降低系统并发性。

3. 使用 ReentrantLock

ReentrantLock 是 Java 并发包中的显式锁,与 synchronized 相比,它提供了更丰富的功能,例如支持公平锁、非公平锁、条件变量等。

代码示例如下:

import java.util.concurrent.locks.ReentrantLock;

public class SaleTicket {
    public static void main(String[] args) throws Exception {
        Ticket ticket = new Ticket();
        for (int j = 0; j < 5; j++) {
            new Thread(() -> {
                for (int i = 1; i <= 10000; i++) {
                    ticket.sale();
                }
            }).start();
        }
        Thread.sleep(5000); // 等待线程完成
        ticket.print(); // 打印剩余票数
    }
}

class Ticket {
    private Integer number = new Integer(50000); // 总票数
    private final ReentrantLock lock = new ReentrantLock(); // 显式锁

    public void sale() {
        lock.lock(); // 加锁
        try {
            if (number > 0) {
                number--; // 线程安全操作
            }
        } finally {
            lock.unlock(); // 确保释放锁
        }
    }

    public void print() {
        System.out.println("剩余票:" + number);
    }
}

优点

  • 灵活,支持公平锁、非公平锁等特性。
  • 更适合复杂的并发场景。

缺点

  • 必须显式加锁和释放锁,代码复杂度较高。
  • 需要正确处理异常,防止死锁。

在单机场景中,使用锁机制可以有效解决线程安全问题。对于简单的操作(如计数器),可以优先使用 AtomicInteger;如果需要保护复杂的业务逻辑,可以选择 synchronized ReentrantLock

2.2. 分布式场景里的锁

在单机环境中,使用线程锁(如 synchronizedJUC)可以有效地管理并发操作,保证数据的一致性。但在分布式系统中,多个节点可能并行执行相同的操作,访问的是共享资源(如数据库、缓存、队列等)。这就带来了一个新的问题:如何在不同的节点之间协调资源的访问?

光说可能你不是很理解,我来举个例子:

假设你有一个售票系统,多个用户同时请求购买同一张票。如果没有分布式锁,可能会发生如下情况:

  • 用户 A 和用户 B 同时查询到有票可买
  • 用户 A 和用户 B 分别进行扣款操作,且系统仍认为票数未减少,这会导致“超卖”情况。

使用分布式锁后,只有一个请求可以修改票数,其他请求将被阻塞或等待,直到锁被释放,从而避免超卖问题。

在分布式环境中,多个服务或节点可能并发访问同一份数据。如果没有适当的机制来管理这些并发操作,就会发生资源竞争和数据不一致等问题。因此,分布式锁应运而生,用于控制不同节点对共享资源的访问,确保同一时刻只有一个节点能够执行某项操作。

常见的分布式锁应用场景:

  1. 防止超卖比如多个用户请求同时购买同一票,或者多个服务同时修改同一份数据。通过分布式锁,确保只有一个请求能够操作共享资源,从而避免超卖。
  2. 避免缓存穿透多个服务可能同时访问缓存失效的数据,使用分布式锁可以确保只有一个请求去查询数据库,其他请求需要等待。
  3. 确保数据一致性多个微服务可能会并发修改同一份数据,通过分布式锁来确保同一时刻只有一个服务能够修改数据,从而避免数据不一致的风险。

如何在分布式环境中实现锁?

分布式锁的目标是确保不同节点对共享资源的访问不冲突。以下是几种常见的分布式锁实现方式:

  1. 基于 Redis 的分布式锁 Redis 提供了高效的键值存储,可以通过 SETNX 命令(Set if Not eXists)来创建分布式锁。该命令只有在锁不存在时才会成功设置,从而保证了只有一个节点可以获取锁。

    示例:

    SETNX lock_key value

    该命令如果成功设置,表示当前节点获得了锁;如果失败,表示其他节点已获得锁。

  2. 基于 ZooKeeper 的分布式锁 ZooKeeper 是一个分布式协调服务,提供了可靠的锁机制。通过在 ZooKeeper 中创建临时节点,当一个节点成功创建锁节点时,其他节点无法重复创建,从而实现分布式锁。(这部分会放到ZooKeeper系列说

  3. 基于数据库的分布式锁: 通过在数据库中创建锁表,使用数据库行锁来控制并发访问。虽然简单易用,但性能较低,适合低并发场景。(本文不讲)

  4. 基于 Redisson 的分布式锁Redisson 是一个 Java 客户端,提供了高效的分布式锁功能,支持多种锁类型,如公平锁、读写锁等。它封装了 Redis 的原子操作,并提供了更易用的 API,使得在分布式系统中实现锁机制更加方便。

在实践中,分布式锁可以应用于多个场景,如防止超卖、确保数据一致性和避免缓存穿透等问题。在下一节中,我们将详细介绍分布式锁的具体技术实现,包括redis、Redission、Zookeeper的具体实现技术细节。

3. Redis分布式锁实现

想要实现分布式锁,必须要求 Redis 有互斥的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。

两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。

客户端 1 申请加锁,加锁成功:

客户端 2 申请加锁,因为它后到达,加锁失败:

image.png

此时,加锁成功的客户端,就可以去操作共享资源,例如,修改 数据库 的某一行数据,或者调用一个 API 请求。

操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?

也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。

image.png

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成死锁

1、程序处理业务逻辑异常,没及时释放锁

2、进程挂了,没机会释放锁

这时,这个客户端就会一直占用这个锁,而其它客户端就永远拿不到这把锁了。怎么解决这个问题呢?

如何避免死锁?

我们很容易想到的方案是,在申请锁时,给这把锁设置一个租期

在 Redis 中实现时,就是给这个 key 设置一个过期时间。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:

SETNX lock 1    // 加锁
EXPIRE lock 10  // 10s后自动过期

image.png

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被自动释放,其它客户端依旧可以拿到锁。

但现在还是有问题:

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却来不及执行的情况发生呢?例如:

  • SETNX 执行成功,执行EXPIRE 时由于网络问题,执行失败

  • SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行

  • SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行

总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生死锁问题。

在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

SET lock 1 EX 10 NX

image.png

锁被别人释放怎么办?

上面的命令执行时,每个客户端在释放锁时,都是无脑操作,并没有检查这把锁是否还归自己持有,所以就会发生释放别人锁的风险,这样的解锁流程,很不严谨!如何解决这个问题呢?

解决办法是:客户端在加锁时,设置一个只有自己知道的唯一标识进去。

例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:

SET lock $uuid EX 20 NX

之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:

if redis.get("lock") == $uuid:
    redis.del("lock")

这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。

安全释放锁的 Lua 脚本如下:

if redis.call("GET",KEYS[1]) == ARGV[1]
then
    return redis.call("DEL",KEYS[1])
else
    return 0
end

好了,这样一路优化,整个的加锁、解锁的流程就更严谨了。

这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:

1、加锁

SET lock_key $unique_id EX $expire_time NX

2、操作共享资源

3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁

3.1. SpringBoot实现分布式锁

只贴核心代码,redis配置和maven依赖就不贴了:

/**
 * 分布式锁的实现
 */
@Component
public class RedisDistLock implements Lock {

    private final static int LOCK_TIME = 5*1000;//失效时间
    private final static String RS_DISTLOCK_NS = "tdln:"; //加锁的key的前缀
    /*
     if redis.call('get',KEYS[1])==ARGV[1] then
        return redis.call('del', KEYS[1])
    else return 0 end
     */
    //释放锁的时候,确保原子。lua脚本:确保  释放锁的线程就是加锁的线程,不能被线程的线程无脑调用释放
    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*保存每个线程的独有的ID值*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    /*解决锁的重入*/
    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {//加锁成功,就会把抢到锁的线程进行保存
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() { //redis的分布式锁
        while(!tryLock()){
            try {
                Thread.sleep(100); //每隔100ms 都会去尝试加锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t = Thread.currentThread();
        if(ownerThread==t){/*说明本线程持有锁*/
            return true;
        }else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/
            return false;
        }
        Jedis jedis = jedisPool.getResource();
        try {
            String id = UUID.randomUUID().toString();
            SetParams params = new SetParams();
            params.px(LOCK_TIME);
            params.nx();
            synchronized (this){/*线程们,本地抢锁*/
                if((ownerThread==null)&&
                "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){
                    lockerId.set(id);
                    setOwnerThread(t);
                    return true;
                }else{
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!");
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
            System.out.println("本地锁所有权已释放!");
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

}

这段代码实现了一个基于 Redis 的分布式锁 (RedisDistLock) 类。它实现了 Lock 接口,用于在分布式环境中对资源进行加锁和解锁,确保同一时间只有一个线程可以操作共享资源。主要功能包括:

  1. 加锁 (lock 方法):尝试获取分布式锁,如果当前线程未持有锁,则进入循环每 100 毫秒重试,直到成功获得锁。
  2. 释放锁 (unlock 方法):只有持有锁的线程才能释放锁,利用 Lua 脚本确保锁的释放操作是原子的,避免锁被错误释放。
  3. 锁的重入:通过检查当前线程是否已经持有锁,来支持锁的重入机制,避免死锁。
  4. 线程唯一标识:每个线程持有一个唯一的 lockerId,用于标识和验证锁的拥有者。
  5. 不支持中断和等待超时的加锁:该实现不支持可中断的加锁操作,也不支持在指定时间内尝试获取锁。

该锁使用了 Redis 的 SET 命令来加锁,并且使用 Lua 脚本确保释放锁时不会被其他线程误释放。

3.2. 看门狗方案实现

3.2.1. 开门狗方案原理

在分布式系统中,使用 Redis 锁时,如果业务逻辑执行时间超过锁的过期时间,可能会引发锁的提前释放问题,进而导致并发冲突。为了避免这种情况,可以引入 看门狗机制。

未加看门狗机制时分布式场景的工作流程:

  1. 加锁客户端(如客户端 C)请求加锁,Redis 设置 lock_key 并附加一个过期时间(例如 10 秒)。
  2. 业务执行客户端在锁的保护下,执行业务逻辑。
  3. 解锁业务逻辑执行完毕后,客户端主动释放锁。

潜在问题:

  • 如果业务逻辑的执行时间超过锁的过期时间(如大于 10 秒),在客户端释放锁之前,锁已经因为过期而自动释放。
  • 锁释放后,其他客户端(如客户端 A 或 B)可能抢到锁,导致多个客户端同时执行相同的业务逻辑,发生并发冲突。

如下图所示:

为了解决锁提前释放的问题,可以引入 看门狗机制,通过定期续期保证锁在业务逻辑执行完成前不会被自动释放。

看门狗机制的工作原理:

  1. 加锁后启动看门狗

    • 客户端C加锁后,启动一个守护线程(看门狗)。
    • 守护线程会定期检查锁的过期时间。
  2. 定期续期

    • 守护线程每隔一段时间(例如 5 秒)检查锁是否即将过期。
    • 如果锁没有被解锁且仍然属于当前客户端,则向 Redis 请求续期(如将锁的过期时间从 10 秒延长到 20 秒)。
    • 这样,即使业务逻辑的执行时间超过初始过期时间,锁也不会过期。
  3. 主动释放锁

    • 客户端在业务逻辑执行完毕后,主动释放锁。
    • 此时,看门狗线程会停止续期,锁正常释放。

如下图展示了看门狗机制的具体流程:

3.2.2. 看门狗方案核心代码

只贴核心代码:

RedisDistLockWithDog

@Component
public class RedisDistLockWithDog implements Lock {

    private final static int LOCK_TIME = 1*1000;
    private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
    private final static String RS_DISTLOCK_NS = "tdln2:";
    /*
     if redis.call('get',KEYS[1])==ARGV[1] then
        return redis.call('del', KEYS[1])
    else return 0 end
     */
    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*还有并发问题,考虑ThreadLocal*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() {
        while(!tryLock()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t=Thread.currentThread();
        /*说明本线程正在持有锁*/
        if(ownerThread==t) {
            return true;
        }else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/
            return false;
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/
            String id = UUID.randomUUID().toString();

            SetParams params = new SetParams();
            params.px(LOCK_TIME); //加锁时间1s
            params.nx();
            synchronized (this){
                if ((ownerThread==null)&&
                        "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
                    lockerId.set(id);
                    setOwnerThread(t);
                    if(expireThread == null){//看门狗线程启动
                        expireThread = new Thread(new ExpireTask(),"expireThread");
                        expireThread.setDaemon(true);
                        expireThread.start();
                    }
                    //往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)
                    delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
                    System.out.println(Thread.currentThread().getName()+"已获得锁----");
                    return true;
                }else{
                    System.out.println(Thread.currentThread().getName()+"无法获得锁----");
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!",e);
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            System.out.println(result);
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

    /*看门狗线程*/
    private Thread expireThread;
    //通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数   阻塞延迟队列   刷1  没有刷2
    private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
    //续锁逻辑:判断是持有锁的线程才能续锁
    private final static String DELAY_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
                    "    else return 0 end";
    private class ExpireTask implements Runnable{

        @Override
        public void run() {
            System.out.println("看门狗线程已启动......");
            while(!Thread.currentThread().isInterrupted()) {
                try {
                    LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到  0.9s
                    Jedis jedis = null;
                    try {
                        jedis = jedisPool.getResource();
                        Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
                                Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),
                                Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
                        if(result.longValue()==0L){
                            System.out.println("Redis上的锁已释放,无需续期!");
                        }else{
                            delayDog.add(new ItemVo<>((int)LOCK_TIME,
                                    new LockItem(lockItem.getKey(),lockItem.getValue())));
                            System.out.println("Redis上的锁已续期:"+LOCK_TIME);
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("锁续期失败!",e);
                    } finally {
                        if(jedis!=null) jedis.close();
                    }
                } catch (InterruptedException e) {
                    System.out.println("看门狗线程被中断");
                    break;
                }
            }
            System.out.println("看门狗线程准备关闭......");
        }
    }

//    @PostConstruct
//    public void initExpireThread(){
//
//    }

    @PreDestroy
    public void closeExpireThread(){
        if(null!=expireThread){
            expireThread.interrupt();
        }
    }
}

针对3.1小节中的分布式锁而言,看门狗方案主要做了以下改进 :

1. 锁的过期时间设计

private final static int LOCK_TIME = 1*1000; // 锁的过期时间为1秒

- 相比RedisDistLock的5秒,这里故意设置较短的过期时间

  • 通过看门狗机制来自动续期,避免业务执行时间过长导致锁过期

2. 看门狗机制的核心实现

// 看门狗相关的成员变量
private Thread expireThread;
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();

// 续期的Lua脚本
private final static String DELAY_LOCK_LUA =
        "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                "        return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
                "    else return 0 end";

- 使用DelayQueue来实现延迟任务,避免无效的轮询

  • 通过Lua脚本保证续期操作的原子性

3. 加锁时启动看门狗

if ((ownerThread==null) && "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {

    lockerId.set(id);

    setOwnerThread(t);

    if(expireThread == null){  // 启动看门狗线程

        expireThread = new Thread(new ExpireTask(),"expireThread");

        expireThread.setDaemon(true);

        expireThread.start();

    }

    // 添加续期任务到延迟队列

    delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));

    return true;

}

4. 自动续期的实现

public void run() {

    while(!Thread.currentThread().isInterrupted()) {

        try {

            LockItem lockItem = delayDog.take().getData();  // 阻塞等待直到快过期

            // 执行续期

            Long result = (Long)jedis.eval(DELAY_LOCK_LUA,

                    Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey()),

                    Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));

            if(result.longValue()!=0L){

                // 续期成功,继续添加下一次续期任务

                delayDog.add(new ItemVo<>((int)LOCK_TIME,

                        new LockItem(lockItem.getKey(),lockItem.getValue())));

            }

        } catch (InterruptedException e) {

            break;

        }

    }

}

5. 优化的续期时机

public ItemVo(long expirationTime, T data) {

    // 提前100ms进行续期

    this.activeTime = expirationTime+System.currentTimeMillis()-100;

    this.data = data;

}

主要优势在于:

  • 解决了长时间业务导致锁过期的问题
  • 使用DelayQueue避免了轮询带来的性能开销
  • 自动续期机制更加可靠
  • 优雅关闭机制(通过@PreDestroy注解)

这种实现类似于Redis官方客户端Redisson的实现原理,更适合实际生产环境使用。

4. 如何使用Redission分布式锁

4.1. Redission简介

Redisson 是一个基于 Redis 的高性能工具库,它简化了 Redis 的使用,并提供了丰富的分布式工具支持,如分布式锁、分布式集合、队列等。在分布式锁场景下,Redisson 封装了锁的创建、续期和释放等逻辑,并内置了看门狗机制,大大提升了分布式锁的可靠性和开发效率。

Redisson 的核心功能:

  • 提供可重入锁(ReentrantLock)和公平锁(FairLock)。
  • 内置看门狗机制,自动续期防止锁过期。
  • 支持集群、哨兵模式和单节点模式。
  • 与 Spring Boot 集成简单,支持注解形式使用。

4.2. SpringBoot使用Redission分布式锁

1. 新增 Maven 依赖

在 Spring Boot 项目的 pom.xml 文件中,添加 Redisson 的 Maven 依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.23.5</version> <!-- 使用最新版本 -->
</dependency>

此依赖可以帮助我们轻松将 Redisson 集成到 Spring Boot 项目中。

2. 配置类编写

为 Redisson 创建一个配置类,用于初始化 Redis 连接。以下是一个基于单机模式的示例:

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        // 配置单节点 Redis 地址
        config.useSingleServer()
              .setAddress("redis://127.0.0.1:6379")
              .setPassword(null) // 如果有密码则填写
              .setConnectionMinimumIdleSize(10)
              .setConnectionPoolSize(64);
        return Redisson.create(config);
    }
}

如果你的 Redis 部署为集群模式或哨兵模式,可以使用以下方法:

  • 集群模式config.useClusterServers().addNodeAddress(...)
  • 哨兵模式config.useSentinelServers().addSentinelAddress(...)

3. 使用 Redisson 分布式锁

下面通过一个具体的业务场景(如库存扣减)来演示如何使用 Redisson 分布式锁。

示例代码:

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class StockService {

    @Autowired
    private RedissonClient redissonClient;

    public void deductStock(String productId) {
        // 获取锁实例
        RLock lock = redissonClient.getLock("lock:stock:" + productId);
        try {
            // 加锁,等待时间 5 秒,锁超时时间 10 秒
            if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
                try {
                    // 模拟业务逻辑:扣减库存
                    System.out.println("扣减库存逻辑执行中...");
                    Thread.sleep(8000); // 模拟耗时操作
                    System.out.println("库存扣减成功!");
                } finally {
                    lock.unlock(); // 释放锁
                    System.out.println("锁释放成功!");
                }
            } else {
                System.out.println("获取锁失败,可能有其他线程在执行!");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码使用 Redisson 实现分布式锁,通过 tryLock 尝试获取商品的锁,设置等待时间为 5 秒、锁超时时间为 10 秒。获取锁后,在锁保护下模拟扣减库存的操作,完成后释放锁。如果未能获取锁,则提示可能有其他线程在执行。 

5. 结语

本文从分布式锁的核心原理入手,结合手写实现的 Redis 分布式锁与看门狗机制,深入剖析了解决锁过期问题的设计思路。同时,我们还介绍了 Redisson 的使用方法,通过它的封装与内置的看门狗机制,可以更高效地实现分布式锁,减少开发成本,提升可靠性。分布式锁的实现既是分布式系统中的基础问题,也是解决高并发和数据一致性挑战的重要工具。希望本文的讲解能为你在实际开发中提供帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/934810.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS-高级(一)

文章目录 一次开发、多端部署自由流转 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;HarmonyOS专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年12月09日12点19分 一次开发、多端部署 布局能力 自适应布局 拉伸能力均分能力占比能力缩放…

河工oj第七周补题题解2024

A.GO LecturesⅠ—— Victory GO LecturesⅠ—— Victory - 问题 - 软件学院OJ 代码 统计 #include<bits/stdc.h> using namespace std;double b, w;int main() {for(int i 1; i < 19; i ) {for(int j 1; j < 19; j ) {char ch; cin >> ch;if(ch B) b …

开源架构安全深度解析:挑战、措施与未来

开源架构安全深度解析&#xff1a;挑战、措施与未来 一、引言二、开源架构面临的安全挑战&#xff08;一&#xff09;代码漏洞 —— 隐藏的定时炸弹&#xff08;二&#xff09;依赖项安全 —— 牵一发而动全身&#xff08;三&#xff09;社区安全 —— 开放中的潜在危机 三、开…

Ubuntu上使用system()函数运行不需要输入密码

使用system()运行一些终端命令的时候&#xff0c;需要sudo权限&#xff0c;也就是必须输入密码&#xff0c;那么在程序自启动的时候就无法成功启动。如果设置Ubuntu下所有操作都不需要密码&#xff0c;安全性太低&#xff0c;所以我们可以将需要用到的终端指令给予无需输入密码…

HBuilderX(uni-app)Vue3路由传参和接收路由参数!!

uni-app搭建小程序时候Vue3语法接收路由参数&#xff0c;去官方文档查看&#xff0c;是onLoad的option接收参数&#xff0c;我试过&#xff0c;接收不到&#xff0c;上网查各种方法也是不太行&#xff0c;最后自己琢磨出来了&#xff0c;这参数藏得还挺深&#xff01;&#xff…

操作系统(1)OS的基本概念

一、定义 操作系统&#xff08;OS&#xff09;是控制和管理整个计算机系统的硬件与软件资源&#xff0c;并合理地组织、调度计算机的工作与资源的分配&#xff0c;进而为用户和其他软件提供方便接口与环境的程序集合。它是计算机系统中最基本的系统软件。 二、功能 资源管理&am…

gridcontrol多行表头

效果如下 只需这样做,设置该属性为对应的值

Formality:set_svf命令

相关阅读 Formalityhttps://blog.csdn.net/weixin_45791458/category_12841971.html?spm1001.2014.3001.5482 svf文件的全称是Setup Verification for Formality&#xff0c;即Design Compiler提供给Formality的设置验证文件&#xff0c;它的作用是为Formality的指导模式(Gui…

【蓝桥杯每日一题】重新排序

重新排序 2024-12-8 蓝桥杯每日一题 重新排序 前缀和 差分 题目大意 给定一个数组 A 和一些查询 L i , R i Li_,R_i Li,​Ri​, 求数组中第 L i L_i Li​至第 R i R_i Ri​个元素之和。 小蓝觉得这个问题很无聊, 于是他想重新排列一下数组, 使得最终每个查 询结果的和尽可能…

LabelImg使用教程

(yolov5scondaPython3123) D:\PyCharm20240724\20240724PyCharmProject>conda.bat deactivate D:\PyCharm20240724\20240724PyCharmProject>conda activate labelimg_env (labelimg_env) D:\PyCharm20240724\20240724PyCharmProject> labelimg 创建快捷键方式

洛谷 P1179 [NOIP2010 普及组] 数字统计 C语言

题目&#xff1a; https://www.luogu.com.cn/problem/P1179 思路&#xff1a;直接暴力过 代码&#xff1a; #include<iostream> using namespace std; int cnt(int x) {int sum 0;while(x){int temp x %10;if(temp 2){sum;}x x/10;}return sum; } int main(void) …

练9:进制转换

欢迎大家订阅【蓝桥杯Python每日一练】 专栏&#xff0c;开启你的 Python数据结构与算法 学习之旅&#xff01; 文章目录 1 进制转换2 例题分析 1 进制转换 ①任意制转为十进制 【示例】 ②十进制转为任意制 【法一】 【法二】 2 例题分析 题目地址&#xff1a;https:/…

【ComfyUI+多视图生成】MV-Adapter:多视图一致性图片生成(2024.12.09基于SDXL开源)

源码&#xff1a;https://github.com/huanngzh/MV-Adapter ComfyUI扩展&#xff1a;https://github.com/huanngzh/ComfyUI-MVAdapter 项目主页&#xff1a;https://huanngzh.github.io/MV-Adapter-Page/ 论文&#xff1a;2412.MV-Adapter: Multi-view Consistent Image Generat…

《机器学习》2.4假设检验 t分布 F分布

目录 t发布 注意是这个东西服从t分布 数据服从t分布通常是在以下情况下&#xff1a; 以下是一些具体的例子&#xff0c;说明在何种情况下数据会服从t分布&#xff1a; t检验 交叉验证t检验 样本方差​编辑 F分布&#xff08;fisher Friedman检验是一种非参数统计方法&a…

图像识别 | Matlab基于卷积神经网络(CNN)的宝可梦识别源程序,GUI界面。附详细的运行说明。

图像识别 | Matlab基于卷积神经网络(CNN)的宝可梦识别源程序&#xff0c;GUI界面。附详细的运行说明。 目录 图像识别 | Matlab基于卷积神经网络(CNN)的宝可梦识别源程序&#xff0c;GUI界面。附详细的运行说明。预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab基…

全国青少年信息学奥林匹克竞赛(信奥赛)备考实战之计数器与累加器(一)

学习背景&#xff1a; 在现实生活中一些需要计数的场景下我们会用到计数器&#xff0c;如空姐手里记录乘客的计数器&#xff0c;跳绳手柄上的计数器等。累加器是累加器求和&#xff0c;以得到最后的结果。计数器和累加器它们虽然是基础知识&#xff0c;但是应用广泛&#xff0…

计算机毕业设计Python动物图像识别分类系统 机器学习 深度学习 数据可视化 爬虫 卷积神经网络CNN 预测算法 图像识别

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

ASP.NET Core API + MySql

环境 数据库&#xff1a; mysql8.0 后端&#xff1a; vs2022 ASP.NET Core API .net 8 前端&#xff1a; Hbuilderx bootstrap 5.3.0 jquery v3.7.1 bootstrap-table 1.23.5 创建项目 添加资源包 AutoMapper Microsoft.EntityFrameworkCore.Tools 8.0.0 Pomelo.EntityFramew…

vmware vsphere5---部署vCSA(VMware vCenter Server)附带第二阶段安装报错解决方案

声明 因为这份文档我是边做边写的&#xff0c;遇到问题重新装了好几次所以IP会很乱 ESXI主机为192.168.20.10 VCSA为192.168.20.7&#xff0c;后台为192.168.20.7:5480 后期请自行对应&#xff0c;后面的192.168.20.57请对应192.168.20.7&#xff0c;或根据自己的来 第一阶段…

【QT】:QT(介绍、下载安装、认识 QT Creator)

背景 &#x1f680; 在我们的互联网中的核心岗位主要有以下几种 开发&#xff08;程序员&#xff09;测试运维&#xff08;管理机器&#xff09;产品经理&#xff08;非技术岗位&#xff0c;提出需求&#xff09; 而我们这里主要关注的是开发方向&#xff0c;开发岗位又分很…