文章目录
- 1. 分布式锁
- 1.1 基本原理和实现方式对比
- synchronized锁在集群模式下的问题
- 多jvm使用同一个锁监视器
- 分布式锁概念
- 分布式锁须满足的条件
- 分布式锁的实现
- 1.2 基于Redis的分布式锁
- 获取锁&释放锁
- 操作示例
- 基于Redis实现分布式锁初级版本
- ILock接口
- SimpleRedisLock
- 使用示例
- Redis分布式锁误删问题(锁超时释放)
- 问题描述
- 解决方式
- 改进Redis的分布式锁
- SimpleRedisLock改进版
- Redis分布式锁误删问题2(原子性问题)
- Lua脚本解决多条命令原子性问题
- Redis的Lua脚本
- Redis提供的调用函数
- redis.call(..)函数调用及传参
- 示例
- lua脚本改进Redis的分布式锁
- lua脚本解决redis命令原子性问题
- SimpleRedisLock改进版
- 测试
- 总结
- 2. redisson
- 2.1 setnx实现的分布式锁存在的问题
- 2.2 Redisson简介
- 2.3 redisson快速入门
- 引入依赖
- 配置Redisson客户端
- 使用Redisson的分布式锁
- 2.4 redisson的可重入原理
- 可重入原理分析
- 获取锁的lua脚本
- 释放锁的lua脚本
- redisson获取锁&释放锁源码
- 2.5 redisson的锁重试和锁超时解决方式
- 图解
- 代码
- 总结
- 2.6 联锁
- 分布式锁主从一致性问题
- redisson解决主从一致问题
- 联锁使用示例
- 配置3个RedissonClient
- 使用RedissonMultiLock
- 总结
1. 分布式锁
1.1 基本原理和实现方式对比
synchronized锁在集群模式下的问题
在集群模式下,synchronized的锁失效了,synchronized只能保证单个jvm内部的多个线程之间的互斥,而没有办法让集群下的多个jvm进程之间互斥,如果要解决这个问题,就要用到分布式锁。synchornized就是利用jvm内部的锁监视器来控制线程的,在jvm的内部因为各线程共享同1个锁监视器,所以只会有1个线程获取锁,可以实现线程间的互斥。但是当有多个jvm进程之后,就会有多个锁监视器,就会有多个线程获取到锁,这样就没有办法实现多jvm进程之间的互斥了。因此,集群模式下就不能使用jvm内部的锁监视器了。
多jvm使用同一个锁监视器
我们要让多个jvm使用同一个锁监视器,这个锁监视器一定是1个在jvm外部的,多个jvm进程都可以看到的。这样多个jvm进程中的线程中只会有1个线程能够获取到这把锁。这样就可以实现集群模式下多jvm进程中的各线程互斥了。
如下图,在线程1获取到jvm进程外的锁监视器,当它获取到该锁成功之后,就可以执行业务查询订单,如果订单不存在,则插入新订单,然后释放锁。假设在这个过程中,线程3也来获取这个jvm进程外的锁监视器,因为线程1已经拿到了这个锁监视器,因此就会失败,然后一直等待这把锁。等到线程1执行完业务,并释放锁之后,线程3才会获取锁成功,此时来查询订单的话,肯定能查到线程1插入的订单,就不会插入新的订单了,这样就避免了安全问题的发生了。
分布式锁概念
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
分布式锁须满足的条件
-
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
-
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
-
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
-
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
-
安全性:安全也是程序中必不可少的一环
除了上述基本条件,还有一些特性,比如:是否可重入、获取锁时是阻塞的还是非阻塞的、公平锁或者非公平锁
分布式锁的实现
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
mysql分布式锁实现:mysql数据库具备事务机制,在事务执行的时候(或者说在执行写操作的时候),mysql会自动分配1个互斥的锁,这样一来,在多个事务之间是互斥的,只有1个事务能够执行。可以利用这个原理来实现分布式锁。我们在业务执行前,先去mysql里申请1个互斥锁,然后执行我们的业务,业务执行完之后,去提交事务,这样锁就释放了。当业务抛出异常后,它会自动的触发回滚,这样锁也释放了。
redis分布式锁实现:利用redis中的setnx命令,只有当redis中的key不存在时,这个命令才会执行成功。如果已经存在,则会执行失败。因此当多个线程去执行setnx时,只会有1个能够成功,其它都会失败。这样就实现了互斥。
zookeeper分布式锁实现:利用zk内部的节点机制,zk内部可以创建节点,同时节点具备唯一性和有序性,并且还可以创建临时节点。唯一性指的是,创建的节点不能重复。有序性指的是,每次创建的节点的id都是递增的。可以利用有序性来实现互斥,假设很多线程在zk中创建节点,这样每个线程创建的节点的id都是递增的,我们约定节点的id最小的那个,它是算获取锁成功,这样就实现了互斥,因为最小的只有1个。如果要释放锁,则可以删除自己创建的节点,这样一来,它就不是最小的了,另外的节点就变成最小的了。也可以使用唯一性,每个线程创建的节点名称都是一样的,这样只会有1个能够创建成功。
1.2 基于Redis的分布式锁
获取锁&释放锁
实现分布式锁时需要实现的两个基本方法:
-
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- 阻塞式:获取锁失败之后,等待,一直到能够获取到锁,或者到指定超时时间为止(相对于非阻塞式比较耗CPU,实现起来比较复杂)
# 添加锁,利用setnx的互斥特性 SETNX lock thread1 # 添加锁过期时间,避免服务宕机引起的死锁 EXPIRE lock 10
由于上述命令不具备原子性,可以使用[help set]查看set命令的详细使用。
# 添加锁,NX是互斥、EX是设置超时时间 SET lock thread1 NX EX 10
-
释放锁:
- 手动释放
- 超时释放
操作示例
基于Redis实现分布式锁初级版本
ILock接口
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
SimpleRedisLock
redis分布式锁初级版本实现
public class SimpleRedisLock implements ILock {
// 业务的名称, 即锁的名称
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁统一前缀
private static final String KEY_PREFIX = "lock:";
// 传入业务名称和stringRedisTemplate
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate
.opsForValue()
.setIfAbsent(
KEY_PREFIX + name,
threadId,
// 可以指定超时时间(这里实际上是假设获取锁成功之后, 同时指定锁的过期时间。
// 并不是阻塞式获取锁失败后的等待时间)
timeoutSec,
TimeUnit.SECONDS
);
// 避免自动拆装箱出现null的情况(防止success为null)
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
使用示例
上面的redis分布式锁初级版本SimpleRedisLock的使用示例
public Result testRedis() {
Long voucherId = 100L;
Long userId = UserHolder.getUser().getId();
// 对每个用户使用锁控制并发访问
SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
// 尝试获取锁, 并指定如果获取锁成功时, 设置的锁的过期时间
boolean isLock = lock.tryLock(5000);
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败, 返回错误或重试
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
}
Redis分布式锁误删问题(锁超时释放)
问题描述
问题描述:如下图所示,使用上面的SimpleRedisLock,线程1先获取到锁(锁设置了过期时间),然后执行业务,可是由于在执行业务的过程中阻塞了,导致锁过期释放了。此时,线程2来拿锁,因为锁已经释放了,所以线程2能够成功拿到锁,然后线程2开始执行自己的业务。恰巧在这个时候,线程1又开始执行了,线程1就去释放锁(把锁给删掉了)。此时,线程3又来获取锁,因为锁已经被删了,所以线程3能够成功拿到锁。但现在问题是:线程2也在执行业务,线程3也在执行业务,这样又出现了并发问题。原因在于线程1删除了不属于自己的锁(线程1拿到锁之后,由于锁超时而被自动释放掉,从而让线程2拿到了锁,而线程1超时阻塞之后,恢复运行,删除了线程2的锁)
解决方式
上面问题出现的根本原因在于:线程1持有锁,但锁由于超时而释放,锁被其它线程争抢了,但线程1恢复运行后,删除了已经被其它线程获取的锁。也就是:线程1删除了当前已经不属于自己的锁了。因此,线程1在释放锁时,需要判断一下,当前持有这把锁的线程是不是自己。如果是,才能删除;如果不是,则不能删除。同时,还需要在获取锁的时候,存入当前线程自己的标识,这样才能在释放锁的时候才能判断当前持有这把锁的线程是不是自己。
改进Redis的分布式锁
需求:修改之前的分布式锁实现,满足:
- 在获取锁时存入线程标示(可以用UUID表示)
- 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致,则释放锁
- 如果不一致,则不释放锁
SimpleRedisLock改进版
如下实现,但也存在问题
public class SimpleRedisLock implements ILock {
// 业务的名称, 即锁的名称
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁统一前缀
private static final String KEY_PREFIX = "lock:";
// 引入uuid, 用于区分多个jvm(因为线程id是递增的, 防止多个jvm的线程id出现重复的情况)
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
// 传入业务名称和stringRedisTemplate
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate
.opsForValue()
.setIfAbsent(
KEY_PREFIX + name,
ID_PREFIX + threadId,
// 可以指定超时时间(这里实际上是假设获取锁成功之后, 同时指定锁的过期时间。
// 并不是阻塞式获取锁失败后的等待时间)
timeoutSec,
TimeUnit.SECONDS
);
// 避免自动拆装箱出现null的情况(防止success为null)
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
Redis分布式锁误删问题2(原子性问题)
前面,我们在释放锁的时候,添加了1个判断,线程在释放锁之前,会执行1个判断,来判断当前持有这把锁的线程是不是当前线程(通过判断锁标识来实现)。如果是,才去释放锁;如果不是,则不去释放锁。但这样仍然存在问题,假设线程1获取到锁之后,执行完业务,然后也进行了判断当前持有锁是不是自己,这个时候的确是自己,线程1判断完成之后,开始去释放锁。比如恰巧这个时候,发生了FULL-GC,所有代码都被阻塞,并且时间还比较长,超过了锁的过期时间,锁被释放了,此时线程2就可以拿到锁,线程2拿到锁之后,就开始执行自己的业务,但是这个时候,线程1就去释放锁了,线程1又一次释放了当前不属于自己的锁,又发生了误删的问题。假设此时线程3过来拿锁,因为锁已经被线程1给释放掉了,因此线程3就拿到了锁,开始执行业务,此时发现,线程2和线程3都在执行业务了,它们并没有被并发控制。
出现这个问题的根本原因在于:判断锁标识是否是自己 和 释放锁 是2个动作,不具备原子性。
因此,如果要解决这个问题,判断锁标识 与 删除锁 必须是原子操作,不能被间隔。我们可以使用lua脚本来解决这个问题。
Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
Redis的Lua脚本
Redis提供的调用函数
这里重点介绍Redis提供的调用函数,语法如下:
# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)
示例
例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
redis.call(…)函数调用及传参
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
示例
lua脚本改进Redis的分布式锁
lua脚本解决redis命令原子性问题
前面,我们提到由于redis执行命令:判断锁标识是否是自己 和 释放锁 是2个动作,不具备原子性,现在使用lua脚本解决执行多个redis命令原子性问题。
释放锁的业务流程是这样的:
- 获取锁中的线程标示
- 判断是否与指定的标示(当前线程标示)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
初步写
-- 锁的key
local key = "lock:order:5"
-- 当前线程标识
local threadId = "xxxx-33"
-- 获取锁中的线程标识 get key
local id = redis.call('get', key)
-- 比较线程标识 与 锁中的标识 是否一致
if(id == threadId) then
-- 释放锁 del key
return redis.call('del', key)
end
return 0
将变量替换为从数组中取
-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
-- 获取锁中的线程标识 get key
local id = redis.call('get', key)
-- 比较线程标识 与 锁中的标识 是否一致
if(id == threadId) then
-- 释放锁 del key
return redis.call('del', key)
end
return 0
简化
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁(成功删除锁, 则返回1)
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
SimpleRedisLock改进版
unlock.lua
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
SimpleRedisLock
public class SimpleRedisLock implements ILock {
// 业务的名称, 即锁的名称
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁统一前缀
private static final String KEY_PREFIX = "lock:";
// 引入uuid, 用于区分多个jvm(因为线程id是递增的, 防止多个jvm的线程id出现重复的情况)
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
//(提前加载好unlock.lua脚本)
//(其中泛型为返回值)
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定类路径下的unlock.lua文件
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指定返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
// 传入业务名称和stringRedisTemplate
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate
.opsForValue()
.setIfAbsent(
KEY_PREFIX + name,
ID_PREFIX + threadId,
// 可以指定超时时间(这里实际上是假设获取锁成功之后, 同时指定锁的过期时间。
// 并不是阻塞式获取锁失败后的等待时间)
timeoutSec,
TimeUnit.SECONDS
);
// 避免自动拆装箱出现null的情况(防止success为null)
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
/*
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
*/
// 调用lua脚本(这里会由redis保证判断和删除这2个操作的原子性)
//(由原来的在代码中判断线程标识与锁中的标识, 如果标识一致, 则删除。改为执行lua脚本命令, 让这2个命令具备原子性)
stringRedisTemplate.execute(
UNLOCK_SCRIPT, // lua脚本
Collections.singletonList(KEY_PREFIX + name), // 锁的key
ID_PREFIX + Thread.currentThread().getId() // 线程标识
);
// 这里在释放锁。如果当前锁是自己的, 则会释放成功; 如果当前锁不是自己的, 则不会释放。
// 并且判断锁是否是自己的与释放锁是具备原子性的。
}
}
测试
将应用启动2次(idea勾选允许并行运行),发送2个请求分别到这2个应用上,先让其中1个应用获取到锁,然后走删除锁的逻辑这里打上断点,然后让锁超时,此时让第2个应用获取到锁,第2个应用能够成功拿到锁。此时,让第1个应用执行释放锁的逻辑,发现第1个应用没有删除第2个应用拿到的锁,因为此时锁已经不是第1个应用的。因此,锁是没有被误删的。同时,由于redis提供的lua脚本功能让判断锁标识与释放锁具备原子性,不会出现线程安全的漏洞。
总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
2. redisson
2.1 setnx实现的分布式锁存在的问题
基于setnx实现的分布式锁存在下面的问题:
2.2 Redisson简介
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
redisson的wiki文档:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0
2.3 redisson快速入门
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config
.useSingleServer()
.setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
使用Redisson的分布式锁
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,可以空参, 也可以有参
// 有参的参数分别是:waitTime - 获取锁的最大等待时间(期间会重试),
// leaseTime - 锁自动释放时间,
// timeUnit - 时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if (isLock) {
try {
System.out.println("执行业务");
} finally {
// 释放锁
lock.unlock();
}
}
}
2.4 redisson的可重入原理
可重入原理分析
前面我们时使用redis提供的set命令,并且指定nx命令参数(当key不存在时,执行set命令),和ex命令参数(当key设置成功时,指定key的过期时间)。如下图,method1方法在第一次获取锁时,由于key不存在,获取锁会成功。当method1调用method2时,method2方法也要取获取锁,由于在method1已经获取了锁,此时再用setnx命令获取就会失败,因此,这种方式获取的锁是不可重入的。
为了实现锁的可重入,我们参考jdk中可重入锁的实现原理。我们在获取锁的时候,不仅要在锁中记录当前线程标识,还需要记录当前的重入次数。显然, 这个时候,使用redis的string数据结构就不满足需求了。因此可以采用hash数据结构(注意hash结构未提供setnx ex等命令参数),如下图所示,KEY记录锁标识,field记录线程标识,value记录锁的重入次数。在获取锁时,先判断锁是否存在,如果已经存在,先不立刻失败,而是查看当前锁中的线程标识是否是自己,如果是自己,则重入次数加1,如果不是自己,才失败。再说释放锁,在释放锁的时候,先去查看当前这个锁的线程标识是否是自己,如果是,才能释放,不是的话,就不能释放,并且释放的时候并不是直接删除,而是把锁重入次数减1,如果减为0了,才把这把锁给删掉,如果还不为0(说明还有其它业务),就要重置有效期,给后面的业务留够充足的时间。
获取锁的lua脚本
释放锁的lua脚本
redisson获取锁&释放锁源码
2.5 redisson的锁重试和锁超时解决方式
图解
代码
public class RedissonLock extends RedissonExpirable implements RLock {
// ...
// waitTime - 最大锁等待时间
// leaseTime - 获取锁成功时, 设置的失效时间
// 1. 锁重试问题:
// waitTime就是指定的等待时间,
// 里面用到了redis的发布订阅机制, redisson在释放锁的lua脚本中, 释放锁时会发布消息, 这里会订阅消息
// 不断计算剩余等待时间 + redis发布订阅 实现锁重试
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
}
// ...
// 2. 锁超时问题:
// 当leaseTime为-1时, 才会开启看门狗机制, 每隔一段时间就去重置有效期。在释放锁的时候, 会取消看门狗这个任务。
private <T> RFuture<Long> tryAcquireAsync(long waitTime,
long leaseTime,
TimeUnit unit,
long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager()
.getCfg()
.getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,
threadId,
RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 获取锁成功之后, 会自动续约
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 获取的entryName实际上是跟锁是一对一的
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 续约
renewExpiration();
}
}
// 续约方法
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// 完成之后, 继续续约, 重置有效期
//(无限延续下去, 因此在释放锁的时候, 会取消这个续约任务)
renewExpiration();
}
});
}
// 看门狗默认时间为30s, 因此, 这里是10s
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// 重置锁的有效期
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
}
总结
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
2.6 联锁
分布式锁主从一致性问题
为了提高redis的可用性,在生产中往往会搭建redis主从模式,它由多台redis组成,只不过它们的角色会有不同,会有一台作为主节点,其它的作为从节点。主从的职责也会不一样,往往会做读写分离,主节点用来处理所有发向redis的写的操作(增删改),从节点处理redis的读的操作。既然数据都写在了主节点,那么从节点没有数据,又是怎么处理读的请求呢?因此,主节点和从节点需要做数据的同步。主节点会不断的把数据同步给从节点,来确保主节点和从节点的数据是一致的。但是主从同步会有一定的延迟,所以数据同步也会有延迟,尽管延迟很短,但它客观存在。redisson分布式锁主从一致性问题正是由这个延迟导致的。
当1个java应用来获取锁,执行1个写操作命令来获取锁,此时主节点执行成功,保存了该数据。而后,主节点会向从节点同步数据,恰在此时,主节点发生故障,同步尚未完成,这个时候redis中会有哨兵监控集群状态,当发现主节点宕机后,会在从节点中选出1台作为新的主节点,但因为之前的主节点尚未把数据同步过来(也就是锁已经丢失了)。当java应用再来访问这个新的主节点时,就会发现锁已经没有了(锁失效了),假设此时其它线程也来获取锁,因为所以经丢失了,所以其它线程也能够获取到锁了,此时就发生了并发的安全问题。
redisson解决主从一致问题
既然主从关系是导致出现主从一致问题的原因,干脆就不要主从了,所有的节点都看作是独立的redis节点,相互之间没有任何关系,都可以做读写。此时,获取锁的方式就变了,之前只需要向master执行写操作获取锁就可以了,但现在必须依次的向多个redis节点都去执行写操作获取锁,都保存了这个锁标识,才算获取锁成功。因为现在没有主从,所以没有主从一致性问题。假设有一台redis节点宕机了,但此时redis仍然是可用的,只要有节点还存活着,redis的锁仍然有效。
为了继续提高redis的可用性,我们也可以给每个redis节点建立主从关系,如下图所示。假设其中1个redis节点发生故障,假设它并未完成同步,那么它的slave上就没有锁的标识,同时这个slave也会成为新的主节点(但它没有锁标识)。此时,1个线程过来拿锁,是获取不了的,因为必须每1个节点都拿到锁,才算拿到锁成功。尽管这个主节点能拿到成功,但其它节点仍保存了锁标识,因此,只要有1个节点存活者,那么其它线程就不可能拿到锁,就不会出现锁失效的问题。这样的方案保留了主从同步机制,确保了整个redis集群高可用的特性,同时也避免了主从一致引发的锁失效问题,这种方案在redis中叫MultiLock,即联锁。
联锁使用示例
配置3个RedissonClient
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient1(){
// 配置
Config config = new Config();
config
.useSingleServer()
.setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient1(){
// 配置
Config config = new Config();
config
.useSingleServer()
.setAddress("redis://192.168.150.101:6380")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient1(){
// 配置
Config config = new Config();
config
.useSingleServer()
.setAddress("redis://192.168.150.101:6381")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
使用RedissonMultiLock
@Resource
private RedissonClient redissonClient1;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@Test
public void testMultiLock(){
RLock lock1 = redissonClient1.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");
// 创建联锁
//(这里使用redissonClient1 或 redissonClient2 或 redissonClient3 都是一样的)
lock = redissonClient1.getMultiLock(lock1, lock2, lock3);
}
总结
1)不可重入Redis分布式锁:
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
- 缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:
- 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂