Gateway网关限流

在高并发的系统中,往往需要在系统中做限流,一方面是为了防止大量的请求使服务器过载,导致服务不可用,另一方面是为了防止恶意网络攻击

文章目录

  • 一、常见限流场景
    • 1.1 限流的对象
    • 1.2 限流的处理
    • 1.3 限流的架构
  • 二、常见的限流算法
    • 2.1 计数器算法
    • 2.2 漏桶算法(Leaky Bucket)
    • 2.3 令牌桶算法(Token Bucket)
  • 三、Gateway令牌桶限流
    • 3.1 添加依赖jar包
    • 3.2 yml添加配置
    • 3.3 添加redis配置类
    • 3.4 定义KeyResolver的bean对象
    • 3.5自定义返回信息配置类
    • 3.6 测试
  • 四、Gateway令牌桶限流源码解析

一、常见限流场景

缓存、降级 和 限流 被称为高并发、分布式系统的三驾马车,网关作为整个分布式系统中的第一道关卡,限流功能自然必不可少。通过限流,可以控制服务请求的速率,从而提高系统应对突发大流量的能力,让系统更具弹性。限流有着很多实际的应用场景,比如双十一的秒杀活动, 12306 的抢票等。这里讨论网关限流。

1.1 限流的对象

通过上面的介绍,我们对限流的概念可能感觉还是比较模糊,到底限流限的是什么?顾名思义,限流就是限制流量,但这里的流量是一个比较笼统的概念。如果考虑各种不同的场景,限流是非常复杂的,而且和具体的业务规则密切相关,可以考虑如下几种常见的场景:

a):请求频率限流(Request rate limiting):限制某个接口的一分钟内的请求数
b):并发量限流(Concurrent requests limiting):限制某个服务的处理请求数
c):传输速率限流(Transmission rate limiting):限制下载文件速率
d):限制黑名单用户访问
e):限制某个IP请求

1.2 限流的处理

a):拒绝服务:请求直接抛出异常,如:gateway返回429状态码
b):排队等待:请求放入队列中,等待处理
c):服务降级:请求返回兜底数据等

最简单的做法是拒绝服务,直接抛出异常,返回错误信息(比如返回 HTTP 状态码 429 Too Many Requests),或者给前端返回 302 重定向到一个错误页面,提示用户资源没有了或稍后再试。但是对于一些比较重要的接口不能直接拒绝,比如秒杀、下单等接口,我们既不希望用户请求太快,也不希望请求失败,这种情况一般会将请求放到一个消息队列中排队等待,消息队列可以起到削峰和限流的作用。第三种处理方式是服务降级,当触发限流条件时,直接返回兜底数据,比如查询商品库存的接口,可以默认返回有货。

1.3 限流的架构

针对不同的系统架构,需要使用不同的限流方案。如下图所示,服务部署的方式一般可以分为单机模式和集群模式:

在这里插入图片描述单机模式的限流非常简单,可以直接基于内存就可以实现,而集群模式的限流必须依赖于某个“中心化”的组件,比如网关或 Redis,从而引出两种不同的限流架构:网关层限流 和 中间件限流。

在这里插入图片描述网关作为整个分布式系统的入口,承担了所有的用户请求,所以在网关中进行限流是最合适不过的。网关层限流有时也被称为 接入层限流。除了我们使用的 Spring Cloud Gateway,最常用的网关层组件还有 Nginx,可以通过它的 ngx_http_limit_req_module 模块,使用 limit_conn_zone、limit_req_zone、limit_rate 等指令很容易的实现并发量限流、请求频率限流和传输速率限流。这里不对 Nginx 作过多的说明,关于这几个指令的详细信息可以 参考 Nginx 的官方文档。
另一种限流架构是中间件限流,可以将限流的逻辑下沉到服务层。但是集群中的每个服务必须将自己的流量信息统一汇总到某个地方供其他服务读取,一般来说用 Redis 的比较多,Redis 提供的过期特性和 lua 脚本执行非常适合做限流。除了 Redis 这种中间件,还有很多类似的分布式缓存系统都可以使用,如 Hazelcast、Apache Ignite、Infinispan 等。
我们可以更进一步扩展上面的架构,将网关改为集群模式,虽然这还是网关层限流架构,但是由于网关变成了集群模式,所以网关必须依赖于中间件进行限流,这和上面讨论的中间件限流没有区别。

在这里插入图片描述

二、常见的限流算法

2.1 计数器算法

计数器算法采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。

在这里插入图片描述
如上图所示,假定1min内限制请求数量为100,一个请求则计数器counter+1。当counter大于100且还在当前的1min内,触发限流;1min过后,counter重新计数。计数器算法存在临界问题,如下图所示:假设有一个恶意用户,他在 0:59 时,瞬间发送了 100 个请求,并且 1:00 又瞬间发送了 100 个请求,那么其实这个用户在 1 秒里面,瞬间发送了 200 个请求。这个临界点上,可以压垮服务。

在这里插入图片描述

2.2 漏桶算法(Leaky Bucket)

除了计数器算法,另一个很自然的限流思路是将所有的请求缓存到一个队列中,然后按某个固定的速度慢慢处理,这其实就是漏桶算法(Leaky Bucket)。漏桶算法假设将请求装到一个桶中,桶的容量为 M,当桶满时,请求被丢弃。在桶的底部有一个洞,桶中的请求像水一样按固定的速度(每秒 r 个)漏出来。我们用下面这个形象的图来表示漏桶算法:

在这里插入图片描述
桶的上面是个水龙头,我们的请求从水龙头流到桶中,水龙头流出的水速不定,有时快有时慢,这种忽快忽慢的流量叫做 Bursty flow。如果桶中的水满了,多余的水就会溢出去,相当于请求被丢弃。从桶底部漏出的水速是固定不变的,可以看出漏桶算法可以平滑请求的速率。
漏桶算法可以通过一个队列来实现,如下图所示:

在这里插入图片描述
当请求到达时,不直接处理请求,而是将其放入一个队列,然后另一个线程以固定的速率从队列中读取请求并处理,从而达到限流的目的。注意的是这个队列可以有不同的实现方式,比如设置请求的存活时间,或将队列改造成 PriorityQueue,根据请求的优先级排序而不是先进先出。当然队列也有满的时候,如果队列已经满了,那么请求只能被丢弃了。漏桶算法有一个缺陷,在处理突发流量时效率很低。比如双十一抢购、秒杀活动

2.3 令牌桶算法(Token Bucket)

令牌桶算法(Token Bucket)是目前应用最广泛的一种限流算法,它的基本思想由两部分组成:生成令牌 和 消费令牌。

生产令牌:固定容量的令牌桶,按固定的速率(N/s)往桶中放入令牌,桶满时不再放入;
消费令牌:每个请求需要从桶中拿取令牌,当消费速率低于生产速率时,直至桶中令牌满而触发限流,此时请求可以放入缓冲队列或直接拒绝。

令牌桶算法的图示如下:

在这里插入图片描述在上面的图中,我们将请求放在一个缓冲队列中,可以看出这一部分的逻辑和漏桶算法几乎一模一样,只不过在处理请求上,一个是以固定速率处理,一个是从桶中获取令牌后才处理。
仔细思考就会发现,令牌桶算法有一个很关键的问题,就是桶大小的设置,正是这个参数可以让令牌桶算法具备处理突发流量的能力。譬如将桶大小设置为 100,生成令牌的速度设置为每秒 10 个,那么在系统空闲一段时间的之后(桶中令牌一直没有消费,慢慢的会被装满),突然来了 50 个请求,这时系统可以直接按每秒 50 个的速度处理,随着桶中的令牌很快用完,处理速度又会慢慢降下来,和生成令牌速度趋于一致。这是令牌桶算法和漏桶算法最大的区别,漏桶算法无论来了多少请求,只会一直以每秒 10 个的速度进行处理。当然,处理突发流量虽然提高了系统性能,但也给系统带来了一定的压力,如果桶大小设置不合理,突发的大流量可能会直接压垮系统。

总结三种算法特点
类别特点缺点
计数器算法

1.结构简单-计数器;

2.临界问题;

出现临界问题
漏斗算法

1.固定速率处理请求;

2.保护服务;

无法处理突发流量
令牌桶算法

1.固定速率生产令牌;

2.设置容量大小;

3.处理突发流量;

容量设置不合理,可能压垮服务

三、Gateway令牌桶限流

Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua脚本在如下图所示的文件夹中:
在这里插入图片描述接下来我们使用redis作为令牌桶来实现限流:

步骤如下:
引入依赖
创建限流标识
配置限流速率

3.1 添加依赖jar包

SpringBoot和cloud版本

<!-- SpringBoot 依赖配置 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-dependencies</artifactId>
	<version>2.3.7.RELEASE</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>
<!-- Springcloud 依赖配置 -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dependencies</artifactId>
	<version>Hoxton.SR9</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>

限流依赖

<!--网关-->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--基于Redis实现限流-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
	<version>2.2.13.RELEASE</version>
</dependency>

3.2 yml添加配置

server:
  # 服务器的HTTP端口,默认为80
  port: 9000

# Spring配置
spring:
  application:
    #微服务名称
    name: microservice-gateway

  cloud:
    gateway:
      discovery: #配置网关发现机制
        locator: #配置网关处理机制
          enabled: false #开启网关自动映射
          lower-case-service-id: false #服务名称大小写转换:true开启,false 关闭
      routes:
        - id: routed2
          uri: lb://CONSUMER80
          predicates:
            - Path=/api/*/*
          filters:
            # 截断一位url请求前缀
            #- StripPrefix=1
            - name: GatewayRequestRateLimiter
              args:
                # 指定限流标识
                key-resolver: '#{@ipKeyResolver}' #SpringEL表达式,从spring容器中找对象并赋值 '#{@beanName}'
                # 生产令牌速度,每秒多少个令牌
                redis-rate-limiter.replenishRate: 1
                # 令牌桶的总容量
                redis-rate-limiter.burstCapacity: 5

# eureka客户端配置
eureka:
  instance:
    #向注册中心注册服务ID
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
    prefer-ip-address: true     #显示IP地址
    # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-renewal-interval-in-seconds: 30
    #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 30
  server:
    # 设置eureka是否启动自我保护
    enable-self-preservation: true
    # 剔除服务的时间间隔毫秒数(单位:毫秒,默认60秒)
    eviction-interval-timer-in-ms: 5000
  client:
    #表示是否向Eureka注册中心注册自己
    register-with-eureka: true
    fetch-registry: true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
    service-url:
      #defaultZone:  http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
      defaultZone: http://192.168.10.130:7001/eureka/,http://192.168.10.130:7002/eureka/


redis:
  host: 127.0.0.1
  port: 6379
  password: 123456
  database: 0

在上面的配置文件,配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个参数:

burstCapacity,令牌桶总容量。
replenishRate,令牌桶每秒填充平均速率。
key-resolver,用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。

3.3 添加redis配置类

为什么要配置这个,是因为 reactive 使用了 lettuce连接池


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@Order(-1)
//@AutoConfigureBefore({RedisAutoConfiguration.class, RedisReactiveAutoConfiguration.class})
public class RedisConfig {


    @Value("${redis.host}")
    private String redisHost;

    @Value("${redis.port}")
    private int redisPort;

    @Value("${redis.password}")
    private String redisPassword;


    @Bean
    public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
                .newSerializationContext(RedisSerializer.string())
                .value(RedisSerializer.json())
                .build();
        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
    }



    @Primary
    @Bean
    public ReactiveRedisConnectionFactory lettuceConnectionFactory() {
        RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration(redisHost, redisPort);
        redisConfig.setPassword(RedisPassword.of(redisPassword));
        return new LettuceConnectionFactory(redisConfig);
    }


}

3.4 定义KeyResolver的bean对象

配置中key-resolver: “#{@ipKeyResolver}”,其中ipKeyResolver对应的是下面方法的名称

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;


/**
 * 定义KeyResolver的bean对象
 */

@Slf4j
@Configuration
public class KeyResolverConfig {

    /**
     * 基于url
     */
    @Bean
    public KeyResolver pathKeyResolver() {
        System.out.println("基于url限流");
        return exchange -> Mono.just(
                exchange.getRequest().getPath().toString()
        );
    }

    /**
     * 基于用户限流
     */
    @Bean
    KeyResolver userKeyResolver() {
        System.out.println("基于用户限流");
        //按用户限流
        return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
    }
   
//    @Bean
//    @Primary
//    KeyResolver ipKeyResolver() {
//        System.out.println("基于IP来限流");
//        //按IP来限流
//        return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
//    }

    /**
     * 基于IP来限流
     */
    @Primary
    @Bean
    public KeyResolver ipKeyResolver() {
        return new KeyResolver() {
            @Override
            public Mono<String> resolve(ServerWebExchange exchange) {
                ServerHttpRequest request = exchange.getRequest();
                String remoteAddr = request.getRemoteAddress().getAddress().getHostAddress();
                // 这里根据请求【URI】进行限流
                log.info("这里根据url请求 {}", remoteAddr);
                return Mono.just(remoteAddr);
            }
        };
    }
}

3.5自定义返回信息配置类

因为源码的过滤器RequestRateLimiterGatewayFilterFactory中,会将限流拦截的请求的http status code设置为429,但是具体的内容格式却不是JSON格式,导致我们看到的响应结果如上图所示。

at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)

源码如下:

在这里插入图片描述这里有往header里加参数,但是提示显示,是 ReadOnlyHttpHeaders.add,只读的headers,是不可以添加操作的,所以抛出了UnsupportedOperationException的异常:

在这里插入图片描述
所以,这个问题基本都是由于源代码的过滤器所导致,这里要解决问题,我们可以自定义一个过滤器替代一下,代码如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

/***
 * 自定义限流处理异常信息
 *
 */
@Slf4j
@Component
public class GatewayRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {

    private final RateLimiter defaultRateLimiter;

    private final KeyResolver defaultKeyResolver;

    public GatewayRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
        super(defaultRateLimiter, defaultKeyResolver);
        this.defaultRateLimiter = defaultRateLimiter;
        this.defaultKeyResolver = defaultKeyResolver;
        log.info("限流自定义返回加载");
    }

    @Override
    public GatewayFilter apply(Config config) {
        KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
        RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
        return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
            String routeId = config.getRouteId();
            if (routeId == null) {
                Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                routeId = route.getId();
            }
            String finalRouteId = routeId;
            return limiter.isAllowed(routeId, key).flatMap(response -> {
                for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
                    exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
                }
                if (response.isAllowed()) {
                    return chain.filter(exchange);
                }
                log.warn("已限流: {}", finalRouteId);
                ServerHttpResponse httpResponse = exchange.getResponse();
                //修改code为500
                httpResponse.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                if (!httpResponse.getHeaders().containsKey("Content-Type")) {
                    httpResponse.getHeaders().add("Content-Type", "application/json");
                }
                Instant end = Instant.now();
                String dateTimeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                //此处无法触发全局异常处理,手动返回
                DataBuffer buffer = httpResponse.bufferFactory().wrap(("{"
                        + "  \"code\": \"429\","
                        + "  \"message\": \"服务器限流\","
                        + "  \"data\": \"Server throttling\","
                        + "  \"time\": " + dateTimeStr + ","
                        + "  \"success\": false"
                        + "}").getBytes(StandardCharsets.UTF_8));
                return httpResponse.writeWith(Mono.just(buffer));
            });
        });
    }

    private <T> T getOrDefault(T configValue, T defaultValue) {
        return (configValue != null) ? configValue : defaultValue;
    }
}

3.6 测试

jmter开启十个线程,每秒钟请求1次,
限制请求后,网关直接返回429状态码

在这里插入图片描述
查看redis
在这里插入图片描述

四、Gateway令牌桶限流源码解析

Spring Cloud Gateway 中定义了关于限流的一个接口 RateLimiter,如下

package org.springframework.cloud.gateway.filter.ratelimit;

public interface RateLimiter<C> extends StatefulConfigurable<C> {
    Mono<RateLimiter.Response> isAllowed(String routeId, String id);
    }

这个接口就一个方法 isAllowed,第一个参数 routeId 表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id 表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange 中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory 中对 isAllowed 的调用逻辑:

 public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
 		// 从配置中得到 KeyResolver
        KeyResolver resolver = (KeyResolver)this.getOrDefault(config.keyResolver, this.defaultKeyResolver);
        // 从配置中得到 RateLimiter
        RateLimiter<Object> limiter = (RateLimiter)this.getOrDefault(config.rateLimiter, this.defaultRateLimiter);
        boolean denyEmpty = (Boolean)this.getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
        HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse((String)this.getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
        return (exchange, chain) -> {
            return resolver.resolve(exchange).defaultIfEmpty("____EMPTY_KEY__").flatMap((key) -> {
    // 通过KeyResolver得到key,作为唯一标识id传入isAllowed()方法
                if ("____EMPTY_KEY__".equals(key)) {
                    if (denyEmpty) {
                        ServerWebExchangeUtils.setResponseStatus(exchange, emptyKeyStatus);
                        return exchange.getResponse().setComplete();
                    } else {
                        return chain.filter(exchange);
                    }
                } else {
            // 获取当前路由ID,作为routeId参数传入isAllowed()方法
                    String routeId = config.getRouteId();
                    if (routeId == null) {
                        Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                        routeId = route.getId();
                    }

                    return limiter.isAllowed(routeId, key).flatMap((response) -> {
                        Iterator var4 = response.getHeaders().entrySet().iterator();

                        while(var4.hasNext()) {
                            Entry<String, String> header = (Entry)var4.next();
                            exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
                        }

                        if (response.isAllowed()) {
                        // 请求允许,直接走到下一个 filter
                            return chain.filter(exchange);
                        } else {
// 请求被限流,返回设置的 HTTP 状态码(默认是 429)                     ServerWebExchangeUtils.setResponseStatus(exchange, config.getStatusCode());
                            return exchange.getResponse().setComplete();
                        }
                    });
                }
            });
        };
    }

从上面的的逻辑可以看出,通过实现 KeyResolver 接口的 resolve 方法就可以自定义要限流的对象了。

public interface KeyResolver {
    Mono<String> resolve(ServerWebExchange exchange);
}

比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:


 public class HostAddrKeyResolver implements KeyResolver {
     @Override
     public Mono<String> resolve(ServerWebExchange exchange) {
         return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
     }
}

我们继续看 Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:

在这里插入图片描述
很显然是基于 Redis 实现的限流,虽说通过 Redis 也可以实现单机限流,但是总感觉有些大材小用,而且对于那些没有 Redis 的环境很不友好。所以,我们要实现真正的本地限流。
我们从 Spring Cloud Gateway 的 pull request 中发现了一个新特性 Feature/local-rate-limiter,而且看提交记录,这个新特性很有可能会合并到 3.0.0 版本中。我们不妨来看下这个 local-rate-limiter 的实现:LocalRateLimiter.java,可以看出它是基于 Resilience4有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:

public Mono<Response> isAllowed(String routeId, String id) {
    Config routeConfig = loadConfiguration(routeId);

    // How many requests per second do you want a user to be allowed to do?
    int replenishRate = routeConfig.getReplenishRate();

    // How many seconds for a token refresh?
    int refreshPeriod = routeConfig.getRefreshPeriod();

   // How many tokens are requested per request?
    int requestedTokens = routeConfig.getRequestedTokens();

    final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry
            .ofDefaults()
            .rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));

   final boolean allowed = rateLimiter.acquirePermission(requestedTokens);
    final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();

    Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
   return Mono.just(response);

}

实现分布式请求频率限流
上面介绍了如何实现单机请求频率限流,接下来再看下分布式请求频率限流。这个就比较简单了,因为上面说了,Spring Cloud Gateway 自带了一个限流实现,就是 RedisRateLimiter,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

这段代码和上面介绍令牌桶算法时用 Java 实现的那段经典代码几乎是一样的。这里使用 lua 脚本,主要是利用了 Redis 的单线程特性,以及执行 lua 脚本的原子性,避免了并发访问时可能出现请求量超出上限的现象。想象目前令牌桶中还剩 1 个令牌,此时有两个请求同时到来,判断令牌是否足够也是同时的,两个请求都认为还剩 1 个令牌,于是两个请求都被允许了。

有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:

spring:
  cloud:
    gateway:
      routes:
      - id: test
        uri: lb://Provider  # 路由定义对应的微服务的转发地址:lb;负载均衡 + 服务名称
        filters:
          - name: RequestRateLimiter
            args:
              key-resolver: '#{@hostAddrKeyResolver}'
              redis-rate-limiter.replenishRate: 1
              redis-rate-limiter.burstCapacity: 3

其中,key-resolver 使用 SpEL 表达式 #{@beanName} 从 Spring 容器中获取 hostAddrKeyResolver 对象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少个令牌,也就是填充速度。

第二种方式是通过下面的代码来配置:

 @Bean
 public RouteLocator myRoutes(RouteLocatorBuilder builder) {
  return builder.routes()
    .route(p -> p
      .path("/get")
      .filters(filter -> filter.requestRateLimiter()
        .rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
      .uri("http://httpbin.org:80"))
   .build();
}

这样就可以对某个 route 进行限流了。但是这里有一点要注意,Spring Cloud Gateway 自带的限流器有一个很大的坑,replenishRate 不支持设置小数,也就是说往桶中填充的 token 的速度最少为每秒 1 个,所以,如果我的限流规则是每分钟 10 个请求(按理说应该每 6 秒填充一次,或每秒填充 1/6 个 token),这种情况 Spring Cloud Gateway 就没法正确的限流。网上也有人提了 issue,support greater than a second resolution for the rate limiter,但还没有得到解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/331402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为网络设备文件传输FTP配置

R2配置 ftp server enable aaa local-user ftp-client password cipher Huawei123local-user ftp-client privilege level 15local-user ftp-client ftp-directory flash:/local-user ftp-client service-type ftpinterface GigabitEthernet0/0/0ip address 10.0.12.2 255.255.…

【VMware】安装和卸载VMware的Ubuntu

安装视频&#xff1a;&#xff08;全程无废话&#xff09; https://www.bilibili.com/video/BV1W34y1k7ge/?spm_id_from333.337.search-card.all.click&vd_sourcefb8dcae0aee3f1aab700c21099045395

如何使用VNC实现Win系统远程桌面Ubuntu图形化界面【内网穿透】

文章目录 推荐前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 …

k8s-kubectl常用命令

一、基础命令 1.1 get 查询集群所有资源的详细信息&#xff0c;resource包括集群节点、运行的Pod、Deployment、Service等。 1.1.1 查询Pod kubectl get po -o wid 1.1.2 查询所有NameSpace kubectl get namespace 1.1.3 查询NameSpace下Pod kubectl get po --all-namespaces…

第二百七十三回

文章目录 1. 概念介绍2. 方法与信息2.1 获取方法2.2 详细信息 3. 示例代码4. 内容总结 我们在上一章回中介绍了"蓝牙综合示例"相关的内容&#xff0c;本章回中将介绍如何获取设备信息.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在本章回中获…

Power Designer 连接 PostgreSQL 逆向工程生成pd表结构操作步骤以及过程中出现的问题解决

一、使用PowerDesigner16.5 链接pg数据库 1.1、启动PD.选择Create Model…。 1.2、选择Model types / Physical Data Model Physical Diagram&#xff1a;选择pgsql直接【ok】 1.3、选择connect 在工具栏选择Database-Connect… 快捷键&#xff1a;ctrlshiftN.如下图&#xff…

C语言——大头记单词

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 每一发奋努力的背后&#xff0c;必有加…

Flink1.17 基础知识

Flink1.17 基础知识 来源&#xff1a;B站尚硅谷 目录 Flink1.17 基础知识Flink 概述Flink 是什么Flink特点Flink vs SparkStreamingFlink的应用场景Flink分层API Flink快速上手创建项目WordCount代码编写批处理流处理 Flink部署集群角色部署模式会话模式&#xff08;Session …

解密PGSQL数据库引擎:探索数据世界的秘密

目录 1、引言 1.1 什么是PGSQL数据库引擎 1.2 数据库引擎的重要性 1.3 解密PGSQL数据库引擎的意义 2、PGSQL数据库引擎的基础知识 2.1 什么是数据库引擎 2.2 PGSQL数据库引擎的历史和发展 2.3 PGSQL数据库引擎的特点和优势 2.4 PGSQL数据库引擎的架构和组件 3、PGSQL…

数据库(基础理论+MySQL安装和部署)

目录 基础理论 1.1 什么是数据库&#xff1f; 1.2 DBMS数据库管理系统 1.3 数据库与文件系统的区别 1.4 数据库的发展和规划 1.5 常见的数据库 1.5.1 关系型数据库 1.5.2 非关系型数据库 1.6 DBMS支持的数据模型 层次模型 网状模型 关系模型 面向对象模型&#xf…

【每日一题】2171. 拿出最少数目的魔法豆-2024.1.18

题目&#xff1a; 2171. 拿出最少数目的魔法豆 给定一个 正整数 数组 beans &#xff0c;其中每个整数表示一个袋子里装的魔法豆的数目。 请你从每个袋子中 拿出 一些豆子&#xff08;也可以 不拿出&#xff09;&#xff0c;使得剩下的 非空 袋子中&#xff08;即 至少还有一…

【华为 ICT HCIA eNSP 习题汇总】——题目集2

1、交换机某个端口配置信息如下&#xff0c;则此端口的PVID为&#xff08;&#xff09;。 A、100 B、2 C、4 D、1 # interface GigabitEthernet0/0/1 port hybrid tagged vlan 2 to 3 100 port hybrid unatgged vlan 4 6 #考点&#xff1a;VLAN&#xff08;虚拟局域网&#xff…

精通Discord营销:多账号注册与管理,高效打造矩阵

Discord虽然是一个海外小众平台&#xff0c;但在Z世代群体来说却非常受欢迎。通常在游戏行业、年轻化的电商特定品类、软件等业务中&#xff0c;Discord的社群营销可以起到非常卓越的效果。但是&#xff0c;您必须学会管理不同的帐户&#xff0c;以构成矩阵打造社区&#xff0c…

ubuntu开放ssh服务

&#x1f4d1;前言 本文主要是【ubuntu】——ubuntu开放ssh服务的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;每日一…

PXE——高效批量网络装机

目录 部署PXE远程安装服务 1.PXE概述 2.实现过程 3.实验操作 3.1安装dhcp、vsftpd、tftp-server.x86_64、syslinux服务 3.2修改配置文件——DHCP 3.3修改配置文件——TFTP 3.4kickstart——无人值守安装 3.4.1选择程序 3.4.2修改基础配置 3.4.3修改安装方法 3.4.4…

C++核心编程之通过类和对象的思想对文件进行操作

目录 ​​​​​​​一、文件操作 1. 文件类型分类&#xff1a; 2. 操作文件的三大类 二、文本文件 1.写文件 2.读文件 三、二进制文件 1.写二进制文件 2.读二进制文件 一、文件操作 程序运行时产生的数据都属于临时数据,程序一旦运行结束都会被释放 通过文件可以将…

XXL-Job的搭建接入Springboot项目(详细)

一、XXL-Job介绍 XXL-Job 是一款开源的分布式任务调度平台&#xff0c;由 Xuxueli&#xff08;徐雪里&#xff09;开发。它基于 Java 技术栈&#xff0c;提供了一套简单易用、高可靠性的任务调度解决方案。 XXL-Job 的主要作用是帮助开发者实现定时任务的调度和执行。它可以用…

一、VTK 9.0.0 编译安装步骤 VS2019 CMake3.26.0

零基础开始学习VTK &#xff0c;请跟我进行第一步&#xff0c;配置好开放环境&#xff01; 首先&#xff0c;你时间比较紧急&#xff0c;想直接使用VTK &#xff0c;而无需编译、那么请使用 PCL-1.12.0-AllInOne-msvc2019-win64.exe 它已经帮你编译好VTK 9 了&#xff0c;直…

Defi安全--Orion Protocol攻击事件分析

其它相关文章可见个人主页 1. Orion Protocol攻击事件相关信息 2023年2月2日&#xff0c;在ETH和BSC上的Orion Protocol项目被攻击&#xff0c;这里以ETH上攻击为例&#xff1a; 攻击合约地址&#xff1a;Attacker Contract Address | Etherscan攻击者地址&#xff1a;Orion…

合并K个升序链表(LeetCode 23)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;顺序合并方法二&#xff1a;分治合并方法三&#xff1a;使用优先队列合并 参考文献 1.问题描述 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff…