使用的go版本为 go1.21.2
首先我们写一个简单的Pool的使用代码
package main
import "sync"
var bytePool = sync.Pool{
New: func() interface{} {
b := make([]byte, 1024)
return &b
},
}
func main() {
for j := 0; j < 10; j++ {
obj := bytePool.Get().(*[]byte) // 获取一个[]byte
_ = obj
bytePool.Put(obj) // 用完再给放回去
}
}
pool对象池的作用
- 减少内存分配: 通过池,可以减少对内存的频繁分配和释放,提高程序的内存利用率。
- 避免垃圾回收压力: 对象池中的对象在被使用后不会立即被释放,而是放回到池中等待复用。这有助于减轻垃圾回收的压力,因为对象可以在多次使用后才被真正释放。
- 提高性能: 复用对象可以避免不必要的对象创建和销毁开销,从而提高程序的性能。
从demo上看好像没啥卵用,我们来进行一些压力测试
package main
import (
"sync"
"testing"
)
var bytePool = sync.Pool{
New: func() interface{} {
b := make([]byte, 1024)
return &b
},
}
func BenchmarkByteMake(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10000; j++ {
obj := make([]byte, 1024)
_ = obj
}
}
}
func BenchmarkBytePool(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10000; j++ {
obj := bytePool.Get().(*[]byte) // 获取一个1024长度的[]byte
_ = obj
bytePool.Put(obj) // 用完再给放回去
}
}
}
看一下压测效果
可以看到执行效率高了好多倍
项目中没实际用到过,不过我们可以翻一下开源项目中是怎么用的
redis-v9
Pool结构体
比较复杂有点套娃的意思
//代码位于 GOROOT/src/sync/pool.go L:49
type Pool struct {
//防止Pool被复制, 君子协议,编译可以通过,某些编辑器会报waring
//静态检测 go vet会出错
//有兴趣可以看一下这里 https://github.com/golang/go/issues/8005#issuecomment-190753527
noCopy noCopy
local unsafe.Pointer // 本地池,对应类型[P]poolLocal P指的是 GMP中的P.ID字段
localSize uintptr // 本地池大小
victim unsafe.Pointer // 上一个周期的本地池
victimSize uintptr // 上一个周期的本地池大小
New func() any // 创建对象的方法,这个需要我们自己实现
}
type poolLocal struct { //本地池
poolLocalInternal
// 用128取模,确保结构体占据整数个缓存行,从而防止伪共享.
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{} // 本地P的私有字段
shared poolChain // 双端链表, 任何P都可以进行popTail
}
//代码位于 GOROOT/src/sync/poolqueue.go L:194
type poolChain struct { // 双向队列
//头部
head *poolChainElt
//尾部
tail *poolChainElt
}
type poolChainElt struct { //环状队列
poolDequeue
// next 由生产者原子性地写入,并由消费者原子性地读取, 从非nil转换为nil
// prev 由消费者原子性地写入,并由生产者原子性地读取, 从非nil转换为nil
next, prev *poolChainElt
}
//代码位于 GOROOT/src/sync/poolqueue.go L:19
type poolDequeue struct {
//一个字段两个含义,高32位为头,低32位为尾部
headTail uint64
//环形缓存
//vals[i].typ 为nil 说明该槽位为空
vals []eface
}
type eface struct { //类型与值
typ, val unsafe.Pointer
}
Get函数
//代码位于 GOROOT/src/sync/pool.go L:127
func (p *Pool) Get() any {
if race.Enabled { // 使用竞态检查
race.Disable() //竞态检查 禁用
}
l, pid := p.pin() //获取当前P的ID 与 poolLocal 详细见下方
x := l.private //看看私有属性是否存在对象,如果存在就可以直接返回
l.private = nil
if x == nil { //
//优先从链表的头部获取,
x, _ = l.shared.popHead()
if x == nil {// 慢读取路径
x = p.getSlow(pid)
}
}
runtime_procUnpin() //取消 P 的禁止抢占
if race.Enabled { // 使用竞态检查
race.Enable() //竞态检查 启用
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil { //调度new方法重新生成一个对象
x = p.New()
}
return x
}
pin函数
//代码位于 GOROOT/src/sync/pool.go L:127
func (p *Pool) pin() (*poolLocal, int) {
//获取P的id
pid := runtime_procPin()
// 原子操作获取本地池大小
// 本地池
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { //如果当前P.id 没有在local中越界,直接去获取值
return indexLocal(l, pid), pid
}
return p.pinSlow() //慢获取
}
func (p *Pool) pinSlow() (*poolLocal, int) {
//取消P的禁止抢占
runtime_procUnpin()
allPoolsMu.Lock() //加锁
defer allPoolsMu.Unlock()
pid := runtime_procPin() //获取P的id
//获取本地池的大小与本地池
s := p.localSize
l := p.local
if uintptr(pid) < s { //如果当前P.id 没有在local中越界,直接去获取值
return indexLocal(l, pid), pid
}
if p.local == nil { //如果local为空,将他加入到allPools中
allPools = append(allPools, p)
}
// GOMAXPROCS在GC之间发送了变化,重新分配p.load与p.localSize
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
getSlow函数
//代码位于 GOROOT/src/sync/pool.go L:156
func (p *Pool) getSlow(pid int) any {
// 原子获取本地池大小
// 本地池
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从别的P poolLocal尾部获取local
// 这个循环的方式有点东西(pid+i+1)%int(size),优先从非pid的下标获取,最后一次是pid
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 原子获取上一周期本地池大小
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size { //如果pid大于size 说明让回收掉了
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {//看看私有属性是否存在对象,如果存在就可以直接返回
l.private = nil
return x
}
// 尝试从别的P poolLocal尾部获取local
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
//将victimSize设置为0
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
Put函数
//代码位于 GOROOT/src/sync/pool.go L:95
func (p *Pool) Put(x any) {
if x == nil { //如果写入的x为nil之间返回
return
}
if race.Enabled { //使用竞态检查
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable() // 竞态检查 禁用
}
l, _ := p.pin() // 获取PoolLocal
if l.private == nil { // 如果私有属性没有赋值
l.private = x
} else { //将x写入头
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled { //使用竞态检查
race.Enable() //竞态检查 启用
}
}
pushHead函数解读
//代码位于 GOROOT/src/sync/poolqueue.go L:228
func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil { //如果head为空,将head初始化为8长度的eface数组
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d) //将新创建的节点,当做尾节点
}
if d.pushHead(val) { //对象入队
return
}
// 走到这里说明满了。可扩容为2倍
newSize := len(d.vals) * 2
// 扩容大小 (1 << 32) / 4 超出将这个设置为(1 << 32) / 4
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
//新建poolChainElt将prev指向d
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2 //将新创建的节点,当做头节点
storePoolChainElt(&d.next, d2) // 将老的节点指向,新节点
d2.pushHead(val) //对象入队
}
延迟处理下标小技巧
package main
import (
"fmt"
)
func main() {
pid := 1
size := 20
for i := 0; i < int(size); i++ {
if i == pid {
continue
}
fmt.Println(i)
}
// 优化版本 pid会在最后一个打印处理
for i := 0; i < size; i++ {
index := (pid + i + 1) % size
// 前面处理完以后直接return
fmt.Println(index)
}
}
总结
我们从上面的源码分析了解Pool的数据结构、Get、Put这些基本操作原理,在项目中我们可以使用比特位来减少内存的占用,从源码分析我们得知Go官方设计不允许进行Pool复制(君子协议), 还学到了一个延迟处理下标的小技巧。