Redisson
参考: 原文链接
定义:Redisson 是一个用于与 Redis 进行交互的 Java 客户端库
优点:很多
1. 入门
1.1 安装
<!--redission-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.31.0</version>
</dependency>
<!--starter-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.18.0</version>
</dependency>
1.2 配置
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.133.136:6379")
.setPassword("111111");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
---------------------------------使用yml配置-----------------------------------------
redisson:
singleServerConfig:
address: "redis://192.168.133.136:6379" # Redis 服务器地址
password: "111111" # 如果有密码,填入密码
connectionMinimumIdleSize: 10 # 最小空闲连接数
connectionPoolSize: 64 # 连接池大小
idleConnectionTimeout: 10000 # 空闲连接最大存活时间
connectTimeout: 10000 # 连接超时
timeout: 10000 # 请求超时
retryAttempts: 3 # 重试次数
retryInterval: 1500 # 重试间隔(毫秒)
1.3 使用
@Autowired
private RedissonClient redissonClient;
@Test
void testRedisson() throws Exception {
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("sanjin");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
//判断获取锁成功
if (isLock) {
try {
System.out.println("执行业务");
} finally {
//释放锁
lock.unlock();
}
}
}
----------------结果------------------
执行业务
2. 可重入锁原理
2.1 加锁
这是可重入锁接口,
public interface RLock extends Lock, RLockAsync {
String getName();
void lockInterruptibly(long var1, TimeUnit var3) throws InterruptedException;
// 有等待时间
boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;
// 无等待时间
void lock(long var1, TimeUnit var3);
boolean forceUnlock();
boolean isLocked();
boolean isHeldByThread(long var1);
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
}
看一下lock实现方法, 明白大体逻辑即可
- 如果成功,立即返回,如果失败,订阅锁的释放事件
- 在锁释放时,重新尝试获取锁,如果仍未成功(又被抢了),根据 TTL 再次等待,直到获取锁成功
- 在方法退出前,取消对锁释放事件的订阅,避免资源浪费
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 保证只有锁的持有线程可以释放锁
long threadId = Thread.currentThread().getId();
// -1通常表示锁的等待时间设置为无限制(立即尝试获取锁)
// 如果返回 null,表示成功获取到锁, ttl表示锁的剩余过期时间
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 如果未获取到锁,订阅锁的释放事件
CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
// 设置订阅超时,防止由于网络或其他问题导致的长时间等待
this.pubSub.timeout(future);
RedissonLockEntry entry;
// 阻塞等待订阅结果
if (interruptibly) {
entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
} else {
entry = (RedissonLockEntry)this.commandExecutor.get(future);
}
try {
// 进入一个无限循环,不断尝试重新获取锁,直到成功为止
while(true) {
// 再次尝试获取锁
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// 如果返回 null,表示成功获取锁,直接退出方法
if (ttl == null) {
return;
}
// 如果返回一个非 null 的值 ttl,表示锁仍被占用,需要根据剩余时间等待
if (ttl >= 0L) {
try {
// 使用计数器(CountDownLatch 的一种实现)来等待锁的释放通知
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var14) {
if (interruptibly) {
throw var14;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
// 如果 TTL 为负数(可能表示锁的持有者未设置自动过期时间),线程将等待一个释放通知
} else if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
} finally {
// 无论锁是否成功获取,最终都会释放订阅,以避免资源泄漏
this.unsubscribe(entry, threadId);
}
}
}
具体加锁逻辑tryAcquire
KEYS[1]
:锁的 Redis 键,通常为锁的唯一标识ARGV[1]
:锁的过期时间(毫秒)ARGV[2]
:当前线程的唯一标识(value)
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 异步执行 Redis 的 EVAL 命令
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
// 判断锁是否已存在,如果锁不存在,创建锁并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 使用 HINCRBY 创建锁并设置当前线程的持有次数为 1,利用hash结构
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置锁的过期时间,确保锁在持有者崩溃后释放
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 返回 nil 表示锁已成功获取
"return nil; " +
"end; " +
// 锁已被当前线程持有的情况, 判断锁是否被当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 增加持有次数(支持可重入)
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 更新锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回锁的剩余过期时间(毫秒),用于通知调用者等待或重试
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()),
new Object[]{unit.toMillis(leaseTime),
this.getLockName(threadId)});
}
具体加锁逻辑如下图
2.2 续锁
主要就是根据leaseTime判断如何操作
指定了leaseTime:设置过期时间为leaseTime,不启用看门狗
不指定leaseTime:设置默认过期时间(30s),并且启用看门狗
指不指定是看 lock.lock() 是否传入了值
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
// 尝试获取锁
if (leaseTime > 0L) {
// 使用指定的过期时间尝试获取锁, 适合短期锁场景,过期后无需续期
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 使用内部默认的锁租约时间 internalLockLeaseTime(30秒), 适合长期锁场景,通常需要续期机制
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 处理锁获取结果
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
// 如果 ttlRemaining == null,说明锁成功获取
if (ttlRemaining == null) {
if (leaseTime > 0L) {
// 将 internalLockLeaseTime 更新为指定的租约时间,后续 Redis 锁命令会使用该值设置锁的过期时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 看门狗机制续租
// 调用 scheduleExpirationRenewal 方法,开启后台续期任务,确保锁不会因过期时间耗尽而释放
this.scheduleExpirationRenewal(threadId);
}
}
// 表示锁已被其他线程持有,返回锁的剩余有效时间
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
2.3 解锁
最外层的解锁方法
public void unlock() {
try {
// 异步调用解锁方法,并等待其执行完成
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
解锁并处理解锁后的步骤(取消看门狗机制…)
public RFuture<Void> unlockAsync(long threadId) {
// 异步调用解锁方法
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 使用 handle() 方法处理解锁结果和异常
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消锁的续期
this.cancelExpirationRenewal(threadId);
// 如果异步操作抛出异常,包装并抛出 CompletionException
if (e != null) {
throw new CompletionException(e);
} else if (opStatus == null) {
// 如果解锁操作状态为 null,说明当前线程未持有锁,抛出 IllegalMonitorStateException
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
throw new CompletionException(cause);
} else {
// 如果操作成功,则返回 null
return null;
}
});
return new CompletableFutureWrapper(f);
}
具体解锁的redis执行步骤
参数:
KEYS[1]
:锁的键名。KEYS[2]
:用于发布解锁事件的频道名。ARGV[1]
:解锁事件消息。ARGV[2]
:锁的过期时间。ARGV[3]
:锁的名称(用于计数器)。
步骤:
-
检查当前线程是否持有锁
-
减少锁计数器
-
如果计数器值大于零,更新锁的过期时间
-
如果计数器值为零,删除锁并发布解锁事件
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 检查指定的锁是否存在。如果不存在(0),返回 nil,表示当前线程没有持有锁。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end;" +
// 将锁的计数器减一。如果当前线程持有锁,这个操作会减少锁的计数器值
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
// 如果锁计数器值仍大于零,说明有其他线程持有锁,需要更新锁的过期时间
"if (counter > 0) then " +
// 更新锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
// 表示解锁操作成功但锁仍被其他线程持有
"return 0; " +
"else " +
// 如果计数器值为零,说明当前线程是最后一个持有锁的线程
// 删除锁
"redis.call('del', KEYS[1]); " +
// 发布解锁事件通知其他等待的线程
"redis.call('publish', KEYS[2], ARGV[1]); " +
// 表示解锁成功且锁已被完全释放
"return 1; " +
"end; " +
// 如果条件不满足,返回 nil
"return nil;",
Arrays.asList(this.getRawName(), this.getChannelName()),
new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
图解:
3. 其他锁
3.1 红锁和多锁的区别
RedLock 是一种更为复杂的分布式锁实现,保证了分布式环境中的高可用性和容错性,但需要多个 Redis 实例进行协调
多锁 的实现简单,但可靠性差,容易受到单点故障的影响,不适合对安全性和可靠性要求较高的应用
特点 | RedLock | 多锁(Multiple Locks) |
---|---|---|
实现方式 | 使用多个独立的 Redis 实例,保证多数节点成功 | 每个 Redis 实例独立设置锁 |
容错性 | 高,支持在大多数节点上获取锁 | 低,不能保证一致性和容错性 |
锁的获取 | 需要在大多数实例中成功获取 | 在任意一个实例上获取锁即可 |
安全性 | 提供了更高的安全性和可靠性 | 相对简单,但不适用于复杂场景 |
网络分区容忍性 | 可以容忍部分节点失败,但不是所有 | 不适合面对网络分区或节点故障的场景 |
3.2 简单演示
public static void main(String[] args) {
String lockKey = "myLock";
Config config = new Config();
config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6379");
Config config2 = new Config();
config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6380");
Config config3 = new Config();
config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6381");
RLock lock = Redisson.create(config).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);
RedissonRedLock redLock = new RedissonRedLock(lock, lock2, lock3);
try {
redLock.lock();
} finally {
redLock.unlock();
}
}
3.3 CAP之间的取舍
CAP 原则又称 CAP 定理, 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性), 三者不可得兼
一致性© : 在分布式系统中的所有数据备份, 在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)
可用性(A): 在集群中一部分节点故障后, 集群整体是否还能响应客户端的读写请求(对数据更新具备高可用性)
分区容忍性§: 以实际效果而言, 分区相当于对通信的时限要求. 系统如果不能在时限内达成数据一致性, 就意味着发生了分区的情况, 必须就当前操作在 C 和 A 之间做出选择
4. Redisson的限流功能
常见的限流功能:固定窗口算法、滑动窗口算法、漏桶算法、令牌桶算法
利用Redisson的令牌桶限流
@Test
void testLimiter() {
// 创建一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter("sanjin");
// 初始化最大流速为每秒10个令牌
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
for (int i = 0; i < 20; i++) {
// 尝试获取一个令牌
boolean b = rateLimiter.tryAcquire();
if (b) {
System.out.println("成功获取第"+ i +"个令牌");
} else {
System.out.println("被第" + i + "次限流了");
}
}
}
---------------------------结果-----------------------------------
成功获取第0个令牌
成功获取第1个令牌
成功获取第2个令牌
成功获取第3个令牌
成功获取第4个令牌
成功获取第5个令牌
成功获取第6个令牌
成功获取第7个令牌
成功获取第8个令牌
成功获取第9个令牌
被第10次限流了
被第11次限流了
被第12次限流了
被第13次限流了
被第14次限流了
被第15次限流了
被第16次限流了
被第17次限流了
被第18次限流了
被第19次限流了