1. time/rate限速器使用
- 令牌桶限流算法
- rate.NewLimiter(limit,burst)产生一个新的限速器
- limit表示每秒产生token数、burst表示最多存token数
- Allow判断当前是否可以取到token
- Wait阻塞等待直到取到token
- Reverse返回等待时间(预估的等待时间),再去取token
package main
import (
"context"
"golang.org/x/time/rate"
"log"
"testing"
"time"
)
func Test_RateLimiter(t *testing.T) {
l := rate.NewLimiter(1, 5)
log.Println(l.Limit(), l.Burst())
for i := 0; i < 10; i++ {
//阻塞等待直到,取到一个token
log.Println("before Wait")
c, _ := context.WithTimeout(context.Background(), time.Second*2)
if err := l.Wait(c); err != nil {
log.Println("limiter wait err:" + err.Error())
}
log.Println("after Wait")
//返回需要等待多久才有新的token,这样就可以等待指定时间执行任务
r := l.Reserve()
log.Println("reserve Delay:", r.Delay())
//判断当前是否可以取到token
a := l.Allow()
log.Println("Allow:", a)
log.Println("======================")
}
}
2. time/rate源码原理
- 计算上次请求和当前请求时间差
- 计算时间差内生成的token数+旧token数
- 如果token为负,则计算等待时间
- token为正,则请求后token-1
type Limit float64
type Limiter struct {
limit Limit//每秒产生的token数
burst int//桶的总大小
mu sync.Mutex//锁
tokens float64//token总数
last time.Time//上一次更新token的时间
lastEvent time.Time//最后一次限速的时间
}
Allow、Reverse、Wait三个方法底层调用的都是func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation
// reserveN 是 AllowN、ReserveN 和 WaitN 的辅助方法。
// maxFutureReserve 指定了允许的最大预订等待时间。
// reserveN 返回 Reservation(而不是 *Reservation),以避免在 AllowN 和 WaitN 中进行分配。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
// 加锁,保护临界区
lim.mu.Lock()
// 如果每秒产生的token数为无限,则无需预订直接返回
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true, // 预订成功
lim: lim, // 当前限流器
tokens: n, // 预订的令牌数
timeToAct: now, // 立即生效
}
}
// 更新当前时间、上次时间和现在可用令牌数
now, last, tokens := lim.advance(now)
// 计算请求n个tokens后的剩余令牌数
tokens -= float64(n)
// 计算等待时长
var waitDuration time.Duration
if tokens < 0 {
// 如果令牌不够,需要等待的时间
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 判断预订是否成功,请求的n是否小于等于桶的容量,且等待时间是否小于用户给的最大实践
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 准备预订结果
r := Reservation{
ok: ok, // 预订是否成功
lim: lim, // 当前限流器
limit: lim.limit, // 当前限流器的限制
}
if ok {
r.tokens = n // 成功预订的令牌数
r.timeToAct = now.Add(waitDuration) // 生效时间
}
// 更新限流器状态
if ok {
lim.last = now // 更新上次预订时间
lim.tokens = tokens // 更新剩余令牌数
lim.lastEvent = r.timeToAct // 更新上次事件时间
} else {
lim.last = last // 未成功则恢复上次时间
}
// 解锁
lim.mu.Unlock()
return r // 返回预订结果
}
// advance 计算并返回基于时间推移的 lim 的更新状态。
// lim 自身的状态不会被改变。
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
// 如果 now 比 last 还早,则使用 now 作为 last
last = now
}
// 避免 last 非常久远时导致 delta 溢出。
// 计算多久后这个桶会自动填满
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
// 如果实际时间间隔超过最大允许间隔,调整为最大间隔,避免由于非常大的 elapsed 造成溢出或不合理的计算。
elapsed = maxElapsed
}
// 计算由于时间推移增加的令牌数
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
// 如果计算得到的令牌数超过了 burst,则限制为 burst
tokens = burst
}
// 返回更新后的时间 now, 上次时间 last 以及新的令牌数 tokens
return now, last, tokens
}
// tokensFromDuration 是一个单位转换函数,
// 用于将时间段转换为在该时间段内以每秒 limit 个令牌的速率
// 可积累的令牌数。
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// 自行分离整数部分和小数部分,以尽量减少舍入误差。
// 参考 golang.org/issues/34861。
sec := float64(d / time.Second) * float64(limit) // 计算整秒内的令牌数
nsec := float64(d % time.Second) * float64(limit) // 计算剩余纳秒内的令牌数
return sec + nsec / 1e9 // 返回整秒和纳秒对应的令牌数之和
}
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation
- AllowN(now time.Time, n int) bool
lim.reserveN(now, n, 0).ok
- now表示现在
- n表示请求n个token
- 0表示等待时间
- ReserveN
lim.reserveN(now, n, InfDuration)
- now表示现在
- n表示请求n个token
- InfDuration表示无限等待
- WaitN
// WaitN 阻塞直到 lim 允许 n 个事件发生。
// 如果 n 超过了 Limiter 的 burst 大小,Context 被取消,
// 或者预期的等待时间超过了 Context 的截止时间,它会返回一个错误。
// 如果速率限制是无限的(Inf),则忽略 burst 限制。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
// 加锁以安全地获取限流器的 burst 和 limit 值
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
// 如果 n 超过了 burst 且 limit 不是 Inf,则返回错误
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// 检查 Context 是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 查看ctx是否设定了deadline,确定最大等待时间
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
// 计算距离截止时间的剩余时间
waitLimit = deadline.Sub(now)
}
// 进行预订
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
// 如果预订失败且等待时间超过 Context 截止时间,返回错误
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 计算需要等待的时间
delay := r.DelayFrom(now)
if delay == 0 {
// 如果不需要等待,直接返回
return nil
}
// 启动定时器进行等待
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// 拿到了令牌
return nil
case <-ctx.Done():
// 在等待时 Context 被取消,取消预订,允许其他事件提前进行
r.Cancel()
return ctx.Err()
}
}
3. 小结
令牌桶算法广泛应用于控制 API 请求速率、限制资源访问频率、管理任务调度等场景。通过合理设置 limit 和 burst,可以有效平衡系统负载和服务质量。该算法并不会实时去维护令牌桶中的token的数量,而是通过last和lastEvent来巧妙的计算出该段时间内容桶内令牌的状态,同时通过锁来维护了对于令牌桶的访问一致性问题。