1、哈希表
哈希表是一个很常见的数据结构,用来存储无序的 key/value
对,给定的 key
可以在 O(1)
时间复杂度内查找、更新或删除对应的 value
。
设计一个好的哈希表,需要着重关注两个关键点:哈希函数、冲突处理。
1.1 哈希函数
理想情况下,哈希函数既要有优异的性能,又要让结果在映射范围内均匀的分布,使得增删改查的时间复杂度为 𝑂(1)
。
不过实际情况是,key
的数量往往会大于映射的范围,这势必产生哈希冲突,以及更差的读写性能。最坏的结果可能会是所有操作的时间复杂度可能会达到 𝑂(𝑛)
,比如结果都映射到一个 bucket
(桶)上,最后变成线性查找了。
因此好的哈希函数对于哈希表来说至关重要。
1.2 冲突处理
当多个不同的 key
通过哈希函数计算得到同一个结果,这就是哈希冲突(或叫哈希碰撞)。解决的哈希冲突的办法,常见的有开放地址法和拉链法。
1.2.1 开放地址法
开放地址法是一种线性探测的方法,当发生哈希冲突时,它会按照某种规律(如线性探测、二次探测或双散列等)在哈希表中寻找下一个可用的空位来存储冲突的元素。
具体来说,当计算出的哈希值对应的槽位已被占用时,可以按照探测序列依次检查相邻的槽位,直到找到一个空槽位为止。
如上图,底层数组有四个元素,key3
经过计算
index := hash("key3") % len(len)
得到 key1
一样的索引,这时发现索引所在的槽位不为空,那么就向右侧依次探测不为空的槽位,直到找到空槽位那么就把 key/value
写进去。
当需要查找某个 key
对应的 value
时,会从索引的位置开始线性探测数组
- 步骤一:若对比索引所在位置的
key
相等,停止查找,取出value
- 步骤二:若对比索引所在位置的
key
不相等,则依次向右探测- 重复步骤一,找到则取出
value
- 若最终没有找到则返回空
- 重复步骤一,找到则取出
删除 key
和查找的方法同理。
最后,来看看开放地址发的优缺点:
- 优点:
- 不需要额外的存储空间来存储链表节点。
- 缺点:
- 容易产生聚集现象,即连续的槽位被占用,导致查找效率降低。
- 当哈希表越来越满时,查找、插入和删除操作的效率会下降。
1.2.2 拉链法
拉链法是在每个哈希表的槽位上链接一个链表。当发生哈希冲突时,将具有相同哈希值的元素添加到对应槽位的链表中。查找、插入和删除操作都在相应的链表中进行。
拉链法一般会使用数组加上链表,是哈希表最常见的实现方法,像 Golang
、Java
、PHP
等编程言都用拉链法实现哈希表,另外 Redis
中字典也是使用拉链法实现哈希表的,详见拙作 Redis 之字典。
同样的,也来看看拉链法的优缺点:
- 优点:
- 解决了聚集问题,提高了查找、插入和删除操作的效率。
- 动态地分配链表空间,适应于元素数量不确定的场景。
- 缺点:
- 需要额外的存储空间来存储链表节点。
- 如果链表过长,查找、插入和删除操作的效率仍然会受到影响。
1.2.3 装载因子
装载因子 = 元素数量 ÷ 桶数量
哈希表中的装载因子是衡量哈希表使用程度的一个重要参数,它表示哈希表中已存储的元素数量与哈希表总槽位数量之间的比例。装载因子越大,哈希的读写性能就越差。
拿开放地址法来比方,底层数组长度为 8
,目前已写入 6
个,那么装载因子为 6/8
也就是 0.75
,快接近 1
(为1
时说明已经满了),这时再写入会导致碰撞的次数变多,哈希表的性能变得越差。
对比拉链法来说,也是一样。因为哈希表读写操作主要耗时在计算哈希
、定位桶
和遍历链表
这三个过程。
对于装载因子越大,哈希的读写性能就越差
,解决的办法是动态的增大哈希表的长度,当装载因子超过某个阈值时增加哈希表的长度,自动扩容。大致步骤是:
- 创建新的哈希表,容量为原来的两倍
- 遍历原有哈希表的元素,并重新计算索引,写入新的哈希表中
- 遍历过程中,对于写操作只在新的哈希表中进行,查询和删除在新旧两个哈希表中分别进行
- 遍历完成后,释放原来的哈希表
2. Golang 的 map
注意: Golang 版本为 1.19.12
2.1 数据结构
先来看看 map
相关的常量
const (
// 一个桶里面最多可以装的键值对的个数,8对。
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits // 1 << 3 ,2的三次方,也就是 8
// 触发扩容操作的最大装载因子的临界值
loadFactor = 6.5
// 为了保持内联,键 和 值 的最大长度都是128字节,如果超过了128个字节,就存储它的指针
maxKeySize = 128
maxValueSize = 128
// 数据偏移应该是 bmap 的整数倍,但是需要正确的对齐。
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// tophash 的一些值
empty = 0 // 没有键值对
evacuatedEmpty = 1 // 没有键值对,并且桶内的键值被迁移走了。
evacuatedX = 2 // 键值对有效,并且已经迁移了一个表的前半段
evacuatedY = 3 // 键值对有效,并且已经迁移了一个表的后半段
minTopHash = 4 // 最小的 tophash
// 标记
iterator = 1 // 当前桶的迭代子
oldIterator = 2 // 旧桶的迭代子
hashWriting = 4 // 一个goroutine正在写入map
sameSizeGrow = 8 // 当前字典增长到新字典并且保持相同的大小
// 迭代子检查桶ID的哨兵
noCheck = 1<<(8*sys.PtrSize) - 1
)
通过上面字段可以得出,Golang
的 map
的每个桶中可以存 8
个 key/value
对。最大装载因子为 6.5
,这个是官方统计出来的最优值,源码中(src/runtime/map.go
)有介绍这个值是如何得来的,这里就不做过多介绍。若初始化有个 4
个桶,那么在存储 4*6.5
也就是在 26
个键值对后,就需要进行扩容了,不然查询效率就会降低,当然了,map
里元素越多,感觉越明显。
type hmap struct {
count int // map 的长度
flags uint8 // 标识,比如正在写数据
B uint8 // log以2为底,桶个数的对数 (总共能存 6.5 * 2^B 个元素)
noverflow uint16 // 近似溢出桶的个数,当B小于16时是准确值,大于等于16时是大概的值。
hash0 uint32 // 哈希种子
buckets unsafe.Pointer // 有 2^B 个桶的数组. count==0 的时候,这个数组为 nil.
oldbuckets unsafe.Pointer // 旧的桶数组一半的元素
nevacuate uintptr // 扩容增长过程中的计数器
extra *mapextra // 可选字段
}
type mapextra struct {
// 如果 key 和 value 都不包含指针,并且可以被 inline(<=128 字节)
// 使用 extra 来存储 overflow bucket,这样可以避免 GC 扫描整个 map
// 然而 bmap.overflow 也是个指针。这时候我们只能把这些 overflow 的指针
// 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了
// overflow 包含的是 hmap.buckets 的 overflow 的 bucket
// oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow 的 bucket
overflow *[]*bmap
oldoverflow *[]*bmap
// 指向空闲的 overflow bucket 的指针
nextOverflow *bmap
}
整个 hmap
结构占 48
个字节内存
fmt.Println(unsafe.Sizeof(hmap{})) // 48
其中
count
,代表map
当前元素的个数,通过len(m)
获取B
, 是log
以2
为底,桶个数的对数,比如 B 为3
,那么此时map
中bucket
的个数为2^3
那就是8
hash0
,哈希种子,可以增加哈希分布的随机性,从而有效地防止哈希冲突攻击buckets
指向具体bmap
数组数组的指针mapextra
主要存储溢出桶的,通过nextOverflow
进行相连
再来看看具体存放数据桶的数据结构
// bucket 本体
type bmap struct {
topbits [8]uint8 // 用于存储每个键的哈希值的高位,用于快速比较和查找
keys [8]keytype // 存储键的数组
values [8]valuetype// 存储值的数组
pad uintptr // 用于内存对齐的填充字段
overflow uintptr // 指向溢出桶的指针,如果有的话
}
前面介绍过,桶的元素个数为 8
,所以 bmap
中 前三个字段数组长度都是 8
topbits
主要定位key
具体在哪个桶overflow
指向溢出桶的指针,用于存储溢出元素,形成一个溢出链表。当一个桶中的元素超过8
个时,多余的元素会存储在溢出桶中。keys/values
这里放一起介绍,和其他哈希表实现不同的是,Golang
中分别把key
和value
单独聚合存储,主要是为了节省内存- 比如
map[int64]int8
,key
占8
个字节,而value
仅占1
个字节,内存对齐的要则要16
个字节,但若是分开存储,8
个 value 才占8
个字节,这样就可以起到节省内存的作用 pad
主要作用为了内存对齐进行填充字段用的,比如上面哈希表中只有两个value
,那么pad
就可以填充6
个字节
- 比如
2.2 初始化
// make(map[k]v, hint)
// 如果编译器认为 map 和第一个 bucket 可以直接创建在栈上,h 和 bucket 可能都是非空
// h != nil,可以直接在 h 内创建 map
// 如果 h.buckets != nil,其指向的 bucket 可以作为第一个 bucket 来使用
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 计算 hint 个元素所需的内存大小,并检查是否会溢出或超过允许的最大分配内存 maxAlloc
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// 如果发生溢出或内存大小超过限制,则将 hint 设置为 0
if overflow || mem > maxAlloc {
hint = 0
}
// 初始化 hmap
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()
// 按照提供的元素个数,计算初始桶的数量
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 分配初始的 hash table
// 如果 B == 0,buckets 字段会由 mapassign 来 lazily 分配
// 因为如果 hint 很大的话,对这部分内存归零会花比较长时间
if h.B != 0 {
var nextOverflow *bmap
// 初始化 bucket 和 nextOverflow
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
这里先看下 B
的计算逻辑,这里为了更好认清计算逻辑,把相关代码抽取出来,可以直接运行的。
package _0240626
import (
"fmt"
"testing"
)
const (
// Maximum number of key/elem pairs a bucket can hold.
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactorDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2
PtrSize = 4 << (^uintptr(0) >> 63)
)
func TestB(t *testing.T) {
B := uint8(0)
hint := 8
for overLoadFactor(hint, B) {
B++
}
fmt.Println(B, 1<<B)
}
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {
// Masking the shift amount allows overflow checks to be elided.
return uintptr(1) << (b & (PtrSize*8 - 1))
}
还是得强调一点 B
是以 2
为底桶个数的对数,比如 B
为 0
,说只需一个桶。另外还需记住每个桶中存放的 key/value
对的个数是 8
。
好,接下来主要看看 overLoadFactor
判断逻辑
count > bucketCnt
,其中bucketCnt
为上面提到的一个桶存8
个元素uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
,这里主要为了计算提供的桶能否存的下给的元素数量- 装载因子为
6.5
,比如B
为0
,那么桶的个数就是1<<0
就是1
个桶,可以最多存2*6.5
,也就是13
个元素 - 少于
13
元素,就不用累加B
,否则就需要累加B
,并重复计算,直到提供的桶可以存的下count
个元素
- 装载因子为
OK
,确定了桶的数量,接下来就可以初始化桶了
// makeBucketArray 为 map buckets 初始化底层数组
// 1<<b 是需要分配的最小数量 buckets
// dirtyalloc 要么是 nil,要么就是之前由 makeBucketArray 使用同样的 t 和 b 参数
// 分配的数组
// 如果 dirtyalloc 是 nil,那么就会分配一个新数组,否则会清空掉原来的 dirtyalloc
// 然后重用这部分内存作为新的底层数组
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// base = 1 << b 也就是桶的数量
base := bucketShift(b)
nbuckets := base
// 对于比较小的 b 来说,不太可能有 overflow buckets
// 这里省掉一些计算消耗
if b >= 4 {
// Add on the estimated number of overflow buckets
// required to insert the median number of elements
// used with this value of b.
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
buckets = newarray(t.bucket, int(nbuckets))
} else {
// dirtyalloc 之前就是用上面的 newarray(t.bucket, int(nbuckets))
// 生成的,所以是非空
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.kind&kindNoPointers == 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
// 我们预分配了一些 overflow buckets
// 为了让追踪这些 overflow buckets 的成本最低
// 我们这里约定,如果预分配的 overflow bucket 的 overflow 指针是 nil
// 那么通过增加指针可以找到更多可用的溢出桶.
// 我们需要一个安全的非空指针来作为 last overflow bucket,直接用 buckets 就行了
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
这里需着重看下 b
的值,以 4
为界线
- 小于
4
,不创建溢出桶 ,比如3
,那么就初始化1<<3
也就是8
个正常桶(hmap
结构中buckets
指向的bmap
数组) - 大于或等于
4
,创建溢出捅(此时正常桶的个数为1<<4
即16
个)- 溢出桶数量规则,比如
b
为4
,在不考虑内存对齐的情况下,1<<(4-4)
也就是1
- 溢出桶和正常桶在内存上是连续的
- 最后一个溢出桶的溢出指针设置为桶数组的起始地址,可以明确地表示这是链表的末端,从而避免无限循环
- 溢出桶数量规则,比如
仔细阅读 makemap
函数的源码实现,发现在有溢出捅的情况下,在初始化 mapextra
后,仅仅用 nextOverflow
指向第一个空闲的溢出桶,但是 overflow
字段却没有提及。
if h.B != 0 {
var nextOverflow *bmap
// 初始化 bucket 和 nextOverflow
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
这里就稍微向后跳下进度,因为在 mapextra
中 overflow
是在写入 key/value
时生成的。
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
if h.extra != nil && h.extra.nextOverflow != nil {
// We have preallocated overflow buckets available.
// See makeBucketArray for more details.
ovf = h.extra.nextOverflow
if ovf.overflow(t) == nil {
// We're not at the end of the preallocated overflow buckets. Bump the pointer.
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
// This is the last preallocated overflow bucket.
// Reset the overflow pointer on this bucket,
// which was set to a non-nil sentinel value.
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
ovf = (*bmap)(newobject(t.bucket))
}
h.incrnoverflow()
if t.bucket.ptrdata == 0 {
h.createOverflow()
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
b.setoverflow(t, ovf)
return ovf
}
也就是在 makemap
初始化的时候,若有溢出捅,正常桶和溢出桶是一块分配的,同时nextOverflow
指向第一个溢出捅,同时把最后一个溢出桶的 overflow
指针指向第一个正常桶。接着在写入的时候,会在正常桶都满的情况下,会去往溢出桶里写:
- 若有空闲的溢出捅(
h.extra.nextOverflow
)- 那么获取此空闲的溢出桶
ovf = h.extra.nextOverflow
- 同时还需判断此溢出捅的
overflow
是否为nil
- 为
nil
, 说明还存在空闲的溢出捅,那么h.extra.nextOverflow
指向此空闲的溢出捅 - 不为
nil
,说明是最后一个溢出桶(上面提到过在分配溢出捅的时候会把最后一个溢出桶的oveflow
指向第一个正常桶),同时把当前溢出桶的overflow
和h.extra.nextOverflow
都置为nil
- 为
- 那么获取此空闲的溢出桶
- 若没有空闲的溢出捅
- 那么就生成一个新的桶
ovf = (*bmap)(newobject(t.bucket))
- 那么就生成一个新的桶
- 更新溢出桶的数量(当
B
小于16
时是准确值,大于等于16
时是大概的值) - 若
map
中的key/value
类型都不是指针(map[int64]int8
),那么就会把溢出捅的指针放入extra
的overflow
切片数组中,这么做主要是方便GC
,提升性能- 当对一个长度
100万
个key/value
对的map
中进行GC
时,若按照传统方法,会依次扫描bmap
中的overflow
,这会相当耗时 - 而一旦对这些
overflow
进行管理,那么当GC
扫描时,只需把extra.overflow
置黑即可,这可是相当大的性能提升(有兴趣的读者可以写个map[int64]int8
和map[string]int8
测试用例,先往里写入100万
个key/value
对,手动开启GC
,然后统计下耗时)
- 当对一个长度
- 最后把待写入的桶的
overflow
指针指向ovf
这个溢出捅
ok,接下来看看 hmap
在不同类型下的内存布局吧
2.2.1 没有分配预分配溢出桶
当
B
小于4
,是不会预分配溢出捅
2.2.2 有分配预分配溢出桶
当
B
大于或等于4
,则会预分配溢出捅
2.3 写入
// t 是 map 的类型信息,h 是 map 的实际数据结构,key 是要插入或更新的键。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
// 如果 map 是 nil,则抛出错误。
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
// 如果启用了竞态检测,获取调用者的程序计数器 (PC)。
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapassign)
// 检测写操作是否会引起竞态。
racewritepc(unsafe.Pointer(h), callerpc, pc)
// 检测对 key 的读操作。
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
// MemorySanitizer 检测对 key 的读取。
msanread(key, t.key.size)
}
if asanenabled {
// AddressSanitizer 检测对 key 的读取。
asanread(key, t.key.size)
}
if h.flags&hashWriting != 0 {
// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。
fatal("concurrent map writes")
}
// 计算键的哈希值。
hash := t.hasher(key, uintptr(h.hash0))
// 在调用 t.hasher 之后设置 hashWriting 标志,
// 因为 t.hasher 可能会 panic,如果发生 panic,我们并没有进行写操作。
h.flags ^= hashWriting
if h.buckets == nil {
// 如果 buckets 是 nil,则初始化一个新的 bucket。
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
again:
// 计算哈希值对应的 bucket 索引。
bucket := hash & bucketMask(h.B)
if h.growing() {
// 如果 map 正在扩容,则执行扩容任务。
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
var inserti *uint8 // 指向要插入位置的 tophash
var insertk unsafe.Pointer // 指向要插入的 key 位置
var elem unsafe.Pointer // 指向要插入的元素位置
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
// 检查每个槽位的 tophash。
if b.tophash[i] != top {
// 如果槽位为空,记录第一个空槽的位置。
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
// 如果遇到 emptyRest,说明后续的槽位都是空的,退出循环。
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 计算当前槽位的 key 的地址。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
// 如果是间接存储,获取实际的 key 地址。
k = *((*unsafe.Pointer)(k))
}
// 比较键是否相等。
if !t.key.equal(key, k) {
continue
}
// 找到已存在的键,更新值。
if t.needkeyupdate() {
// 如果需要更新键,则复制新的键到当前位置。
typedmemmove(t.key, k, key)
}
// 获取元素的地址。
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
// 检查溢出桶。
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 没有找到键,分配新的插槽并添加条目。
// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // 扩容会使所有内容失效,因此需要重试。
}
if inserti == nil {
// 当前桶及所有溢出桶已满,分配一个新的溢出桶。
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// 在插入位置存储新键和值。
if t.indirectkey() {
// 如果键是间接存储,分配新内存并存储指针。
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
// 如果值是间接存储,分配新内存并存储指针。
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
// 复制键到插入位置。
typedmemmove(t.key, insertk, key)
// 设置插入位置的 tophash。
*inserti = top
// 更新元素计数。
h.count++
done:
if h.flags&hashWriting == 0 {
// 确保 hashWriting 标志已设置。
fatal("concurrent map writes")
}
// 清除 hashWriting 标志。
h.flags &^= hashWriting
if t.indirectelem() {
// 如果值是间接存储,返回实际的值指针。
elem = *((*unsafe.Pointer)(elem))
}
return elem
}
阅读完 map
写入的源码后,终于知道 concurrent map writes
这个报错的出处了。因为 map
是非线程安全的,也就是一个 map
是不能同时有写入、读取的。
if h.flags&hashWriting != 0 {
// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。
fatal("concurrent map writes")
}
同时,也补充了在 makemap
初始化中,B
为 0
也就是 1<<0
即 1
个桶时的生成之处了。
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
接下来着重介绍下如何定位桶以及对应得 key
的写入位置。
hash := t.hasher(key, uintptr(h.hash0))
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
- 先计算
key
对应的哈希值,这里会把初始化的hash0
加进去,使其更加分布均匀 - 定位具体的桶,也就是比较低
1<<B - 1
位- 比如
B
为3
,也就是1<<3
即8
个桶 - 那么
hash & (1<<3 -1 )
也就是获取hash
低七位是哪个值,就能定位到哪个桶了
- 比如
- 定位具体的
key
- 先求
hash
的高八位的值 - 再遍历对应
tophash
数组的第几个位置,进而知道key
位于哪个槽了(若槽中有值,直接更新,否则就写入)- 若是
8
个tophash
都没有,说明满了,那么就遍历overflow
- 如果当前桶及所有溢出桶已满,那么就分配一个新的溢出桶
- 若是
- 先求
在写入的时候,还得考虑到是否扩容
// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // 扩容会使所有内容失效,因此需要重试。
}
- 溢出桶的数量超过桶数量
- 装载因子超过
6.5
具体的扩容过程,留在本篇博文的最后来讲。
2.4 查询
// mapaccess2 用于在 map 中查找键,返回值和是否找到的布尔值。
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
// 如果启用了竞态检测且 map 非空,获取调用者 PC 和当前函数 PC。
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapaccess2)
// 检测 map 的读操作是否会引起竞态。
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
// 如果启用了 MemorySanitizer 且 map 非空,读取键的内存。
if msanenabled && h != nil {
msanread(key, t.key.size)
}
// 如果启用了 AddressSanitizer 且 map 非空,读取键的内存。
if asanenabled && h != nil {
asanread(key, t.key.size)
}
// 如果 map 是 nil 或者元素个数为 0。
if h == nil || h.count == 0 {
// 如果哈希函数可能 panic,则计算哈希以检查。
if t.hashMightPanic() {
t.hasher(key, 0) // 见 issue 23734
}
// 返回零值指针和 false,表示未找到。
return unsafe.Pointer(&zeroVal[0]), false
}
// 检查是否有并发读写。
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
// 计算键的哈希值。
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
// 找到对应的 bucket。
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
// 如果存在旧的 buckets,检查是否需要缩小掩码。
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果旧的 bucket 未被迁移,则使用旧的。
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
// 遍历 bucket 和溢出桶。
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
// 检查 tophash 是否匹配。
if b.tophash[i] != top {
// 如果遇到 emptyRest,说明后续无有效数据,退出循环。
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 获取当前键的地址。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 比较键是否相等。
if t.key.equal(key, k) {
// 获取元素的地址。
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
// 返回元素指针和 true。
return e, true
}
}
}
// 未找到键,返回零值指针和 false。
return unsafe.Pointer(&zeroVal[0]), false
}
发现和写入的逻辑差不多
- 计算
key
的哈希值hash
- 依据
hash
低B
位定位所在桶 - 依据
hash
高8
位定位在桶中的存储位置 - 若当前桶未找到则查找对应的溢出捅
- 若对应位置有数据,则对比此位置上
key
,以确定是否是要查找的数据 - 若
map
处于扩容阶段,那么优先从oldbuckets
查找 - 若未找到键,则返回零值指针和
false
。
等等,查询的时候还有个知识点没介绍到,那就是遍历 map
无序性,也就是说遍历 map
时顺序打印出来的 key/value
是和写入的顺序不一致的,而且每次打印的都不一样,顺序是随机的
func TestRandMap(t *testing.T) {
m := map[int]int{
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
}
for i := 0; i < len(m); i++ {
for k, v := range m {
fmt.Printf("key=%d,value=%d ", k, v)
}
fmt.Println()
}
}
输出如下
key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4
key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5
那 Golang
是如何实现的呢?
type hiter struct {
key unsafe.Pointer // 当前键的指针,必须在第一个位置。写入 nil 表示迭代结束。
elem unsafe.Pointer // 当前值的指针,必须在第二个位置。
t *maptype // `map` 类型的描述信息。
h *hmap // `map` 的内部结构指针。
buckets unsafe.Pointer // 迭代器初始化时的 bucket 指针。
bptr *bmap // 当前 bucket 的指针。
overflow *[]*bmap // 用于保持 hmap.buckets 的溢出桶存活。
oldoverflow *[]*bmap // 用于保持 hmap.oldbuckets 的溢出桶存活。
startBucket uintptr // 迭代开始时的桶索引。
offset uint8 // 桶内的起始偏移量(随机化)。
wrapped bool // 是否已经从桶数组的末尾绕回到开头。
B uint8 // 当前 `map` 的 bucket 数量的对数(`B` 表示 `2^B` 个桶)。
i uint8 // 当前桶内的偏移量(键值对索引)。
bucket uintptr // 当前正在迭代的桶索引。
checkBucket uintptr // 用于检查的桶索引。
}
每次在遍历 map
时,会维护一个迭代器 hiter
,着重关注下 startBucket
和 offset
这两个字段
- 遍历
map
时从哪个桶开始遍历 - 遍历桶时,从哪个位置开始遍历
接下来在看看如何初始化这个字段
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// 如果启用了数据竞争检测,并且哈希表不为空
if raceenabled && h != nil {
// 获取调用者的程序计数器
callerpc := getcallerpc()
// 记录读取操作
racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))
}
// 设置迭代器的类型信息
it.t = t
// 如果哈希表为空或计数为零,则直接返回
if h == nil || h.count == 0 {
return
}
// 验证迭代器的大小是否正确
if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
throw("hash_iter size incorrect") // 参见 cmd/compile/internal/reflectdata/reflect.go
}
// 设置迭代器的哈希表指针
it.h = h
// 获取当前哈希表的桶状态
it.B = h.B
it.buckets = h.buckets
if t.bucket.ptrdata == 0 {
// 如果桶没有指针数据,则分配当前溢出桶数组并记住当前和旧的指针
// 这确保即使在迭代过程中哈希表扩展和/或增加溢出桶时,
// 也能保留所有相关的溢出桶。
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// 决定从哪里开始迭代
var r uintptr
if h.B > 31-bucketCntBits {
// 如果桶的数量大于31减去桶数位数,则使用64位随机数
r = uintptr(fastrand64())
} else {
// 否则使用32位随机数
r = uintptr(fastrand())
}
// 随机选择一个起始桶
it.startBucket = r & bucketMask(h.B)
// 随机选择桶内的起始偏移量
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// 初始化迭代器状态
it.bucket = it.startBucket
// 标记有一个迭代器存在
// 可以与另一个 mapiterinit() 并发运行。
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
// 原子操作设置标志位,表明存在迭代器
atomic.Or8(&h.flags, iterator|oldIterator)
}
// 预先执行一次迭代,初始化迭代器的 key 和 elem 指针
mapiternext(it)
}
重点是这处
// 随机选择一个起始桶
it.startBucket = r & bucketMask(h.B)
// 随机选择桶内的起始偏移量
it.offset = uint8(r >> h.B & (bucketCnt - 1))
也就是每次遍历的时候,起始桶的选择是随机的,同时桶内的起始偏移量也是随机的,这样就实现了无序性。
2.5 删除
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// 如果启用了数据竞争检测,并且哈希表不为空
if raceenabled && h != nil {
// 获取调用者的程序计数器
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapdelete)
// 记录写操作
racewritepc(unsafe.Pointer(h), callerpc, pc)
// 记录对 key 的读操作
raceReadObjectPC(t.key, key, callerpc, pc)
}
// 如果启用了 MemorySanitizer(MSAN)(MSAN)检测,并且哈希表不为空
if msanenabled && h != nil {
// 记录对 key 的读操作
msanread(key, t.key.size)
}
// 如果启用了地址清毒(ASAN)检测,并且哈希表不为空
if asanenabled && h != nil {
// 记录对 key 的读操作
asanread(key, t.key.size)
}
// 如果哈希表为空或计数为零,则直接返回
if h == nil || h.count == 0 {
if t.hashMightPanic() {
// 计算哈希值,处理可能的 panic
t.hasher(key, 0) // 参见 issue 23734
}
return
}
// 如果哈希表标志位中有 hashWriting,则表示并发写操作,抛出异常
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
// 计算 key 的哈希值
hash := t.hasher(key, uintptr(h.hash0))
// 设置 hashWriting 标志位,在调用 t.hasher 之后设置,因为 t.hasher 可能会 panic
h.flags ^= hashWriting
// 计算 key 对应的桶索引
bucket := hash & bucketMask(h.B)
// 如果正在扩展哈希表,执行扩展操作
if h.growing() {
growWork(t, h, bucket)
}
// 获取对应桶的指针
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
// 计算 tophash 值
top := tophash(hash)
search:
// 遍历桶及其溢出桶
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
// 如果 tophash 不匹配,继续下一个
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
// 获取 key 的指针
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
// 如果 key 不匹配,继续下一个
if !t.key.equal(key, k2) {
continue
}
// 清空 key 的指针
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
// 清空 elem 的指针
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
// 标记为已删除
b.tophash[i] = emptyOne
// 如果桶末尾连续出现 emptyOne 状态,将其改为 emptyRest 状态
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break // 初始桶的开始,迭代结束
}
// 找到前一个桶,从其最后一个条目继续
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
// 减少哈希表计数
h.count--
// 如果哈希表为空,重置哈希种子
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}
// 检查 hashWriting 标志位,确保其被清除
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
// 清除 hashWriting 标志位
h.flags &^= hashWriting
}
删除 key
的操作,在定位桶和 key
的流程上和写入及查询差不多。
找到对应的 key
以后,如果此位置上存储的是指针,那么就把指针置为 nil。如果是值就清空它所在的内存。还要清理 tophash
里面的值最后把 map
的 key
总个数计数器 count
减1 。
若是处在扩容过程中,那么删除操作会在扩容以后在新的 bmap
里面删除。
2.6 扩容
前面在 2.3
写入章节里介绍过,扩容满足以下两个条件的任意一个即可
- 溢出桶的数量超过桶数量
- 装载因子超过
6.5
针对这两种情况,实现的扩容机制是不一样的
2.6.1 等量扩容
若是溢出桶的数量超过桶数量,那么就会创建新桶保存数据,垃圾回收会清理老的溢出桶并释放内存。
什么情况会触发等量扩容(sameSizeGrow
)呢?
如果在写入后又删除了大量的数据,这样始终不满足条件 2
,且在之后的写入过程中,由于之前的桶已满,那么只能新增溢出桶并写入。
2.6.2 增量扩容
触发扩容的时机是在写入的时候
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
也就是当前 hmap
没有在扩容,并且满足上面条件任意一个,接下来看看 hasGrow
逻辑
func hashGrow(t *maptype, h *hmap) {
// 如果哈希表的负载因子已经达到临界值,则增加桶的数量。
// 否则,如果有太多的溢出桶,则保持相同数量的桶,并等量扩展。
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
// 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。
bigger = 0
h.flags |= sameSizeGrow // 设置同大小增长标志
}
oldbuckets := h.buckets // 保存当前的桶数组
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组
// 设置新的哈希表标志
flags := h.flags &^ (iterator | oldIterator) // 清除迭代器标志
if h.flags&iterator != 0 {
flags |= oldIterator // 如果当前存在迭代器,则设置旧迭代器标志
}
// 提交扩展(与GC原子操作)
h.B += bigger // 增加桶数量的指数
h.flags = flags // 更新标志
h.oldbuckets = oldbuckets // 保存旧的桶数组
h.buckets = newbuckets // 设置新的桶数组
h.nevacuate = 0 // 重置迁移计数
h.noverflow = 0 // 重置溢出桶计数
// 如果有溢出桶,将当前溢出桶提升到旧桶中
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil") // 如果旧溢出桶不为空,抛出异常
}
h.extra.oldoverflow = h.extra.overflow // 提升当前溢出桶到旧溢出桶
h.extra.overflow = nil // 清空当前溢出桶
}
// 如果有下一个溢出桶,设置到额外信息中
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra) // 如果额外信息为空,则创建一个新的
}
h.extra.nextOverflow = nextOverflow // 设置下一个溢出桶
}
// 实际的数据复制由 growWork() 和 evacuate() 逐步完成
}
函数的开头也交代了何时增量扩容、何时等量扩容
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
// 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。
bigger = 0
h.flags |= sameSizeGrow // 设置同大小增长标志
}
增量扩容是指新的正常桶比原来扩大一倍
bigger := uint8(1)
h.B += bigger // 增加桶数量的指数
比如原来 B
为 3
共计 8
个桶,那么增量扩容后正常桶的数量为 1<<(3+1)
即 16
。
同时,h.oldbuckets
保存旧的 h.buckets
桶数组,h.buckets
保存新的桶数组
oldbuckets := h.buckets // 保存当前的桶数组
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组
h.oldbuckets = oldbuckets // 保存旧的桶数组
h.buckets = newbuckets // 设置新的桶数组
在此函数的最后,也阐明了实际的数据迁移是由 growWork()
和 evacuate()
逐步完成。
2.6.3 扩容机制
接下来看看 growWork()
。
在前面介绍写入
和删除
的时候,都会检查当前是否在扩容
if h.growing() {
// 如果 map 正在扩容,则执行扩容任务。
growWork(t, h, bucket)
}
// growing reports whether h is growing. The growth may be to the same size or bigger.
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
也就是通过 h.oldbuckets
是否不为 nil
来判断当前 map
是否在扩容,若 map
正在扩容,则执行扩容。
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 确保我们移动的 oldbucket 对应的是我们马上就要用到的那一个
evacuate(t, h, bucket&h.oldbucketmask())
// 如果还在 growing 状态,再多移动一个 oldbucket
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
这里可以看到每次迁移两个桶,一个是当前的桶,另外一个是 hmap
里指向的 nevacuate
的桶,这是增量迁移。这个和 Redis
的 rehash
惰性迁移一致,只是 Golang
中在写入和删除的时候迁移。这样就既不影响写入、删除,又进行了扩容,也算是一种取舍吧。
真正的迁移是在 evacuate()
函数中
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 获取旧桶的指针
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 计算新桶的数量
newbit := h.noldbuckets()
// 如果旧桶还没有被迁移
if !evacuated(b)) {
// TODO: 如果没有迭代器使用旧桶,可以复用溢出桶而不是使用新桶。
// xy 包含 x 和 y(低位和高位)的迁移目标。
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
if !h.sameSizeGrow() {
// 仅在增长时计算 y 指针。
// 否则 GC 可能会看到错误的指针。
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 处理旧桶中的每个溢出桶
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() {
// 计算哈希以决定是将此键/元素发送到桶 x 还是桶 y。
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// 如果 key != key (NaNs),那么哈希可能会完全不同于旧哈希。
// 而且,它不是可重现的。在存在迭代器的情况下,重现性是必须的,
// 因为我们的迁移决策必须与迭代器做出的决策相匹配。
// 幸运的是,我们可以自由地将这些键任意发送。
// 我们让 tophash 的低位驱动迁移决策。
// 我们为下一级重新计算一个新的随机 tophash,
// 以便这些键在多次扩展后均匀分布在所有桶中。
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // 迁移目标
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // 复制指针
} else {
typedmemmove(t.key, dst.k, k) // 复制元素
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
// 这些更新可能会将这些指针越过键或元素数组的末尾。
// 不过我们在桶的末尾有溢出指针来防止指向桶的末尾之外。
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// 取消链接溢出桶并清除键/元素以帮助 GC。
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// 保留 b.tophash,因为迁移状态在此处维护。
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// 更新迁移标记
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
这里需着重关注下这个 evacDst
数据结构
// evacDst 结构体表示在调整大小过程中,数据迁移的目的地
// 它包含了当前目标桶、索引以及指向键/元素存储的指针信息
type evacDst struct {
b *bmap // 当前目标桶(指向 bmap 结构的指针)
i int // 键/元素在 b 中的索引(在桶中存储键/元素的位置)
k unsafe.Pointer // 指向当前键存储的指针(指向键内存位置的原始指针)
e unsafe.Pointer // 指向当前元素存储的指针(指向元素内存位置的原始指针)
}
这个 evacDst
的主要作用是 rehash
和数据迁移。
// xy 包含 x 和 y(低位和高位)的迁移目标。
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
if !h.sameSizeGrow() {
// 仅在增长时计算 y 指针。
// 否则 GC 可能会看到错误的指针。
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
这里的 x
、y
分别代表扩容后新桶的前半段与后半段,若是等量扩容的话,那么只有 x
。
因为在等量扩容下,旧桶与新桶之间是一对一的关系,因此这里不过做就做过多介绍。
这里说下增量扩容,需要重新计算 key
的 hash
哈希值,然后确认在低位桶还是在高位桶。
newbit := h.noldbuckets()
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
这里我把相关代码给摘了出来,先计算 newbit
,也就是水位值。为何这么说呢,看看这个值是如何计算的。
func (h *hmap) noldbuckets() uintptr {
oldB := h.B
if !h.sameSizeGrow() {
oldB--
}
return bucketShift(oldB)
}
// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {
return uintptr(1) << (b & (goarch.PtrSize*8 - 1))
}
比如增量扩容后 B
为 4
,newbit
就是 1<<(4-1)
也就是 8
,二进制为 1000
。我们知道二进制与 &
操作是相同位的值都为 1
那么结果为 1
,否则为 0
,而 Golang
是采用位运算来获取桶的编号的。
先获取 key
的哈希值,然后拿低 B
位再和 newbit
做与运算,一旦结果为 0
说明桶是小于水位值的,也就是低位桶,否则就是高位桶。
举个示例:旧桶 B
为 2
,也就是 4
个桶,其中在第二个桶中有 2
个 key
的哈希值低 3
位分别为 010
、110
。增量扩容后, B
变成了 3
,所以 010
、110
便分别落入 2
和 6
号桶。不过还得告诉程序扩容后落在地位桶 x
还是高位桶 y
。这里把上面的算法套用一下就知道了,010&100
结果为 0
落入低位桶 x
,110&100
结果为 1
落入高位桶 y
。
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // 迁移目标
useY
的作用就是这里了,目标桶依据 useY
是落入的地位桶还是高位桶。之前介绍过 xy
数组的第一位为低位桶(新桶的前半段),第二位为高位桶(新桶的后半段)。
const(
evacuatedX = 2 // key/elem is valid. Entry has been evacuated to first half of larger table.
evacuatedY = 3 // same as above, but evacuated to second half of larger table.
)
这里就是标记旧桶中 b.tophash[i]
已被迁移,至于迁移到低位桶还是高位桶就依据 evacuatedX + useY
来确定了
- evacuatedX,低位桶,也就是新桶的前半段
- evacuatedY,高位桶,也就是新桶的后半段,当
userY
为1
时,evacuatedX+userY
就是高位桶
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
如果迁移的桶都满了,那么新建个溢出捅,重置索引,并把 dst.k
和 dst.e
指向新的键和值存储位置。
dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // 复制指针
} else {
typedmemmove(t.key, dst.k, k) // 复制元素
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
前面准备工作都做好了,这里就是迁移数据了。
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
这里主要是更新哈希表的迁移进度标记 (nevacuate
),以便逐步完成哈希表的增长 rehash
过程。
前面在介绍 growWork()
函数提到过,扩容时每次迁移两个值,一个是当前操作的值,另一个就是按照迁移进度正常迁移的值。这里就是对比下,是否刚好这两个值是一样的,是的话则更新下进度标记。
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
// 将当前迁移标记推进一个桶
h.nevacuate++
// 实验表明1024是一个过大的值,但作为保障以确保O(1)行为
stop := h.nevacuate + 1024
// 确保停止标记不超过新桶数量
if stop > newbit {
stop = newbit
}
// 遍历并推进迁移标记,直到到达停止标记或遇到尚未迁移的桶
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 如果迁移标记到达了新桶数量,表示所有旧桶都已被迁移
if h.nevacuate == newbit { // newbit 表示旧桶的数量
// 迁移完成,释放旧的主桶数组
h.oldbuckets = nil
// 同时可以丢弃旧的溢出桶,如果它们没有被迭代器引用
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 清除 sameSizeGrow 标志,表示增长过程已完成
h.flags &^= sameSizeGrow
}
}
这个函数的作用是增加哈希的 nevacuate
计数器,并在所有的旧桶都被迁移后清空 bmap
的 oldbuckets
和 oldoverflow
。