分布式锁的实现,基于Redis实现分布式锁
- 前言
- 基于mysql实现分布式锁
- 基于Reids实现分布式锁
- 基于Redis的具体实现
- 问题1 :线程误删锁
- 解决方案一:在删除锁时进行校验
- 问题2:释放锁的检验和释放不具有原子性
- 解决方案:基于lua脚本使检验和删除操作具有原子性
- 其他问题
前言
对于一些业务来说,比如商品或者优惠卷的抢杀,我们常常需要进行一些加锁的操作比如synchronized等来避免商品的超卖问题,但是对于大型的购物平台,都是使用了分布式的系统,将服务部署在多台服务器中,让请求通过轮询算法发送到多个服务器中,用来减少服务器的压力,但是使用了分布式系统也就会导致单机锁的失效,比如synchronized来讲,它是基于JVM的监视器锁,线程想要进入被其修饰的代码块就要获取JVM的监视器锁,但是对于多个服务器,它们有着不同的JVM,监视器锁也就不同,也就导致了多个服务器的线程可以同时进入被其修饰的代码块中,此时我们就需要来实现一个分布式锁,可以同时阻塞多个服务器的线程。
基于mysql实现分布式锁
我们可以使用mysql的数据库表来实现,添加几个字段,例如id、lock_name、thread_id分表表示锁的id、锁的名称、加锁的服务器的线程id,给lock_name加唯一约束,线程需要获取锁就需要尝试在数据库插入锁,如果数据库中存在相同的锁,就不能插入也就导致获取锁失败,成功插入锁就获取到了锁,这是基于mysql的锁机制(插入数据时会添加排他性的行级锁)来保证并发安全。
特点:
可用性:高可用
性能:一般
安全性:当mysql连接断开时,会发生线程安全问题
基于Reids实现分布式锁
我们还可以基于Redis实现分布式锁,Redis的缓存可以被多个服务器所共享,我们可以基于Redis的set nx实现分布式锁,当要存储的值不存在时才能插入成功,否则插入失败,还可以使用参数ex设置线程持有锁的超时时间,防止线程一直持有锁。
特点:
可用性:高可用
性能:比mysql更好
安全性:好,可以给锁设置过期时间
基于Redis的具体实现
public class RedisLock implements ILock {
private final static String NAME = "lock:";
private String key;
private StringRedisTemplate redisTemplate;
public RedisLock(String key, StringRedisTemplate redisTemplate) {
this.key = key;
this.redisTemplate=redisTemplate
}
@Override
public boolean getLock(Long timeOut) {
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(NAME + key, “1”, timeOut, TimeUnit.SECONDS);
return BooleanUtil.isTrue(aBoolean);
}
@Override
public void releaseLock() {
redisTemplate.delete(NAME_key)
}
}
如上,我们封装了lock+key的锁的获取和释放方法,这样看起来觉得实现非常简单,实际有很多问题存在
问题1 :线程误删锁
有一个用户的不同服务器的不同线程,分别是线程一和线程二,线程一先获取到了锁,线程一由于业务阻塞超过了锁的过期时间,导致锁被释放,此时线程二获取到了锁,开始执行业务,线程一又继续执行业务执行完毕后删除锁,把线程二持有的锁给释放了,其他线程又获取到了锁,就发生了线程安全问题
解决方案一:在删除锁时进行校验
解决问题一很容易想到,我们可以添加锁时给其value值设置成服务器线程的唯一标识,每次删除锁时都会检验是否是当前线程持有的锁
public class RedisLock implements ILock {
private final static String NAME = "lock:";
private String key;
private StringRedisTemplate redisTemplate;
public RedisLock(String key, StringRedisTemplate redisTemplate) {
this.key = key;
this.redisTemplate=redisTemplate
}
@Override
public boolean getLock(Long timeOut) {
String uid = PREFIX_ID + Thread.currentThread().getId();
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(NAME + key, uid, timeOut, TimeUnit.SECONDS);
return BooleanUtil.isTrue(aBoolean);
}
@Override
public void releaseLock() {
String id = PREFIX_ID + Thread.currentThread().getId();
String uid=redisTemplate.opsForValue().get(NAME+key);
if (id.equals(uid)){
redisTemplate.delete(NAME+key)
}
}
问题2:释放锁的检验和释放不具有原子性
在解决方法一和方法二后,其实还有一个问题,显然在释放锁的操作中,校验和释放不具有原子性,这样就可能会发生这样一个问题,例如线程一和线程二是同一个用户的不同的线程请求,线程一获取到了锁,执行完业务准备释放锁,在释放锁校验成功后,发生了长时间的阻塞(比如FullGC会阻塞所有线程),导致线程的持有锁的时间超过了锁的过期时间,导致锁被释放,被线程二成功获取到了锁,开始执行,此时线程一恢复后释放了锁,此时又有其他线程可以获取到锁,就发生了线程安全的问题。
解决方案:基于lua脚本使检验和删除操作具有原子性
创建lua脚本,lua语言具有原子性
if(redis.call('get',KEYS[1]==ARGV[1])) then
--如果相同删除
return redis.call('del',KEYS[1])
end
return 0
public class RedisLock implements ILock {
private final static String NAME = "lock:";
private String key;
private StringRedisTemplate redisTemplate;
private final static String PREFIX_ID = UUID.randomUUID().toString(true) + "_";
private final static DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<Long>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("noLock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public RedisLock(String key, StringRedisTemplate redisTemplate) {
this.key = key;
this.redisTemplate = redisTemplate;
}
@Override
public boolean getLock(Long timeOut) {
String id = PREFIX_ID + Thread.currentThread().getId();
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(NAME + key, id, timeOut, TimeUnit.SECONDS);
return BooleanUtil.isTrue(aBoolean);
}
@Override
public void releaseLock() {
String id = PREFIX_ID + Thread.currentThread().getId();
redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(NAME+key),id);
}
}
其他问题
基于Redis实现的分布式锁还具有其他问题,比如不可重入、不能多次尝试获取锁、超时释放、主从一致性问题,我们在下一篇文章来介绍这些问题产生的原因以及解决方案