1. Redisson锁与Redis订阅与发布模式的联系:
Redisson锁中,使用订阅发布模式去通知等待锁的客户端:锁已经释放,可以进行抢锁。
- publish channel_name message:将消息发送到指定频道
- 解锁时,在Lua解锁脚本中使用:场景1:锁不存在时,发送消息到解锁频道;场景2:锁重入记数hash等于0时,发送消息到解锁频道。
- subscribe channel_name ... :客户端订阅一个或多个频道消息
- 加锁时,没有争抢锁成功的客户端,需要订阅解锁频道 -> RedissonLock#tryLock中订阅频道后,会用await()阻塞等待锁释放。直到获取推送的频道解锁消息,才取消等待。
- unsubsribe channel_name ... :客户端退订一个或多个频道消息
- 加锁时,客户端最后都会去退订解锁频道,不管加锁成功与失败。
2. RedissonLock锁:
数据结构:
- Hash结构:key -> name 、field -> lockName (id+":"+threadId -> UUID:threadId) 、 value -> 可重入次数
加锁伪脚本:
// 如果不存在锁:则新增锁,并设置锁重入计数为1、设置锁过期时间,返回nil结束
exists name == 0 -> name:加锁的key名称
hset name lockName 1 -> lockName:当前客户端标识
pexpire name internalLockLeaseTime -> internalLockLeaseTime:锁租期
return nil
// 如果存在锁,且唯一标识也匹配,表示当前锁可重入,重入计数+1,并重新设置锁过期时间,返回nil结束
hexists name lockName = 1
hincrby name lockName 1
pexpire name internalLockLeaseTime
return nil
// 如果存在锁,且唯一标识不匹配,表示锁是被其他线程占用,返回剩余时间结束
return pttl name
解锁伪脚本:
// 如果锁不存在:则直接广播解锁消息,返回1,结束
exists name == 0
publish channelName 0 -> channelName:频道名称 -> redisson_lock__channel:{name}
return 1
// 如果锁存在,但是但唯一标识不匹配:表示锁被其他线程占用,当前线程不允许解锁,返回nil,结束
hexists name lockName == 0
return nil
// 如果锁存在,并且唯一标识匹配:则先将锁重入计数 -1
counter = hincrby name lockName -1
// 如果锁重入计数-1后,还大于0,重新设置有效期,返回0,结束
pexpire name internalLockLeaseTime
return 0
// 如果锁重入计数-1后,等于0,直接删除key,并广播解锁消息(即唤醒其它争抢锁的被阻塞的线程),返回1,结束
del name
publish channelName 0
return 1
参考文档:https://www.cnblogs.com/huangwentian/p/14622441.html
3. RedissonFairLock锁:
数据结构:
- Hash结构:key -> name 、field -> lockName (UUID:threadId)、 value -> 可重入次数
- List结构:key -> redisson_lock_queue:{lockName} 、vaule -> lockName (UUID:threadId)
- 作用:实现公平锁,按顺序记录线程争抢锁的顺序。
- Zset结构:key -> redisson_lock_timeout:{lockName}、 score -> timeout(过期时间,它是时间戳) member -> lockName (id+":"+threadId)
- 作用1:利用Zset分值的功能,清除等待队列中,等待加锁,但是已经过期的线程。
- 作用2:利用Zset分值的功能,在加锁不成功后,计算需要等待的时间(ttl)和更新Zset分值(timeout)
加锁伪脚本:
/ 删除列表中第一个位置是过期的线程
// 死循环
while true do
// list中获取第一元素,如果firstThreadId2为空,直接结束循环
firstThreadId2 = lindex redisson_lock_queue:{lockName} 0
if firstThreadId2 == false
break;
// 如果firstThreadId2不为空,从Zset中获取分值timeout
timeout = zscore redisson_lock_timeout:{lockName} firstThreadId2
// 如果 timeout <= 当前时间戳,从Zset中删除该成员,从List弹出该元素
if timeout <= currentTime
zrem redisson_lock_timeout:{lockName} firstThreadId2
lpop redisson_lock_queue:{lockName}
// 如果 timeout > 当前时间戳,结束循环
break;
// 如果不存在name的key,并且不存队列或者队列第一个元素是lockName时,当前线程可以获得锁
if exists name == 0 && (exists redisson_lock_timeout:{lockName} == 0 || lindex redisson_lock_queue:{lockName} 0 == lockName)
lpop redisson_lock_queue:{lockName}
zrem redisson_lock_timeout:{lockName} lockName
hset name lockName 1
pexpire name internalLockLeaseTime
return nil
// 如果存在name的key,并且hash结构的field也是lockname时,表示当前线程已经获得锁,此时处理锁重入
if hexists name lockName == 1
hincrby name lockName 1
pexpire name internalLockLeaseTime
return nil
// 获取列表第一个元素,计算当前的ttl,然后再重新计算timeout赋值给Zset中,Zset添加成功后,再rpush到队列,返回ttl
firstThreadId = lindex redisson_lock_queue:{lockName} 0
ttl
// 如果列表第一元素不为空,并且不是lockName时,从Zset中获取lockName的timeout,计算ttl
if firstThreadId ~= false and firstThreadId ~= lockName
ttl = (zscore redisson_lock_timeout:{lockName} lockName) - currentTime
// 列表第一元素是lockName时,获取ttl
ttl = pttl name
// 计算 timeout
timeout = ttl + (currentTime + threadWaitTime);
// 如果 zadd 添加成功,则 rpush
if (zadd redisson_lock_timeout:{lockName} timeout lockName) == 1
rpush redisson_lock_queue:{lockName}
return ttl
解锁伪脚本:
// 删除列表中第一个位置是过期的线程
... ...
// 1. 如果不存name时,获取列表第一个元素,并且第一元素不为空,向解锁频道发送通知
if exists name == 0
nextThreadId = lindex redisson_lock_queue:{lockName} 0
if nextThreadId ~= false
publish channel_name .. ':' .. nextThreadId 0
return 1
// 2. 如果 hexists name lockName 不存在时,直接返回nil
if hexists name lockName == 0
return nil
// 3. 如果 hexists name lockName 存在时,处理锁可重入,或删除key,通知下一个元素
counter = hincrby name lockName -1
// 如果counter大于0,设置过期时间
if counter > 0
pexpire name internalLockLeaseTime
return 0
// 如果counter小于等于0,删除key
del name
// 获取列表第一个元素,如果不为空,发送解锁通知
nextThreadId = lindex redisson_lock_queue:{lockName} 0
if nextThreadId ~= false
publish channel_name .. ':' .. nextThreadId 0
return 1
4. 延迟队列:
两个API概念:
- RBlockingQueue 就是目标队列
- RDelayedQueue 就是中转队列
- 我们实际操作的是RBlockingQueue队列,并不是RDelayedQueue队列,RDelayedQueue主要是提供中间转发的一个队列。
- 我们都是基于RBlockingQueue目标队列在进行消费,而RDelayedQueue就是会把过期的消息放入到我们的目标队列中。
- 我们只要从RBlockingQueue队列中取数据即可。
数据结构:
- ZSet结构:key -> redisson_delay_queue_timeout:{name}、score -> System.currentTimeMillis() + delayInMs、member ->数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
- 作用:该队列实现了获取延迟数据的功能。
- List结构:key-> redisson_delay_queue:{name} 、value -> 数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
- 作用:该队列记录了所有数据的加入顺序。
- List结构:key -> 出入阻塞队列的名称,上边的{name}、value -> 数据
处理流程(展示一条主线):
Redisson延迟队列剖析_why redissondelayedqueue use list-CSDN博客
- 实例化RedissonDelayedQueue时:
- 设置频道名称(redisson_delay_queue_channel:{name})、队列名称(redisson_delay_queue:{name})、延迟队列名称(redisson_delay_queue_timeout:{name})
- 设一个“将到期的任务推送阻塞队列”的任务(i.获取到期数据,到阻塞队列的Lua脚本;ii.设置发布订阅的主题)
- 启动该任务,并且添加相关监听(i.添加监听订阅发布的事件 ->pushTask() ;ii.添加监听消息的事件->scheduleTask(startTime))
- 添加消息时:
- 向timeoutSet延迟队列添加数据
- 向queue队列添加数据
- 获取timeoutSet延迟队列的队头数据,如果队头数据等于刚加入的数据,则向频道发送消息
- 触发频道监听事件->scheduleTask(startTime):
- 计算延迟时间 -> delay = startTime - System.currentTimeMillis()
- 如果延迟大于10,启动延迟任务
- 否则立即执行启动 -> pushTask()
- 将到期的任务推送到阻塞队列 -> pushTask():
- 调用实例化时,定义的pushTaskAsync
- 添加监听操作完成的事件,在执行成功后,判断返回值是否为空,不为空调用scheduleTask(future.getNow())