文章目录
- RRateLimiter 介绍
- 代码实现
- Lua 脚本
- 现实场景
- 1. 初始化限流器
- 2. 限流器应用场景(客人申请游玩流程)
RRateLimiter 介绍
在分布式系统中,限流(Rate Limiting)是保障系统稳定性、避免过载的重要机制。Redisson 作为一个功能强大的 Redis 客户端,不仅提供了广泛使用的分布式锁,还包含了许多其他实用的分布式工具。其中,RRateLimiter 是 Redisson 提供的分布式限流器,功能强大。本文将详细解析 RRateLimiter 的原理,深入理解其工作机制。
代码实现
首先,通过一个简单的示例了解如何使用 RRateLimiter,它创建了一个限流器并启动多个线程来获取令牌:
import org.redisson.Redisson; // 导入 Redisson 的核心类,用于创建 Redisson 客户端
import org.redisson.api.RRateLimiter; // 导入 RRateLimiter 接口,用于实现分布式限流
import org.redisson.api.RedissonClient; // 导入 RedissonClient 接口,用于与 Redis 进行交互
import org.redisson.config.Config; // 导入 Redisson 的配置类,用于配置 Redis 连接
import java.util.concurrent.CountDownLatch; // 导入 CountDownLatch 类,用于控制线程同步
public class RateLimiterDemo { // 定义一个公共类 RateLimiterDemo
public static void main(String[] args) throws InterruptedException { // 主方法,程序入口,可能抛出 InterruptedException
RRateLimiter rateLimiter = createRateLimiter(); // 创建一个 RRateLimiter 实例
int totalThreads = 20; // 定义总线程数为 20
CountDownLatch latch = new CountDownLatch(totalThreads); // 创建一个 CountDownLatch 实例,初始计数为 totalThreads
long startTime = System.currentTimeMillis(); // 记录开始时间,用于计算总耗时
for (int i = 0; i < totalThreads; i++) { // 循环创建并启动 20 个线程
new Thread(() -> { // 创建一个新线程
rateLimiter.acquire(1); // 每个线程尝试获取 1 个令牌,若令牌不足则阻塞等待
latch.countDown(); // 线程完成后,调用 countDown() 方法减少计数器
}).start(); // 启动线程
}
latch.await(); // 主线程等待,直到所有子线程完成
System.out.println("Total elapsed time: " + (System.currentTimeMillis() - startTime) + " ms"); // 打印总耗时
}
/**
* 创建并配置 RRateLimiter 的方法
*
* @return 配置好的 RRateLimiter 实例
*/
private static RRateLimiter createRateLimiter() { // 创建并配置 RRateLimiter 的方法
Config config = new Config(); // 创建一个新的 Redisson 配置实例
config.useSingleServer() // 配置使用单一 Redis 服务器
.setAddress("redis://127.0.0.1:6379") // 设置 Redis 服务器地址
.setTimeout(1000000); // 设置连接超时时间(毫秒)
RedissonClient redisson = Redisson.create(config); // 根据配置创建一个 Redisson 客户端实例
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter"); // 获取名为 "myRateLimiter" 的 RRateLimiter 实例
rateLimiter.trySetRate(RRateLimiter.RateType.OVERALL, 1, 1, RateIntervalUnit.SECONDS); // 初始化限流器,设置全局速率为每秒 1 个令牌
return rateLimiter; // 返回配置好的限流器实例
}
}
Lua 脚本
为了更深入地理解 RRateLimiter 的工作原理,将进一步解析其底层的 Lua 脚本,实现分布式限流的核心逻辑。以下内容将逐行解释 Lua 脚本的功能和实现细节。
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); -- 将速率设置到哈希表中,只有当 'rate' 字段不存在时才设置
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); -- 将时间区间设置到哈希表中,只有当 'interval' 字段不存在时才设置
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]); -- 将类型设置到哈希表中,只有当 'type' 字段不存在时才设置,并返回结果
-- ARGV[1] 为请求令牌数
-- ARGV[2] 为请求时间戳
-- ARGV[3] 为请求类型
-- 获取限流器的速率、时间区间和类型
local rate = redis.call("hget", KEYS[1], "rate") -- 从哈希表中获取速率
local interval = redis.call("hget", KEYS[1], "interval") -- 获取时间区间(毫秒)
local type = redis.call("hget", KEYS[1], "type") -- 获取限流器的类型(单机或集群)
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized") -- 确保限流器已初始化
-- 默认情况下,使用 {name}:value 和 {name}:permits
local valueName = KEYS[2] -- 当前令牌数的键名
local permitsName = KEYS[4] -- 记录请求的有序集合键名
-- 如果类型为 "1"(单机模式),则使用不同的键名
if type == "1" then
valueName = KEYS[3] -- 单机模式下的令牌数键名
permitsName = KEYS[5] -- 单机模式下的有序集合键名
end
-- 确保请求的令牌数不超过限流器的速率
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
-- 获取当前剩余的令牌数
local currentValue = redis.call("get", valueName)
-- 第一次请求直接走else
-- 第二次请求因为 valueName 更新有值,走if
if currentValue ~= false then
-- 获取已过期的请求(初始时间 至 (当前时间(ARGV[2])-时间间隔(interval)) 准备清理失效的令牌数据
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0 -- 初始化拟新增失效令牌数
-- 遍历过期的请求,释放相应的令牌
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end
-- 如果有释放的令牌,更新当前可用令牌数并移除过期的请求
if released > 0 then
redis.call("zrem", permitsName, unpack(expiredValues)) -- 清除 permitsName 中包含 expiredValues 的数据
currentValue = tonumber(currentValue) + released -- 清理失效令牌后计算总可用令牌数
redis.call("set", valueName, currentValue) -- 更新可用令牌
end
-- 如果当前令牌数不足以满足请求
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 计算需要等待的时间
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1) -- 找到最近一次的请求时间 nearest
local random, permits = struct.unpack("fI", nearest[1]) -- 解压为时间戳+请求令牌数
-- 返回等待时间,也可以写为 tonumber(nearest[2])+interval-tonumber(ARGV[2])
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval) -- nearest[2] 为上行的 random
else
-- 当前可用令牌数足够,记录此次请求并减少可用令牌数
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
return nil -- 成功获取令牌
end
else
-- 第一次请求,初始化令牌数和有序集合
redis.call("set", valueName, rate) -- 设置当前令牌数为最大速率值
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
return nil -- 成功获取令牌
end
现实场景
冰雪大世界的热门项目每天吸引着络绎不绝的顾客。为了避免人流过于集中,影响顾客的体验和项目的正常运行,管理团队制定了以下规则:
- 每小时只接待6位客人。
- 每位客人在进入项目游玩,一个小时后自动将入场票归还到废票区,确保不影响后续客人的入场。
限流机制的设置
1. 初始化限流器
项目每天一开始,第一位客人进入游玩时,系统会进行以下操作:
- 统计系统剩余票数量:记录为(valueName),代表同一时间段内的最大客容量。
- 记录每次申请的客人及进场时间:存储在(permitsName)中。
- 刷新实际剩余票数量:更新为(currentValue = valueName),确保系统实时掌握当前剩余的入场票数。
通过这些步骤,系统为当天的限流工作做好了准备。
2. 限流器应用场景(客人申请游玩流程)
当一位客人申请游玩项目时,系统会按照以下流程操作:
步骤一:查询可用票
- 计算实际剩余票(currentValue = valueName)。
步骤二:回收废票
- 从废票区 根据入场记录(permitsName)计算(当前时间-时间间隔)之前的所有废弃入场票(released),这意味着已进入游玩的客人在系统时间间隔后已不再影响项目后续游客的体验,归还的票可以重新使用。
- 更新实际剩余票(currentValue):将回收的票数加到实际剩余票(currentValue += released)
- 更新系统剩余票(valueName):(valueName = currentValue),确保系统知道当前有多少可用的入场票,反映最新的入场票状态。
步骤三:判断票数是否足够
- 检查实际剩余票 (currentValue):与当前游客申请票数(tonumber(ARGV[1]))进行比较。
-
如果票够用:
- 记录此次请求:将客人的申请信息和进场时间记录到(permitsName)中。
- 更新系统剩余票:(valueName -= 申请票数)中扣除相应的票数。
- 允许客人进入:客人成功进入项目游玩。
-
如果票不够:
- 计算等待时间:根据上一位客人的入场时间和设定的时间间隔,计算出客人需要等待时间(上一位客人的入场时间+间隔时间-当前时间)。
- 告知客人:将计算出的等待时间返回给客人,游客异步再尝试进入。
-