在高并发的系统中,往往需要在系统中做限流,一方面是为了防止大量的请求使服务器过载,导致服务不可用,另一方面是为了防止恶意网络攻击
文章目录
- 一、常见限流场景
- 1.1 限流的对象
- 1.2 限流的处理
- 1.3 限流的架构
- 二、常见的限流算法
- 2.1 计数器算法
- 2.2 漏桶算法(Leaky Bucket)
- 2.3 令牌桶算法(Token Bucket)
- 三、Gateway令牌桶限流
- 3.1 添加依赖jar包
- 3.2 yml添加配置
- 3.3 添加redis配置类
- 3.4 定义KeyResolver的bean对象
- 3.5自定义返回信息配置类
- 3.6 测试
- 四、Gateway令牌桶限流源码解析
一、常见限流场景
缓存、降级 和 限流 被称为高并发、分布式系统的三驾马车,网关作为整个分布式系统中的第一道关卡,限流功能自然必不可少。通过限流,可以控制服务请求的速率,从而提高系统应对突发大流量的能力,让系统更具弹性。限流有着很多实际的应用场景,比如双十一的秒杀活动, 12306 的抢票等。这里讨论网关限流。
1.1 限流的对象
通过上面的介绍,我们对限流的概念可能感觉还是比较模糊,到底限流限的是什么?顾名思义,限流就是限制流量,但这里的流量是一个比较笼统的概念。如果考虑各种不同的场景,限流是非常复杂的,而且和具体的业务规则密切相关,可以考虑如下几种常见的场景:
a):请求频率限流(Request rate limiting):限制某个接口的一分钟内的请求数
b):并发量限流(Concurrent requests limiting):限制某个服务的处理请求数
c):传输速率限流(Transmission rate limiting):限制下载文件速率
d):限制黑名单用户访问
e):限制某个IP请求
1.2 限流的处理
a):拒绝服务:请求直接抛出异常,如:gateway返回429状态码
b):排队等待:请求放入队列中,等待处理
c):服务降级:请求返回兜底数据等
最简单的做法是拒绝服务,直接抛出异常,返回错误信息(比如返回 HTTP 状态码 429 Too Many Requests),或者给前端返回 302 重定向到一个错误页面,提示用户资源没有了或稍后再试。但是对于一些比较重要的接口不能直接拒绝,比如秒杀、下单等接口,我们既不希望用户请求太快,也不希望请求失败,这种情况一般会将请求放到一个消息队列中排队等待,消息队列可以起到削峰和限流的作用。第三种处理方式是服务降级,当触发限流条件时,直接返回兜底数据,比如查询商品库存的接口,可以默认返回有货。
1.3 限流的架构
针对不同的系统架构,需要使用不同的限流方案。如下图所示,服务部署的方式一般可以分为单机模式和集群模式:
单机模式的限流非常简单,可以直接基于内存就可以实现,而集群模式的限流必须依赖于某个“中心化”的组件,比如网关或 Redis,从而引出两种不同的限流架构:网关层限流 和 中间件限流。
网关作为整个分布式系统的入口,承担了所有的用户请求,所以在网关中进行限流是最合适不过的。网关层限流有时也被称为 接入层限流。除了我们使用的 Spring Cloud Gateway,最常用的网关层组件还有 Nginx,可以通过它的 ngx_http_limit_req_module 模块,使用 limit_conn_zone、limit_req_zone、limit_rate 等指令很容易的实现并发量限流、请求频率限流和传输速率限流。这里不对 Nginx 作过多的说明,关于这几个指令的详细信息可以 参考 Nginx 的官方文档。
另一种限流架构是中间件限流,可以将限流的逻辑下沉到服务层。但是集群中的每个服务必须将自己的流量信息统一汇总到某个地方供其他服务读取,一般来说用 Redis 的比较多,Redis 提供的过期特性和 lua 脚本执行非常适合做限流。除了 Redis 这种中间件,还有很多类似的分布式缓存系统都可以使用,如 Hazelcast、Apache Ignite、Infinispan 等。
我们可以更进一步扩展上面的架构,将网关改为集群模式,虽然这还是网关层限流架构,但是由于网关变成了集群模式,所以网关必须依赖于中间件进行限流,这和上面讨论的中间件限流没有区别。
二、常见的限流算法
2.1 计数器算法
计数器算法采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。
如上图所示,假定1min内限制请求数量为100,一个请求则计数器counter+1。当counter大于100且还在当前的1min内,触发限流;1min过后,counter重新计数。计数器算法存在临界问题,如下图所示:假设有一个恶意用户,他在 0:59 时,瞬间发送了 100 个请求,并且 1:00 又瞬间发送了 100 个请求,那么其实这个用户在 1 秒里面,瞬间发送了 200 个请求。这个临界点上,可以压垮服务。
2.2 漏桶算法(Leaky Bucket)
除了计数器算法,另一个很自然的限流思路是将所有的请求缓存到一个队列中,然后按某个固定的速度慢慢处理,这其实就是漏桶算法(Leaky Bucket)。漏桶算法假设将请求装到一个桶中,桶的容量为 M,当桶满时,请求被丢弃。在桶的底部有一个洞,桶中的请求像水一样按固定的速度(每秒 r 个)漏出来。我们用下面这个形象的图来表示漏桶算法:
桶的上面是个水龙头,我们的请求从水龙头流到桶中,水龙头流出的水速不定,有时快有时慢,这种忽快忽慢的流量叫做 Bursty flow。如果桶中的水满了,多余的水就会溢出去,相当于请求被丢弃。从桶底部漏出的水速是固定不变的,可以看出漏桶算法可以平滑请求的速率。
漏桶算法可以通过一个队列来实现,如下图所示:
当请求到达时,不直接处理请求,而是将其放入一个队列,然后另一个线程以固定的速率从队列中读取请求并处理,从而达到限流的目的。注意的是这个队列可以有不同的实现方式,比如设置请求的存活时间,或将队列改造成 PriorityQueue,根据请求的优先级排序而不是先进先出。当然队列也有满的时候,如果队列已经满了,那么请求只能被丢弃了。漏桶算法有一个缺陷,在处理突发流量时效率很低。比如双十一抢购、秒杀活动
2.3 令牌桶算法(Token Bucket)
令牌桶算法(Token Bucket)是目前应用最广泛的一种限流算法,它的基本思想由两部分组成:生成令牌 和 消费令牌。
生产令牌:固定容量的令牌桶,按固定的速率(N/s)往桶中放入令牌,桶满时不再放入;
消费令牌:每个请求需要从桶中拿取令牌,当消费速率低于生产速率时,直至桶中令牌满而触发限流,此时请求可以放入缓冲队列或直接拒绝。
令牌桶算法的图示如下:
在上面的图中,我们将请求放在一个缓冲队列中,可以看出这一部分的逻辑和漏桶算法几乎一模一样,只不过在处理请求上,一个是以固定速率处理,一个是从桶中获取令牌后才处理。
仔细思考就会发现,令牌桶算法有一个很关键的问题,就是桶大小的设置,正是这个参数可以让令牌桶算法具备处理突发流量的能力。譬如将桶大小设置为 100,生成令牌的速度设置为每秒 10 个,那么在系统空闲一段时间的之后(桶中令牌一直没有消费,慢慢的会被装满),突然来了 50 个请求,这时系统可以直接按每秒 50 个的速度处理,随着桶中的令牌很快用完,处理速度又会慢慢降下来,和生成令牌速度趋于一致。这是令牌桶算法和漏桶算法最大的区别,漏桶算法无论来了多少请求,只会一直以每秒 10 个的速度进行处理。当然,处理突发流量虽然提高了系统性能,但也给系统带来了一定的压力,如果桶大小设置不合理,突发的大流量可能会直接压垮系统。
总结三种算法特点
类别 | 特点 | 缺点 |
计数器算法 | 1.结构简单-计数器; 2.临界问题; | 出现临界问题 |
漏斗算法 | 1.固定速率处理请求; 2.保护服务; | 无法处理突发流量 |
令牌桶算法 | 1.固定速率生产令牌; 2.设置容量大小; 3.处理突发流量; | 容量设置不合理,可能压垮服务 |
三、Gateway令牌桶限流
Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua脚本在如下图所示的文件夹中:
接下来我们使用redis作为令牌桶来实现限流:
步骤如下:
引入依赖
创建限流标识
配置限流速率
3.1 添加依赖jar包
SpringBoot和cloud版本
<!-- SpringBoot 依赖配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.7.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Springcloud 依赖配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
限流依赖
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--基于Redis实现限流-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
<version>2.2.13.RELEASE</version>
</dependency>
3.2 yml添加配置
server:
# 服务器的HTTP端口,默认为80
port: 9000
# Spring配置
spring:
application:
#微服务名称
name: microservice-gateway
cloud:
gateway:
discovery: #配置网关发现机制
locator: #配置网关处理机制
enabled: false #开启网关自动映射
lower-case-service-id: false #服务名称大小写转换:true开启,false 关闭
routes:
- id: routed2
uri: lb://CONSUMER80
predicates:
- Path=/api/*/*
filters:
# 截断一位url请求前缀
#- StripPrefix=1
- name: GatewayRequestRateLimiter
args:
# 指定限流标识
key-resolver: '#{@ipKeyResolver}' #SpringEL表达式,从spring容器中找对象并赋值 '#{@beanName}'
# 生产令牌速度,每秒多少个令牌
redis-rate-limiter.replenishRate: 1
# 令牌桶的总容量
redis-rate-limiter.burstCapacity: 5
# eureka客户端配置
eureka:
instance:
#向注册中心注册服务ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address: true #显示IP地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 30
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 30
server:
# 设置eureka是否启动自我保护
enable-self-preservation: true
# 剔除服务的时间间隔毫秒数(单位:毫秒,默认60秒)
eviction-interval-timer-in-ms: 5000
client:
#表示是否向Eureka注册中心注册自己
register-with-eureka: true
fetch-registry: true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
defaultZone: http://192.168.10.130:7001/eureka/,http://192.168.10.130:7002/eureka/
redis:
host: 127.0.0.1
port: 6379
password: 123456
database: 0
在上面的配置文件,配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个参数:
burstCapacity,令牌桶总容量。
replenishRate,令牌桶每秒填充平均速率。
key-resolver,用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。
3.3 添加redis配置类
为什么要配置这个,是因为 reactive 使用了 lettuce连接池
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@Order(-1)
//@AutoConfigureBefore({RedisAutoConfiguration.class, RedisReactiveAutoConfiguration.class})
public class RedisConfig {
@Value("${redis.host}")
private String redisHost;
@Value("${redis.port}")
private int redisPort;
@Value("${redis.password}")
private String redisPassword;
@Bean
public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
.newSerializationContext(RedisSerializer.string())
.value(RedisSerializer.json())
.build();
return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
}
@Primary
@Bean
public ReactiveRedisConnectionFactory lettuceConnectionFactory() {
RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration(redisHost, redisPort);
redisConfig.setPassword(RedisPassword.of(redisPassword));
return new LettuceConnectionFactory(redisConfig);
}
}
3.4 定义KeyResolver的bean对象
配置中key-resolver: “#{@ipKeyResolver}”,其中ipKeyResolver对应的是下面方法的名称
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* 定义KeyResolver的bean对象
*/
@Slf4j
@Configuration
public class KeyResolverConfig {
/**
* 基于url
*/
@Bean
public KeyResolver pathKeyResolver() {
System.out.println("基于url限流");
return exchange -> Mono.just(
exchange.getRequest().getPath().toString()
);
}
/**
* 基于用户限流
*/
@Bean
KeyResolver userKeyResolver() {
System.out.println("基于用户限流");
//按用户限流
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}
// @Bean
// @Primary
// KeyResolver ipKeyResolver() {
// System.out.println("基于IP来限流");
// //按IP来限流
// return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
// }
/**
* 基于IP来限流
*/
@Primary
@Bean
public KeyResolver ipKeyResolver() {
return new KeyResolver() {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
String remoteAddr = request.getRemoteAddress().getAddress().getHostAddress();
// 这里根据请求【URI】进行限流
log.info("这里根据url请求 {}", remoteAddr);
return Mono.just(remoteAddr);
}
};
}
}
3.5自定义返回信息配置类
因为源码的过滤器RequestRateLimiterGatewayFilterFactory中,会将限流拦截的请求的http status code设置为429,但是具体的内容格式却不是JSON格式,导致我们看到的响应结果如上图所示。
at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)
源码如下:
这里有往header里加参数,但是提示显示,是 ReadOnlyHttpHeaders.add,只读的headers,是不可以添加操作的,所以抛出了UnsupportedOperationException的异常:
所以,这个问题基本都是由于源代码的过滤器所导致,这里要解决问题,我们可以自定义一个过滤器替代一下,代码如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
/***
* 自定义限流处理异常信息
*
*/
@Slf4j
@Component
public class GatewayRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public GatewayRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(defaultRateLimiter, defaultKeyResolver);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
log.info("限流自定义返回加载");
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
String finalRouteId = routeId;
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
log.warn("已限流: {}", finalRouteId);
ServerHttpResponse httpResponse = exchange.getResponse();
//修改code为500
httpResponse.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
if (!httpResponse.getHeaders().containsKey("Content-Type")) {
httpResponse.getHeaders().add("Content-Type", "application/json");
}
Instant end = Instant.now();
String dateTimeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
//此处无法触发全局异常处理,手动返回
DataBuffer buffer = httpResponse.bufferFactory().wrap(("{"
+ " \"code\": \"429\","
+ " \"message\": \"服务器限流\","
+ " \"data\": \"Server throttling\","
+ " \"time\": " + dateTimeStr + ","
+ " \"success\": false"
+ "}").getBytes(StandardCharsets.UTF_8));
return httpResponse.writeWith(Mono.just(buffer));
});
});
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
}
3.6 测试
jmter开启十个线程,每秒钟请求1次,
限制请求后,网关直接返回429状态码
查看redis
四、Gateway令牌桶限流源码解析
Spring Cloud Gateway 中定义了关于限流的一个接口 RateLimiter,如下
package org.springframework.cloud.gateway.filter.ratelimit;
public interface RateLimiter<C> extends StatefulConfigurable<C> {
Mono<RateLimiter.Response> isAllowed(String routeId, String id);
}
这个接口就一个方法 isAllowed,第一个参数 routeId 表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id 表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange 中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory 中对 isAllowed 的调用逻辑:
public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
// 从配置中得到 KeyResolver
KeyResolver resolver = (KeyResolver)this.getOrDefault(config.keyResolver, this.defaultKeyResolver);
// 从配置中得到 RateLimiter
RateLimiter<Object> limiter = (RateLimiter)this.getOrDefault(config.rateLimiter, this.defaultRateLimiter);
boolean denyEmpty = (Boolean)this.getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse((String)this.getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> {
return resolver.resolve(exchange).defaultIfEmpty("____EMPTY_KEY__").flatMap((key) -> {
// 通过KeyResolver得到key,作为唯一标识id传入isAllowed()方法
if ("____EMPTY_KEY__".equals(key)) {
if (denyEmpty) {
ServerWebExchangeUtils.setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
} else {
return chain.filter(exchange);
}
} else {
// 获取当前路由ID,作为routeId参数传入isAllowed()方法
String routeId = config.getRouteId();
if (routeId == null) {
Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap((response) -> {
Iterator var4 = response.getHeaders().entrySet().iterator();
while(var4.hasNext()) {
Entry<String, String> header = (Entry)var4.next();
exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
}
if (response.isAllowed()) {
// 请求允许,直接走到下一个 filter
return chain.filter(exchange);
} else {
// 请求被限流,返回设置的 HTTP 状态码(默认是 429) ServerWebExchangeUtils.setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
}
});
}
});
};
}
从上面的的逻辑可以看出,通过实现 KeyResolver 接口的 resolve 方法就可以自定义要限流的对象了。
public interface KeyResolver {
Mono<String> resolve(ServerWebExchange exchange);
}
比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:
public class HostAddrKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
我们继续看 Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:
很显然是基于 Redis 实现的限流,虽说通过 Redis 也可以实现单机限流,但是总感觉有些大材小用,而且对于那些没有 Redis 的环境很不友好。所以,我们要实现真正的本地限流。
我们从 Spring Cloud Gateway 的 pull request 中发现了一个新特性 Feature/local-rate-limiter,而且看提交记录,这个新特性很有可能会合并到 3.0.0 版本中。我们不妨来看下这个 local-rate-limiter 的实现:LocalRateLimiter.java,可以看出它是基于 Resilience4有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:
public Mono<Response> isAllowed(String routeId, String id) {
Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How many seconds for a token refresh?
int refreshPeriod = routeConfig.getRefreshPeriod();
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry
.ofDefaults()
.rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));
final boolean allowed = rateLimiter.acquirePermission(requestedTokens);
final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
return Mono.just(response);
}
实现分布式请求频率限流
上面介绍了如何实现单机请求频率限流,接下来再看下分布式请求频率限流。这个就比较简单了,因为上面说了,Spring Cloud Gateway 自带了一个限流实现,就是 RedisRateLimiter,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }
这段代码和上面介绍令牌桶算法时用 Java 实现的那段经典代码几乎是一样的。这里使用 lua 脚本,主要是利用了 Redis 的单线程特性,以及执行 lua 脚本的原子性,避免了并发访问时可能出现请求量超出上限的现象。想象目前令牌桶中还剩 1 个令牌,此时有两个请求同时到来,判断令牌是否足够也是同时的,两个请求都认为还剩 1 个令牌,于是两个请求都被允许了。
有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:
spring:
cloud:
gateway:
routes:
- id: test
uri: lb://Provider # 路由定义对应的微服务的转发地址:lb;负载均衡 + 服务名称
filters:
- name: RequestRateLimiter
args:
key-resolver: '#{@hostAddrKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 3
其中,key-resolver 使用 SpEL 表达式 #{@beanName} 从 Spring 容器中获取 hostAddrKeyResolver 对象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少个令牌,也就是填充速度。
第二种方式是通过下面的代码来配置:
@Bean
public RouteLocator myRoutes(RouteLocatorBuilder builder) {
return builder.routes()
.route(p -> p
.path("/get")
.filters(filter -> filter.requestRateLimiter()
.rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
.uri("http://httpbin.org:80"))
.build();
}
这样就可以对某个 route 进行限流了。但是这里有一点要注意,Spring Cloud Gateway 自带的限流器有一个很大的坑,replenishRate 不支持设置小数,也就是说往桶中填充的 token 的速度最少为每秒 1 个,所以,如果我的限流规则是每分钟 10 个请求(按理说应该每 6 秒填充一次,或每秒填充 1/6 个 token),这种情况 Spring Cloud Gateway 就没法正确的限流。网上也有人提了 issue,support greater than a second resolution for the rate limiter,但还没有得到解决。