本地缓存是一个项目中很常见的组件。在很多人的眼中就是一个简单的key-value的map存储即可实现,但实际上,设计一个本地缓存需要考虑的问题远比你想象的多,比如说,本地缓存是将数据存储在内存,若数据量激增突破了内存限制怎么办?如何高效地管理本地缓存,以最低的时间复杂度添加和删除缓存都是值得思考的问题
本文以第一视角带你走进go-zero框架,从源码的角度解析本地缓存的设计与实现
1. 数据结构
我们先来看本地缓存的数据结构
// 本地缓存对象
type LocalCache struct {
name string
lock sync.Mutex
data map[string]any // 字典,用于索引数据
expire time.Duration // 缓存过期时间
lruCache LRU // LRU缓存,用于限制内存大小
barrier singleflight.Group // 单飞模式,防止重复请求
unstableExpiry Unstable // 引入过期时间的抖动,避免缓存同时过期
timingWheel *TimingWheel // 时间轮,用于管理缓存的过期
stats *cacheStat // 用于统计缓存命中率
}
一大堆东西眼花缭乱,不过不用担心,后面我们会逐个解析。现在你只需要关注name、lock、data、expire这几个成员。
顾名思义,name就是对应缓存的名称;data是存储缓存的map;expire是缓存过期时间;lock是缓存使用中用到的锁,很好理解,当高并发的情况下,必然涉及到竞争,加个锁来保证安全。
好了,现在你可以很轻易的实现本地缓存的增删改查,无非就是操作map,操作时上个锁罢了。不过,这只是个模型的雏形,接下来才是重点。
2. 内存溢出问题
首先,我们来考虑第一个问题:如果设置了大量的缓存数据导致内存溢出该怎么办?
比较直观的一个想法就是 给缓存设置固定的大小,当数据量超过缓存容量时就淘汰掉某个历史数据。
没错,这就是缓存淘汰机制,常见的策略有很多,本文使用的是LRU。
2.1. LRU
所谓LRU指的是淘汰最近最少使用的数据
什么意思呢,假设缓存容量为3,而此时你依次插入了1、2、3三条数据,接下来要插入4了,那么将把1淘汰掉换成4,因为1是最久未使用的数据
再比如,还是依次插入了1、2、3三条数据,这时查询了1,那么1就变成了最近最多使用的数据了,接下来再要淘汰数据就会淘汰2了
2.2. LRU实现
关于LRU的实现原理其实很简单,一个双向链表和一个map再加一个容量参数就可解决
- 双向链表:管理数据的先后顺序,链表头的数据表示最近最常使用的数据,当插入和查询数据时都将相应的数据插入或移动到链表头;而删除数据时从链表尾删除
- map:用于存储数据,维护映射关系,保证数据查询时可以以O(1)的时间复杂度查到
- 容量参数:限制LRU的最大容量
2.3. 框架实现
接下来我们看框架源码
上述数据结构中有一个LRU类型的LruCache,这就是其接口了
package localcache
import "container/list"
// LRU缓存
type LRU interface {
Add(key string)
Remove(key string)
}
// 占位用,表示不起用LRU模型
type emptyLRU struct{}
func (e emptyLRU) Add(key string) {}
func (e emptyLRU) Remove(key string) {}
// 关键LRU缓存
type KeyLRU struct {
limit int // 最大容量
evicts *list.List // 双向链表
elements map[string]*list.Element // key-value映射
onEvict func(key string) // 回调函数
}
func NewKeyLRU(limit int, onEvict func(key string)) *KeyLRU {
return &KeyLRU{
limit: limit,
evicts: list.New(),
elements: make(map[string]*list.Element),
onEvict: onEvict,
}
}
func (k *KeyLRU) Add(key string) {
// 1. 若key已存在,则将其移动到链表头部
if elem, ok := k.elements[key]; ok {
k.evicts.MoveToFront(elem)
return
}
// 2. 若key不存在,则将其添加到链表头部,若加入后超过最大容量,则将链表尾的元素删除
elem := k.evicts.PushFront(key)
k.elements[key] = elem
if k.evicts.Len() > k.limit {
k.removeOldest()
}
}
func (k *KeyLRU) removeOldest() {
elem := k.evicts.Back()
if elem != nil {
k.removeElement(elem)
}
}
func (k *KeyLRU) removeElement(elem *list.Element) {
k.evicts.Remove(elem)
key := elem.Value.(string)
delete(k.elements, key)
k.onEvict(key)
}
func (k *KeyLRU) Remove(key string) {
if elem, ok := k.elements[key]; ok {
k.removeElement(elem)
}
}
首先LRU的接口中定义了Add和Remove两个方法
为啥没有Get呢?因为LRU我们这里用他就是用来管理缓存溢出问题的,并不是用来保存和查询数据的,所以无需Get
接下来我们定义了两个接口实现的结构体:
- emptyLRU:空LRU,表示不起用LRU模型
- KeyLRU:关键LRU,这个才是真正核心的LRU模型
好,我们重点看一下KeyLRU的实现,包含了4个参数,除了我们上述提及的双向链表、map和容量参数外,还有一个回调函数
这个回调函数是干嘛的呢?
实际上,在LRU淘汰数据时,我们整个本地缓存模型还需要执行其他操作(这些后面会讲到),由于这些操作涉及到了其他模块,所以干脆搞一个回调函数,让本地缓存模型把这些操作写在回调函数里统一管理。
2.3.1. 构造函数
没啥可说的,几个构建操作
2.3.2. 插入操作
插入的逻辑如下:
- 判断key是否已存在,若已存在则直接将该数据挪到双向链表的头部
- 若不存在,则先将key从头部插进去,map也将之保存,然后看此时的双向链表数据量是否已超过容量参数,如果超过了,就将链表尾的数据删了,map中也把这条数据删了,然后调用回调函数执行其他模块的操作
2.3.3. 删除操作
了解了插入操作,我们再看删除操作就简单了
如果map中存在要删除的key,那么就按上面的删除尾结点同样的操作把这条数据删了就ok
OK,现在你已经有了LRU来保证你的内存不会溢出,只要在执行本地缓存插入和删除操作时对LRU进行相应的操作就可以了!
3. 缓存过期问题
每一条缓存数据都可能存在一个过期时间,当过期时间到达时我们需要将该数据删除,但是这并不好实现。
假设我们在map中插入一条数据后为其起一个定时器,等到过期时间一到就删除他。这么做看似合理,但如果数据量很大,且同一时间大量的缓存未过期就意味着需要大量的协程去起定时器。这样既会造成较大的内存压力也不方便管理。
于是我们很自然地想到是否可以构建一个高效的定时任务管理模型来统一处理缓存过期问题
这个模型需要满足以下几个条件:
- 以较低的时间复杂度进行定时任务的插入和删除
- 支持插入大规模的任务
- 高并发场景下避免频繁的锁竞争
- 内存占用不宜过高
- 易于实现和扩展
能够满足上述条件的常见模型叫做“时间轮”
3.1. 时间轮模型
我们从图出发,由图可见,时间轮就是一个随时间转动的大转盘,还有一个固定位置的指针。而转盘上有着一个一个的槽位(图中0、1、2…),槽位之间有着固定的时间间隔,定时任务就分布在这些槽上。当转盘转到了哪个槽,那么这个槽上的任务就开始执行。
精彩的来了,每个槽上都是一个双向链表,同一槽位的任务就拴在这个链表上,当转盘转到这个槽位时,就从前向后遍历这个链表,依次执行可执行的任务。
这个时候您可能要问了
为什么时间轮一定要设计成一个环形?
很简单,因为节约空间,当设计成环形时意味着槽位的数量是固定的,那么双向链表的数量也就是固定的。
但是这样又带来了一个新的问题:
如果槽位的数量是固定的,那么它能表示的时间范围不就固定了?
比方说,槽位之间的间隔是1s,一共有5个槽位,那时间轮不就只能表示0-5s内的任务了吗?
这个问题也很好解决,只需要引入“圈”的概念就可以了。还是上面的例子,一圈就是5s,那么对于6s后执行的任务它的槽位就是1圈1s的位置,3s的任务就是0圈3s。
由此我们只需要按照 过期时间%(槽位时间间隔*槽位数) 就可以得到需要插入的槽位位置。
当然,这个公式并不准确,因为它是按0槽位作为起点的,但实际我们应该以当前指针指向的槽位(代表当前时间)为起点,在此基础上加上你的过期时间,于是公式变成了:
插入的槽位 = (当前槽位 + 过期时间/槽位时间间隔)%槽位数
圈数 = (过期时间/槽位时间间隔 - 1)/槽位数
值得一提的是,对于双向链表中的每一条任务我们都需要维护其圈数,如果转盘转过来了,但发现你的圈数大于0,说明你至少还要再等待一轮才能执行。
时间轮为啥高效?
最后我们再来看看时间轮的时间复杂度和空间复杂度问题
- 首先时间轮的槽位固定,决定了它并不怎么占用空间,您可能会说槽位上不是有双向链表吗,怎么就不占用空间了。但细细想来,当任务逐渐被执行,双向链表一直在流动着插入删除,但槽位却是固定占用一定的空间,即便某个槽位上没有数据,也占着一定的空间。所以这才是问题的关键。
- 再来看时间复杂度问题。
- 由于槽位固定,本质上就是个定长数组,所以每个槽位的执行就是根据当前指针找到对应槽位而已,O(1)时间复杂度。
- 而执行就更简单了,遍历双向链表,从前到后执行。执行完立马删了,O(1)时间复杂度
- 再看插入和删除。根据我们上面提到的插入的槽位和圈数的计算公式,可能轻松找到待插入的槽位索引,然后插到链表尾就完事了,O(1)时间复杂度。而删除也很容易,我们只需要给待删除的任务打一下标记,当要执行的时候发现存在这个标记就直接把它删了,也是O(1)时间复杂度
查找问题
可能您也发现了,无论是插入还是删除任务都离不开查找任务是否存在的操作,但如果按照时间轮现有模型去查找岂不是要遍历每个槽位和双向链表?
于是我们很容易地想到在时间轮之外我们还需要维护一个map去方便查找,不过这个map一定是并发安全的。
3.2. 框架实现
了解了时间轮的设计思想,接下来看框架的具体实现
对应上述数据结构中的TimingWheel
package localcache
import (
"container/list"
"errors"
"fmt"
"time"
)
var (
ErrArgument = errors.New("incorrect task argument")
ErrClosed = errors.New("TimingWheel is closed already")
)
// 时间轮,用于管理和调度本地缓存过期任务
type TimingWheel struct {
interval time.Duration // 每个slot的时间间隔
ticker Ticker // 定时器,用于驱动时间轮的移动
slots []*list.List // 时间轮的槽,每个槽位存储一个任务链表
timers *SafeMap // 线程安全的映射,用于跟踪和管理所有定时任务
tickedPos int // 当前时间轮指针的位置
numSlots int // 时间轮的总槽数
execute Execute // 执行任务函数
// 接收不同类型任务操作的通道
setChannel chan timingEntry
moveChannel chan baseEntry
removeChannel chan any
stopChannel chan PlaceholderType
}
// 定时任务基本信息
type baseEntry struct {
delay time.Duration
key any
}
// 表示一个定时任务
type timingEntry struct {
baseEntry
value any
circle int // 剩余圈数
diff int
removed bool // 是否被移除
}
// 表示任务在时间轮中的位置和状态
type positionEntry struct {
pos int // 槽位置
item *timingEntry // 任务
}
type timingTask struct {
key any
value any
}
// 定义一个执行任务的方法
type Execute func(key, value any)
func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
if interval <= 0 || numSlots <= 0 || execute == nil {
return nil, fmt.Errorf("interval: %v, slots: %v, execute: %p",
interval, numSlots, execute)
}
return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))
}
func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {
tw := &TimingWheel{
interval: interval,
ticker: ticker,
slots: make([]*list.List, numSlots),
timers: NewSafeMap(),
execute: execute,
numSlots: numSlots,
setChannel: make(chan timingEntry),
moveChannel: make(chan baseEntry),
removeChannel: make(chan any),
stopChannel: make(chan PlaceholderType),
}
tw.initSlots()
go tw.run()
return tw, nil
}
func (tw *TimingWheel) initSlots() {
for i := 0; i < tw.numSlots; i++ {
tw.slots[i] = list.New()
}
}
// 借助select机制实际执行时间轮各任务
func (tw *TimingWheel) run() {
for {
select {
case <-tw.ticker.Chan():
tw.onTicker()
case task := <-tw.setChannel:
tw.setTask(&task)
case key := <-tw.removeChannel:
tw.removeTask(key)
case task := <-tw.moveChannel:
tw.moveTask(task)
case <-tw.stopChannel:
tw.ticker.Stop()
return
}
}
}
func (tw *TimingWheel) SetTimer(key, value any, delay time.Duration) error {
if delay < 0 || key == nil {
return ErrArgument
}
select {
case tw.setChannel <- timingEntry{
baseEntry: baseEntry{
delay: delay,
key: key,
},
value: value,
}:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}
// 移动任务到指定delay的位置
func (tw *TimingWheel) MoveTimer(key any, delay time.Duration) error {
if delay <= 0 || key == nil {
return ErrArgument
}
select {
case tw.moveChannel <- baseEntry{
delay: delay,
key: key,
}:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}
func (tw *TimingWheel) RemoveTimer(key any) error {
if key == nil {
return ErrArgument
}
select {
case tw.removeChannel <- key:
return nil
case <-tw.stopChannel:
return ErrClosed
}
}
// ================================== 实际执行任务方法 ==================================
// 执行时间轮任务
func (tw *TimingWheel) onTicker() {
// 找到执行槽位,挂在其任务链表上执行
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
l := tw.slots[tw.tickedPos]
tw.scanAndRunTasks(l)
}
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
var tasks []timingTask
for e := l.Front(); e != nil; {
task := e.Value.(*timingEntry)
if task.removed {
next := e.Next()
l.Remove(e)
e = next
continue
} else if task.circle > 0 {
task.circle--
e = e.Next()
continue
} else if task.diff > 0 {
next := e.Next()
l.Remove(e)
pos := (tw.tickedPos + task.diff) % tw.numSlots
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
task.diff = 0
e = next
continue
}
tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}
tw.runTasks(tasks)
}
func (tw *TimingWheel) runTasks(tasks []timingTask) {
if len(tasks) == 0 {
return
}
go func() {
for i := range tasks {
RunSafe(func() {
tw.execute(tasks[i].key, tasks[i].value)
})
}
}()
}
func (tw *TimingWheel) setTask(task *timingEntry) {
if task.delay < tw.interval {
task.delay = tw.interval
}
if val, ok := tw.timers.Get(task.key); ok {
// 任务已存在,更新任务的值并移动到指定delay的位置
entry := val.(*positionEntry)
entry.item.value = task.value
tw.moveTask(task.baseEntry)
} else {
pos, circle := tw.getPositionAndCircle(task.delay)
task.circle = circle
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
}
}
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {
steps := int(d / tw.interval)
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots
return
}
func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
if val, ok := tw.timers.Get(task.key); ok {
timer := val.(*positionEntry)
timer.item = task
timer.pos = pos
} else {
tw.timers.Set(task.key, &positionEntry{
pos: pos,
item: task,
})
}
}
func (tw *TimingWheel) moveTask(task baseEntry) {
val, ok := tw.timers.Get(task.key)
if !ok {
return
}
timer := val.(*positionEntry)
if task.delay < tw.interval {
GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
return
}
pos, circle := tw.getPositionAndCircle(task.delay)
if pos >= timer.pos {
timer.item.circle = circle
timer.item.diff = pos - timer.pos
} else if circle > 0 {
circle--
timer.item.circle = circle
timer.item.diff = tw.numSlots + pos - timer.pos
} else {
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}
func (tw *TimingWheel) removeTask(key any) {
val, ok := tw.timers.Get(key)
if !ok {
return
}
timer := val.(*positionEntry)
timer.item.removed = true
tw.timers.Del(key)
}
我们一点点来剖析,先看数据结构,包含了:
- interval:slot时间间隔
- ticker:定时器,驱动时间轮转动
- slots:槽位,每个槽位是一个双向链表
- timers:并发安全的map
- tickedPos:时间轮指针
- numSlots:总槽位数
- execute:任务执行函数
有了上述原理的基础,这里的大部分参数应该都好理解。需要额外说明的是为什么有execute这个东西。
其实和LRU中的回调函数一样,都是把具体的操作放在本地缓存模型中统一管理。
接下来注意到setChannel、moveChannel、removeChannel、
stopChannel,他们分别表示设置、移动、删除、停止操作的通道。为什么有这几个东西呢?这就涉及到时间轮的事件处理机制。
不理解没关系,我们看下代码
3.2.1. 事件处理机制
从构造函数入手,我们发现最后通过起了一个协程执行run()
func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {
......
go tw.run()
......
}
// 借助select机制实际执行时间轮各任务
func (tw *TimingWheel) run() {
for {
select {
case <-tw.ticker.Chan():
tw.onTicker()
case task := <-tw.setChannel:
tw.setTask(&task)
case key := <-tw.removeChannel:
tw.removeTask(key)
case task := <-tw.moveChannel:
tw.moveTask(task)
case <-tw.stopChannel:
tw.ticker.Stop()
return
}
}
}
可以看到,run方法在监听各个操作通道,当通道有数据被取出时就调用相应的执行函数去执行。同时也启动定时器转动时间轮。
为什么这么设计呢?
- 这就涉及到时间轮事件驱动的理念,通过channel可以将插入、删除这些操作都看做一个个事件,逐一处理。
- 所有核心逻辑都整合在一个run方法中去执行,减少了代码的耦合性
- 这种方式天然就具备了线程安全,如果没有它,那么每个操作势必要上锁解锁
- 再往深了想,这种方式进一步扩展,你可以为channel添加缓冲,可以很方便的扩展更多的事件
接下来我们从run方法出发,看各个事件是咋处理的
3.2.2. 时间轮转动
首先,我们先说构造函数里的定时器
return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))
明白了吧,就是起了一个时间间隔为槽位时间间隔的Ticker
我们再看核心的onTicker方法
// 执行时间轮任务
func (tw *TimingWheel) onTicker() {
// 找到执行槽位,挂在其任务链表上执行
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
l := tw.slots[tw.tickedPos]
tw.scanAndRunTasks(l)
}
很好理解,每次Ticker触发的时候,时间轮转动一个槽位,考虑到是个环,所以本次触发的槽位就是:
执行槽位 = (上次的槽位 + 1)%总槽位数
找到执行槽位后,再看scanAndRunTasks方法
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
var tasks []timingTask
for e := l.Front(); e != nil; {
task := e.Value.(*timingEntry)
if task.removed {
next := e.Next()
l.Remove(e)
e = next
continue
} else if task.circle > 0 {
task.circle--
e = e.Next()
continue
} else if task.diff > 0 {
next := e.Next()
l.Remove(e)
pos := (tw.tickedPos + task.diff) % tw.numSlots
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
task.diff = 0
e = next
continue
}
tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}
tw.runTasks(tasks)
}
可以看到,本质就是从前向后遍历双向链表
- 先看是不是被标记删除了,如果是直接干掉它
- 再看圈数是不是大于1,如果是说明还没到它,跳过
- 再看是否需要延迟执行,如果是就把他扔到该去的槽里(这个后面还会讲到)
- 最后把需要执行的任务统一放到runTasks中执行execute函数
那么execute函数到底是个啥?
好的,我提前满足你的好奇心,我们来看本地缓存构造函数
func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {
......
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
// 缓存过期,直接删除
key, ok := k.(string)
if !ok {
return
}
cache.Del(key)
})
if err != nil {
return nil, err
}
cache.timingWheel = timingWheel
return cache, nil
}
明白了吧,其实就是直接把缓存删掉,当然这里面会存在很多操作,我们后面再说
3.2.3. 插入任务
func (tw *TimingWheel) setTask(task *timingEntry) {
if task.delay < tw.interval {
task.delay = tw.interval
}
if val, ok := tw.timers.Get(task.key); ok {
// 任务已存在,更新任务的值并移动到指定delay的位置
entry := val.(*positionEntry)
entry.item.value = task.value
tw.moveTask(task.baseEntry)
} else {
pos, circle := tw.getPositionAndCircle(task.delay)
task.circle = circle
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
}
}
逻辑就是先看任务是否存在,存在就更新其过期时间,也就是移到其他槽位去,不存在就找到其槽位尾插进去。
getPositionAndCircle就是根据上文提到的计算公式计算待插入的槽位和圈数
3.2.4. 移动任务
func (tw *TimingWheel) moveTask(task baseEntry) {
val, ok := tw.timers.Get(task.key)
if !ok {
return
}
timer := val.(*positionEntry)
if task.delay < tw.interval {
GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
return
}
pos, circle := tw.getPositionAndCircle(task.delay)
if pos >= timer.pos {
timer.item.circle = circle
timer.item.diff = pos - timer.pos
} else if circle > 0 {
circle--
timer.item.circle = circle
timer.item.diff = tw.numSlots + pos - timer.pos
} else {
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}
大致的逻辑是:
- 先查到任务
- 如果任务的过期时间比槽位时间间隔还短,那就没必要再移动了,直接执行就完事了
- 计算需要移动到的槽位和圈数,将需要移动的diff记录下来,在时间轮转动的时候移走(呼应前文)
3.2.5. 删除任务
func (tw *TimingWheel) removeTask(key any) {
val, ok := tw.timers.Get(key)
if !ok {
return
}
timer := val.(*positionEntry)
timer.item.removed = true
tw.timers.Del(key)
}
这里就是打个标记
3.2.6. 并发安全的map
最后我们来说说模型中并发安全的map,也就是SafeMap
这是框架自己定义的,我就不细说了,有兴趣的自己去看源码。
简单来说就是设计了两个map,一新一旧,记录各自删了多少数据。如果一个删超标了,就用另一个覆盖它,然后自己重置(清空)
终于说完时间轮了!
4. 缓存雪崩问题
所谓缓存雪崩是指缓存的数据在某个时刻大面积过期,从而导致系统资源的集中消耗或性能瓶颈。
那么框架式怎么解决的呢?
引入了随机偏移量抖动。简单来说就是在输入的过期时间基础上按一定的偏移量随机偏移,避免大量缓存的过期时间一致,导致集中过期。
4.1. 实现
对应数据结构中的unstableExpiry,我们来看其结构Unstable
package localcache
import (
"math/rand"
"sync"
"time"
)
// 过期时间抖动
type Unstable struct {
deviation float64 // 抖动阈值
r *rand.Rand
lock *sync.Mutex
}
func NewUnstable(deviation float64) Unstable {
if deviation < 0 {
deviation = 0
}
if deviation > 1 {
deviation = 1
}
return Unstable{
deviation: deviation,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
}
}
// 抖动时间
// 生成一个在[1-u.deviation,1+u.deviation]之间的随机因子
func (u Unstable) AroundDuration(base time.Duration) time.Duration {
u.lock.Lock()
val := time.Duration((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))
u.lock.Unlock()
return val
}
// 抖动整数
// 生成一个在[1-u.deviation,1+u.deviation]之间的随机因子
func (u Unstable) AroundInt(base int64) int64 {
u.lock.Lock()
val := int64((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))
u.lock.Unlock()
return val
}
逻辑还是比较清晰的,就是经过计算使最终的过期时间保持在
[(1-deviation)*原过期时间, (1+deviation)*原过期时间]
5. 缓存击穿问题
所谓缓存击穿是指在某一时间有大量请求打过来,而恰巧此时一些热点数据被集中清除,那么在缓存+DB的架构中请求就全部穿过了缓存落到DB上,造成压力骤增。
因此我们希望当缓存失效时只有一个请求去加载数据,其他请求等待
Go语言中singleflight(单飞模式)可以完美满足我们的需求
singleflight 是 golang.org/x/sync/singleflight 包提供的一个功能模块,其内部使用了一个映射(map)来跟踪正在进行的请求,当一个新的请求到来时,
singleflight会检查是否已经有相同键的请求正在进行:
- 如果存在:新的请求会等待已在进行中的请求完成,并共享其结果
- 如果不存在:singleflight会执行该请求,并将结果缓存起来供后续相同键的请求使用
其使用也非常简单,下面是一个示例:
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
// 模拟从数据库加载数据的函数
func loadDataFromDB(key string) (string, error) {
// 模拟延迟
time.Sleep(2 * time.Second)
return fmt.Sprintf("data for %s", key), nil
}
func main() {
var wg sync.WaitGroup
sfGroup := singleflight.Group{}
// 模拟多个并发请求同一个key
key := "hot_key"
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 使用 Do 方法确保只有一个请求会执行 loadDataFromDB
v, err, shared := sfGroup.Do(key, func() (interface{}, error) {
return loadDataFromDB(key)
})
if err != nil {
fmt.Printf("Goroutine %d: error loading data: %v\n", id, err)
return
}
fmt.Printf("Goroutine %d: got data: %s (shared: %v)\n", id, v, shared)
}(i)
}
wg.Wait()
}
6. 缓存命中率统计
这个其实很好处理,每次查询缓存的时候做一个计数。缓存命中了就给命中数+1,未命中就给未命中数+1,每隔一段时间就用 命中数/(命中数+未命中数) 计算出命中率,打印到日志即可。
来看代码实现
package localcache
import (
"sync/atomic"
"time"
"e.coding.net/xverse-git/public/go_common/logger"
)
const statInterval = time.Minute
// 缓存命中率统计模块
type cacheStat struct {
name string // 缓存名称,标识统计信息
hit uint64 // 缓存命中次数
miss uint64 // 缓存未命中次数
sizeCallback func() int // 回调函数,用于动态获取缓存的大小
}
func newCacheStat(name string, sizeCallback func() int) *cacheStat {
st := &cacheStat{
name: name,
sizeCallback: sizeCallback,
}
go st.statLoop()
return st
}
// 开启定时任务进行统计
func (cs *cacheStat) statLoop() {
ticker := time.NewTicker(statInterval)
defer ticker.Stop()
for range ticker.C {
hit := atomic.SwapUint64(&cs.hit, 0)
miss := atomic.SwapUint64(&cs.miss, 0)
total := hit + miss
if total == 0 {
continue
}
percent := 100 * float32(hit) / float32(total)
logger.Infof("cache(%s) - qpm: %d, hit_ratio: %.1f%%, elements: %d, hit: %d, miss: %d",
cs.name, total, percent, cs.sizeCallback(), hit, miss)
}
}
func (cs *cacheStat) IncrementHit() {
atomic.AddUint64(&cs.hit, 1)
}
func (cs *cacheStat) IncrementMiss() {
atomic.AddUint64(&cs.miss, 1)
}
很直观,也很好理解,就不去赘述了。额外提一嘴的是命中数和未命中数的+1操作由于是并发情况下的变更,因此是原子操作,所以通过 atomic.AddUint64 的方式。
7. 本地缓存各操作
本地缓存的各个设计要点和模块终于讲完了,现在我们可以回过头来看其各操作的实现了。
7.1. 构造函数
type CacheOption func(*LocalCache)
func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {
cache := &LocalCache{
data: make(map[string]any),
expire: expire,
lruCache: emptyLRU{},
barrier: singleflight.Group{},
unstableExpiry: NewUnstable(expiryDeviation),
}
for _, opt := range opts {
opt(cache)
}
if len(cache.name) == 0 {
cache.name = defaultCacheName
}
cache.stats = newCacheStat(cache.name, cache.size)
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
// 缓存过期,直接删除
key, ok := k.(string)
if !ok {
return
}
cache.Del(key)
})
if err != nil {
return nil, err
}
cache.timingWheel = timingWheel
return cache, nil
}
func WithName(name string) CacheOption {
return func(cache *LocalCache) {
cache.name = name
}
}
// 设置最大容量
func WithLimit(limit int) CacheOption {
return func(cache *LocalCache) {
if limit > 0 {
cache.lruCache = NewKeyLRU(limit, cache.onEvict)
}
}
}
func (lc *LocalCache) onEvict(key string) {
delete(lc.data, key)
lc.timingWheel.RemoveTimer(key)
}
这里主要是对各模块的一些初始化操作,包括定义时间轮的回调函数等。值得一提的是这里使用了函数式编程,使用opts将参数注入构造函数中。
这其实是一种很常见的默认值注入方法。当方法的部分参数不一定注入实参的时候,由于Go不像Python那样拥有默认值机制,所以往往采用这种函数式编程的方式注入。
在使用时代码如下:
cache, err := NewLocalCache(time.Second*2, WithName("any"))
当限制缓存数量,即设置WithLimit时才使用KeyLRU去控制。LRU的回调函数是在删除缓存时触发,对应onEvict方法,内容也很简单,删除map对应的key和时间轮删除
7.2. 查询缓存
func (lc *LocalCache) Get(key string) (any, bool) {
value, ok := lc.doGet(key)
if ok {
lc.stats.IncrementHit()
} else {
lc.stats.IncrementMiss()
}
return value, ok
}
func (lc *LocalCache) doGet(key string) (any, bool) {
lc.lock.Lock()
defer lc.lock.Unlock()
value, ok := lc.data[key]
if ok {
lc.lruCache.Add(key)
}
return value, ok
}
这里没什么好说的,包含了命中率的计数和LRU的添加(移动到链表头结点)
7.3. 添加缓存
func (lc *LocalCache) Set(key string, value any) {
lc.SetWithExpire(key, value, lc.expire)
}
//nolint:errcheck // 不做检查
func (lc *LocalCache) SetWithExpire(key string, value any, expire time.Duration) {
lc.lock.Lock()
_, ok := lc.data[key]
lc.data[key] = value
lc.lruCache.Add(key)
lc.lock.Unlock()
expiry := lc.unstableExpiry.AroundDuration(expire) // 过期时间抖动处理
if ok {
lc.timingWheel.MoveTimer(key, expiry)
} else {
lc.timingWheel.SetTimer(key, value, expiry)
}
}
当添加缓存时先进行LRU的添加,数据的保存。然后对过期时间进行抖动处理,然后将任务添加进时间轮,由时间轮转到到过期时间的槽位时调用回调函数删除该缓存。
7.4. 删除缓存
func (lc *LocalCache) Del(key string) {
lc.lock.Lock()
delete(lc.data, key)
lc.lruCache.Remove(key)
lc.lock.Unlock()
lc.timingWheel.RemoveTimer(key)
}
这里涉及map的删除、LRU的删除和时间轮的删除。时间轮执行时调用的回调函数里就是这个方法。
7.5. 获取缓存
func (lc *LocalCache) Take(key string, fetch func() (any, error)) (any, error) {
if val, ok := lc.doGet(key); ok {
lc.stats.IncrementHit()
return val, nil
}
var fresh bool
val, err, _ := lc.barrier.Do(key, func() (any, error) {
if val, ok := lc.doGet(key); ok {
return val, nil
}
v, e := fetch()
if e != nil {
return nil, e
}
fresh = true
lc.Set(key, v)
return v, nil
})
if err != nil {
return nil, err
}
if fresh {
lc.stats.IncrementMiss()
return val, nil
}
lc.stats.IncrementHit()
return val, nil
}
这个方法相对特殊,它允许当缓存不存在时根据用户注入的fetch函数的逻辑去获取并设置缓存。
当然,这里的设置是需要借助singleflight逻辑的。同样,也涉及命中率的计数。
示例类似这样:
cache.Take("first", func() (any, error) {
time.Sleep(time.Millisecond * 100)
return "first element", nil
})
OK,至此本地缓存全部讲完。完结,撒花!