通过Redis zset实现滑动窗口限流算法
在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。限流可以认为服务降级的一种,限流通过限制请求的流量以达到保护系统的目的。
一般来说,系统的吞吐量是可以计算出一个阈值的,为了保证系统的稳定运行,一旦达到这个阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。否则,很容易导致服务器的宕机。
滑动窗口算法
滑动窗口算法思想就是记录一个滑动的时间窗口内的操作次数,操作次数超过阈值则进行限流。
通过zset实现滑动窗口算法思路
指定时间T内,只允许发生N次。我们可以将这个指定时间T,看成一个滑动时间窗口(定宽)。我们采用Redis的zset基本数据类型的score来圈出这个滑动时间窗口。在实际操作zset的过程中,我们只需要保留在这个滑动时间窗口以内的数据,其他的数据不处理即可。
- 每个用户的行为采用一个zset存储,score为毫秒时间戳,value也使用毫秒时间戳(比UUID更加节省内存)
- 只保留滑动窗口时间内的行为记录,如果zset为空,则移除zset,不再占用内存(节省内存)
SpringBoot内实现限流
效果:某个接口1分钟只允许访问n次。
我们可以通过Spring的Aop来实现解耦,通过Before通知校验接口是否达到限流阈值,如果达到直接抛异常。
限流工具类
@Slf4j
@Component
public class SlidingWindowCounter implements ApplicationContextAware {
private static RedisTemplate<String, Object> redisTemplate;
/**
* 数据统计-判断数量是否超过最大限定值
*
* @param key redis key
* @param windowTime 窗口时间,单位:秒
* @param maxNum 最大数量
*
* @return true-超过 false-未超过
*/
public static boolean countOver(String key, long windowTime, long maxNum) {
// 窗口结束时间
long windowEndTime = System.currentTimeMillis();
// 窗口开始时间
long windowStartTime = windowEndTime - windowTime;
// 按score统计key的value中的有效数量
Long count = redisTemplate.opsForZSet().count(key, windowStartTime, windowEndTime);
if (count == null) {
return false;
}
return count > maxNum;
}
/**
* 数据统计、数据上报同步处理,判断数量是否超过最大限定值
*
* @param key redis key
* @param windowTime 窗口时间,单位:秒
* @param maxNum 最大数量
*
* @return true-超过 false-未超过
*/
public static boolean incrementAndGet(String key, long windowTime, long maxNum) {
// 窗口结束时间
long windowEndTime = System.currentTimeMillis();
// 窗口开始时间
long windowStartTime = windowEndTime - windowTime;
// 清除窗口过期成员
redisTemplate.opsForZSet().removeRangeByScore(key,0,windowStartTime);
// 按score统计key的value中的有效数量
Long count = redisTemplate.opsForZSet().count(key, windowStartTime, windowEndTime);
boolean limit = count!=null && count >= maxNum;
//如果没限流,设置zset值
if(!limit){
// 添加当前时间 value=当前时间戳 score=当前时间戳
redisTemplate.opsForZSet().add(key,windowStartTime,windowEndTime);
}
return limit;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
redisTemplate = applicationContext.getBean("redisTemplate",RedisTemplate.class);
}
}
自定义限流注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisRateLimiter {
/**
* 窗口时间
*/
long windows() default 60;
/**
* 窗口时间内允许访问次数
*/
long count() default 1;
/**
* 时间单位
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
定义AOP实现校验
@Component
@Aspect
public class RedisRateLimiterAop {
@Autowired
private RedissonClient redissonClient;
@Pointcut(value = "@annotation(redisRateLimiter)")
public void pointCut(RedisRateLimiter redisRateLimiter){}
@Before("pointCut(redisRateLimiter)")
public void around(JoinPoint joinPoint,RedisRateLimiter redisRateLimiter){
String key = joinPoint.getSignature().getDeclaringTypeName()+"@"+joinPoint.getSignature().getName();
long windows = redisRateLimiter.timeUnit().toMillis(redisRateLimiter.windows());
long count = redisRateLimiter.count();
//我们自己实现的滑动窗口限流
redissonTokenBucket(key, windows, count);
}
/**
* 调用我们写的工具类判断是否超过阈值
*/
private void slidingWindowLimit(String key, long windows, long count) {
boolean limit = SlidingWindowCounter.incrementAndGet(key, windows, count);
if(limit){
throw new RuntimeException("限流");
}
}
/**
* 如果使用了Redisson,可以直接使用令牌桶来实现限流
*/
private void redissonTokenBucket(String key, long windows, long count) {
// 1、 声明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
// 2、 设置速率,5秒中产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, count, windows, RateIntervalUnit.MILLISECONDS);
// 3、试图获取一个令牌,获取到返回true
boolean hasToken = rateLimiter.tryAcquire(1);
if(!hasToken){
throw new RuntimeException("限流");
}
}
}
使用自定义注解实现限流
@RestController
public class TestController {
@RequestMapping("/test")
@RedisRateLimiter(windows = 5,count = 1,timeUnit = TimeUnit.SECONDS)
public String test(){
return "success";
}
}
Redisson提供的令牌桶工具类
redisson内提供了RateLimiter工具类,我们可以通过RateLimiter定义令牌桶来实现现限流。
代码和上面的aop方法类似,只需要将 slidingWindowLimit 方法替换为 redissonTokenBucket方法即可。
private void redissonTokenBucket(String key, long windows, long count) {
// 1、 声明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
// 2、 设置速率,5秒中产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, count, windows, RateIntervalUnit.MILLISECONDS);
// 3、试图获取一个令牌,获取到返回true
boolean hasToken = rateLimiter.tryAcquire(1);
if(!hasToken){
throw new RuntimeException("限流");
}
}