Redis系列-5 Redis分布式锁

背景:

本文介绍Redis分布式锁的内容,包括Redis相关命令和Lua脚本的介绍,以及操作分布式锁的流程与消息,最后结合Redission源码介绍分布式锁的实现原理。

1.基本命令

1.1 基本键值对的设置

设值: set key value
取值: get key
删除: del key

>set key1 value1
"OK"
>get key1
"value1"
>del key1
"1"

1.2 setnx用法

setnx key value:
当key不存在时,进行设置,返回1(表示操作成功)
当key存在时,不进行设置,返回0(表示操作失败)

>setnx key1 value1
"1"
>setnx key1 value1
"0"

1.3 setex和psetex

setex key seoconds value
等价于原子性地执行了 set key value和expire key seconds
psetex用法与setex相同,区别是setex单位为秒,而psetex是毫秒;

>setex key1 1000 value1
"OK"
>ttl key1
"997"

1.4 set扩展用法

set key value [EX seconds | PX millSeconds] [NX | XX]
seconds EX 表示设置过期时间以秒为单位,millSeconds PX 表示设置过期时间以毫秒为单位;
NX表示当键不存在时执行,并返回OK;否则返回null
XX表示当键存在时执行,并返回OK;否则返回null

>set key1 value1 EX 1000 NX
"OK"
>set key1 value1 EX 1000 NX
null
>set key1 value1 EX 2000 XX
"OK"
>ttl key1
"1996"

2.lua脚本

由于redis是单线程执行的,因此可以原子性地执行lua脚本。因此可通过lua脚本对基本命令进行组合。
格式如下:

EVAL "lua脚本" n KEY... , ARGV...

(1) 通过EVAL命令执行lua脚本;
(2) 可对脚本进行传参,可以传多个KEY和多个ARGV,KEY和ARGV建议使用逗号(,)隔开;
(3) 需要显示指定KEY个数;
(4) lua脚本通过KEYS[i] 和 ARGV[j] 获取传入的参数,下标从1开始;
以下通过案例的方式介绍一下lua脚本的使用。

2.1 加锁

分布式锁的数据结构可以被定义为如下格式:

{
 "lockKey":  {
        "uuid: threadId": num
    }
}

lockKey表示分布式锁:数据库存中存在lockKey键时,表示已有客户端占据了lockKey锁,否则表示lockKey锁未被获取。
uuid: threadId结构包含了UUID唯一字符串,num为获取锁的次数。UUID用于保证上锁和解锁是同一个客户端,num用于实现锁的可重入。
案例:

// 如果锁不存在,则加锁并设置过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    // 设置锁记录锁的获取次数为1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 设置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 如果锁存在,且为自己,锁+1,并重新设置过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    // 设置锁记录锁的获取次数+1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 重置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 锁已存在,不是自己,则返回锁到期时间
return redis.call('pttl', KEYS[1]);

说明:
上述LUA脚本返回空,说明锁获取成功;否则获取失败并得到锁的过期时间(毫秒)。

其中,redis.call('exists', KEYS[1])表示KEY[1]键是否存在,存在返回1,不存在返回0;redis.call('hincrby', KEYS[1], ARGV[2], 1)表示对哈希类型数据KEYS[1]和ARGV[2]键对应的值加1;redis.call('pexpire', KEYS[1], ARGV[1])表示设置KEYS[1]键的有效期为ARGV[1],单位毫秒;

在redis客户端进行如下操作:

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"1"

>ttl myLock
"54"

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>ttl myLock
"56"

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"2"

给上述lua脚本的传参为1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

1表示只有一个Key, 其他为ARGV, 即
KEY[1] = myLock
ARGV[1]=60000
ARGV[2]=80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

得到的结果如下:

{
 "myLock":  {
        "80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12": 2
    }
}

表示"myLock"分布式锁已被占用, 获取锁的次数为2次。

2.2 解锁

案例:

// 解锁成功返回1,失败返回0
if (redis.call('del', KEYS[1]) == 1) then 
    // 向Redis发布消息
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1 
else 
    return 0 
end

说明:
解锁成功后,该lua脚本返回1,解锁失败返回0;
其中: redis.call('del', KEYS[1])表示根据KEYS[1]键删除数据;redis.call('publish', KEYS[2], ARGV[1])表示发布消息 KEYS[2], ARGV[1];

2.3 释放一层锁

// 锁不是被自己占有,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;

// 锁数量-1,如果还大于0,重新设置过期时间;否则删除锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    // 重置过期时间
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
return nil;

上述Lua脚本返回1表示删除锁成功,返回0表示锁释放一层,返回空表示释放失败。

2.4 续期

// 锁被自己占用,重新设置过期时间,返回1;否则返回0if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

上述Lua脚本返回1表示续期成功,返回0表示续期失败(当前未获取锁)。

3.Redission用法

分布式锁可以直接使用开源的Redission
引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.1</version>
</dependency>

编码如下:

public static void lock1() {
    RedissonClient redisson = getRedissonClient();
    RLock lock = redisson.getLock("myLock");
    // 获取锁
    lock.lock();
    try {
        // 业务逻辑
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        lock.unlock();
    }
    redisson.shutdown();
    System.out.println("Begin end");
}

// 获取redis客户端实例
private static RedissonClient getRedissonClient() {
    Config config = new Config();
    config.setLockWatchdogTimeout(600*1000);
    config.useSingleServer().setAddress("redis://127.0.0.1:6001").setPassword("xxx");
    RedissonClient redisson = Redisson.create(config);
    return redisson;
}

说明:redisson.getLock(“myLock”)中的myLock即为分布式锁的键,多个客户端实例需要保证键相同。
lock.lock()用于执行获取锁的逻辑,获取成功后直接返回;获取失败后进入等待队列阻塞;lock.unlock();用于手动解锁。
getRedissonClient方法用于获取redis客户端实例,其中的setLockWatchdogTimeout方法用于设置看门狗的超时时间,单位毫秒,默认为30000(30秒)。
使用lock.lock()方法获取锁时不需要设置锁的过期时间,在获取锁成功后,Redisson通过看门狗机制,进行锁的续期,每经过WatchdogTimeout/3时间执行一次续期操作。
当lock.unlock()释放锁时,会同时关闭看门狗。

4.流程和消息

4.1 流程介绍

屏蔽底层Redis对锁的实现方式,仅用Lock和UnLock表示获取锁和释放锁,分布式锁的竞争流程可表示如下图所示:
在这里插入图片描述
[1] 客户端ClientA向Redis发送获取锁的消息,锁key为myLock(自定义);
[2] Redis响应成功,表示占锁成功;
[3] 客户端ClientB向Redis发送获取锁的消息,key为myLock;
[4] 服务器判断此时myLock锁已被ClientA占有,Redis响应失败;
[5] Client B 向Redis发送订阅消息订阅myChannel频道,等待收到通知;
[6-7] ClientA释放锁同时发布消息至Redis的myChannel频道;
[8] Redis收到publish消息后,向所有订阅了myChannel频道的客户端发送message通知消息;
[9] ClientB收到订阅的消息后,知道锁已被释放,再次获取锁;
其中:消息6和消息7是lua脚本执行的,因此具备原子性;当客户端收到message消息时,表明锁已被释放,可以重新竞争锁。
另外,对于客户端ClientA,在消息2-6之间,Redis的看门狗机制会自动为myLock续期。

4.2 消息介绍

Auth消息:

*2
$4
AUTH
$8
Root@123

+OK

其中:*2 表示由两个输入字符串;
$4表示第一个字符串长度为4,即AUTH;
$8表示第二个字符串长度为8,即Root@123;
+OK为Redis返回的结构,表示鉴权成功;
解析后为:

client: AUTH Root@123
Redis: OK

PING/PONG消息:

*1
$4
PING

+PONG

客户端向Redis发送PING心跳消息,Redis响应PONG消息。

QUITE消息:

*1
$4
QUIT

+OK

客户端向Redis发送QUITE退出消息,Redis响应OK消息。

以下分场景介绍Redis消息,包括成功获取锁—锁的续期—锁的释放和发布通知以及获取锁失败—锁的订阅—收到通知消息—取消订阅等,为简化篇幅,将省略AUTH、PING/PONG、AUITE等重复的内容。

4.2.1 成功获取锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "exists"过滤条件,得到:

*6
$4 
EVAL
$339 
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.2 获取锁后,锁的续期

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*6
$4
EVAL
$120
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.3 释放锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$6
myLock
$31
redisson_lock__channel:{myLock}
$1
0
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 2 myLock redisson_lock__channel:{myLock} 0 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.4 获取锁失败后订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$9
SUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL SUBSCRIBE redisson_lock__channel:{myLock}

4.2.5 订阅后收到通知消息

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*3
$7
message
$31
redisson_lock__channel:{myLock}
$1
0

解析后为:

message redisson_lock__channel:{myLock} 0

4.2.6 取消订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$11
UNSUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL UNSUBSCRIBE redisson_lock__channel:{myLock}

5.源码

源码介绍围绕下图展开,如果对下图的逻辑线比较属性,直接跳过本章内容。
在这里插入图片描述
在介绍源码前,有必要了解一下两个概念: Redis的订阅发布机制和Semaphore.
Redis订阅和发布机制:
打开两个Redis客户端,分别执行subscribe myChannel订阅myChannel频道的消息:

>subscribe myChannel
切换到推送/订阅模式,关闭标签页来停止接收信息。
1) "subscribe"
2) "myChannel"
3) "1"

再打开一个客户端,执行publish myChannel key1告诉Redis,向订阅了myChannel的客户端发送消息:

>publish myChannel key1
"2"

返回值2表示有两个订阅客户端。

客户端收到Redis的通知消息:

1) "message"
2) "myChannel"
3) "key1"

Semaphore:

说明:由于在线程专题已经详细介绍过AQS,这里涉及AQS的内容不再展开介绍。

Semaphore是JUC中的一个并发工具类,内部维持了一个state的整数记录状态值,并提供了acquireXXX和release方法用于获取和释放锁(共享锁)。当state的值小于acquireXXX时,线程会进入AQS的等待队列,处于阻塞状态; release方法被时,state属性会增加,如果大于0,则从等待队列中唤醒一个。

如下案例中,Semaphore创建时,state设置为0,当客户端ClientA调用acquire方法获取锁时,进入Semaphore的等待队列处于阻塞状态;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H3sF8Ns-1717209303379)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255100424.png)]
当客户端ClientB调用release方法释放锁时(本质是对state值进行加法运算),此时Semaphore会自动唤醒处于等待队列中的ClientA.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n602PIUM-1717209303380)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255109685.png)]
客户端A唤醒时,会再次尝试获取锁,此时Semaphore拥有共享锁的数量为1,与acquire方法获取数量相同(默认获取1个),获取锁正常。
在这里插入图片描述接下来,根据如下案例进行源码介绍:

public static void main(String[] args) {
    RedissonClient client = getClient();
    RLock lock = client.getLock("myLock");
    System.out.println("Lock1 Begin exec");
    lock.lock();
    try {
        System.out.println("Lock1 Begin...");
        Thread.sleep(1000 * 60);
        System.out.println("Lock1 End.");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    client.shutdown();
}

5.1 获取锁

源码入口lock.lock()->lock(-1, null, false)

说明:为减少代码重复度,会提取公共部分代码形成模板方法,模板方法相对于提取前的方法因存在扩展逻辑,导致可读性降低(虽然整体可维护性提升)。如lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法支持响应中断与忽略中断两种情况,支持设置超时时间与不设置超时时间两种情况。

说明:是否响应中断(即被中断时抛出异常或者不抛出异常)不影响解析主线逻辑,在介绍源码时,认为interruptibly为false, 对interruptibly=true分为不进行说明。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

上述lock方法的主体逻辑可以分为4个部分:
[1] part-1:执行Lua脚本尝试获取锁,获取锁成功,则直接返回;
[2] part-2: 获取锁失败后,向Redis发布订阅;
[3] part-3: while死循环,获取锁或者抛出异常后退出循环;
[4] part-4: 退出while循环(获取锁成功或抛出异常),向Redis发送取消订阅消息;
整体流程比较清晰,从逻辑上可以切分为两个部分:获取锁成功场景和获取锁失败场景。

5.1.1 获取锁成功:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }
 // Ignore ...
}

tryAcquire(-1, leaseTime, unit, threadId)返回null时,获取锁成功,退出lock方法。进入tryAcquire方法:

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

其中tryAcquireAsync返回一个Future对象,get方法阻塞等待(通过Future的await方法)该Future执行完成并返回结果或者抛出异常(包装后的RedisException)。进入tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 1.尝试从Redis获取锁,返回一个异步的Future对象
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.在Future对象添加回调函数,完成时回调
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 3.通过看门狗对锁进行自动续期
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync方法整体逻辑较为简单:尝试从Redis获取锁,返回一个异步的Future对象;然后在Future对象添加回调逻辑,在Future完成时回调。

tryAcquireAsync方法仍然是个模板方法,支持设置过期时间和不设置过期时间:

[1]不设置时间: 向Redis申请锁时携带看门狗的超时时间(在3.Redission用法章节中通过Config对象的setLockWatchdogTimeout方法设置),之后通过看门狗续期。

[2]设置过期时间: 向Redis申请锁时携带执行的超时时间,超时后自动释放锁,因此不需要看门狗。

这里有两个重点方法需要关注一下:tryLockInnerAsync向Redis申请锁和scheduleExpirationRenewal开启看门狗。

tryLockInnerAsync方法就是将Lua脚本的执行包装为异步执行,返回一个Future对象:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这里的lua脚本在2.1章节已进行结合,不再赘述。

scheduleExpirationRenewal功能是开启一个看门狗线程,定期(1/3的看门狗超时时间)向Redis续期,主线逻辑如下所示:

protected void scheduleExpirationRenewal(long threadId) {
   //...
   // scheduleExpirationRenewal的核心逻辑在于调用renewExpiration
   renewExpiration();
   //...
}


private void renewExpiration() {
    //...
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {          
            //...
            // 调用lua脚本-为锁续期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {   
                //...
                // 每internalLockLeaseTime/3时间,回调自身,开始循环
                renewExpiration();   
                //...                    
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    //...
}

// 对lua脚本异步执行的封装,与2.4中介绍的lua脚本相同
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

5.1.2 获取锁失败:

当锁已被其他客户端占有,获取锁失败,进入订阅-阻塞等待锁释放流程:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 //...

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

[1] 先看一下向Redis发布订阅请求部分:

// 向lua发送订阅消息,并注册一个监听器监听订阅频道的通知消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 与前文结合的get方法逻辑相似,阻塞等待Future执行完成,即等待订阅消息发送给Redis并说道订阅成功消息
commandExecutor.syncSubscription(future);

重点在于第一个方法subscribe(threadId):向Redis发送订阅消息,并注册一个监听器,监听通知消息。
向redis发送订阅消息, 消息内容如章节4.2.4中的SUBSCRIBE redisson_lock__channel:{myLock}, 该消息表示客户端订阅redisson_lock__channel:{myLock}频道, 当Redis服务器收到该频道的通知消息后,会以Message类型的消息通知给订阅的客户端,消息内容为:message redisson_lock__channel:{myLock} 0.
当Redssion客户端收到message redisson_lock__channel:{myLock} 0后,监听器被调用,监听器的注册和监听器的内容后面介绍。
[2] 接着进入while死循环,只有抛出异常或者获取锁成功才会退出:

while (true) {
    // tryAcquire前文已介绍:获取锁成功返回null, 否则返回锁的超时时间
    ttl = tryAcquire(-1, leaseTime, unit, threadId);
    if (ttl == null) {
        break;
    }

    if (ttl >= 0) {
        try {
            // 根据锁的超时时间,定时阻塞等待锁,超时后,自动苏醒
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
    } else {
        // 当锁没有设置超时时间时,阻塞等待,直到被唤醒
        future.getNow().getLatch().acquireUninterruptibly();
    }
}

这里的future.getNow().getLatch()返回的是一个Semaphore对象,初始化时state设置为0,因此调用acquire方法会陷入等待队列。唤醒逻辑在5.3 订阅和通知章节中介绍。
[3] unsubscribe(future, threadId)用于取消订阅,能进入finnally说明有异常抛出或者已经获取锁,从而不需要再监听redis的通知,unsubscribe核心是删除注册的监听器。

5.2 释放锁

Redssion提供的分布式锁支持可重入,因此多次获取需要多次释放。根据案例的lock.unlock()进入unlock方法:

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

get是5.1章中结合过,这里直接进入unlockAsync方法,主线逻辑如下:

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    //  向Redis发送解锁消息
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 关闭看门狗
        cancelExpirationRenewal(threadId);
  //...
    });
    return result;
}

核心逻辑在于unlockInnerAsync:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

这里的lua脚本用于释放一层锁,如果释放完一层锁后,锁的数量为0,则删除对应的key(释放锁), 此时还会发布一条消息通知Redis,可参考2.3 释放一层锁

5.3 订阅和通知

章节5.2中介绍了客户端获取锁失败后Redis订阅,然后进入等待队列阻塞;在章节5.3中介绍了释放锁以及向Redis发布通知消息,本章节的内容是将二者衔接起来。

订阅和通知的核心功能是:阻塞在等待队列中的客户端将会因为通知消息而被唤醒。

Redis通知Redission:

客户端与Redis服务器底层的通讯是基于TCP链,Redssion使用Netty进行了封装,在pipeline中添加了CommandPubSubDecoder解码器,该解码器中存在如下逻辑:

protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,Object result) throws IOException {
 //...
 if (result instanceof Message) {
  //...
  if (result instanceof PubSubMessage) {
   pubSubConnection.onMessage((PubSubMessage) result);
  }
  //...
 }
 // ...
}

当接收到Message且是PubSubMessage类型的消息(即前文介绍的message redisson_lock__channel:{myLock} 0消息)时,调用pubSubConnection.onMessage((PubSubMessage) result),进入该方法:

public void onMessage(PubSubMessage message) {
    for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
        redisPubSubListener.onMessage(message.getChannel(), message.getValue());
    }
}

这里会调用注册的监听器,包括5.1.2 获取锁失败章节中注册的监听器。

Redission注册监听器:
继续看一下5.1.2 获取锁失败章节RFuture<RedissonLockEntry> future = subscribe(threadId)逻辑:

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return pubSub.subscribe(getEntryName(), getChannelName());
}

public RFuture<E> subscribe(String entryName, String channelName) {
    //...
    RedisPubSubListener<Object> listener = createListener(channelName, value);
    service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
    //...
}

通过createListener创建监听器,然后将监听器注册到RedisPubSubConnection对象的listeners属性中:

public class RedisPubSubConnection extends RedisConnection {
    // 监听器容器
    final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
    
    // 添加监听器的方法
    public void addListener(RedisPubSubListener<?> listener) {
        listeners.add((RedisPubSubListener<Object>) listener);
    }
}

至于创建listener后,如何调用RedisPubSubConnection的addListener方法可通过代码追踪和Bebug进行了解,不是重点内容;这里重点关注的是这个监听器的内部逻辑,即当这个监听器被调用时触发的逻辑:

private RedisPubSubListener<Object> createListener(String channelName, E value) {
    RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
        @Override
        public void onMessage(CharSequence channel, Object message) {
            if (!channelName.equals(channel.toString())) {
                return;
            }
            PublishSubscribe.this.onMessage(value, (Long) message);
        }

      //...
    };
    return listener;
}

channelName.equals(channel.toString())用于判断收到的消息是否是订阅的消息,如前文介绍的订阅的频道是redisson_lock__channel:{myLock}, 此时会校验channelName。
核心逻辑在于PublishSubscribe.this.onMessage(value, (Long) message):

protected void onMessage(RedissonLockEntry value, Long message) {
    //...
    value.getLatch().release();
    //...
}

value.getLatch()获取的是前文介绍的Semaphore对象,调用release时会修改state值,并唤醒一个等待的线程。
基于上述介绍:线程获取分布式锁失败后,向Redis订阅消息,并注册监听器,然后陷入Semaphore的等待队列;当其他客户端释放锁时,同时会发布通知消息给Redis服务器;Redis服务器收到消息后,向订阅的客户端发送通知消息;客户端收到通知消息后,触发监听器逻辑,监听器基于Semaphore机制,唤醒阻塞的线程。线程被唤醒后再次尝试获取分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/694122.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据分析必备:一步步教你如何用Pandas做数据分析(21)

1、Pandas 可视化 Pandas 可视化是指使用 Pandas 库中的函数和方法来创建数据可视化图表。Pandas 提供了一些基本的绘图功能&#xff0c;例如折线图、柱状图、饼图等&#xff0c;可以通过调用相应的函数来创建这些图表。 2、基本绘图&#xff1a;绘图 Series和DataFrame上的…

【启明智显分享】基于工业级芯片Model3A的7寸彩色触摸屏应用于智慧电子桌牌方案

一场大型会议的布置&#xff0c;往往少不了制作安放参会人物的桌牌。制作、打印、裁剪&#xff0c;若有临时参与人员变更&#xff0c;会务方免不了手忙脚乱更新桌牌。由此&#xff0c;智能电子桌牌应运而生&#xff0c;工作人员通过系统操作更新桌牌信息&#xff0c;解决了传统…

2024.6.9 四

Python的异常处理 在python里,错误和异常是不同的概念 错误: Python 的语法错误或者称之为解析错,大多是因为写代码写错了出现的 异常: 即便 Python 程序的语法是正确的&#xff0c;在运行它的时候&#xff0c;也有可能发生错误。运行期检测到的错误被称为异常。 大多数的异常…

QT串口调试助手V2.0(源码全开源)--上位机+多通道波形显示+数据保存(优化波形显示控件)

首先关于Qt的安装和基本配置这里就不做重复说明了&#xff0c;注&#xff1a;本文在Qt5.14基础上完成 完整的项目开源仓库链接在文章末尾 图形控件——qcustomplot QCustomPlot是一个基于Qt框架的开源绘图库&#xff0c;用于创建高质量的二维图表和数据可视化。 QCustomPlot…

【PL理论】(12) F#:模块 | 命名空间 | 异常处理 | 内置异常 |:? | 相互递归函数

&#x1f4ad; 写在前面&#xff1a;本章我们将介绍 F# 的模块&#xff0c;我们前几章讲的列表、集合和映射都是模块。然后我们将介绍 F# 中的异常&#xff0c;以及内置异常&#xff0c;最后再讲解一下相互递归函数。 目录 0x00 F# 模块&#xff08;Module&#xff09; 0x01…

堡垒机的自动化运维,快速安全提升运维效率

随着信息技术的突飞猛进&#xff0c;企业对于IT系统的依赖程度日益加深&#xff0c;不仅希望可以提高运维效率&#xff0c;也希望能保障IT系统的安全。因此堡垒机与自动化运维技术的结合应运而生&#xff0c;堡垒机的自动化运维&#xff0c;快速安全提升运维效率。今天我们就来…

人工智能和物联网如何结合

欢迎来到 Papicatch的博客 文章目录 &#x1f349;引言 &#x1f349;AI与IoT的结合方式 &#x1f348;数据处理和分析 &#x1f34d;实例 &#x1f348;边缘计算 &#x1f34d;实例 &#x1f348;自动化和自主操作 &#x1f34d;实例 &#x1f348;安全和隐私保护 &…

YOLOv10 超详细解析 | 网络结构、训练策略、论文解读

网络结构 1. Backbone 2. Head 3. 说明 网络结构按 YOLOv10m 绘制&#xff0c;不同 scale 的模型在结构上略有不同&#xff0c;而不是像 YOLOv8 一样仅调整 depth 和 width。Head 有部分后续计算与 YOLOv8 完全相同&#xff0c;上图省略&#xff0c;具体请看此文。YOLOv10 整…

以sqlilabs靶场为例,讲解SQL注入攻击原理【42-53关】

【Less-42】 使用 or 11 -- aaa 密码&#xff0c;登陆成功。 找到注入点&#xff1a;密码输入框。 解题步骤&#xff1a; # 获取数据库名 and updatexml(1,concat(0x7e,(select database()),0x7e),1) -- aaa# 获取数据表名 and updatexml(1,concat(0x7e,(select group_conca…

QT案例 记录解决在管理员权限下QFrame控件获取拖拽到控件上的文件路径

参考知乎问答 Qt管理员权限如何支持拖放操作&#xff1f; 的回答和代码示例。 解决在管理员权限运行下&#xff0c;通过窗体的QFrame子控件获取到拖拽的内容。 目录标题 导读解决方案详解示例详细 【管理员权限】在QFrame控件中获取拖拽内容 【管理员权限】继承 IDropTarget 类…

Invalid JSON text:“Invalid value.“ at position 0 in value for column ‘user.info

你们好&#xff0c;我是金金金。 场景 我正在练习mybatis-plus&#xff0c;在插入一条数据的时候报错了&#xff0c;错误信息如上图 排查 排查之前我先贴一下代码 以下为数据库字段类型 在插入的过程中报错&#xff1a;Data truncation: Invalid JSON text: "Invalid val…

百度高级项目经理洪刘生受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 百度在线网络技术&#xff08;北京&#xff09;有限公司IDG智能驾驶业务部高级项目经理洪刘生先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“互联网PMO赋能战略项目集管理实战分享”。大会将于6月29-30日在北京举办…

FCN-语义分割中的全卷积网络

FCN-语义分割中的全卷积网络 语义分割 语义分割是计算机视觉中的关键任务之一&#xff0c;现实中&#xff0c;越来越多的应用场景需要从影像中推理出相关的知识或语义&#xff08;即由具体到抽象的过程&#xff09;。作为计算机视觉的核心问题&#xff0c;语义分割对于场景理…

【西瓜书】9.聚类

聚类任务是无监督学习的一种用于分类等其他任务的前驱过程&#xff0c;作为数据清洗&#xff0c;基于聚类结果训练分类模型 1.聚类性能度量&#xff08;有效性指标&#xff09; 分类任务的性能度量有错误率、精度、准确率P、召回率R、F1度量(P-R的调和平均)、TPR、FPR、AUC回归…

流程的控制

条件选择语句 我们一般将条件选择语句分为三类&#xff1a; 单条件双条件多条件 本篇文章将分开诉说着三类。 单条件 单条件的语法很简单&#xff1a; if (条件) {// 代码}条件这里我们需要注意下&#xff0c;可以向里写入两种&#xff1a; 布尔值布尔表达式 当然&…

【算法刷题 | 动态规划08】6.9(单词拆分、打家劫舍、打家劫舍||)

文章目录 21.单词拆分21.1题目21.2解法&#xff1a;动规21.2.1动规思路21.2.2代码实现 22.打家劫舍22.1题目22.2解法&#xff1a;动规22.2.1动规思路22.2.2代码实现 23.打家劫舍||23.1题目23.2解法&#xff1a;动规23.2.1动规思路23.2.2代码实现 21.单词拆分 21.1题目 给你一…

Unity动画录制工具在运行时录制和保存模型骨骼运动的方法录制动画给其他角色模型使用支持JSON、FBX等格式

如果您正在寻找一种在运行时录制和保存模型骨骼运动的方法&#xff0c;那么此插件是满足您需求的完美解决方案。 实时录制角色运动 将录制到的角色动作转为动画文件 将录制好的动作给新的角色模型使用&#xff0c;完美复制 支持导出FBX格式 操作简单&#xff0c;有按钮界面…

Nacos的配置中心

1.前言 除了注册中心和负载均衡之外, Nacos还是⼀个配置中心, 具备配置管理的功能. Namespace 的常用场景之一是不同环境的配置区分隔离&#xff0c; 例如开发测试环境和⽣产环境的配置隔离。 1.1 为什么需要配置中心&#xff1f; 当前项目的配置都在代码中&#xff0c;会存…

网络基础-IP协议

文章目录 前言一、IP报文二、IP报文分片重组IP分片IP分片示例MTUping 命令可以验证MTU大小Windows系统&#xff1a;Linux系统: 前言 基础不牢&#xff0c;地动山摇&#xff0c;本节我们详细介绍IP协议的内容。 一、IP报文 第一行&#xff1a; 4位版本号指定IP协议的版本&#…

原来你长这个样子啊,Java字节码文件

字节码文件 字节码文件是一种二进制文件&#xff0c;扩展名为.class 通过 javac 将源码编译得到&#xff0c;是一种中间形式的代码&#xff0c;这种中间形式的代码让Java有了“一次编译&#xff0c;多次运行”的跨平台特点。 字节码文件的组成 由5大组成部分&#xff1a;基础…