本文基于 Go 1.19
Pool
是一组可以安全在多个 goroutine
间共享的临时对象的集合。
存储在 Pool
中的任何项目都可能在任何时候被移除,因此 Pool
不适合保存那些有状态的对象,如数据库连接、TCP 连接等。
Pool
的目的是缓存已分配但未使用的项以供以后使用,从而减少垃圾收集器的压力。
也就是说,它可以轻松构建高效、线程安全的空闲列表,但是,它并不适用于所有空闲列表。
使用实例
下面以几个实际的例子来说明 Pool
的一些使用场景。
下面是两个非常典型的使用场景,但是在实际使用中,对于那些需要频繁创建和销毁的对象的场景,我们都可以考虑使用 Pool
。
gin 里面 Context 对象使用 Pool 保存
在 gin
的 Engine
结构体中的 ServeHTTP
方法中,可以看到 Context
对象是从 Pool
中获取的。
然后在处理完请求之后,将 Context
对象放回 Pool
中。
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 从 Pool 中获取 Context
c := engine.pool.Get().(*Context)
// 重置 writermem
c.writermem.reset(w)
// 重置 Request
c.Request = req
// 重置其他字段
c.reset()
engine.handleHTTPRequest(c)
// 将 Context 对象放回 Pool
engine.pool.Put(c)
}
我们可以去看看 gin
中 Context
对象的定义,我们会发现,它其中有很多字段。
设想一下,如果每一个请求都创建一个 Context
对象,那么每一个请求都要对 Context
进行内存分配,
分配了之后,如果请求结束了,这些申请的内存在后续就要被回收掉(当然,不是马上就回收)。
这样一来,如果待回收的 Context
对象很多,那么垃圾回收器就会被压力很大。
同样的做法在
echo
这个框架中也有出现。
fmt 里面的 pp 结构体
在我们使用 fmt
包来打印的时候(比如,调用 fmt.Fprintf
),其实底层是要使用一个名为 pp
的结构体来进行打印的。
如果我们的系统中需要大量地使用 fmt
库来做格式化字符串的操作,如果每次进行格式化操作的时候都 new
一个 pp
对象,
那么也会在某个时刻导致垃圾回收器的压力很大。
所以,fmt
包中使用 pp
的时候,都是从 Pool
中获取的:
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
// 从 Pool 中获取 pp 对象
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}
上面这个方法是获取 pp
对象的方法。我们在这里没有看到重置 pp
字段的代码,因为这些操作在 pp
的 free
方法中了:
// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}
// 重置 p.buf
p.buf = p.buf[:0]
p.arg = nil
p.value = reflect.Value{}
p.wrappedErr = nil
// 将 pp 放回 Pool 中
ppFree.Put(p)
}
我们不能对
sync.Pool
有任何假设,比如,不要想着 Put 进去的对象带了一个状态,然后 Get 出来的时候就能拿到这个状态。
因为这个对象可能在任何时候被清除掉。
从上面这个例子中,我们可以看到 p.buf = p.buf[:0]
这一行对 buf
进行了重置,
这给我我们的启示是,我们在使用 sync.Pool
的时候,如果我们存取的对象会带有一些不同的字段值,
那么我们可能需要对这些字段进行重置后再使用,这样就可以避免 Get
到的对象带有之前的一些状态,
不过重置这些字段的开销相比分配新的内存以及后续 GC,其实开销可以忽略不计。
Pool 整体模型
Pool
本质上是一个双端队列,这个队列支持以下操作:
pushHead
:将一个对象放到队列的头部,这也是唯一的入队操作。popHead
:从队列的头部取出一个对象。如果取不到则使用New
方法创建一个新的对象。popTail
:从队列的尾部取出一个对象。如果取不到则使用New
方法创建一个新的对象。
可以简单地用下图表示:
在阅读本文过程中,想不清楚的时候,回想一下这个模型,可能会有所帮助。
当然,在实际的实现中,比这个复杂多了,但是这个模型已经足够我们理解 Pool
的工作原理了。
Pool 的双端队列是使用数组还是链表
我们知道,队列的存储通常有两种方式,一种是数组,一种是链表,两者的优缺点如下:
优点 | 缺点 | |
---|---|---|
数组 | 存储空间连续,可以根据下标快速访问 | 大小固定,扩容成本高 |
链表 | 大小不固定,可以很容易增加新的元素 | 访问效率不如数组。需要额外的空间来存储前后元素的指针 |
那么 Pool
里面用的是数组还是链表呢?答案是:数组 + 链表。
Pool 为什么要用数组 + 链表
为了快速访问队列中的元素,使用数组是最好的选择,但是数组的大小是固定的,如果队列中的元素很多,那么数组就会很快被填满。
如果我们还想继续往队列中添加元素,那么就需要对数组进行扩容,这个成本是很高的(因为本来就是需要频繁分配/销毁对象的场景才会使用 Pool
)。
同时,我们知道 Pool
设计的目的就是为了减少频繁内存分配带来的性能问题,如果在使用 Pool
的过程中频繁对其进行扩容,那么就违背了 Pool
设计的初衷了。
为了解决数组扩容的问题,我们可以考虑一下使用链表。在我们 Put
的时候往链表的头部添加一个元素,然后 Get
的时候从链表的尾部取出一个元素(还需要移除)。
但是这样我们就需要一个结构体来表示我们的节点了,那么问题来了,我们又需要频繁地分配/销毁这个结构体,这样就又回到了最开始的问题了(频繁创建/销毁对象)。
所以,只使用链表也不是一个好的选择。
所以,Pool
采用了 数组 + 链表 的方式来实现双端队列,它们的关系如下:
- 数组:存储队列中的元素,数组的大小是固定的。
- 链表:当一个数组存储满了之后,就会新建一个数组,然后通过链表将这两个数组串联起来。
最终,Pool
的双端队列的结构如下:
数组如何实现队列
我们知道,数组的存储空间是固定的一块连续的内存,所以我们可以通过下标来访问数组中的元素。
我们 push
的时候,将 head
下标 +1
,然后 pop
的时候,也要修改对应的下标,
但是这样会导致一个问题是,早晚 head
会超出数组的下标范围,但这个时候数组可能还有很多空间,
因为在我们 push
的时候,可能也同时在 pop
,所以数组中的空间可能还有很多。
所以,就可以在 head
超出数组下标范围的时候,将 head
对数组长度取模,这样就可以循环使用数组了:
不过对于这种情况,使用下面的图更加直观:
Pool
里面是使用poolDequeue
这个结构体来表示上图这个队列的,本身是一个数组,但是是当作环形队列使用的。
poolChain 的最终模型
poolChain
就是上面说的 数组 + 链表 的组合,它的最终模型如下:
说明:
poolChain
本身是一个双向链表,每个节点都是一个poolDequeue
,每个节点都有prev
和next
指针指向前后节点。上图的head
是头节点,tail
是尾节点。poolDequeue
是一个数组,它的大小是固定的,但是它是当作环形队列使用的。tail
初始化的时候长度为 8,具体可参考poolChain
的pushHead
方法,后面会说到。pushHead
的时候,如果head
中的环形队列已经满了,那么就会新建一个poolDequeue
,然后将它插入到head
的前面。(新的head
节点的大小为前一个head
节点的两倍)popTail
、popHead
的时候,如果tail
或者head
中的环形队列(poolDequeue
)已经空了,那么就会将它从链表中移除。
多个 P 的情况下的 poolChain
这里假设 P 跟我们机器的逻辑处理器的数量一致。(这里涉及到了 goroutine 的调度机制,不了解可以先了解一下再回来看。)
我们知道,在 go 中,每一个 goroutine 都会绑定一个 P,这样才可以充分利用多核的优势。
设想一下,如果我们有多个 goroutine 同时存储一个 Pool
,会出现什么情况呢?
会导致很激烈的数据竞争,虽然没有使用 Mutex
这种相对低效的互斥锁来解决竞争问题,使用的是原子操作,但是也会导致性能下降。
所以,在 Pool
的实现中,会为每一个 P 都创建一个 poolChain
,每次存取,先操作本地 P 绑定的 poolChain
,这样就可以减少多个 goroutine 同时操作一个 Pool
的竞争问题了。
所以,最终的 Pool
的模型会长成下面这个样子,每个 G 关联了一个 poolChain
:
注意:这里不是
Pool
实际的样子,只是为了说明Pool
的实现原理。
最终实现中的 Pool
在实际的实现中,其实会跟上一个图有一些差异,可以说复杂很多:
从上图可以看出,其实每个 P 关联的并不是 poolChain
,而是 poolLocal
,poolLocal
里面包含了一个 poolLocalInternal
和一个 pad
,pad
是为了避免伪共享而添加的。而 poolLocalInternal
就是实际上 Pool
中存储数据的一个结构体。poolLocalInternal
中包含了两个字段,private
和 shared
,shared
就是我们上面说的 poolChain
,而 private
是一个 any
类型的字段,在我们调用 Pool
的 Put
方法的时候,会先尝试将数据存储到 private
中,如果 private
中已经有数据了,那么就会将数据存储到 shared
中。同样的,在 Get
的时候,也会先从 private
中获取数据,如果 private
中没有数据,那么就会从 shared
中获取数据。
而相应的,Pool
存取数据会变成:
pushHead
:我们调用Pool
的Put
方法的时候,只会写入到本地 P 关联的那个poolLocal
中。popHead
:这个方法也只能从本地 P 关联的poolLocal
中取数据。popTail
:这个方法会从本地 P 关联的poolLocal
中取数据,如果取不到,那么就会从其他 P 关联的poolLocal
中取数据。
这样就可以减少多个 goroutine 同时操作一个
Pool
的竞争问题了(但是无法避免)。
Pool 模型总结
最后,我们将这一节的内容总结一下,可以得到下面这个图:
说明:
- go 进程内会有多个 P,每个 P 都会关联一个
poolLocal
。 poolLocal
中包含了一个poolLocalInternal
和一个pad
,poolLocalInternal
中包含了两个字段,private
和shared
。private
是一个any
类型的字段,用来存储我们调用Pool
的Put
方法的时候传入的数据,Get
的时候如果private
中有数据,那么就会直接返回private
中的数据。shared
是一个poolChain
,用来存储我们调用Pool
的Put
方法的时候传入的数据,Get
的时候如果当前 P 绑定的poolLocal
内是空的,那么可以从其他P
绑定的shared
的尾部获取。poolChain
是一个双向链表,每个节点都是一个poolDequeue
,每个节点都有prev
和next
指针指向前后节点。上图的head
是头节点,tail
是尾节点。poolDequeue
是一个数组,它的大小是固定的,但是它是当作环形队列使用的。pushHead
的时候,如果poolChain
的节点满了,那么会新建一个节点,其容量为前一个节点的两倍。poolChain
支持三种操作:pushHead
、popHead
、popTail
。pushHead
会将数据存储到当前 P 关联的poolChain
的头部。
Pool 中的结构体
在开始分析源码之前,我们先来看一下 Pool
的 UML 图:
上图中已经包含 Pool
实现的所有关键结构体了,下面我们来分析一下这些结构体的作用。
sync.Pool 结构体:
type Pool struct {
// noCopy 用于防止 Pool 被复制(可以使用 go vet 检测)
noCopy noCopy
// local 的主要作用是,多个 goroutine 同时访问 Pool 时,可以减少竞争,提升性能。
// 实际类型是 [P]poolLocal。长度是 localSize。
local unsafe.Pointer
// []poolLocal 的长度。也就是 local 切片的长度
localSize uintptr
// 存放的是上一轮 GC 时的 local 字段的值。
victim unsafe.Pointer
// victim 数组的长度
victimSize uintptr
// 新建对象的方法。
// Get 的时候如果 Pool 中没有对象可用,会调用这个方法来新建一个对象。
New func() any
}
字段说明:
local
:就是我们上文说到的poolLocal
类型的切片,长度是runtime.GOMAXPROCS(0)
,也就是当前 P 的数量。之所以使用切片类型是因为我们可以在运行的过程中调整 P 的数量,所以它的长度并不是固定的,如果 P 的数量变了,poolLocal
也会跟着改变。localSize
:local
的长度。victim
:上一轮 GC 时的local
字段的值。victimSize
:victim
的长度。New
:新建对象的方法。
victim
的作用是在 GC 的时候,将 local
的值赋值给 victim
,然后将 local
置为 nil
,这样就可以避免在 GC 的时候,local
中的对象被回收掉。
当然,并不是完全不会回收,再经历一次 GC 之后,victim
中的对象就会被回收掉。这样做的好处是,可以避免 GC 的时候清除 Pool 中的所有对象,
这样在 GC 之后如果需要大量地从 Pool
中获取对象也不至于产生瞬时的性能抖动。
victim cache
是计算机科学中的一个术语,victim cache
是位于 cpu cache 和主存之间的又一级 cache,用于存放由于失效而被丢弃(替换)的那些块。
每当失效发生时,在访问主存之前,victim cache 都会被检查,如果命中,就不会访问主存。
Pool 获取对象的流程
最终,当我们调用 Pool
的 Get
方法的时候,会按下图的流程来获取对象:
说明:
- 首先会从当前 P 关联的
poolLocal
中的private
字段中获取对象,如果获取到了,那么直接返回。 - 如果
private
字段中没有对象,那么会从当前 P 关联的poolLocal
中的shared
字段中获取对象,如果获取到了,那么直接返回(这里使用的是popHead
方法)。 - 尝试从其他 P 关联的
poolLocal
中的shared
字段中获取对象,如果获取到了,那么直接返回(这里使用的是popTail
方法)。 - 如果其他 P 关联的
poolLocal
中的shared
字段中也没有对象,那么会从victim
中获取对象,如果获取到了,那么直接返回。 - 如果
victim
中也没有对象,那么会调用New
方法来创建一个新的对象(当然前提是我们创建Pool
对象的时候设置了New
字段)。 - 如果
New
字段也没有设置,那么会返回nil
。
poolLocal 和 poolLocalInternal 结构体:
在 Pool
中,使用了 poolLocal
和 poolLocalInternal
两个结构体来表示实际存储数据的结构体。
当然我们也可以只使用 poolLocalInternal
这个结构体,但是为了避免伪共享,在 Pool
的实现中,
将 poolLocalInternal
放在了 poolLocal
的第一个字段,然后在 poolLocal
中添加了一个 pad
字段,用来填充 poolLocalInternal
到 cache line 的大小。
// 实际存储 Pool 的数据的结构体
type poolLocalInternal struct {
// private 用于存储 Pool 的 Put 方法传入的数据。
// Get 的时候如果 private 不为空,那么直接返回 private 中的数据。
// 只能被当前 P 使用。
private any
// 本地 P 可以pushHead/popHead
// 任何 P 都可以 popTail。
shared poolChain
}
// Pool 中的 local 属性的元素类型。
// Pool 中的 local 是一个元素类型为 poolLocal 的切片,长度为 runtime.GOMAXPROCS(0)。
type poolLocal struct {
poolLocalInternal
// pad 用于填充 poolLocalInternal 到 cache line 的大小。
// 为了避免伪共享,将 poolLocalInternal 放在第一个字段。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
其实这里比较关键的地方是 pad
字段的作用,我们知道,CPU 会将内存分成多个 cache line,CPU 从内存中读取数据的时候,
并不是一个字节一个字节地读取,而是一次性读取一个 cache line 的数据。但是如果我们的数据结构中的字段不是按照 cache line 的大小来排列的,
比如跨了两个 cache line,那么在读取的时候就会产生伪共享,这样就会降低性能。
伪共享的原因是,数据跨了两个 cache line,那么在读取的时候,就会将两个 cache line 都读取到 CPU 的 cache 中,
这样有可能会导致不同 CPU 在发生数据竞争的时候,会使一些不相关的数据也会失效,从而导致性能下降。
如果对齐到 cache line,那么从内存读取数据的时候,就不会将一些不相关的数据也读取到 CPU 的 cache 中,从而避免了伪共享。
poolChain、poolChainElt 和 poolDequeue 结构体
为什么要把这三个放一起讲呢?因为这三个结构体就是 Pool
中做实际存取数据的结构体,三者作用如下:
poolChain
:poolChain
是一个链表,每个节点都是poolChainElt
。poolChainElt
:每个poolChainElt
中包含了一个poolDequeue
,同时包含了指向前一个节点和后一个节点的指针。poolDequeue
:poolDequeue
是一个双端队列(环形队列,使用数组存储),用来存储Pool
中的数据。
// poolChain 是 poolDequeue 的动态大小版本。
//
// 这是作为 poolDequeues 的双向链表队列实现的,其中每个 dequeue 都是前一个 dequeue 大小的两倍。
// 一旦 dequeue 填满,就会分配一个新的 dequeue,并且 pushHead 只会 push 到最新的 dequeue。
// pop 可以从头部或者尾部进行,一旦 dequeue 空了,它就会从 poolChain 中删除。
//
// poolChain 的实现是一个双向链表,每个 poolChainElt 都是一个 poolDequeue。
//(也就是元素是 poolDequeue 的双向链表)
type poolChain struct {
// head 是要推送到的 poolDequeue。
// 只能由生产者访问,因此不需要同步。
head *poolChainElt
// tail 是从 poolDequeue 中 pop 的节点。
// 这是由消费者访问的,因此读取和写入必须是原子的。
tail *poolChainElt
}
// poolChainElt 是 poolChain 中的元素。
type poolChainElt struct {
poolDequeue
// next 和 prev 链接到此 poolChain 中相邻的 poolChainElts。
//
// next 由生产者原子写入,由消费者原子读取。 它只从 nil 过渡到 non-nil。
// prev 由消费者原子写入,由生产者原子读取。 它只会从 non-nil 过渡到 nil。
next, prev *poolChainElt
}
// poolDequeue 是一个无锁的固定大小的单生产者、多消费者队列。
// 单个生产者既可以从头部 push 也可以从头部 pop,消费者只可以从尾部 pop。
//
// 它有一个附加功能,它会清空不使用的槽,从而避免不必要的对象保留。
type poolDequeue struct {
// headTail 将 32 位头索引和 32 位尾索引打包在一起。
// 两者都是 vals modulo len(vals)-1 的索引。
//
// tail = 队列尾的索引
// head = 下一个要填充的插槽的索引
//
// [tail, head) 范围内的槽位归消费者所有。
// 消费者继续拥有此范围之外的槽,直到它清空槽,此时所有权传递给生产者。
//
// 头索引存储在最高有效位中,以便我们可以原子地对它做 add 操作,同时溢出是无害的。
headTail uint64
// vals 是存储在此 dequeue 中的 interface{} 值的环形缓冲区。它的大小必须是 2 的幂。
// (队列存储的元素,接口类型)
//
// 如果插槽为空,则 vals[i].typ 为 nil,否则为非 nil。
// 一个插槽仍在使用中,直到 *both* 尾部索引超出它并且 typ 已设置为 nil。(both?)
// 这由消费者原子地设置为 nil,并由生产者原子地读取。
vals []eface
}
poolChain
的结构大概如下:
poolDequeue
的结构大概如下:
在 poolDequeue
中,headTail
是一个 uint64
类型,它的高 32 位是 head
,低 32 位是 tail
。
这样一来,这样就可以使用原子操作来保证 head
和 tail
的协程安全了。
在 poolDequeue
中,vals
是一个切片类型,元素类型是 eface
,eface
是一个空接口类型,内存布局跟 interface{}
一样,
因此可以看作是一个 interface{}
类型。如果这里看不明白可以看看《go interface 设计与实现》。
那为什么不直接使用 interface{}
类型呢?因为如果用了 interface{}
类型,
那么 poolDequeue
就无法区分保存的 val
是 nil
还是一个空的槽。
那有什么解决办法呢?在 push
和 pop
的时候使用互斥锁可以解决这个问题(因为目前的实现是使用原子操作的,所以这才需要判断保存的 val
是 nil
还是空槽),
但是这样就会导致性能下降。
Pool 源码剖析
在源码剖析的开始部分,不会深入去讲底层的存取细节,我们将其当作一个抽象的队列来看待即可,这样可能会更加便于理解。不过在讲完
Pool
的实现之后,最后还是会展开讲述这个复杂的 “队列” 的那些实现细节。
接下来,我们来看看 Pool
的源码实现。
Pool
提供的接口非常简单,只有 Put
、Get
两个方法,还有一个 New
字段,用来指定 Pool
中的元素是如何创建的:
New
属性:New
是一个函数,用来创建Pool
中的元素。Put
方法:Put
方法用来向Pool
中放入一个元素。Get
方法:Get
方法用来从Pool
中获取一个元素。
Get
Get
方法的实现如下:
Get
从 Pool
中选择一个任意项,将其从 Pool
中移除,并将其返回给调用者。
Get
可能会选择忽略池并将其视为空的。
调用方不应假定传递给 Put
的值与 Get
返回的值之间存在任何关系。
(Put
和 Get
之间可能会发生 GC
,然后 Pool
里面的元素可能会被 GC
回收掉)
如果 Get
否则返回 nil
并且 p.New
不为 nil
,则 Get
返回调用 p.New
的结果。
// Get 从 Pool 中获取一个对象
func (p *Pool) Get() any {
// ...
// pin 将当前的 goroutine 和 P 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 P 的 ID。
l, pid := p.pin()
// 先看 private 是否为 nil,如果不为 nil,就直接返回 private,并将 private 置为 nil。
x := l.private
l.private = nil
if x == nil {
// 尝试从本地 shared 的头部取。
x, _ = l.shared.popHead()
if x == nil { // 如果本地 shared 的头部取不到,就从其他 P 的 shared 的尾部取。
x = p.getSlow(pid)
}
}
// 将当前的 goroutine 和 P 解绑,允许被抢占。
runtime_procUnpin()
// ...
// 如果 x 为 nil 并且 p.New 不为 nil,则返回 p.New() 的结果。
// 没有就 New 一个。
if x == nil && p.New != nil {
x = p.New()
}
return x
}
Pool
Get
的流程可以总结如下:
- 将当前的
goroutine
和P
绑定,禁止被抢占,返回当前P
的本地缓存(poolLocal
)和P
的ID
。 - 从本地
private
取,如果取不到,就从本地shared
的头部取,如果取不到,就从其他P
的shared
的尾部取。获取到则返回 - 如果从其他的
P
的shared
的尾部也获取不到,则从victim
获取。获取到则返回 - 将当前的
goroutine
和P
解绑,允许被抢占。 - 如果
p.New
不为nil
,则返回p.New
的结果。
再贴一下上面那个图(当然,下图包含了下面的 getSlow
的流程,并不只是 Get
):
在 Pool
中一个很关键的操作是 pin
,它的作用是将当前的 goroutine
和 P
绑定,禁止被抢占。
这样就可以保证在 Get
和 Put
的时候,都可以获取到当前 P
的本地缓存(poolLocal
),
否则,有可能在 Get
和 Put
的时候,P
会被抢占,导致获取到的 poolLocal
不一致,这样 poolLocal
就会失去意义,
不得不再次陷入跟其他 goroutine
竞争的状态,又不得不考虑在如何在不同 goroutine
之间进行同步了。
而绑定了 P
后,在 Get
和 Put
的时候,就可以使用原子操作来代替其他更大粒度的锁了,
但是我们也不必太担心,因为绑定 P
的时间窗口其实很小。
getSlow 源码剖析
在 Get
中,如果从 private
和 shared
中都取不到,就会调用 getSlow
方法。它的作用是:
- 尝试从其他
P
的shared
的尾部取。 - 尝试从
victim
获取。
getSlow
的实现如下:
// 从其他 P 的 shared 的尾部取。
func (p *Pool) getSlow(pid int) any {
// 获取 local 的大小和 local。
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他 P 的 shared 的尾部取。
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size)) // pid+i+1 的用途从下一个 P 开始取。
if x, _ := l.shared.popTail(); x != nil { // 尝试从每一个 P 的 shared 的尾部取,获取到则返回。
return x
}
}
// 尝试从 victim cache 取。
// 我们在尝试从所有主缓存中偷取之后执行此操作,
// 因为我们希望 victim cache 中的对象尽可能地老化。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size { // 如果 pid 大于 size,会发生越界,直接返回 nil。这意味着 gomaxprocs 相比上一次 poolCleanup 的时候变大了。
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // victim 实际上也是一个 poolLocal 数组,每个 poolLocal 都有一个 private 字段,这个字段就是 victim cache。
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 将 victim cache 标记为空,以便将来的 Get 不会再考虑它。
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
pin 源码剖析
pin
方法的实现如下:
pin
将当前 goroutine
固定到 P
上,禁用抢占并返回 poolLocal
池中对应的 poolLocal
。
调用方必须在完成取值后调用 runtime_procUnpin()
来取消抢占。
// 将当前的 goroutine 和 P 绑定,禁止被抢。
func (p *Pool) pin() (*poolLocal, int) {
// procPin 函数的目的是为了当前 G 绑定到 P 上。
pid := runtime_procPin() // 返回当前 P 的 id。
// 在 pinSlow 中,我们会存储 local,然后再存储 localSize,
// 这里我们以相反的顺序读取。 由于我们禁用了抢占,
// 因此 GC 不能在两者之间发生。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { // pid < s,说明当前 P 已经初始化过了。
return indexLocal(l, pid), pid // 返回当前 P 的 poolLocal。
}
return p.pinSlow() // 如果当前 P 没有初始化过,那么就调用 pinSlow()。
}
func (p *Pool) pinSlow() (*poolLocal, int) {
// 在互斥锁下重试。
// 在固定时无法锁定互斥锁。
runtime_procUnpin() // 解除当前 P 的绑定。
allPoolsMu.Lock() // 加全局锁。
defer allPoolsMu.Unlock() // 解锁。
pid := runtime_procPin() // 重新绑定当前 P。
// 在固定时不会调用 poolCleanup。(无法被抢占,GC 不会发生)
s := p.localSize
l := p.local
if uintptr(pid) < s { // 这其实是一个 double-checking,如果在加锁期间,其他 goroutine 已经初始化过了,就直接返回。
return indexLocal(l, pid), pid
}
// p.local == nil 说明 pool 还没有初始化过。
if p.local == nil { // 如果当前 P 没有初始化过,那么就将当前 P 添加到 allPools 中。
allPools = append(allPools, p)
}
// 当 local 数组为空,或者和当前的 runtime.GOMAXPROCS 不一致时,
// 将触发重新创建 local 数组,以和 P 的个数保持一致。
// 如果在 GC 之间更改了 GOMAXPROCS,我们将重新分配数组并丢弃旧数组。
size := runtime.GOMAXPROCS(0) // 获取当前 GOMAXPROCS(也就是 P 的个数)
local := make([]poolLocal, size) // 创建一个 poolLocal 数组
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release // 将当前 P 的 poolLocal 添加到 p.local 中
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release // 将当前 P 的 poolLocal 添加到 p.localSize 中
return &local[pid], pid // 返回当前 P 关联的 poolLocal,以及当前 P 的 id。
}
在 pinSlow
中比较关键的操作是,它会初始化当前 P
关联的 poolLocal
,并将其添加到 allPools
中。
关于 allPools
的作用,可以看下一小节。
pinSlow
的流程:
- 解除当前
P
的绑定。 - 加全局
Pool
的锁。 - 重新绑定当前
P
。 - 如果当前
P
的id
小于localSize
,那么就返回当前P
的poolLocal
。(典型的double-checking
) - 如果
local
还没初始化,那么将当前P
的poolLocal
添加到allPools
中。 - 初始化
local
。最后返回当前P
的poolLocal
。
对于 local
的初始化,我们可以参考一下下图(我们需要知道的是,切片的底层结构体的第一个字段是一个数组):
&local[0]
是 []poolLocal
的首地址,unsafe.Pointer(&local[0])
就是 poolLocal
数组的首地址。
indexLocal 源码剖析
我们在上面的代码中还可以看到一个 indexLocal
函数,它的作用是返回 poolLocal
数组中的第 i
个元素。
// l 指向了 poolLocal 数组的首地址,i 是数组的索引。
// 返回了数组中第 i 个元素,其类型是 poolLocal。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
我们之前在看 Pool
结构体的时候,看到过 local
字段,它的类型是 unsafe.Pointer
,也就是一个指针。
然后我们再结合一下 indexLocal
的实现,就可以知道 local
字段指向的是一个 poolLocal
数组了。
其中 l
是 poolLocal
数组的首地址,i
是数组的索引,unsafe.Sizeof(poolLocal{})
是 poolLocal
的大小。
这一小节没看懂可以参考一下我写的另外一篇文章 《深入理解 go unsafe》。
allPools 的作用
allPools
的目的是在 GC
的时候,遍历所有的 Pool
对象,将其中的 victim
替换为 local
,然后将 local
设置为 nil
。
这样后续的 Get
操作在 local
获取不到的时候,可以从 victim
中获取。一定程度上减少了 GC
后的性能抖动。
在 Pool
中,还定义了下面几个全局变量:
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex
// allPools 是所有非空 primary cache 的 pool 的集合。
// 该集合由 allPoolsMu 和 pinning 保护,或者 STW 保护。
allPools []*Pool
// oldPools 是所有可能非空 victim cache 的 pool 的集合。
oldPools []*Pool
)
如果我们第一次看这些变量,可能会有点懵,不知道它们的作用是什么。
我们可以再结合一下 poolCleanup
函数的实现,就可以知道它们的作用了。
func poolCleanup() {
// 在垃圾收集开始时,这个函数会被调用。
// 它不能分配并且可能不应该调用任何运行时函数。
// 从 allPools 中删除 victim 缓存。
for _, p := range oldPools {
p.victim = nil // 作用是让 GC 可以回收 victim 缓存中的对象。
p.victimSize = 0
}
// 将 primary 缓存移动到 victim 缓存。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 具有非空主缓存的池现在具有非空 victim 缓存,并且没有池具有 primary 缓存。
oldPools, allPools = allPools, nil
}
poolCleanup
这个函数会在 GC
开始的时候被调用,它的作用是将 local
移动到 victim
中。
同时,将 victim
置为 nil
,这样 GC
就可以回收 victim
中的对象了,也就是说要经历两轮 GC
local
才会真正地被回收。
也就意味着,GC
的时候,local
其实并没有被回收,而是被移动到了 victim
中。
poolCleanup
可以用下图表示,实际上就是使用 local
和 localSize
覆盖 victim
和 victimSize
:
Put
Put
的实现比较简单,就是将对象放到 local
中,不需要 Get
那种操作其他 P
绑定的 poolLocal
的情况。
// Put 将 x 添加到 Pool 中。
func (p *Pool) Put(x any) {
if x == nil {
return
}
// ...
// 获取 poolLocal
l, _ := p.pin() // 将当前 goroutine 与 P 绑定。获取当前 P 关联的 poolLocal,以及当前 P 的 id。
if l.private == nil { // 优先放入 private
l.private = x
} else { // 如果 private 已经有值了,就放入 shared
l.shared.pushHead(x) // 这部分其他 P 也是可以访问的。
}
runtime_procUnpin() // 解除当前 P 的绑定。
// ...
}
Pool
Put
的流程:
- 如果
Put
的值是nil
,则直接返回。 - 将当前的
goroutine
和P
绑定,禁止被抢占,返回当前P
的本地缓存(poolLocal
)和P
的ID
。 - 如果本地
private
为空,则将x
放入本地private
。 - 如果本地
private
不为空,则将x
放入本地shared
的头部。 - 将当前的
goroutine
和P
解绑,允许被抢占。
这里面的 pin
和 runtime_procUnpin
,我们在前文已经介绍过了,这里就不再赘述了。
New
这里说的 New
是 sync.Pool
中的 New
字段,在我们尝试了所有方法都获取不到对象的时候,
会判断 Pool
的 New
属性是否为 nil
,如果不为 nil
,则会调用 New
方法,创建一个新的对象。
poolChain 和 poolDequeue 源码剖析
poolChain
是一个双向链表,它的每一个节点的元素是 poolDequeue
。
在上文中,对于 Get
和 Put
的细节,还没有具体展开,因为不了解这些细节也不影响我们理解 Pool
的整体流程。
现在是时候来看看 poolChain
和 poolDequeue
这两个结构体的实现了,会结合起来一起看。
poolChain
和 poolDequeue
里面都提供了以下三个方法:
pushHead
:将对象放到队列的头部。popHead
:从队列的头部取出一个对象。popTail
:从队列的尾部取出一个对象。
不一样的是,poolChain
里面的方法会处理链表节点的创建和销毁,而 poolDequeue
里面的方法才是实际从队列存取对象的方法。
pushHead
poolChain
的 pushHead
方法的实现如下:
// 添加一个元素到队列头部
func (c *poolChain) pushHead(val any) {
// 链表头
d := c.head
if d == nil { // 链表为空
// 初始化链表。
// 新建 poolChainElt,然后 c 的 head 和 tail 都指向这个新建的元素。
const initSize = 8 // 初始化大小为 8
// 新建一个节点,类型为 poolChainElt
d = new(poolChainElt)
d.vals = make([]eface, initSize)
// 将 c 的 head 和 tail 都指向这个新建的元素
c.head = d
// 使用原子操作保存 c.tail,因为其他 goroutine 也可能会修改 c.tail。
storePoolChainElt(&c.tail, d)
}
// poolQueue 还没满的时候可以成功 push,pushHead 会返回 true。
// poolQueue 满的时候 pushHead 返回 false。
if d.pushHead(val) {
return
}
// 当前 dequeue 已满。分配一个两倍大小的新 dequeue。
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { // 限制单个 dequeue 的最大大小
newSize = dequeueLimit
}
// 新建 poolChainElt,然后 c 的 head 指向这个新建的元素。
// 同时,d 的 next 指向这个新建的元素。
d2 := &poolChainElt{prev: d} // 因为是加到队列头,所以 prev 指向 d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
poolChain
的 pushHead
方法的流程:
- 如果链表为空,那么就初始化链表。
- 如果链表不为空,那么就尝试
pushHead
。 - 如果
pushHead
失败,那么就分配一个两倍大小的新dequeue
。 - 然后把新
dequeue
放到链表头部。(push
的时候已经锁定了goroutine
在P
上,所以这一步是没有并发问题的)
在 poolChain
的 pushHead
方法中,唯一需要特别注意的是 storePoolChainElt(&c.tail, d)
这一行代码。
这里使用了 storePoolChainElt
方法,而不是直接使用 c.tail = d
。
这是因为 c.tail
是会和其他 goroutine
存在竞争的(其他 goroutine
获取对象的时候可能会修改 tail
),因此不能直接赋值,而是使用了原子操作。
poolChain
的 pushHead
方法的流程图如下:
队列头来说,
prev
实际上指向的是head
的下一个元素,但是又不能叫next
,因为next
被用来表示tail
的下一个元素,所以就叫了prev
。我们需要知道prev
、next
本质上都是指向了下一个元素,就看你是从队列头还是队列尾来查找。
在 poolChain
中,其实实际存储对象的时候并不是在 poolChain
,而是在 poolChain
的每一个节点中的 poolDequeue
中。
所以我们再来看看 poolDequeue
中的 pushHead
实现:
// pushHead 在队列的头部添加 val。
// 如果队列已满,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
//
// 注意:head 指向的是下一个要插入的元素的位置,所以插入的时候,先将 head 指向的位置设置为 val,然后 head++。
func (d *poolDequeue) pushHead(val any) bool {
// 读取 head 和 tail 的值。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 队列满了
// 不能直接 tail == head,因为初始化的时候,head 和 tail 都是 0
return false
}
// 取模意味着,当 head 超过 len(d.vals) 时,会从头开始。也就是一个环。
slot := &d.vals[head&uint32(len(d.vals)-1)] // 获取 head 对应的槽位。
// push 只有当前协程能 push,所以不需要加锁。
// 但是 popTail 可能会在另一个协程中执行,所以需要判断当前的槽位是否被 popTail 释放了。
// 因为 popTail 的操作是先 cas 修改 headTail,然后再获取 slot 的值,最后才将 slot 置 0 的。
// 如果修改了 headTail 之后还没有来得及将 slot 置 0,那么这里就会判断出槽位还没有被释放。
typ := atomic.LoadPointer(&slot.typ) // 获取槽位的类型
if typ != nil { // 槽位不为空
// 队列依然是满的
return false
}
// 如果 typ 已经是 nil,那么这里后续的操作是安全的。
if val == nil { // put 进来的值是 nil,使用 dequeueNil 代替
val = dequeueNil(nil)
}
// 将 val 赋值给槽位
*(*any)(unsafe.Pointer(slot)) = val
// 增加 head。为什么是 1<<dequeueBits 呢?
// 因为 head 是高 32 位,所以要左移 32 位
// 本质上是:head = head + 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
poolDequeue
的 pushHead
方法的流程:
pushHead
的处理流程:
- 判断队列是否已满,如果已满则返回
false
。 - 获取下一个
push
的位置,先判断这个位置是否可用,如果不可用则返回false
。(可能和popTail
冲突,如果popTail
没来得及将其中的值取出来,那么这个槽就还不能使用) - 如果可用,则将值放入这个位置,然后将
head
指针加1
。
在 poolDequeue
中,我们看到有一行代码比较奇怪:atomic.LoadUint64(&d.headTail)
,这是为了可以原子操作存取 head
和 tail
两个值。这样就可以避免锁的使用了。
popHead
poolChain
的 popHead
方法的实现如下:
// popHead 会从链表头部 pop 出一个元素。
// 返回值:
// 1. any:pop 出的元素。
// 2. bool:是否成功 pop 出元素。
func (c *poolChain) popHead() (any, bool) {
d := c.head
// d == nil 的情况:
// 1. 链表为空。
// 2. 链表只有一个元素,且这个元素已经 pop 完了。(被其他协程 pop 了)
for d != nil {
// 这是因为,在我们拿到 d 之后,还没来得及 pop 的话,其他协程可能已经 pop 了。
// 所以需要 for 循环。典型的无锁编程。
//
// poolQueue 还没空的时候可以成功 pop,popHead 会返回 true。
if val, ok := d.popHead(); ok {
return val, ok
}
// 之前的 dequeue 中可能仍有未消费的元素,因此尝试后退。
d = loadPoolChainElt(&d.prev) // 获取下一个 poolChainElt
}
return nil, false
}
prev 虽然从命名上看是前一个,但是实际上是下一个节点。从 head 开始遍历的话,prev 就是下一个节点,从 tail 开始遍历的话,next 就是下一个节点。
poolChain
的 popHead
的处理流程:
- 如果链表为空,那么就返回
false
。 - 如果链表不为空,那么就尝试
popHead
。 - 如果
popHead
失败,那么就尝试从链表下一个 dequeuepopHead
。(循环直到最后一个poolChainElt
)
poolChain
的 popHead
方法的流程图如下:
在 poolChain
中,实际上调用的是 poolDequeue
的 popHead
方法:
// popHead 删除并返回队列头部的元素。
// 如果队列为空,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
func (d *poolDequeue) popHead() (any, bool) {
// slot 用来保存从队列头部取出的值
var slot *eface
for { // 获取不到槽会继续循环,直到获取到槽或者发现队列为空为止。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head { // 队列为空
return nil, false
}
// 先将 head 减 1,然后再获取槽位。
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功获取到槽
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 成功获取到 slot,将 slot 的值取出来
// head - 1 了,说明这个槽是可以被安全使用的,所以不需要加锁。
// 因为 popTail 不会影响到 head,所以不会影响到这里。
// 另外,pushHead 也没有影响,因为在实际使用中,只有一个协程会 pushHead。
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nils
}
*slot = eface{} // 将 slot 置 0
return val, true
}
poolDequeue
的 popHead
的处理流程:
- 判断队列是否为空,如果为空则返回
false
。 - 尝试将
head
指针减1
,如果失败则进行下一轮尝试(自旋,for + 原子操作是无锁编程中很常见的写法)。 - 将
head
指针对应的槽位的值取出来,然后将槽位置为nil
。
popTail
poolChain
的 popTail
方法的实现如下:
// 从队列尾取出一个元素
func (c *poolChain) popTail() (any, bool) {
// 获取链表尾部的 poolChainElt
// 如果链表为空,返回 nil,false
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// 在我们 popTail 之前获取 next 指针很重要。
// 一般来说,d 可能暂时为空,但如果 next 在 pop 之前为非 nil,
// 并且 pop 失败,则 d 永久为空,这是唯一可以安全地将 d 从链中删除的条件。
//
// 解析:next 非 nil,但是 pop 失败:d 肯定是空的了,这个时候我们可以安全地将 d 从链表中删除。
d2 := loadPoolChainElt(&d.next)
// poolQueue 还没空的时候可以成功 pop,popTail 会返回 true。
if val, ok := d.popTail(); ok {
return val, ok
}
// 队列已经空了
if d2 == nil {
// d 是唯一的 dequeue。它现在是空的,但以后可能会有新的元素 push 进来。
return nil, false
}
// 链表的尾部已经被消费完了,所以转到下一个 dequeue。
// 尝试从链表中删除它,这样下一次 pop 就不必再次查看空的 dequeue。
// (本质:移除空的 tail 元素)
//
// 开始处理 d2,d2 是 d 的下一个 dequeue。(开始尝试从 d2 中 pop)
// cas:c.tail 由 d 变为 d2。
// 如果 cas 成功,说明 d2 是最新的 tail,d 可以被移除。
// d2 的 prev 指针设置为 nil,这样 gc 可以回收 d。(d2 没有下一个元素了)
//
// c.tail(d) 指向下一个 poolChainElt
// 同时下一个 poolChainElt 的 prev 指针(d)设置为 nil。
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 我们赢得了竞争。清除 prev 指针,以便垃圾收集器可以收集空的 dequeue,
// 以便 popHead 的时候不做多余的查找操作。
storePoolChainElt(&d2.prev, nil)
}
// d 指向 d2,继续处理下一个 dequeue
d = d2
}
}
poolChain
中 popTail
的处理流程:
- 如果链表为空,那么就返回
false
。 - 如果链表不为空,那么就尝试
popTail
。 - 如果
popTail
失败,那么就尝试从链表上一个 dequeuepopTail
。(循环直到第一个poolChainElt
)
poolChain
的 popTail
方法的流程图如下:
在 popTail
的时候,如果发现 poolChainElt
已经为空了,那么就会从链表中移除它:
poolDequeue
的 popTail
方法的实现如下:
// popHead 和 popTail 都有取值,然后将槽置空的过程,但是它们的实现是不一样的。
// 在 popHead 中,是直接将槽的值设置为 eface{},而在 popTail 中,
// 先将 val 设置为 nil,然后将 typ 通过原子操作设置为 nil。
// 这样在 pushHead 的时候就可以安全操作了,只要先使用原子操作判断 typ 是否为 nil 就可以了。
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
//
// popTail 删除并返回队列尾部的元素。
// 如果队列为空,则返回 false。
// 它可以被任意数量的消费者调用。(如何保证并发安全?原子操作)
func (d *poolDequeue) popTail() (any, bool) {
// slot 用来保存从队列尾取出来的值
var slot *eface
// 获取队列尾部的值
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
// 队列为空,直接返回
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
//
// 确认 head 和 tail(对于我们上面的推测性检查)并增加 tail。
// 如果成功,那么我们就拥有 tail 的插槽。
ptrs2 := d.pack(head, tail+1) // 新的 headTail
// 如果返回 false,说明从 Load 到 CompareAndSwap 期间,有其他 goroutine 修改了 headTail。
// 则需要重新 Load,再次尝试(再次执行 for 循环)。
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// 成功获取到 slot,将 slot 的值取出来
// 问题来了:
// 在这里 cas 成功的时候,这个 slot 实际上可能是还没有释放的,在这个时候,pushHead 其实不能写入到这个 slot 中。
// 因此,我们可以在 pushHead 的代码中看到,会先判断 slot.typ 是否为 nil,如果不为 nil,说明 slot 还没有被释放,那么就直接 return 了。
// 这种情况发生在 poolDequeue 满了的时候。
// We now own slot.
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 这是什么情况?非空,但是值等于 dequeueNil(nil) ?
val = nil
}
// 取出值之后,将 slot 置 0。
// 在 poolDequeue 中,值是允许为 nil 的,但是 pool 的 Put 中判断值为 nil 的时候就直接 return 了。
// 告诉 pushHead 我们已经操作完这个插槽。
// 将插槽归零也很重要,这样我们就不会留下可能使该对象比必要时间更长的引用。
//
// 我们首先写入 val,然后通过原子写入 typ 来发布我们已完成此插槽。
slot.val = nil
atomic.StorePointer(&slot.typ, nil) // 为什么要用原子操作?因为 pushHead 也会读取这个值,所以需要保证读取的是最新的值。
// 此时 pushHead 可以操作这个槽了。
return val, true
}
poolDequeue
的 popTail
的处理流程:
- 判断队列是否为空,如果为空则返回
false
。 - 尝试将
tail
指针加1
,如果失败则进行下一轮尝试(自旋)。 - 将
tail
指针对应的槽位的值取出来,然后将槽位置为nil
。
sync.Pool 设计要点
在 sync.Pool
中,我们可以看到它有许许多多的编程技巧,为了实现一个高性能的 Pool
要做的东西是非常复杂的,
但是对于我们而言,我们只会用到它暴露出来的两个非常简单的接口 Put
、Get
,这其实也算是 Go 语言的一种设计哲学吧,
把复杂留给自己,把简单留给用户。但是,我们还是要知道它的实现原理,这样才能更好的使用它。
接下来,我们就再来总结一下 sync.Pool
高性能的一些设计要点:
noCopy
字段,防止sync.Pool
被复制。sync.Pool
是不能被复制的,否则会导致一些隐晦的错误。local
字段,用于存储与P
关联的一个poolLocal
对象,在多个goroutine
同时操作的时候,可以减少不同goroutine
之间的竞争,只有在本地的poolLocal
中没有找到对象的时候,才会去其他goroutine
关联的队列中去取。poolDequeue
中pushHead
跟popTail
之间会存在head
、tail
指针上的一些竞争,这些竞争问题是通过原子操作来解决的(相比互斥锁效率更高)。使用了原子操作可能就会有失败的时候,这个时候,再次重试就可以了。poolDequeue
中的head
/tail
指针使用一个字段来保存,然后通过原子操作保证head
/tail
的一致性。poolChain
使用链表的方式解决容量问题,并且新增一个元素到链表的时候,容量为上一个元素(poolChain
链表头)的两倍(非常常见的扩容策略)。双向链表,可以接受别的P
的popTail
操作,减少竞争的同时可以充分利用多核。pin
保证P
不会被抢占。如果一个goroutine
在执行Put
或者Get
期间被挂起,有可能下次恢复时,绑定的就不是上次的P
了。那整个过程就会完全乱掉,因为获取到的poolLocal
不是之前那个了。使用pin
可以解决这种并发问题。- 自旋操作,因为原子操作失败的时候可能存在竞争,这个时候再尝试一下就可能成功了。(
for
+ 原子操作是无锁编程中很常见的一种编程模式,在sync.Map
中也有很多类似操作) pad
内存对齐,可以避免伪共享。poolDequeue
中存储数据的结构是一个环形队列,是连续的内存,可以充分利用 CPU 的cache
。在访问poolDequeue
某一项时,其附近的数据项都有可能加载到统一cache line
中,有利于提升性能。同时它是预先分配内存的,因此其中的数据项可不断复用。
总结
最后,总结一下本文内容:
sync.Pool
是一个非常有用的工具,它可以帮助我们减少内存的分配和回收(通过复用对象),提升程序的性能。但是,我们要注意它的使用场景,它适合那些没有状态的对象,同时,我们不能对那些从Pool
中Get
出来的对象做任何假设。- 我们在
Put
或者Get
之前,可能需要对我们操作的对象重置一下,防止对后续的操作造成影响。 Pool
中的对象存储是使用队列的方式,这个队列的实现是一个链表(poolChain
),链表的每一个节点都是一个环形队列(poolDequeue
)。这个队列支持以下三种操作:pushHead
:将一个对象放到队列的头部。popHead
:将队列的头部的对象取出来。popTail
:将队列的尾部的对象取出来。
sync.Pool
的实现中,使用了很多编程技巧,比如noCopy
、pin
、pad
、原子操作等等,这些技巧都是为了实现一个高性能的Pool
而做的一些优化,我们可以学习一下,具体参考上一节。sync.Pool
中,Put
和Get
操作的时候会先将goroutine
与P
绑定,然后再去操作P
关联的poolLocal
,这样可以减少竞争,提升性能。因为每一个P
都有一个关联的poolLocal
,所以多个goroutine
操作的时候,可以充分利用多核。在操作完成后,再解除绑定。- 考虑到
GC
直接清除Pool
中的对象会在GC
后可能会产生性能抖动,所以在GC
的时候,其实并不会马上清除Pool
中的对象,而是将这些对象放到victim
字段中,在Get
的过程中,如果所有的poolLocal
中获取不到对象,则会从victim
中去找。但是再进行GC
的时候,旧的victim
会被清除。也就是Pool
中对象的淘汰会经历两次GC
。