开源限流组件分析(一):juju/ratelimit

文章目录

  • 前言
  • 数据结构
  • 对外提供接口
    • 初始化令牌桶
    • 获取令牌
  • 核心方法
    • adjustavailableTokens
    • currentTick
    • take
    • TakeAvailable
    • Wait系列

前言

这篇文章分析下go开源限流组件juju-ratelimit的使用方式和源码实现细节

源码地址:https://github.com/juju/ratelimit 版本:v1.0.2

其提供了一种高效的令牌桶限流实现

令牌桶相比于其他限流算法(如漏桶算法)的一个显著优势,就是在突发流量到来时,可以短时间内提供更多的处理能力,以应对这些额外的请求

直观上来说,令牌桶算法可以实现为:桶用channel实现

  • 在后台每隔一段固定的时间向桶中发放令牌
  • 要获取令牌时,从channel取数据

后台定时填充令牌:

 func Start(bucket chan struct{}, interval time.Duration) {
 	go func() {
 		ticker := time.NewTicker(interval)
 		defer ticker.Stop()
         // 每隔interval时间往channel中塞一个令牌
 		for range ticker.C {
 			select {
                 // 放令牌
 			case bucket <- struct{}{} :
 				// 桶满了,丢弃
 			default:
 				
 			}
 		}
 	}()
 }

业务请求要获取获取令牌时(非阻塞式):

func GetToken(bucket chan struct{}) bool {
	select {
	case <- bucket:
		return true
	default:
		return false
	}
}

但这样有多少容量就要开多少空间,对内存不友好

更好的方式是只用一个int变量availableTokens维护桶中有多少token:

  1. 每次有请求要获取令牌时,先根据当前时间now减去上次获取令牌的时间last,计算因为这段时间流逝,应该给桶中补充多少令牌,并加到availableTokens
  2. 如果availableTokens < 本次请求的令牌数 request,说明令牌不够。否则令牌够,放行请求,并根据本次需求的令牌数在桶中扣减 :availableTokens -= request

数据结构

type Bucket struct {
	clock Clock

	// bucket创建时间,仅初始化一次,用于计算时间相对偏移量
	startTime time.Time

	// 令牌桶最大容量
	capacity int64

	// 每次放入多少个令牌
	quantum int64

	// 每次放入令牌桶的间隔
	fillInterval time.Duration

	mu sync.Mutex

	// 当前桶中有多少token,注意:可能为负值
	availableTokens int64

	// 上一次访问令牌桶的tick
	latestTick int64
}

注意:桶中剩余令牌availableTokens可能为负值,至于为啥在下文分析扣减令牌流程时说明

对外提供接口

初始化令牌桶

其提供了几种根据不同需求初始化令牌桶的方法:

fillInterval时间放1个令牌,桶容量为capacity

func NewBucket(fillInterval time.Duration, capacity int64) *Bucket 

fillInterval时间放入quantum个令牌

func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket 

这两个方法都是简单设置Bucket的字段,比较简单

每秒放入rate个令牌:

func NewBucketWithRate(rate float64, capacity int64) *Bucket 

其构造方法如下:主要是需要根据参数rate推算出tb.fillInterval以及tb.quantum

func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {
    tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)

    // 每次放入令牌数quantum从1开始,每轮 * 1.1
    for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
            // 假设quantum=1, rate=10,即每次放1个,每秒放10个 => 则放令牌的间隔是0.1s
            fillInterval := time.Duration(1e9 * float64(quantum) / rate)
            if fillInterval <= 0 {
                    continue
            }
            tb.fillInterval = fillInterval
            tb.quantum = quantum

            // 误差小于0.01就返回tb
            if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
                    return tb
            }
    }

}

虽然这里用了for循环寻找最合适的fillInterval和quantum,但只要rate小于10亿,都会在执行第一轮循环后退出

所以可以近似看做quantum=1fillInterval=1e9 / rate

例如:quantum=1,当要求 rate=10,即每次放1个,每秒放10个时,计算出来放令牌的时间间隔时是 0.1s

nextQuantum方法:

// 每次增大1.1倍
func nextQuantum(q int64) int64 {
   q1 := q * 11 / 10
   if q1 == q {
      q1++
   }
   return q1
}

Rate方法:根据quantumfillInterval,计算每秒放入多少个令牌

func (tb *Bucket) Rate() float64 {
    // 举个例子:假设每次放入2个,每500ms放一个 => 每秒就放4个
    return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}

获取令牌

非阻塞获取count个令牌,返回的Duration表示调用者需要等待的时间才能获得令牌

func (tb *Bucket) Take(count int64) time.Duration 

如果在`maxWait`时间内能获取到`count`个令牌就获取,否则不获取

返回获取成功时需要等待的时间,是否获取成功

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) 

非阻塞获取最多`count`个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)
func (tb *Bucket) TakeAvailable(count int64) int64

获取count个令牌,并在方法内部等待直到获取到令牌

func (tb *Bucket) Wait(count int64) 

只有当最多阻塞等待maxWait时间,能获取到count个令牌时才等待,并在内部等待。如果不能返回false
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool

这5个方法中,业务上实用的是WaitMaxDuration,毕竟被限流的请求不能无限等下去

核心方法

adjustavailableTokens

将桶中的令牌数调整为tick时间的令牌数,相当于给桶补充token

  • 计算在tick与上一次lastTick之间能够生产多少个令牌

    • 一般是计算从上次请求到本次请求中产生的令牌数
  • 追加令牌到桶中

func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
    if tb.availableTokens >= tb.capacity {
            return
    }
    // 补充令牌
    tb.availableTokens += (tick - lastTick) * tb.quantum
    if tb.availableTokens > tb.capacity {
            tb.availableTokens = tb.capacity
    }
    return
}

其参数tick如何计算的?调currentTick方法

currentTick

计算当前时间与开始时间(桶的初始化时间)之间,需要放入多少次令牌

func (tb *Bucket) currentTick(now time.Time) int64 {
    return int64(now.Sub(tb.startTime) / tb.fillInterval)
}

用于计算当前到哪个tick了

take

Take系列的方法,底层会调到take方法:

Take:

func (tb *Bucket) Take(count int64) time.Duration {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    // 可以等待无限大的时间
    d, _ := tb.take(tb.clock.Now(), count, infinityDuration)
    return d
}

TakeMaxDuration:

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.take(tb.clock.Now(), count, maxWait)
}

差别在于Take会等待无限大的时间,直到拿到token。TakeMaxDuration最多只会等待maxWait时间


take是获取令牌的核心方法,其流程如下:

  1. 计算出当前时刻可用的令牌数,并补充到令牌桶中
  2. 如果当前令牌桶存量够,在桶中扣减令牌
  3. 如果当前令牌桶存量不够,在桶中预扣减令牌,并返回需要等待的时间waitTime
/** 
当前时间是now
要获取count个令牌
最多等待maxWait时间
*/
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
    if count <= 0 {
            return 0, true
    }

    tick := tb.currentTick(now)
    // 给桶补充token
    tb.adjustavailableTokens(tick)
    avail := tb.availableTokens - count
    // 当前桶中的令牌就能满足需求,直接返回
    if avail >= 0 {
            tb.availableTokens = avail
            return 0, true
    }

    // 到这avail是负数,-avail就是还需要产生多少个令牌,
    // 要产生avail个令牌,还需要多少个tick(向上取整)
    // 再加上当前tick,就是能满足需求的tick
    endTick := tick + (-avail+tb.quantum-1)/tb.quantum
    // 需要等到这个时间才有令牌
    endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
    // 需要等待的时间
    waitTime := endTime.Sub(now)

    // 如果需要等待的时间超过了最大等待时间,则不等待,也不扣减,直接返回
    if waitTime > maxWait {
            return 0, false
    }

    // 这里将availableTokens更新为负值
    tb.availableTokens = avail
    // 返回要等待的时间,且已经在桶里预扣减了令牌
    return waitTime, true
}

怎么理解桶中的availableTokens变为负值?表示有请求已经提前预支了令牌,相当于欠账

  • 之后有请求要获取令牌时,需要先等时间流逝,把这些账还了,才能获取成功

    • TakeAvailable方法
  • 当然也可以在之前已欠账的基础上继续欠账,这样要等待更久的时间才能获取令牌成功

    • Take,TakeMaxDuration方法

例如:令牌桶每秒产生1个令牌,假设在**t1时刻**桶中已经没有令牌了
  1. 请求A在t1时刻调Take()获取5个令牌,将availableTokens更新为-5,并返回5s,表示需要等待5s才能让请求放行
  2. 时间过去1s,此时时刻t2 = t1 + 1s
  3. 请求B在t2时刻调Take()获取5个令牌,首先因为时间流逝,将availableTokens更新为-4。再将availableTokens更新为-4 - 5 = -9,也就是继续欠账。返回9s,表示要等待9s才能让请求放行
    在这里插入图片描述

TakeAvailable

非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)

func (tb *Bucket) TakeAvailable(count int64) int64 {
	tb.mu.Lock()
	defer tb.mu.Unlock()
	return tb.takeAvailable(tb.clock.Now(), count)
}

// 返回可用的令牌数(可能小于count),如果没可用的令牌,将返回0,不会阻塞
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
	if count <= 0 {
		return 0
	}
    // 基于当前时间,给桶补充令牌
	tb.adjustavailableTokens(tb.currentTick(now))
	if tb.availableTokens <= 0 {
		return 0
	}

    // 获取max(count, availableTokens)个令牌,也就是有多少就获取多少
	if count > tb.availableTokens {
		count = tb.availableTokens
	}
	tb.availableTokens -= count
	return count
}

Wait系列

Wait系列的两个方法Wait和WaitMaxDuration就是对Take的封装,也就是如果需要等待,在Wait方法内部等待

func (tb *Bucket) Wait(count int64) {
	if d := tb.Take(count); d > 0 {
        // 在内部等待
		tb.clock.Sleep(d)
	}
}

func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
	d, ok := tb.TakeMaxDuration(count, maxWait)
	if d > 0 {
		tb.clock.Sleep(d)
	}
	return ok
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/893519.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

非常漂亮html公告弹窗代码

非常漂亮html公告弹窗代码 <style>.act-user-modal[data-v-627ce64e] {width: 900px;height: 570px;position: fixed;left: 50%;top: 50%;z-index: 9000;background: url(https://pic1.zhimg.com/80/v2-39b2a0ea3f338776b81d760e67d56027.png)no-repeat 50%;margin: -285…

[已解决] pycharm添加本地conda虚拟环境 + 配置解释器 - pycharm找不到conda可执行文件

目录 问题&#xff1a; 方法&#xff1a; 补充&#xff1a;创建conda虚拟环境 参考文档&#xff1a;pycharm找不到conda可执行文件怎么办&#xff1f;-CSDN 问题&#xff1a; 1.显示&#xff1a;未为项目配置 Python 解释器 2.想在pycharm中使用本地创建的虚拟环境 方法&a…

关于MyBatis-Plus 提供Wrappers.lambdaQuery()的方法

实例&#xff1a; private LambdaQueryWrapper<XXX> buildQueryWrapper(XXXBo bo) { Map<String, Object> params bo.getParams(); LambdaQueryWrapper<XXX> lqw Wrappers.lambdaQuery(); lqw.eq(bo.getOrgId() ! null, XXX::getOrgId, bo.getOrgId()); lq…

搭建Elasticsearch集群

一. 集群的结构 1.单点的问题 单点的Elasticsearch存在哪些可能出现的问题呢? 单台机器存储容量有限,无法实现高存储。容易出现单点故障,无法实现高可用。单服务的并发处理能力有限,无法实现高并发所以,为了应对这些问题,我们需要对Elasticsearch搭建集群。 2.数据分片…

芯课堂 | 使用 SWM341 系列 MCU 环境配置

SWM341系列MCU调试环境配置 SWM341 是华芯微特的其中一款 MCU&#xff0c;341 和 341内置 SDRAM 的 34S 系列&#xff0c;其内核相同。 芯片使用安谋科技“星辰”STAR-MC1 内核,基于 Arm-V8 指令集架构&#xff0c;下载烧录选 M33&#xff0c;对应的工具需要升级; 1、使用 KE…

【踩坑随笔】Tensorflow-GPU训练踩坑

一个无语的坑&#xff0c;4060单卡训练&#xff0c;8G内存本来就不够&#xff0c;还没开始训练就已经爆内存了&#xff0c;但是居然正常跑完了训练&#xff0c;然后一推理发现结果就是一坨。。。往回翻日志才发现原来中间有异常。 首先解决第一个问题&#xff1a;Could not lo…

阴盘奇门月将查法排法以及php的实现的部分代码

vvvvv绿泡泡: lsk9479 月将查法&#xff0c;是根据二十四节气来查询的&#xff0c;查法表如下&#xff1a; ‌雨水至春分‌&#xff1a;月将为亥‌春分至谷雨‌&#xff1a;月将为戌‌谷雨至小满‌&#xff1a;月将为酉‌小满至夏至‌&#xff1a;月将为申‌夏至至大暑‌&am…

基于SpringBoot+Vue大学生创业就业智慧导航服务平台【提供源码+答辩PPT+参考文档+项目部署】

&#x1f4a5; 这两年毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的JavaWeb项目缺少创新和亮点&#xff0c;往往达不到毕业答辩的要求&#xff01; ❗如何解决这类问题&#xff1f; 让我们能够顺利通过毕业&#xff0c;我也一直在不断思考、努力、精进。通过2024年…

SVN——常见问题

基本操作 检出 提交 更新 显示日志 撤销本地修改 撤销已提交内容 恢复到指定版本 添加忽略 修改同一行 修改二进制文件

Science:成功申请适合自己的博士或博士后岗位的技巧

我是娜姐 迪娜学姐 &#xff0c;一个SCI医学期刊编辑&#xff0c;探索用AI工具提效论文写作和发表。 选择在哪里读博士&#xff0c;或进行博士后研究是一个人人生中具有决定性的选择。“选择大于努力”&#xff0c;不同的选择带来的人生体验和结果&#xff0c;可能天上地下。 …

【JavaEE】——四次挥手,TCP状态转换,滑动窗口,流量控制

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;断开连接的本质 二&#xff1a;四次挥手 1&#xff1a;FIN 2&#xff1a;过程梳理 …

【STM32 HAL库】MPU6050姿态解算 卡尔曼滤波

【STM32 HAL库】MPU6050姿态解算 卡尔曼滤波 前言MPU6050寄存器代码详解mpu6050.cmpu6050.h 使用说明 前言 本篇文章基于卡尔曼滤波的原理详解与公式推导&#xff0c;来详细的解释下如何使用卡尔曼滤波来解算MPU6050的姿态 参考资料&#xff1a;Github_mpu6050 MPU6050寄存器…

【漏洞复现】SpringBlade menu/list SQL注入漏洞

》》》产品描述《《《 致远互联智能协同是一个信息窗口与工作界面,进行所有信息的分类组合和聚合推送呈现。通过面向角色化、业务化、多终端的多维信息空间设计,为不同组织提供协同门户,打破组织内信息壁垒,构建统一协同沟通的平台。 》》》漏洞描述《《《 致远互联 FE协作办公…

大语言模型被证明没有推理能力,但是它的救星Prolog来了,我准备入坑了

大语言模型&#xff08;LLM&#xff09;&#xff0c;如GPT等&#xff0c;在自然语言生成上已经展示了非凡的能力&#xff0c;但在推理方面&#xff0c;事情就没那么简单了。它们被证明在逻辑推理上存在严重的短板。大家可能都有类似体验——当你需要LLM帮你推导一个复杂的逻辑问…

cleanmymacX破解版下载 cleanmymacx激活码永久免费 mac电脑免费垃圾清理软件推荐

CleanMyMac&#xff0c;它的字面意思为“清理我的Mac”&#xff0c;是一款Mac清理工具&#xff0c;是MacPaw旗下的一款知名专业Mac系统软件工具&#xff0c;。CleanMyMac X不仅获得了苹果公司的认证&#xff0c;还因其丰富的功能和用户友好型设计&#xff0c;荣获“亚洲设计奖”…

动态量化:大模型在端侧CPU快速推理方案

作为一款高性能的推理引擎框架&#xff0c;MNN高度关注Transformer模型在移动端的部署并持续探索优化大模型在端侧的推理方案。本文介绍权重量化的模型在MNN CPU后端的推理方案&#xff1a;动态量化。动态量化指在运行时对浮点型feature map数据进行8bit量化&#xff0c;然后与…

Linux 和Windows创建共享文件夹实现文件共享

直接开整 1.Windows下创建共享文件夹share右击-》属性—》共享-》选择所有人-》点击共享 2.共享创建完成后可以使他的共享网络地址或者Windows ip地址-推荐使用Windows ip地址有时候 不知道什么原因他Linux解析不了网络地址 共享网络地址 —共享文件夹share 右击-》属性—》共…

基于Java实现(PC)大学班级事务管理系统

courseDesign_Java Java 课设 要求 本次设计要求利用 Java 实现 C/S 模式的大学班级内日常事务管理系统&#xff08;PC 版&#xff0c;应用于校内网有线网络访问&#xff0c;暂不开发移动端&#xff09;&#xff0c;不得依赖现有的建模框架&#xff0c;使用 swings 技术完成如…

spring-boot学习(2)

上次学习截止到拦截器 1.构建RESfun服务 PathVariable通过url路径获取url传递过来的信息 2.MyBatisPlus 第三行的mydb要改为自己的数据库名 第四&#xff0c;五行的账号密码改成自己的 MaooerScan告诉项目自己的这个MyBatisPlus是使用在哪里的&#xff0c;包名 实体类的定义…

中级课程RHCE

RHCE 一、复习RHCSA1.1 系统安装1.1.1 安装虚拟机1.1.2 第一个快照1.1.3 第二个快照vi编辑器1.1.4 看网关网卡 1.2 文件管理1.3 目录管理1.4 用户管理1.5 权限管理1.6 存储管理1.6.1 标准分区管理实验&#xff1a;1.6.2 逻辑卷管理实验&#xff1a;1.6.3 交换空间管理实验 …