为什么需要限流
系统的维护使用是需要成本的,用户可能使用科技疯狂刷量,消耗系统资源,出现额外的经济开销
问题:
- 控制成本=>限制用户的调用次数
- 用户在短时间内疯狂使用,导致服务器资源被占满,其他用户无法使用=>限流
那么限流阈值多大合适?比如限制单个用户在每秒只能使用1次。
限流的算法
推荐阅读:https://juejin.cn/post/6967742960540581918
1)固定窗口限流
单位时间内允许部分操作
1小时只允许10个用户操作
优点:最简单
缺点:可能出现流量突刺
比如:前59分钟没有1个操作,第59分钟来了10个操作;第1小时又来了10个操作。相当于2分钟内执行了20个操作,服务器仍然有高峰危险。
2)滑动窗口限流
单位时间内允许部分操作,但是这个单位时间是滑动的,需要指定一个滑动单位。
滑动窗口与固定窗口相比,将一个时间段又划分成了几个更小的切片。随着时间的前进,滑动窗口不断向前加载切片,向后遗弃切片,但是切片的总长度是固定的,滑动窗口保证了自己时间段内所有的访问不会超过阈值。
优点:能够解决流量突刺问题,第59分钟和第1小时分为了两个切片,但是属于一个时间段,更多的操作会被拒绝
缺点:实现相对复杂,限流效果和滑动单位有关,滑动单位越小,限流效果越好,但往往很难选取到一个特别合适的滑动单位。
3)漏桶限流(推荐)
以固定的速率处理请求(漏水),当请求桶满了后,拒绝请求。
每秒处理10个请求,同的容量是10,每0.1秒固定处理一次请求,如果1秒来了10个请求;都可以处理完,但如果1秒内来了11个请求,最后那个请求就会溢出桶,被拒绝。
优点:能够一定程度上应对流量突刺,能够以固定速率处理请求,保证服务器的安全
缺点:没有办法迅速处理一批请求,只能一个一个按顺序来处理(固定速率的缺点)
4)令牌桶限流(推荐)
管理员先生成一批令牌,每秒生成10个令牌;当用户要操作前,先去拿到一个令牌,有令牌的人就有资格执行操作、能同时执行操作;拿不到令牌就等着
优点:能够并发处理同时的请求,并发性能会更高
需要考虑的问题:还是存在时间单位选取的问题
想看这些算法的具体实现的话,可以参考这篇文章:4种经典限流算法讲解
在实际开发中,我们一般调用第三方库,无需关注这些算法的具体实现,只需要理解上边我说的这些算法的思想就行
限流粒度
- 针对某个方法限流,即单位时间内最多允许同时XX个操作使用这个方法
- 针对某个用户限流,比如单个用户单位时间内最多执行XX次操作
- 针对某个用户X方法限流,比如单个用户单位时间内最多执行XX次这个方法
限流的实现
1)本地限流(单机限流)
每个服务器单独限流,一般适用于单体项目,就是项目只有一个服务器
在Java中,使用Guava库实现单机限流:
Guava库提供了
RateLimiter
类,它使用令牌桶算法来控制请求的速率。
- 添加依赖:首先需要在项目的
pom.xml
文件中添加Guava库的依赖。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version> <!-- 请使用最新版本 -->
</dependency>
- 创建RateLimiter实例:创建一个
RateLimiter
实例,设置每秒可以处理的请求数。
import com.google.common.util.concurrent.RateLimiter;
RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒5个请求
- 限流操作:在需要限流的方法调用前,调用
rateLimiter.acquire()
来获取一个令牌。
public void doSomething() {
if (rateLimiter.tryAcquire()) {
// 执行操作
} else {
// 限流逻辑,如等待或返回错误
}
}
4.**如何在第3点中的else里写等待逻辑:**可以实现几种不同的等待逻辑:
- 主动等待:使用
Thread.sleep()
或其他同步等待方法等待一段时间后再次尝试获取令牌。 - 被动等待:实现一个循环,不断尝试获取令牌,直到成功为止。
- 随机等待:使用随机等待时间以避免多个请求同时触发。
- 指数退避:等待时间逐渐增加,直到成功获取令牌。
- 返回错误:如果等待时间过长,可以选择返回错误,避免无限期的等待。
以下是几种等待逻辑的示例代码:
主动等待
解释public void doSomething() {
if (rateLimiter.tryAcquire()) {
// 执行操作
} else {
try {
// 等待一段时间后再次尝试,例如等待500毫秒
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 可以选择返回错误或继续执行
}
// 再次尝试获取令牌
if (rateLimiter.tryAcquire()) {
// 执行操作
} else {
// 如果再次获取失败,可以选择返回错误或继续等待
}
}
}
被动等待(不断尝试)
解释public void doSomething() {
while (!rateLimiter.tryAcquire()) {
// 可以选择在这里实现一个短暂的等待,如Thread.sleep(10),以避免CPU占用过高
}
// 执行操作
}
随机等待
解释import java.util.Random;
public void doSomething() {
Random random = new Random();
while (!rateLimiter.tryAcquire()) {
try {
// 随机等待一段时间,例如1到500毫秒之间
int waitTime = random.nextInt(500);
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 可以选择返回错误或继续执行
}
}
// 执行操作
}
指数退避
复制解释public void doSomething() {
int retries = 0;
while (!rateLimiter.tryAcquire()) {
try {
// 指数退避等待时间
Thread.sleep((int) (Math.pow(2, retries) * 100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 可以选择返回错误或继续执行
}
retries++;
if (retries > MAX_RETRIES) {
// 超过最大重试次数,可以选择返回错误
break;
}
}
// 执行操作
}
在实际应用中,选择哪种等待逻辑取决于你的具体需求和场景。例如,如果你希望避免重试对系统造成额外压力,可以选择随机等待或指数退避。如果你希望确保请求最终能够被处理,可以选择被动等待。如果你希望快速失败,可以选择返回错误。
2)分布式限流(多机限流)
如果项目有多个服务器,比如微服务,那么建议使用分布式限流。
- 把用户的使用频率等数据放到一个集中的存储进行统计,比如Redis,这样无论用户的请求落到了哪台服务器,都以集中的数据存储内的数据为准(Redisson - 是一个操作Redis的工具库)
- 在网关集中进行限流和统计(比如Sentinel、Spring Cloud Gateway)
import org.redisson.Redisson;
import org.redisson.api.RSemaphore;
import org.redisson.api.RedissonClient;
public static void main(String[] args){
//创建RedissonClient
RedissonClient redisson = Redisson.create();
//获取限流器
RRateLimiter semaphore = redisson.getRateLimiter("mySemaphore");
//尝试获取许可证
boolean result = semaphore.tryAcquire();
if(result){
//处理请求
}else{
//超过流量限制,需要做何处理
}
}
Redisson限流实现
Redisson内置了一个限流工具类,可以借助Redis来存储统计。
官方仓库:https://github.com/redisson/redisson
看不懂方法的参数含义怎么办?
- 看官方文档
- 下载源码
点进任意一个包里的代码,然后点击下载源代码
即可自动下载
步骤:
- 安装Redis
- 引入Redisson代码包:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.21.3</version>
</dependency>
- 创建RedissonConfig配置块,用于初始化RedissonClient对象单例
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private Integer database;
private String host;
private Integer port;
private String password;
@Bean
public RedissonClient getRedissonClient() {
Config config = new Config();
config.useSingleServer()
.setDatabase(database)
.setAddress("redis://" + host + ":" + port)
.setPassword(password);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
- 编写RedisLimiterManager
什么是Manager?提供了通用的能力,可以放到任何一个项目里
/**
* 专门提供RedisLimiter限流基础服务(提供了通用的能力)
*/
@Service
public class RedisLimiterManager {
@Resource
private RedissonClient redissonClient;
/**
* 限流操作
* @param key 区分不同的限流器,比如不同的用户id应该分别统计
*/
public void doRateLimit(String key) {
//创建一个名称为key的限流器,每秒最多访问2次
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
rateLimiter.trySetRate(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS);
//每当一个操作来了之后,请求一个令牌
boolean canOp = rateLimiter.tryAcquire(1);
if (!canOp) {
throw new BusinessException(ErrorCode.TOO_MANY_REQUEST);
}
}
}
- 单元测试:
@SpringBootTest
class RedisLimiterManagerTest {
@Resource
private RedisLimiterManager redisLimiterManager;
@Test
void doRateLimit() throws InterruptedException {
String userId = "1";
for (int i = 0; i < 2; i++) {
redisLimiterManager.doRateLimit(userId);
System.out.println("成功");
}
Thread.sleep(2000);
for (int i = 0; i < 5; i++) {
redisLimiterManager.doRateLimit(userId);
System.out.println("成功");
}
}
}
- 应用到要限流的方法中,比如智能分析接口:
//必须登录
User loginUser = userService.getLoginUser(request);
//限流判断,每个用户一个限流器
redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());