基于redisson实现分布式锁
之前背过分布式锁几种实现方案的八股文,但是并没有真正自己实操过。现在对AOP有了更深一点的理解,就自己来实现一遍。
1、分布式锁的基础知识
分布式锁是相对于普通的锁的。普通的锁在具体的方法层面去锁,单体应用情况下,各个进入的请求都只能进入到一个应用里面,也就能达到锁住方法的效果。
而分布式的系统,将一个项目部署了多个实例,通过nginx去做请求转发将请求分到各个实例。不同实例之间共用代码,共用数据库这些。比如一个请求进入A实例,获得了锁;如果继续有请求进入A实例,则会排队等待。但如果请求进入的是B实例呢?B实例的锁和A实例没有关系,那么进入B实例的请求也会获取到锁,然后进入方法。这样锁的作用就没有达到。这种情况下,就引出了分布式锁,这是专门为了解决分布式系统的并发问题的。做法是让不同的实例都能使用同一个锁。比如redis,redis内部是单线程的,把锁放在redis,这样就可以多个实例共用一个锁。
2、Redisson
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。
利用Redisson可以很轻松就实现分布式锁。
3、实现的几个点
如何动态将拼接key所需的参数值传入到切面?
有两种方案,一种是通过给参数加上注解来标识那些参数是作为key使用的。在切面内通过获取方法的参数上的注解来获取所需的参数值。
另一种是通过定义key的参数名,在切面获取方法参数的参数名进行对比,一样的就是所需的参数。
获取锁失败如何重试?
Redisson提供了重试的能力,以及重试等待时长等。
4、代码实现
前置操作:项目引入Redis
引入Redisson依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
项目yaml文件
spring:
#reids配置
redis:
database: 1
host: 127.0.0.1
port: 6379
# redisson配置
redisson:
# 数据库编号
database: 1
# 节点地址
address: redis://127.0.0.1:6379
编写Redisson配置文件
package com.yumoxuan.myapp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Value("${spring.redis.redisson.address}")
public String address;
@Value("${spring.redis.redisson.database}")
public Integer database;
@Bean
public RedissonClient getRedissonClient(){
Config config=new Config();
config.useSingleServer().setAddress(address).setDatabase(database);
return Redisson.create(config);
}
}
定义分布式锁注解
package com.yumoxuan.myapp.core.aspect.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
/**
* 锁的名称
* @return
*/
String name() default "";
/**
* 锁的参数key
* @return
*/
String[] key() default {};
/**
* 锁过期时间
* @return
*/
int expireTime() default 30;
/**
* 锁过期时间单位
* @return
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 等待获取锁超时时间
* @return
*/
int waitTime() default 10;
}
切面实现
package com.yumoxuan.myapp.core.aspect;
import com.yumoxuan.myapp.core.aspect.annotation.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Component
public class DistributedLockAspect {
@Resource
public RedissonClient redissonClient;
@Pointcut("@annotation(com.yumoxuan.myapp.core.aspect.annotation.DistributedLock)")
public void pointcut() {
}
@Around("pointcut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
DistributedLock annotation = method.getAnnotation(DistributedLock.class);
String name = annotation.name();
String key = name;
String[] keys = annotation.key();
int expireTime = annotation.expireTime();
TimeUnit timeUnit = annotation.timeUnit();
int waitTime = annotation.waitTime();
Object[] args = point.getArgs(); // 参数值
String[] paramNames = signature.getParameterNames(); // 参数名
for (int i = 0; i < args.length; i++) {
String paramName = paramNames[i];
for (int j = 0; j < keys.length; j++) {
if (paramName.equals(keys[j])) {
key = key + ":" + args[i];
}
}
}
log.info(Thread.currentThread().getId()+" "+"key:"+key);
RLock lock = redissonClient.getLock(key);
boolean res = false;
Object obj = null;
try {
if (waitTime == -1) {
res = true;
lock.lock(expireTime, timeUnit);
} else {
res = lock.tryLock(waitTime, expireTime, timeUnit);
}
if (res) {
obj = point.proceed();
} else {
log.error("分布式锁获取异常");
}
} finally {
//释放锁
if (res) {
lock.unlock();
}
}
return obj;
}
}
验证效果
@ApiOperation(value="my_app_user-添加", notes="my_app_user-添加")
@PostMapping(value = "/add")
@DistributedLock(name = "添加分布式锁",key = {"userId"})
public Result<?> add(@RequestBody MyAppUser myAppUser, String userId) {
log.info(Thread.currentThread().getId()+" "+new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Result.ok();
}
如下图,76.77的key是一样的,从打印的时间上看,两次打印时间相差了5秒,说明后者被阻塞。而68的线程key和另外两个不一样,所以没有被阻塞,在76和77的中间就运行了。符合分布式锁的效果。
5、拓展
要实现一个功能完善的分布式锁,其实还有很多内容。
比如锁要过期了,但事务未执行完毕,则需要进行锁续期,这个可以通过守护线程实现。