Go本地缓存设计与实现

本地缓存是一个项目中很常见的组件。在很多人的眼中就是一个简单的key-value的map存储即可实现,但实际上,设计一个本地缓存需要考虑的问题远比你想象的多,比如说,本地缓存是将数据存储在内存,若数据量激增突破了内存限制怎么办?如何高效地管理本地缓存,以最低的时间复杂度添加和删除缓存都是值得思考的问题

本文以第一视角带你走进go-zero框架,从源码的角度解析本地缓存的设计与实现

1. 数据结构

我们先来看本地缓存的数据结构

// 本地缓存对象
type LocalCache struct {
	name           string
	lock           sync.Mutex
	data           map[string]any     // 字典,用于索引数据
	expire         time.Duration      // 缓存过期时间
	lruCache       LRU                // LRU缓存,用于限制内存大小
	barrier        singleflight.Group // 单飞模式,防止重复请求
	unstableExpiry Unstable           // 引入过期时间的抖动,避免缓存同时过期
	timingWheel    *TimingWheel       // 时间轮,用于管理缓存的过期
	stats          *cacheStat         // 用于统计缓存命中率
}

一大堆东西眼花缭乱,不过不用担心,后面我们会逐个解析。现在你只需要关注name、lock、data、expire这几个成员。
顾名思义,name就是对应缓存的名称;data是存储缓存的map;expire是缓存过期时间;lock是缓存使用中用到的锁,很好理解,当高并发的情况下,必然涉及到竞争,加个锁来保证安全。
好了,现在你可以很轻易的实现本地缓存的增删改查,无非就是操作map,操作时上个锁罢了。不过,这只是个模型的雏形,接下来才是重点。

2. 内存溢出问题

首先,我们来考虑第一个问题:如果设置了大量的缓存数据导致内存溢出该怎么办?
比较直观的一个想法就是 给缓存设置固定的大小,当数据量超过缓存容量时就淘汰掉某个历史数据。
没错,这就是缓存淘汰机制,常见的策略有很多,本文使用的是LRU。

2.1. LRU

所谓LRU指的是淘汰最近最少使用的数据
什么意思呢,假设缓存容量为3,而此时你依次插入了1、2、3三条数据,接下来要插入4了,那么将把1淘汰掉换成4,因为1是最久未使用的数据
再比如,还是依次插入了1、2、3三条数据,这时查询了1,那么1就变成了最近最多使用的数据了,接下来再要淘汰数据就会淘汰2了

2.2. LRU实现

关于LRU的实现原理其实很简单,一个双向链表和一个map再加一个容量参数就可解决

  • 双向链表:管理数据的先后顺序,链表头的数据表示最近最常使用的数据,当插入和查询数据时都将相应的数据插入或移动到链表头;而删除数据时从链表尾删除
  • map:用于存储数据,维护映射关系,保证数据查询时可以以O(1)的时间复杂度查到
  • 容量参数:限制LRU的最大容量

2.3. 框架实现

接下来我们看框架源码
上述数据结构中有一个LRU类型的LruCache,这就是其接口了

package localcache

import "container/list"

// LRU缓存
type LRU interface {
	Add(key string)
	Remove(key string)
}

// 占位用,表示不起用LRU模型
type emptyLRU struct{}

func (e emptyLRU) Add(key string) {}

func (e emptyLRU) Remove(key string) {}

// 关键LRU缓存
type KeyLRU struct {
	limit    int                      // 最大容量
	evicts   *list.List               // 双向链表
	elements map[string]*list.Element // key-value映射
	onEvict  func(key string)         // 回调函数
}

func NewKeyLRU(limit int, onEvict func(key string)) *KeyLRU {
	return &KeyLRU{
		limit:    limit,
		evicts:   list.New(),
		elements: make(map[string]*list.Element),
		onEvict:  onEvict,
	}
}

func (k *KeyLRU) Add(key string) {
	// 1. 若key已存在,则将其移动到链表头部
	if elem, ok := k.elements[key]; ok {
		k.evicts.MoveToFront(elem)
		return
	}

	// 2. 若key不存在,则将其添加到链表头部,若加入后超过最大容量,则将链表尾的元素删除
	elem := k.evicts.PushFront(key)
	k.elements[key] = elem

	if k.evicts.Len() > k.limit {
		k.removeOldest()
	}
}

func (k *KeyLRU) removeOldest() {
	elem := k.evicts.Back()
	if elem != nil {
		k.removeElement(elem)
	}
}

func (k *KeyLRU) removeElement(elem *list.Element) {
	k.evicts.Remove(elem)
	key := elem.Value.(string)
	delete(k.elements, key)
	k.onEvict(key)
}

func (k *KeyLRU) Remove(key string) {
	if elem, ok := k.elements[key]; ok {
		k.removeElement(elem)
	}
}

首先LRU的接口中定义了Add和Remove两个方法
为啥没有Get呢?因为LRU我们这里用他就是用来管理缓存溢出问题的,并不是用来保存和查询数据的,所以无需Get
接下来我们定义了两个接口实现的结构体:

  • emptyLRU:空LRU,表示不起用LRU模型
  • KeyLRU:关键LRU,这个才是真正核心的LRU模型

好,我们重点看一下KeyLRU的实现,包含了4个参数,除了我们上述提及的双向链表、map和容量参数外,还有一个回调函数
这个回调函数是干嘛的呢?
实际上,在LRU淘汰数据时,我们整个本地缓存模型还需要执行其他操作(这些后面会讲到),由于这些操作涉及到了其他模块,所以干脆搞一个回调函数,让本地缓存模型把这些操作写在回调函数里统一管理。

2.3.1. 构造函数

没啥可说的,几个构建操作

2.3.2. 插入操作

插入的逻辑如下:

  1. 判断key是否已存在,若已存在则直接将该数据挪到双向链表的头部
  2. 若不存在,则先将key从头部插进去,map也将之保存,然后看此时的双向链表数据量是否已超过容量参数,如果超过了,就将链表尾的数据删了,map中也把这条数据删了,然后调用回调函数执行其他模块的操作

2.3.3. 删除操作

了解了插入操作,我们再看删除操作就简单了
如果map中存在要删除的key,那么就按上面的删除尾结点同样的操作把这条数据删了就ok

OK,现在你已经有了LRU来保证你的内存不会溢出,只要在执行本地缓存插入和删除操作时对LRU进行相应的操作就可以了!

3. 缓存过期问题

每一条缓存数据都可能存在一个过期时间,当过期时间到达时我们需要将该数据删除,但是这并不好实现。
假设我们在map中插入一条数据后为其起一个定时器,等到过期时间一到就删除他。这么做看似合理,但如果数据量很大,且同一时间大量的缓存未过期就意味着需要大量的协程去起定时器。这样既会造成较大的内存压力也不方便管理。
于是我们很自然地想到是否可以构建一个高效的定时任务管理模型来统一处理缓存过期问题

这个模型需要满足以下几个条件:

  1. 以较低的时间复杂度进行定时任务的插入和删除
  2. 支持插入大规模的任务
  3. 高并发场景下避免频繁的锁竞争
  4. 内存占用不宜过高
  5. 易于实现和扩展

能够满足上述条件的常见模型叫做“时间轮”

3.1. 时间轮模型

Alt
我们从图出发,由图可见,时间轮就是一个随时间转动的大转盘,还有一个固定位置的指针。而转盘上有着一个一个的槽位(图中0、1、2…),槽位之间有着固定的时间间隔,定时任务就分布在这些槽上。当转盘转到了哪个槽,那么这个槽上的任务就开始执行。
精彩的来了,每个槽上都是一个双向链表,同一槽位的任务就拴在这个链表上,当转盘转到这个槽位时,就从前向后遍历这个链表,依次执行可执行的任务。

这个时候您可能要问了
为什么时间轮一定要设计成一个环形?
很简单,因为节约空间,当设计成环形时意味着槽位的数量是固定的,那么双向链表的数量也就是固定的。

但是这样又带来了一个新的问题:
如果槽位的数量是固定的,那么它能表示的时间范围不就固定了?
比方说,槽位之间的间隔是1s,一共有5个槽位,那时间轮不就只能表示0-5s内的任务了吗?
这个问题也很好解决,只需要引入“圈”的概念就可以了。还是上面的例子,一圈就是5s,那么对于6s后执行的任务它的槽位就是1圈1s的位置,3s的任务就是0圈3s。
由此我们只需要按照 过期时间%(槽位时间间隔*槽位数) 就可以得到需要插入的槽位位置。
当然,这个公式并不准确,因为它是按0槽位作为起点的,但实际我们应该以当前指针指向的槽位(代表当前时间)为起点,在此基础上加上你的过期时间,于是公式变成了:

插入的槽位 = (当前槽位 + 过期时间/槽位时间间隔)%槽位数
圈数 = (过期时间/槽位时间间隔 - 1)/槽位数

值得一提的是,对于双向链表中的每一条任务我们都需要维护其圈数,如果转盘转过来了,但发现你的圈数大于0,说明你至少还要再等待一轮才能执行。

时间轮为啥高效?
最后我们再来看看时间轮的时间复杂度和空间复杂度问题

  • 首先时间轮的槽位固定,决定了它并不怎么占用空间,您可能会说槽位上不是有双向链表吗,怎么就不占用空间了。但细细想来,当任务逐渐被执行,双向链表一直在流动着插入删除,但槽位却是固定占用一定的空间,即便某个槽位上没有数据,也占着一定的空间。所以这才是问题的关键。
  • 再来看时间复杂度问题。
    • 由于槽位固定,本质上就是个定长数组,所以每个槽位的执行就是根据当前指针找到对应槽位而已,O(1)时间复杂度。
    • 而执行就更简单了,遍历双向链表,从前到后执行。执行完立马删了,O(1)时间复杂度
    • 再看插入和删除。根据我们上面提到的插入的槽位和圈数的计算公式,可能轻松找到待插入的槽位索引,然后插到链表尾就完事了,O(1)时间复杂度。而删除也很容易,我们只需要给待删除的任务打一下标记,当要执行的时候发现存在这个标记就直接把它删了,也是O(1)时间复杂度

查找问题
可能您也发现了,无论是插入还是删除任务都离不开查找任务是否存在的操作,但如果按照时间轮现有模型去查找岂不是要遍历每个槽位和双向链表?
于是我们很容易地想到在时间轮之外我们还需要维护一个map去方便查找,不过这个map一定是并发安全的

3.2. 框架实现

了解了时间轮的设计思想,接下来看框架的具体实现
对应上述数据结构中的TimingWheel

package localcache

import (
	"container/list"
	"errors"
	"fmt"
	"time"
)

var (
	ErrArgument = errors.New("incorrect task argument")
	ErrClosed   = errors.New("TimingWheel is closed already")
)

// 时间轮,用于管理和调度本地缓存过期任务
type TimingWheel struct {
	interval  time.Duration // 每个slot的时间间隔
	ticker    Ticker        // 定时器,用于驱动时间轮的移动
	slots     []*list.List  // 时间轮的槽,每个槽位存储一个任务链表
	timers    *SafeMap      // 线程安全的映射,用于跟踪和管理所有定时任务
	tickedPos int           // 当前时间轮指针的位置
	numSlots  int           // 时间轮的总槽数
	execute   Execute       // 执行任务函数

	// 接收不同类型任务操作的通道
	setChannel    chan timingEntry
	moveChannel   chan baseEntry
	removeChannel chan any
	stopChannel   chan PlaceholderType
}

// 定时任务基本信息
type baseEntry struct {
	delay time.Duration
	key   any
}

// 表示一个定时任务
type timingEntry struct {
	baseEntry
	value   any
	circle  int // 剩余圈数
	diff    int
	removed bool // 是否被移除
}

// 表示任务在时间轮中的位置和状态
type positionEntry struct {
	pos  int          // 槽位置
	item *timingEntry // 任务
}

type timingTask struct {
	key   any
	value any
}

// 定义一个执行任务的方法
type Execute func(key, value any)

func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
	if interval <= 0 || numSlots <= 0 || execute == nil {
		return nil, fmt.Errorf("interval: %v, slots: %v, execute: %p",
			interval, numSlots, execute)
	}

	return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))
}

func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {
	tw := &TimingWheel{
		interval:      interval,
		ticker:        ticker,
		slots:         make([]*list.List, numSlots),
		timers:        NewSafeMap(),
		execute:       execute,
		numSlots:      numSlots,
		setChannel:    make(chan timingEntry),
		moveChannel:   make(chan baseEntry),
		removeChannel: make(chan any),
		stopChannel:   make(chan PlaceholderType),
	}

	tw.initSlots()
	go tw.run()

	return tw, nil
}

func (tw *TimingWheel) initSlots() {
	for i := 0; i < tw.numSlots; i++ {
		tw.slots[i] = list.New()
	}
}

// 借助select机制实际执行时间轮各任务
func (tw *TimingWheel) run() {
	for {
		select {
		case <-tw.ticker.Chan():
			tw.onTicker()
		case task := <-tw.setChannel:
			tw.setTask(&task)
		case key := <-tw.removeChannel:
			tw.removeTask(key)
		case task := <-tw.moveChannel:
			tw.moveTask(task)
		case <-tw.stopChannel:
			tw.ticker.Stop()
			return
		}
	}
}

func (tw *TimingWheel) SetTimer(key, value any, delay time.Duration) error {
	if delay < 0 || key == nil {
		return ErrArgument
	}

	select {
	case tw.setChannel <- timingEntry{
		baseEntry: baseEntry{
			delay: delay,
			key:   key,
		},
		value: value,
	}:
		return nil
	case <-tw.stopChannel:
		return ErrClosed
	}
}

// 移动任务到指定delay的位置
func (tw *TimingWheel) MoveTimer(key any, delay time.Duration) error {
	if delay <= 0 || key == nil {
		return ErrArgument
	}

	select {
	case tw.moveChannel <- baseEntry{
		delay: delay,
		key:   key,
	}:
		return nil
	case <-tw.stopChannel:
		return ErrClosed
	}
}

func (tw *TimingWheel) RemoveTimer(key any) error {
	if key == nil {
		return ErrArgument
	}

	select {
	case tw.removeChannel <- key:
		return nil
	case <-tw.stopChannel:
		return ErrClosed
	}
}

// ================================== 实际执行任务方法 ==================================

// 执行时间轮任务
func (tw *TimingWheel) onTicker() {
	// 找到执行槽位,挂在其任务链表上执行
	tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
	l := tw.slots[tw.tickedPos]
	tw.scanAndRunTasks(l)
}

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
	var tasks []timingTask

	for e := l.Front(); e != nil; {
		task := e.Value.(*timingEntry)
		if task.removed {
			next := e.Next()
			l.Remove(e)
			e = next
			continue
		} else if task.circle > 0 {
			task.circle--
			e = e.Next()
			continue
		} else if task.diff > 0 {
			next := e.Next()
			l.Remove(e)
			pos := (tw.tickedPos + task.diff) % tw.numSlots
			tw.slots[pos].PushBack(task)
			tw.setTimerPosition(pos, task)
			task.diff = 0
			e = next
			continue
		}

		tasks = append(tasks, timingTask{
			key:   task.key,
			value: task.value,
		})
		next := e.Next()
		l.Remove(e)
		tw.timers.Del(task.key)
		e = next
	}

	tw.runTasks(tasks)
}

func (tw *TimingWheel) runTasks(tasks []timingTask) {
	if len(tasks) == 0 {
		return
	}

	go func() {
		for i := range tasks {
			RunSafe(func() {
				tw.execute(tasks[i].key, tasks[i].value)
			})
		}
	}()
}

func (tw *TimingWheel) setTask(task *timingEntry) {
	if task.delay < tw.interval {
		task.delay = tw.interval
	}

	if val, ok := tw.timers.Get(task.key); ok {
		// 任务已存在,更新任务的值并移动到指定delay的位置
		entry := val.(*positionEntry)
		entry.item.value = task.value
		tw.moveTask(task.baseEntry)
	} else {
		pos, circle := tw.getPositionAndCircle(task.delay)
		task.circle = circle
		tw.slots[pos].PushBack(task)
		tw.setTimerPosition(pos, task)
	}
}

func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {
	steps := int(d / tw.interval)
	pos = (tw.tickedPos + steps) % tw.numSlots
	circle = (steps - 1) / tw.numSlots

	return
}

func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
	if val, ok := tw.timers.Get(task.key); ok {
		timer := val.(*positionEntry)
		timer.item = task
		timer.pos = pos
	} else {
		tw.timers.Set(task.key, &positionEntry{
			pos:  pos,
			item: task,
		})
	}
}

func (tw *TimingWheel) moveTask(task baseEntry) {
	val, ok := tw.timers.Get(task.key)
	if !ok {
		return
	}

	timer := val.(*positionEntry)
	if task.delay < tw.interval {
		GoSafe(func() {
			tw.execute(timer.item.key, timer.item.value)
		})
		return
	}

	pos, circle := tw.getPositionAndCircle(task.delay)
	if pos >= timer.pos {
		timer.item.circle = circle
		timer.item.diff = pos - timer.pos
	} else if circle > 0 {
		circle--
		timer.item.circle = circle
		timer.item.diff = tw.numSlots + pos - timer.pos
	} else {
		timer.item.removed = true
		newItem := &timingEntry{
			baseEntry: task,
			value:     timer.item.value,
		}
		tw.slots[pos].PushBack(newItem)
		tw.setTimerPosition(pos, newItem)
	}
}

func (tw *TimingWheel) removeTask(key any) {
	val, ok := tw.timers.Get(key)
	if !ok {
		return
	}

	timer := val.(*positionEntry)
	timer.item.removed = true
	tw.timers.Del(key)
}

我们一点点来剖析,先看数据结构,包含了:

  • interval:slot时间间隔
  • ticker:定时器,驱动时间轮转动
  • slots:槽位,每个槽位是一个双向链表
  • timers:并发安全的map
  • tickedPos:时间轮指针
  • numSlots:总槽位数
  • execute:任务执行函数

有了上述原理的基础,这里的大部分参数应该都好理解。需要额外说明的是为什么有execute这个东西。
其实和LRU中的回调函数一样,都是把具体的操作放在本地缓存模型中统一管理。

接下来注意到setChannel、moveChannel、removeChannel、
stopChannel,他们分别表示设置、移动、删除、停止操作的通道。为什么有这几个东西呢?这就涉及到时间轮的事件处理机制

不理解没关系,我们看下代码

3.2.1. 事件处理机制

从构造函数入手,我们发现最后通过起了一个协程执行run()

func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {
	......
	go tw.run()

	......
}

// 借助select机制实际执行时间轮各任务
func (tw *TimingWheel) run() {
	for {
		select {
		case <-tw.ticker.Chan():
			tw.onTicker()
		case task := <-tw.setChannel:
			tw.setTask(&task)
		case key := <-tw.removeChannel:
			tw.removeTask(key)
		case task := <-tw.moveChannel:
			tw.moveTask(task)
		case <-tw.stopChannel:
			tw.ticker.Stop()
			return
		}
	}
}

可以看到,run方法在监听各个操作通道,当通道有数据被取出时就调用相应的执行函数去执行。同时也启动定时器转动时间轮。
为什么这么设计呢?

  • 这就涉及到时间轮事件驱动的理念,通过channel可以将插入、删除这些操作都看做一个个事件,逐一处理。
  • 所有核心逻辑都整合在一个run方法中去执行,减少了代码的耦合性
  • 这种方式天然就具备了线程安全,如果没有它,那么每个操作势必要上锁解锁
  • 再往深了想,这种方式进一步扩展,你可以为channel添加缓冲,可以很方便的扩展更多的事件

接下来我们从run方法出发,看各个事件是咋处理的

3.2.2. 时间轮转动

首先,我们先说构造函数里的定时器

return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))

明白了吧,就是起了一个时间间隔为槽位时间间隔的Ticker

我们再看核心的onTicker方法

// 执行时间轮任务
func (tw *TimingWheel) onTicker() {
	// 找到执行槽位,挂在其任务链表上执行
	tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
	l := tw.slots[tw.tickedPos]
	tw.scanAndRunTasks(l)
}

很好理解,每次Ticker触发的时候,时间轮转动一个槽位,考虑到是个环,所以本次触发的槽位就是:

执行槽位 = (上次的槽位 + 1)%总槽位数

找到执行槽位后,再看scanAndRunTasks方法

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
	var tasks []timingTask

	for e := l.Front(); e != nil; {
		task := e.Value.(*timingEntry)
		if task.removed {
			next := e.Next()
			l.Remove(e)
			e = next
			continue
		} else if task.circle > 0 {
			task.circle--
			e = e.Next()
			continue
		} else if task.diff > 0 {
			next := e.Next()
			l.Remove(e)
			pos := (tw.tickedPos + task.diff) % tw.numSlots
			tw.slots[pos].PushBack(task)
			tw.setTimerPosition(pos, task)
			task.diff = 0
			e = next
			continue
		}

		tasks = append(tasks, timingTask{
			key:   task.key,
			value: task.value,
		})
		next := e.Next()
		l.Remove(e)
		tw.timers.Del(task.key)
		e = next
	}

	tw.runTasks(tasks)
}

可以看到,本质就是从前向后遍历双向链表

  • 先看是不是被标记删除了,如果是直接干掉它
  • 再看圈数是不是大于1,如果是说明还没到它,跳过
  • 再看是否需要延迟执行,如果是就把他扔到该去的槽里(这个后面还会讲到)
  • 最后把需要执行的任务统一放到runTasks中执行execute函数

那么execute函数到底是个啥?
好的,我提前满足你的好奇心,我们来看本地缓存构造函数

func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {
	......

	timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
		// 缓存过期,直接删除
		key, ok := k.(string)
		if !ok {
			return
		}

		cache.Del(key)
	})
	if err != nil {
		return nil, err
	}
	cache.timingWheel = timingWheel

	return cache, nil
}

明白了吧,其实就是直接把缓存删掉,当然这里面会存在很多操作,我们后面再说

3.2.3. 插入任务

func (tw *TimingWheel) setTask(task *timingEntry) {
	if task.delay < tw.interval {
		task.delay = tw.interval
	}

	if val, ok := tw.timers.Get(task.key); ok {
		// 任务已存在,更新任务的值并移动到指定delay的位置
		entry := val.(*positionEntry)
		entry.item.value = task.value
		tw.moveTask(task.baseEntry)
	} else {
		pos, circle := tw.getPositionAndCircle(task.delay)
		task.circle = circle
		tw.slots[pos].PushBack(task)
		tw.setTimerPosition(pos, task)
	}
}

逻辑就是先看任务是否存在,存在就更新其过期时间,也就是移到其他槽位去,不存在就找到其槽位尾插进去。
getPositionAndCircle就是根据上文提到的计算公式计算待插入的槽位和圈数

3.2.4. 移动任务

func (tw *TimingWheel) moveTask(task baseEntry) {
	val, ok := tw.timers.Get(task.key)
	if !ok {
		return
	}

	timer := val.(*positionEntry)
	if task.delay < tw.interval {
		GoSafe(func() {
			tw.execute(timer.item.key, timer.item.value)
		})
		return
	}

	pos, circle := tw.getPositionAndCircle(task.delay)
	if pos >= timer.pos {
		timer.item.circle = circle
		timer.item.diff = pos - timer.pos
	} else if circle > 0 {
		circle--
		timer.item.circle = circle
		timer.item.diff = tw.numSlots + pos - timer.pos
	} else {
		timer.item.removed = true
		newItem := &timingEntry{
			baseEntry: task,
			value:     timer.item.value,
		}
		tw.slots[pos].PushBack(newItem)
		tw.setTimerPosition(pos, newItem)
	}
}

大致的逻辑是:

  • 先查到任务
  • 如果任务的过期时间比槽位时间间隔还短,那就没必要再移动了,直接执行就完事了
  • 计算需要移动到的槽位和圈数,将需要移动的diff记录下来,在时间轮转动的时候移走(呼应前文)

3.2.5. 删除任务

func (tw *TimingWheel) removeTask(key any) {
	val, ok := tw.timers.Get(key)
	if !ok {
		return
	}

	timer := val.(*positionEntry)
	timer.item.removed = true
	tw.timers.Del(key)
}

这里就是打个标记

3.2.6. 并发安全的map

最后我们来说说模型中并发安全的map,也就是SafeMap
这是框架自己定义的,我就不细说了,有兴趣的自己去看源码。
简单来说就是设计了两个map,一新一旧,记录各自删了多少数据。如果一个删超标了,就用另一个覆盖它,然后自己重置(清空)

终于说完时间轮了!

4. 缓存雪崩问题

所谓缓存雪崩是指缓存的数据在某个时刻大面积过期,从而导致系统资源的集中消耗或性能瓶颈。
那么框架式怎么解决的呢?
引入了随机偏移量抖动。简单来说就是在输入的过期时间基础上按一定的偏移量随机偏移,避免大量缓存的过期时间一致,导致集中过期。

4.1. 实现

对应数据结构中的unstableExpiry,我们来看其结构Unstable

package localcache

import (
	"math/rand"
	"sync"
	"time"
)

// 过期时间抖动
type Unstable struct {
	deviation float64 // 抖动阈值
	r         *rand.Rand
	lock      *sync.Mutex
}

func NewUnstable(deviation float64) Unstable {
	if deviation < 0 {
		deviation = 0
	}
	if deviation > 1 {
		deviation = 1
	}
	return Unstable{
		deviation: deviation,
		r:         rand.New(rand.NewSource(time.Now().UnixNano())),
		lock:      new(sync.Mutex),
	}
}

// 抖动时间
// 生成一个在[1-u.deviation,1+u.deviation]之间的随机因子
func (u Unstable) AroundDuration(base time.Duration) time.Duration {
	u.lock.Lock()
	val := time.Duration((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))
	u.lock.Unlock()
	return val
}

// 抖动整数
// 生成一个在[1-u.deviation,1+u.deviation]之间的随机因子
func (u Unstable) AroundInt(base int64) int64 {
	u.lock.Lock()
	val := int64((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))
	u.lock.Unlock()
	return val
}

逻辑还是比较清晰的,就是经过计算使最终的过期时间保持在
[(1-deviation)*原过期时间, (1+deviation)*原过期时间]

5. 缓存击穿问题

所谓缓存击穿是指在某一时间有大量请求打过来,而恰巧此时一些热点数据被集中清除,那么在缓存+DB的架构中请求就全部穿过了缓存落到DB上,造成压力骤增。

因此我们希望当缓存失效时只有一个请求去加载数据,其他请求等待
Go语言中singleflight(单飞模式)可以完美满足我们的需求
singleflight 是 golang.org/x/sync/singleflight 包提供的一个功能模块,其内部使用了一个映射(map)来跟踪正在进行的请求,当一个新的请求到来时,
singleflight会检查是否已经有相同键的请求正在进行:

  • 如果存在:新的请求会等待已在进行中的请求完成,并共享其结果
  • 如果不存在:singleflight会执行该请求,并将结果缓存起来供后续相同键的请求使用

其使用也非常简单,下面是一个示例:

package main

import (
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/singleflight"
)

// 模拟从数据库加载数据的函数
func loadDataFromDB(key string) (string, error) {
    // 模拟延迟
    time.Sleep(2 * time.Second)
    return fmt.Sprintf("data for %s", key), nil
}

func main() {
    var wg sync.WaitGroup
    sfGroup := singleflight.Group{}

    // 模拟多个并发请求同一个key
    key := "hot_key"
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            // 使用 Do 方法确保只有一个请求会执行 loadDataFromDB
            v, err, shared := sfGroup.Do(key, func() (interface{}, error) {
                return loadDataFromDB(key)
            })
            if err != nil {
                fmt.Printf("Goroutine %d: error loading data: %v\n", id, err)
                return
            }
            fmt.Printf("Goroutine %d: got data: %s (shared: %v)\n", id, v, shared)
        }(i)
    }

    wg.Wait()
}

6. 缓存命中率统计

这个其实很好处理,每次查询缓存的时候做一个计数。缓存命中了就给命中数+1,未命中就给未命中数+1,每隔一段时间就用 命中数/(命中数+未命中数) 计算出命中率,打印到日志即可。
来看代码实现

package localcache

import (
	"sync/atomic"
	"time"

	"e.coding.net/xverse-git/public/go_common/logger"
)

const statInterval = time.Minute

// 缓存命中率统计模块
type cacheStat struct {
	name         string     // 缓存名称,标识统计信息
	hit          uint64     // 缓存命中次数
	miss         uint64     // 缓存未命中次数
	sizeCallback func() int // 回调函数,用于动态获取缓存的大小
}

func newCacheStat(name string, sizeCallback func() int) *cacheStat {
	st := &cacheStat{
		name:         name,
		sizeCallback: sizeCallback,
	}
	go st.statLoop()
	return st
}

// 开启定时任务进行统计
func (cs *cacheStat) statLoop() {
	ticker := time.NewTicker(statInterval)
	defer ticker.Stop()

	for range ticker.C {
		hit := atomic.SwapUint64(&cs.hit, 0)
		miss := atomic.SwapUint64(&cs.miss, 0)
		total := hit + miss
		if total == 0 {
			continue
		}
		percent := 100 * float32(hit) / float32(total)
		logger.Infof("cache(%s) - qpm: %d, hit_ratio: %.1f%%, elements: %d, hit: %d, miss: %d",
			cs.name, total, percent, cs.sizeCallback(), hit, miss)
	}
}

func (cs *cacheStat) IncrementHit() {
	atomic.AddUint64(&cs.hit, 1)
}

func (cs *cacheStat) IncrementMiss() {
	atomic.AddUint64(&cs.miss, 1)
}

很直观,也很好理解,就不去赘述了。额外提一嘴的是命中数和未命中数的+1操作由于是并发情况下的变更,因此是原子操作,所以通过 atomic.AddUint64 的方式。

7. 本地缓存各操作

本地缓存的各个设计要点和模块终于讲完了,现在我们可以回过头来看其各操作的实现了。

7.1. 构造函数

type CacheOption func(*LocalCache)

func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {
	cache := &LocalCache{
		data:           make(map[string]any),
		expire:         expire,
		lruCache:       emptyLRU{},
		barrier:        singleflight.Group{},
		unstableExpiry: NewUnstable(expiryDeviation),
	}

	for _, opt := range opts {
		opt(cache)
	}

	if len(cache.name) == 0 {
		cache.name = defaultCacheName
	}
	cache.stats = newCacheStat(cache.name, cache.size)

	timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
		// 缓存过期,直接删除
		key, ok := k.(string)
		if !ok {
			return
		}

		cache.Del(key)
	})
	if err != nil {
		return nil, err
	}
	cache.timingWheel = timingWheel

	return cache, nil
}

func WithName(name string) CacheOption {
	return func(cache *LocalCache) {
		cache.name = name
	}
}

// 设置最大容量
func WithLimit(limit int) CacheOption {
	return func(cache *LocalCache) {
		if limit > 0 {
			cache.lruCache = NewKeyLRU(limit, cache.onEvict)
		}
	}
}

func (lc *LocalCache) onEvict(key string) {
	delete(lc.data, key)
	lc.timingWheel.RemoveTimer(key)
}

这里主要是对各模块的一些初始化操作,包括定义时间轮的回调函数等。值得一提的是这里使用了函数式编程,使用opts将参数注入构造函数中。
这其实是一种很常见的默认值注入方法。当方法的部分参数不一定注入实参的时候,由于Go不像Python那样拥有默认值机制,所以往往采用这种函数式编程的方式注入。
在使用时代码如下:

cache, err := NewLocalCache(time.Second*2, WithName("any"))

当限制缓存数量,即设置WithLimit时才使用KeyLRU去控制。LRU的回调函数是在删除缓存时触发,对应onEvict方法,内容也很简单,删除map对应的key和时间轮删除

7.2. 查询缓存

func (lc *LocalCache) Get(key string) (any, bool) {
	value, ok := lc.doGet(key)
	if ok {
		lc.stats.IncrementHit()
	} else {
		lc.stats.IncrementMiss()
	}
	return value, ok
}

func (lc *LocalCache) doGet(key string) (any, bool) {
	lc.lock.Lock()
	defer lc.lock.Unlock()

	value, ok := lc.data[key]
	if ok {
		lc.lruCache.Add(key)
	}
	return value, ok
}

这里没什么好说的,包含了命中率的计数和LRU的添加(移动到链表头结点)

7.3. 添加缓存

func (lc *LocalCache) Set(key string, value any) {
	lc.SetWithExpire(key, value, lc.expire)
}

//nolint:errcheck  // 不做检查
func (lc *LocalCache) SetWithExpire(key string, value any, expire time.Duration) {
	lc.lock.Lock()
	_, ok := lc.data[key]
	lc.data[key] = value
	lc.lruCache.Add(key)
	lc.lock.Unlock()

	expiry := lc.unstableExpiry.AroundDuration(expire) // 过期时间抖动处理
	if ok {
		lc.timingWheel.MoveTimer(key, expiry)
	} else {
		lc.timingWheel.SetTimer(key, value, expiry)
	}
}

当添加缓存时先进行LRU的添加,数据的保存。然后对过期时间进行抖动处理,然后将任务添加进时间轮,由时间轮转到到过期时间的槽位时调用回调函数删除该缓存。

7.4. 删除缓存

func (lc *LocalCache) Del(key string) {
	lc.lock.Lock()
	delete(lc.data, key)
	lc.lruCache.Remove(key)
	lc.lock.Unlock()
	lc.timingWheel.RemoveTimer(key)
}

这里涉及map的删除、LRU的删除和时间轮的删除。时间轮执行时调用的回调函数里就是这个方法。

7.5. 获取缓存

func (lc *LocalCache) Take(key string, fetch func() (any, error)) (any, error) {
	if val, ok := lc.doGet(key); ok {
		lc.stats.IncrementHit()
		return val, nil
	}

	var fresh bool
	val, err, _ := lc.barrier.Do(key, func() (any, error) {
		if val, ok := lc.doGet(key); ok {
			return val, nil
		}

		v, e := fetch()
		if e != nil {
			return nil, e
		}

		fresh = true
		lc.Set(key, v)
		return v, nil
	})
	if err != nil {
		return nil, err
	}

	if fresh {
		lc.stats.IncrementMiss()
		return val, nil
	}

	lc.stats.IncrementHit()
	return val, nil
}

这个方法相对特殊,它允许当缓存不存在时根据用户注入的fetch函数的逻辑去获取并设置缓存。
当然,这里的设置是需要借助singleflight逻辑的。同样,也涉及命中率的计数。
示例类似这样:

cache.Take("first", func() (any, error) {
	time.Sleep(time.Millisecond * 100)
	return "first element", nil
})

OK,至此本地缓存全部讲完。完结,撒花!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/984530.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

通义万相2.1开源版本地化部署攻略,生成视频再填利器

2025 年 2 月 25 日晚上 11&#xff1a;00 通义万相 2.1 开源发布&#xff0c;前两周太忙没空搞它&#xff0c;这个周末&#xff0c;也来本地化部署一个&#xff0c;体验生成效果如何&#xff0c;总的来说&#xff0c;它在国内文生视频、图生视频的行列处于领先位置&#xff0c…

Jetson Xavier NX安装CUDA加速的OpenCV

我们使用SDKManager刷机完成后&#xff0c;使用jtop查看&#xff0c;发现OpenCV 是不带CUDA加速的&#xff0c;因此&#xff0c;我们需要安装CUDA加速的OpenCV&#xff0c;这样后续在使用的时候速度会快很多。 首先我们先卸载默认OpenCV sudo apt purge libopencv* -y sudo …

基于PaddleNLP使用DeepSeek-R1搭建智能体

基于PaddleNLP使用DeepSeek-R1搭建智能体 最近在学习DeepSeek&#xff0c;找到了PaddleNLP星河社区大模型&#xff0c;跟着敲写了一遍。内容来源&#xff1a;DeepSeek实战训练营&#xff1a;从云端模型部署到应用开发 - 飞桨AI Studio星河社区-人工智能学习与实训社区 本项目基…

给大家推荐8个好玩有趣的网站

1、Home Apothecary 家庭药房 https://apothecary.tips/zh Home Apothecary&#xff08;家庭药房&#xff09;结合传统中医智慧与现代科学验证&#xff0c;提供涵盖睡眠改善、免疫力提升、肠胃调理、活力增强等健康需求的天然养生饮品配方。精选安神助眠、四季调养、舒缓压力…

使用Beanshell前置处理器对Jmeter的请求body进行加密

这里我们用HmacSHA256来进行加密举例&#xff1a; 步骤&#xff1a; 1.先获取请求参数并对请求参数进行处理&#xff08;处理成String类型&#xff09; //处理请求参数的两种方法&#xff1a; //方法一&#xff1a; //获取请求 Arguments args sampler.getArguments(); //转…

利用paddleocr解决图片旋转问题

由于之前使用easyocr识别图片的时候发现旋转的图片或者倒置的图片效果很差&#xff0c;来利用 cv2.minAreaRect()获取旋转角度&#xff0c;只能解决0-90&#xff0c;对于倒置的图片不能很好解决&#xff0c;因此使用paddleocr中方向分类检测&#xff08;只能返回0&#xff0c;1…

数据结构(蓝桥杯常考点)

数据结构 前言&#xff1a;这个是针对于蓝桥杯竞赛常考的数据结构内容&#xff0c;基础算法比如高精度这些会在下期给大家总结 数据结构 竞赛中&#xff0c;时间复杂度不能超过10的7次方&#xff08;1秒&#xff09;到10的8次方&#xff08;2秒&#xff09; 空间限制&#x…

Python 入

Python 入侵交换机 随着网络安全威胁不断增加&#xff0c;对于网络设备的安全防护变得愈发重要。而交换机作为网络中重要的设备之一&#xff0c;也需要加强安全保护。本文将介绍如何利用Python来入侵交换机&#xff0c;并对其进行漏洞扫描和安全检测。 1. Python 入侵交换机原…

自然语言处理:最大期望值算法

介绍 大家好&#xff0c;博主又来给大家分享知识了&#xff0c;今天给大家分享的内容是自然语言处理中的最大期望值算法。那么什么是最大期望值算法呢&#xff1f; 最大期望值算法&#xff0c;英文简称为EM算法&#xff0c;它的核心思想非常巧妙。它把求解模型参数的过程分成…

RAG 常见分块策略全解析:从原理到代码实践(2025 深度版)

大家好,我是大 F,深耕AI算法十余年,互联网大厂技术岗。 知行合一,不写水文,喜欢可关注,分享AI算法干货、技术心得。 更多文章可关注《大模型理论和实战》、《DeepSeek技术解析和实战》,一起探索技术的无限可能! 引言 在检索增强生成(RAG)系统中,分块策略是决定系统…

【软件逆向】QQ 连连看小游戏去广告与一键消除实现

目录 一、背景介绍 二、去广告实现 2.1 分析广告加载流程 2.2 逆向分析广告加载逻辑 2.3 去广告方案 三、一键消除外挂实现 3.1 分析游戏逻辑 3.2 编写外挂插件 3.3 注入外挂&#xff1a; 四、一键消除效果展示 五、额外扩展 一、背景介绍 QQ 连连看是一款经典的休闲…

小白学Agent技术[5](Agent框架)

文章目录 Agent框架Single Agent框架BabyAGIAutoGPTHuggingGPTHuggingGPT工作原理说明GPT-EngineerAppAgentOS-Copilot Multi-Agent框架斯坦福虚拟小镇TaskWeaverMetaGPT微软UFOAgentScope现状 常见Agent项目比较概述技术规格和能力实际应用案例开发体验比较ChatChain模式 Agen…

AI写论文提示词指令大全,快速写论文

目录 一、十大学术写作提示词1、研究主题2、研究问题3、论文架构4、学术论证5、文献关键要素6、专业文本可读性转换7、学术语言规范化8、提高语言准确性9、多维度、深层论证10、优化文本结构 二、快速写论文提示词1、确认研究选题2、整理相关资料3、快速完成论文大纲4、整合文献…

电子电气架构 ---常见车规MCU安全启动方案

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活,除了生存温饱问题之外,没有什么过多的欲望,表面看起来很高冷,内心热情,如果你身…

HCIP第二讲作业

一、连接拓扑图 二、配置要求 1.学校内部的HTTP客户端可以正常通过域名www.baidu.com访问到百度网络中的HTTP服务器 2.学校网络内部网段基于192.168.1.0/24划分&#xff0c;PC1可以正常访问3.3.3.0/24网段&#xff0c;但是PC2不允许 3.学校内部路由使用静态路由&#xff0c;R1…

Linux第六讲:进程控制

Linux第六讲&#xff1a;进程控制 1.进程创建1.1回顾fork1.2写时拷贝 2.进程终止2.1exit与_exit 3.进程等待3.1进程等待的方法&#xff08;wait和waitpid&#xff09; 4.进程程序替换4.1自定义shell的编写4.1.1输出命令行提示符4.1.2获取用户输入的命令4.1.3命令行分析4.1.4指令…

BI 工具响应慢?可能是 OLAP 层拖了后腿

在数据驱动决策的时代&#xff0c;BI 已成为企业洞察业务、辅助决策的必备工具。然而&#xff0c;随着数据量激增和分析需求复杂化&#xff0c;BI 系统“卡”、“响应慢”的问题日益突出&#xff0c;严重影响分析效率和用户体验。 本文将深入 BI 性能问题的根源&#xff0c;并…

PPT内视频播放无法播放的原因及解决办法

PPT内视频无法播放&#xff0c;通常是视频编解码的问题。目前我遇到的常见的视频编码格式有H.264&#xff0c;H.265&#xff0c;VP9&#xff0c;AV1这4种。H.264编解码的视频&#xff0c;Windows原生系统可以直接播放&#xff0c;其他的视频编码格式需要安装对应的视频编解码插…

【AIGC系列】6:HunyuanVideo视频生成模型部署和代码分析

AIGC系列博文&#xff1a; 【AIGC系列】1&#xff1a;自编码器&#xff08;AutoEncoder, AE&#xff09; 【AIGC系列】2&#xff1a;DALLE 2模型介绍&#xff08;内含扩散模型介绍&#xff09; 【AIGC系列】3&#xff1a;Stable Diffusion模型原理介绍 【AIGC系列】4&#xff1…

Navigation的进阶知识与拦截器配置

Navigation的进阶知识与拦截器配置 写的不是很详细&#xff0c;后续有时间会补充&#xff0c;建议参考官方文档食用 1.如何配置路由信息 1.1 创建工程结构 src/main/ets ├── pages │ └── navigation │ ├── views │ │ ├── Mine.ets //…