深度理解golangmap

基础知识

map的概念

map 的直观意思是映射,是<key, value> 对组成的抽象结构,且 key 不会重复,所有的操作也都是key/value的。map的底层实现一般有两种:

  • 搜索树(search tree),天然有序,平均/最差的写入/查找复杂度是O(logN)
  • 哈希表(hash table),无序存储,平均的写入/查找复杂度是O(1),最差是O(N)

哈希函数

哈希函数(常被称为散列函数)是可以用于将任意大小的数据映射到固定大小值的函数,常见的包括MD5、SHA系列等。

一个设计优秀的哈希函数应该包含以下特性:

  • 均匀性:一个好的哈希函数应该在其输出范围内尽可能均匀地映射,也就是说,应以大致相同的概率生成输出范围内的每个哈希值。
  • 效率高:哈希效率要高,即使很长的输入参数也能快速计算出哈希值。
  • 可确定性:哈希过程必须是确定性的,这意味着对于给定的输入值,它必须始终生成相同的哈希值。
  • 雪崩效应:微小的输入值变化也会让输出值发生巨大的变化。
  • 不可逆:从哈希函数的输出值不可反向推导出原始的数据。

哈希冲突

哈希函数是将任意大小的数据映射到固定大小值的函数。那么,我们可以预见到,即使哈希函数设计得足够优秀,几乎每个输入值都能映射为不同的哈希值。但是,当输入数据足够大,大到能超过固定大小值的组合能表达的最大数量数,冲突将不可避免!(参见抽屉原理)

image.png

抽屉原理:桌上有十个苹果,要把这十个苹果放到九个抽屉里,无论怎样放,我们会发现至少会有一个抽屉里面放不少于两个苹果。抽屉原理有时也被称为鸽巢原理。

哈希桶

哈希桶(也称为槽,类似于抽屉原理中的一个抽屉)可以先简单理解为一个哈希值,所有的哈希值组成了哈希空间。

装载因子

装载因子是表示哈希表中元素的填满程度。它的计算公式:装载因子=填入哈希表中的元素个数/哈希表的长度。装载因子越大,填入的元素越多,空间利用率就越高,但发生哈希冲突的几率就变大。反之,装载因子越小,填入的元素越少,冲突发生的几率减小,但空间浪费也会变得更多,而且还会提高扩容操作的次数。装载因子也是决定哈希表是否进行扩容的关键指标,在java的HashMap的中,其默认装载因子为0.75;Python的dict默认装载因子为2/3。

解决哈希冲突的办法

链地址法

将hash地址相同的key用链表形式串到一起。在查找时也要去遍历这个链来定位真正的key。而这也是go语言中map的冲突解决办法。

开放寻址法

也称再散列,意思就是如果出现了hash冲突,那么再次进行hash,直到无冲突为止,放入对应存储。

开放寻址法中对性能影响最大的是装载因子,它是数组中元素的数量与数组大小的比值。随着装载因子的增加,线性探测的平均用时就会逐渐增加,这会影响哈希表的读写性能。当装载率超过 70% 之后,哈希表的性能就会急剧下降,而一旦装载率达到 100%,整个哈希表就会完全失效,这时查找和插入任意元素的时间复杂度都是O(n)的,这时需要遍历数组中的全部元素,所以在实现哈希表时一定要关注装载因子的变化。

两种解决方案比较

对于开放寻址法而言,它只有数组一种数据结构就可完成存储,继承了数组的优点,对CPU缓存友好,易于序列化操作。但是它对内存的利用率不如链地址法,且发生冲突时代价更高。当数据量明确、装载因子小,适合采用开放寻址法。

链表节点可以在需要时再创建,不必像开放寻址法那样事先申请好足够内存,因此链地址法对于内存的利用率会比开方寻址法高。链地址法对装载因子的容忍度会更高,并且适合存储大对象、大数据量的哈希表。而且相较于开放寻址法,它更加灵活,支持更多的优化策略,比如可采用红黑树代替链表。但是链地址法需要额外的空间来存储指针。

值得一提的是,在Python中dict在发生哈希冲突时采用的开放寻址法,java的HashMap采用的是链地址法,golang的map采用的是链地址法。

map的基本操作

1、 声明

var m map[T]T

注意:此时的map为nil, 对nil的map可以安全的进行读取/删除,但对nil的map进行写入操作会引发panic

package main
func main() {
    var a map[int64]bool
    delete(a, 10)
    _ = a
    a[10] = false
}


go run main.go
panic: assignment to entry in nil map//具体原因详见源码解析部分
复制代码

2、初始化

var m make(map[T]T, len)
var m map[T]T{...}

3、新增/更新

m[key] = value

4、查询

v := m[key]
v, ok := m[key]

对于第一种方式,未set的ket-value将得到零值;对于第二种方式,未set的ket-value,其ok=false。

5、删除

delete(m, key)

可以对相同的key进行多次的删除操作,而不会报错

与其他 map/hashtable 的对比

1、Java map 底层是hash表+链表(拉链法)
其特色是当链表长度超过阈值(8)会转为红黑树,避免O(n)的遍历
2、C++的map是红黑树实现
其unordered_map是hashtable实现的
3、php5中的数组也是基础的hashtable
在7版本的时候,其优化成内存连续的hashtable
4、redis的hash就是一个普通hashtable,其特色在于遍历时采用了高位迭代的顺序,以避免scan时出现重复元素。但是,缩容的话依旧有可能出现重复元素。

深入源码

Map实现

同python与java一样,Go语言中的map是也基于哈希表实现的,它解决哈希冲突的方式是链地址法,即通过使用数组+链表的数据结构来表达map。

map数据结构

map中的数据被存放于一个数组中的,数组的元素是桶(bucket),每个桶至多包含8个键值对数据。哈希值低位(low-order bits)用于选择桶,哈希值高位(high-order bits)用于在一个独立的桶中区别出键.

  • hmap结构体
// https://github.com/golang/go/blob/release-branch.go1.15/src/runtime/map.go
type hmap struct {
    count     int // 代表哈希表中的元素个数,在插入时会自增,在删除时会自减,调用len(map)时,返回的就是该字段值。
    flags     uint8 // 状态标志,下文常量中会解释四种状态位含义。
    B         uint8  // buckets(桶)的对数log2(哈希表元素数量最大可达到装载因子*2^B)
    noverflow uint16 // 溢出桶的大概数量。
    hash0     uint32 // 哈希种子。

    buckets    unsafe.Pointer // 指向buckets数组的指针,数组大小为2^B,如果元素个数为0,它为nil。
    oldbuckets unsafe.Pointer // 如果发生扩容,oldbuckets是指向老的buckets数组的指针,老的buckets数组大小是新的buckets的1/2。非扩容状态下,它为nil。
    nevacuate  uintptr        // 表示扩容进度,小于此地址的buckets代表已搬迁完成。

    extra *mapextra // 这个字段是为了优化GC扫描而设计的。当key和value均不包含指针,并且都可以inline时使用。extra是指向mapextra类型的指针。
复制代码

image.png

  • mapextra的结构体
// mapextra holds fields that are not present on all maps.
type mapextra struct {
    // 如果 key 和 value 都不包含指针,并且可以被 inline(<=128 字节)
    // 就使用 hmap的extra字段 来存储 overflow buckets,这样可以避免 GC 扫描整个 map
    // 然而 bmap.overflow 也是个指针。这时候我们只能把这些 overflow 的指针
    // 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了
    // overflow 包含的是 hmap.buckets 的 overflow 的 buckets
    // oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow 的 bucket
    overflow    *[]*bmap
    oldoverflow *[]*bmap

    // 指向空闲的 overflow bucket 的指针
    nextOverflow *bmap
}
复制代码
  • bmap结构体
// A bucket for a Go map.
type bmap struct {
    // tophash包含此桶中每个键的哈希值最高字节(高8位)信息(也就是前面所述的high-order bits)。
    // 如果tophash[0] < minTopHash,tophash[0]则代表桶的搬迁(evacuation)状态。
    tophash [bucketCnt]uint8
}
复制代码

image.png

image.png

image.png

推断出bmap结构字段的代码和位置如下:

func bmap(t *types.Type) *types.Type {
  // 略...

  field := make([]*types.Field, 0, 5)

	field = append(field, makefield("topbits", arr))

  // 略...
  
	keys := makefield("keys", arr)
	field = append(field, keys)

  // 略...
  
	elems := makefield("elems", arr)
	field = append(field, elems)

  // 略...
  
	if int(elemtype.Align) > Widthptr || int(keytype.Align) > Widthptr {
		field = append(field, makefield("pad", types.Types[TUINTPTR]))
	}

  // 略...
  
	overflow := makefield("overflow", otyp)
	field = append(field, overflow)

  // 略...
}
复制代码
  • 重要常量标志

const (
    // 一个桶中最多能装载的键值对(key-value)的个数为8
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

  // 触发扩容的装载因子为13/2=6.5
    loadFactorNum = 13
    loadFactorDen = 2

    // 键和值超过128个字节,就会被转换为指针
    maxKeySize  = 128
    maxElemSize = 128

    // 数据偏移量应该是bmap结构体的大小,它需要正确地对齐。
  // 对于amd64p32而言,这意味着:即使指针是32位的,也是64位对齐。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)


  // 每个桶(如果有溢出,则包含它的overflow的链接桶)在搬迁完成状态(evacuated* states)下,要么会包含它所有的键值对,要么一个都不包含(但不包括调用evacuate()方法阶段,该方法调用只会在对map发起write时发生,在该阶段其他goroutine是无法查看该map的)。简单的说,桶里的数据要么一起搬走,要么一个都还未搬。
  // tophash除了放置正常的高8位hash值,还会存储一些特殊状态值(标志该cell的搬迁状态)。正常的tophash值,最小应该是5,以下列出的就是一些特殊状态值。
    emptyRest      = 0 // 表示cell为空,并且比它高索引位的cell或者overflows中的cell都是空的。(初始化bucket时,就是该状态)
    emptyOne       = 1 // 空的cell,cell已经被搬迁到新的bucket
    evacuatedX     = 2 // 键值对已经搬迁完毕,key在新buckets数组的前半部分
    evacuatedY     = 3 // 键值对已经搬迁完毕,key在新buckets数组的后半部分
    evacuatedEmpty = 4 // cell为空,整个bucket已经搬迁完毕
    minTopHash     = 5 // tophash的最小正常值

    // flags
    iterator     = 1 // 可能有迭代器在使用buckets
    oldIterator  = 2 // 可能有迭代器在使用oldbuckets
    hashWriting  = 4 // 有协程正在向map写人key
    sameSizeGrow = 8 // 等量扩容

    // 用于迭代器检查的bucket ID
    noCheck = 1<<(8*sys.PtrSize) - 1
)
复制代码

内存模型

image.png

bmap内存布局

bmap中比较特殊的是kv的存储,bmap将8个key存在一起,将8个value存在一起,是key/key/....value/value...这种存储,相对于将key/value/key/value这种存储设计,将key和value分开存储能使bmap内存布局更加紧密,只需要在最后增加padding对齐即可。

可以计算下map[int64]int8 这种类型的map在上述两种存储设计中每个bmap分别占用的空间大小。
如果将kv存在一起,总共需要的每对kv后需要填充7字节的padding,总共的padding为7*8 = 56字节
但是如果将kv分开存储,8个key是对齐的,8个value也是对齐的,不需要额外的padding填充。

创建map

map初始化有以下两种方式

make(map[k]v)
// 指定初始化map大小为hint
make(map[k]v, hint)
复制代码

对于不指定初始化大小,和初始化值hint<=8(bucketCnt)时,go会调用makemap_small函数(源码位置src/runtime/map.go),并直接从堆上进行分配。

func makemap_small() *hmap {
    h := new(hmap)
    h.hash0 = fastrand()
    return h
}
复制代码

当hint>8时,则调用makemap函数

// 如果编译器认为map和第一个bucket可以直接创建在栈上,h和bucket可能都是非空
// 如果h != nil,那么map可以直接在h中创建
// 如果h.buckets != nil,那么h指向的bucket可以作为map的第一个bucket使用
func makemap(t *maptype, hint int, h *hmap) *hmap {
  // math.MulUintptr返回hint与t.bucket.size的乘积,并判断该乘积是否溢出。
    mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// maxAlloc的值,根据平台系统的差异而不同,具体计算方式参照src/runtime/malloc.go
    if overflow || mem > maxAlloc {
        hint = 0
    }

// initialize Hmap
    if h == nil {
        h = new(hmap)
    }
  // 通过fastrand得到哈希种子
    h.hash0 = fastrand()

    // 根据输入的元素个数hint,找到能装下这些元素的B值
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    // 分配初始哈希表
  // 如果B为0,那么buckets字段后续会在mapassign方法中lazily分配
    if h.B != 0 {
        var nextOverflow *bmap
    // makeBucketArray创建一个map的底层保存buckets的数组,它最少会分配h.B^2的大小。
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
    h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }

    return h
}
复制代码

分配buckets数组的makeBucketArray函数

// makeBucket为map创建用于保存buckets的数组。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := bucketShift(b)
    nbuckets := base
  // 对于小的b值(小于4),即桶的数量小于16时,使用溢出桶的可能性很小。对于此情况,就避免计算开销。
    if b >= 4 {
    // 当桶的数量大于等于16个时,正常情况下就会额外创建2^(b-4)个溢出桶
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

 // 这里,dirtyalloc分两种情况。如果它为nil,则会分配一个新的底层数组。如果它不为nil,则它指向的是曾经分配过的底层数组,该底层数组是由之前同样的t和b参数通过makeBucketArray分配的,如果数组不为空,需要把该数组之前的数据清空并复用。
    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.ptrdata != 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
}

  // 即b大于等于4的情况下,会预分配一些溢出桶。
  // 为了把跟踪这些溢出桶的开销降至最低,使用了以下约定:
  // 如果预分配的溢出桶的overflow指针为nil,那么可以通过指针碰撞(bumping the pointer)获得更多可用桶。
  // (关于指针碰撞:假设内存是绝对规整的,所有用过的内存都放在一边,空闲的内存放在另一边,中间放着一个指针作为分界点的指示器,那所分配内存就仅仅是把那个指针向空闲空间那边挪动一段与对象大小相等的距离,这种分配方式称为“指针碰撞”)
  // 对于最后一个溢出桶,需要一个安全的非nil指针指向它。
    if base != nbuckets {
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}
复制代码

根据上述代码,我们能确定在正常情况下,正常桶和溢出桶在内存中的存储空间是连续的,只是被 hmap 中的不同字段引用而已。

map操作

假定key经过哈希计算后得到64bit位的哈希值。如果B=5,buckets数组的长度,即桶的数量是32(2的5次方)。

例如,现要置一key于map中,该key经过哈希后,得到的哈希值如下:
10010111 000011110110110010001111001010100010010110010001010 01010

前面我们知道,==哈希值低位(low-order bits)用于选择桶,在查找时可以先进行tophash的对比,如果一致再进行key的对比,如果不一致则直接跳过,这样的好处是可以减少一些比较的开销;哈希值高位(high-order bits)用于在一个独立的桶中区别出键==。当B等于5时,那么我们选择的哈希值低位也是5位,即01010,它的十进制值为10,代表10号桶。再用哈希值的高8位,找到此key在桶中的位置。==最开始桶中还没有key,那么新加入的key和value就会被放入第一个key空位和value空位。==

注意:对于高低位的选择,该操作的实质是取余,但是取余开销很大,在实际代码实现中采用的是位操作。以下是tophash的实现代码:

func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    return top
}
复制代码

当两个不同的key落在了同一个桶中,这时就发生了哈希冲突。go的解决方式是链地址法(为了更好理解,假设非扩容且该key是第一次添加的情况):在桶中按照顺序寻到第一个空位并记录下来,后续在该桶和它的溢出桶中均未发现存在的该key,将key置于第一个空位;否则,去该桶的溢出桶中寻找空位,如果没有溢出桶,则添加溢出桶,并将其置溢出桶的第一个空位(因为是第一次添加,所以不描述已存在该key的情况)。

image.png

上图中的B值为5,所以桶的数量为32。通过哈希函数计算出待插入key的哈希值,低5位哈希00110,对应于6号桶;高8位10010111,十进制为151,由于桶中前6个cell已经有正常哈希值填充了(遍历),所以将151对应的高位哈希值放置于第7位cell,对应将key和value分别置于相应的第七个空位。

如果是查找key,那么我们会根据高位哈希值去桶中的每个cell中找,若在桶中没找到,并且overflow不为nil,那么继续去溢出桶中寻找,直至找到,如果所有的cell都找过了,还未找到,则返回key类型的默认值(例如key是int类型,则返回0)。

读取

读取有两种形式,即返回值有两种形式,但是Go语言没有函数重载的概念,这种动态返回值很显然是go提供的语法糖,在编译时会根据用户使用的返回值的个数,转换为runtime的对应函数。

方式为:

    // 形式一
    v := m[k]
    // 形式二
    v, ok := m[k]
复制代码

源码位置依然位与runtime/map.go中,对应的函数分别是

  • runtime.mapaccess1,对应的是一个返回值的形式
  • runtime.mapaccess2,对应的是两个返回值的形式
  • runtime.mapaccessk,对应的是同时返回kv的形式,在遍历map时会用到

形式一的代码实现,就是上述的mapaccess1方法。此外,在源码中还有个mapaccess2方法,它的函数签名如下。

func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {}
复制代码

与mapaccess1相比,mapaccess2多了一个bool类型的返回值,它代表的是是否在map中找到了对应的key。因为和mapaccess1基本一致,所以详细代码就不再贴出。

同时,源码中还有mapaccessK方法,它的函数签名如下。

func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) {}
复制代码

与mapaccess1相比,mapaccessK同时返回了key和value,其代码逻辑也一致。

这3个函数的逻辑基本是一致的,下面我们以runtime.mapccess1为例来分析一下map

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 如果开启了竞态检测 -race
    if raceenabled && h != nil {
        callerpc := getcallerpc()
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 如果开启了memory sanitizer -msan
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
  // 如果map为空或者元素个数为0,返回零值
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
  // 注意,这里是按位与操作
  // 当h.flags对应的值为hashWriting(代表有其他goroutine正在往map中写key)时,那么位计算的结果不为0,因此抛出以下错误。
  // 这也表明,go的map是非并发安全的
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
  // 不同类型的key,会使用不同的hash算法,可详见src/runtime/alg.go中typehash函数中的逻辑
    hash := t.hasher(key, uintptr(h.hash0))
    m := bucketMask(h.B)
  // 按位与操作,找到对应的bucket
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
  // 如果oldbuckets不为空,那么证明map发生了扩容
  // 如果有扩容发生,老的buckets中的数据可能还未搬迁至新的buckets里
  // 所以需要先在老的buckets中找
    if c := h.oldbuckets; c != nil {
        if !h.sameSizeGrow() {
            m >>= 1
        }
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
    // 如果在oldbuckets中tophash[0]的值,为evacuatedX、evacuatedY,evacuatedEmpty其中之一
    // 则evacuated()返回为true,代表搬迁完成。
    // 因此,只有当搬迁未完成时,才会从此oldbucket中遍历
        if !evacuated(oldb) {
            b = oldb
        }
    }
  // 取出当前key值的tophash值
    top := tophash(hash)
  // 以下是查找的核心逻辑
  // 双重循环遍历:外层循环是从桶到溢出桶遍历;内层是桶中的cell遍历
  // 跳出循环的条件有三种:第一种是已经找到key值;第二种是当前桶再无溢出桶;
  // 第三种是当前桶中有cell位的tophash值是emptyRest,这个值在前面解释过,它代表此时的桶后面的cell还未利用,所以无需再继续遍历。
bucketloop:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
      // 判断tophash值是否相等
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
      }
      // 因为在bucket中key是用连续的存储空间存储的,因此可以通过bucket地址+数据偏移量(bmap结构体的大小)+ keysize的大小,得到k的地址
      // 同理,value的地址也是相似的计算方法,只是再要加上8个keysize的内存地址
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 判断key是否相等
            if t.key.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
  // 所有的bucket都未找到,则返回零值
    return unsafe.Pointer(&zeroVal[0])
}
复制代码

以下是mapaccess1的查找过程图解

image.png

写入

源码如下

可以证明为什么空map可以读取和删除,而不能写入

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 如果h是空指针,赋值会引起panic
  // 例如以下语句
  // var m map[string]int
    // m["k"] = 1
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
  // 如果开启了竞态检测 -race
    if raceenabled {
        callerpc := getcallerpc()
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 如果开启了memory sanitizer -msan
    if msanenabled {
        msanread(key, t.key.size)
    }
  // 有其他goroutine正在往map中写key,会抛出以下错误
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
  // 通过key和哈希种子,算出对应哈希值
    hash := t.hasher(key, uintptr(h.hash0))

  // 将flags的值与hashWriting做按位或运算
  // 因为在当前goroutine可能还未完成key的写入,再次调用t.hasher会发生panic。
    h.flags ^= hashWriting

    if h.buckets == nil {
        h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
  // bucketMask返回值是2的B次方减1
  // 因此,通过hash值与bucketMask返回值做按位与操作,返回的在buckets数组中的第几号桶
    bucket := hash & bucketMask(h.B)
  // 如果map正在搬迁(即h.oldbuckets != nil)中,则先进行搬迁工作。
    if h.growing() {
        growWork(t, h, bucket)
    }
  // 计算出上面求出的第几号bucket的内存位置
  // post = start + bucketNumber * bucketsize
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    top := tophash(hash)

    var inserti *uint8
    var insertk unsafe.Pointer
    var elem unsafe.Pointer
bucketloop:
    for {
    // 遍历桶中的8个cell
        for i := uintptr(0); i < bucketCnt; i++ {
      // 这里分两种情况,第一种情况是cell位的tophash值和当前tophash值不相等
      // 在 b.tophash[i] != top 的情况下
      // 理论上有可能会是一个空槽位
      // 一般情况下 map 的槽位分布是这样的,e 表示 empty:
      // [h0][h1][h2][h3][h4][e][e][e]
      // 但在执行过 delete 操作时,可能会变成这样:
      // [h0][h1][e][e][h5][e][e][e]
      // 所以如果再插入的话,会尽量往前面的位置插
      // [h0][h1][e][e][h5][e][e][e]
      //          ^
      //          ^
      //       这个位置
      // 所以在循环的时候还要顺便把前面的空位置先记下来
      // 因为有可能在后面会找到相等的key,也可能找不到相等的key
            if b.tophash[i] != top {
        // 如果cell位为空,那么就可以在对应位置进行插入
                if isEmpty(b.tophash[i]) && inserti == nil {
                    inserti = &b.tophash[i]
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                }
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
      // 第二种情况是cell位的tophash值和当前的tophash值相等
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 注意,即使当前cell位的tophash值相等,不一定它对应的key也是相等的,所以还要做一个key值判断
            if !t.key.equal(key, k) {
                continue
            }
            // 如果已经有该key了,就更新它
            if t.needkeyupdate() {
                typedmemmove(t.key, k, key)
            }
      // 这里获取到了要插入key对应的value的内存地址
      // pos = start + dataOffset + 8*keysize + i*elemsize
            elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
      // 如果顺利到这,就直接跳到done的结束逻辑中去
            goto done
        }
    // 如果桶中的8个cell遍历完,还未找到对应的空cell或覆盖cell,那么就进入它的溢出桶中去遍历
        ovf := b.overflow(t)
    // 如果连溢出桶中都没有找到合适的cell,跳出循环。
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 在已有的桶和溢出桶中都未找到合适的cell供key写入,那么有可能会触发以下两种情况
  // 情况一:
  // 判断当前map的装载因子是否达到设定的6.5阈值,或者当前map的溢出桶数量是否过多。如果存在这两种情况之一,则进行扩容操作。
  // hashGrow()实际并未完成扩容,对哈希表数据的搬迁(复制)操作是通过growWork()来完成的。
  // 重新跳入again逻辑,在进行完growWork()操作后,再次遍历新的桶。
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }

  // 情况二:
// 在不满足情况一的条件下,会为当前桶再新建溢出桶,并将tophash,key插入到新建溢出桶的对应内存的0号位置
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        elem = add(insertk, bucketCnt*uintptr(t.keysize))
    }

  // 在插入位置存入新的key和value
    if t.indirectkey() {
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectelem() {
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(elem) = vmem
    }
    typedmemmove(t.key, insertk, key)
    *inserti = top
  // map中的key数量+1
    h.count++

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectelem() {
        elem = *((*unsafe.Pointer)(elem))
    }
    return elem
}
复制代码

删除

删除逻辑中,比较特殊的就是:对于空位置会置为emptyOne标示位;对于后面都是空位置会置为emptyRest标示位。这个标示位也是用于在查找的时候进行continue和break。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { 
   if h.flags&hashWriting != 0 { // 删除key也属于写操作,禁止并发 
      throw("concurrent map writes") 

   } 
  h.flags ^= hashWriting 
  if h.growing() { // 触发扩容 

      growWork(t, h, bucket) 
   } 
   hash := t.hasher(key, uintptr(h.hash0)) 
   bucket := hash & bucketMask(h.B) 
   b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) 
   bOrig := b 
   top := tophash(hash) 
search: 
   for ; b != nil; b = b.overflow(t) { 
      for i := uintptr(0); i < bucketCnt; i++ { 
         if b.tophash[i] != top { 
            if b.tophash[i] == emptyRest { 
               break search 
            } 
            continue 
         } 
         k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) 
         k2 := k 

         if t.indirectkey() { 

            k2 = *((*unsafe.Pointer)(k2)) 
         } 
         if !t.key.equal(key, k2) { 
            continue 
         } 
         // Only clear key if there are pointers in it.
         if t.indirectkey() { 
            *(*unsafe.Pointer)(k) = nil 
         } else if t.key.ptrdata != 0 { 
            memclrHasPointers(k, t.key.size) 
         } 
         e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) 
         if t.indirectelem() { 
            *(*unsafe.Pointer)(e) = nil 
         } else if t.elem.ptrdata != 0 { 
            memclrHasPointers(e, t.elem.size) 
         } else { 
            memclrNoHeapPointers(e, t.elem.size) 
         } 
         b.tophash[i] = emptyOne
         // If the bucket now ends in a bunch of emptyOne states, 
         // change those to emptyRest states.
         // It would be nice to make this a separate function, but
         // for loops are not currently inlineable.
         // 如果发现当前元素的下一个元素的tophash值不是emptyRest,则直接退出
         if i == bucketCnt-1 { 
            if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { 
               goto notLast 
            } 
         } else { 
            if b.tophash[i+1] != emptyRest { 
               goto notLast 
            } 
         } 

         // 否则,将当前元素的tophash设置为emptyRest,且向前check并设置emptyRest标识
         for { 
            b.tophash[i] = emptyRest 
            if i == 0 { 
               if b == bOrig { 
                  break // beginning of initial bucket, we're done. 
               } 
               // Find previous bucket, continue at its last entry. 
               c := b 
               for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { 
               } 
               i = bucketCnt - 1 
            } else { 
              i-- 
            } 
            if b.tophash[i] != emptyOne { 
               break 
           } 
         }
      notLast: 
         h.count-- 
         break search 
      } 
   } 
   if h.flags&hashWriting == 0 { 
      throw("concurrent map writes") 
   } 
   h.flags &^= hashWriting 
} 

复制代码

扩容

在文中讲解装载因子时,我们提到装载因子是决定哈希表是否进行扩容的关键指标。在go的map扩容中,除了装载因子会决定是否需要扩容,溢出桶的数量也是扩容的另一关键指标。

为了保证访问效率,当map将要添加、修改或删除key时,都会检查是否需要扩容,扩容实际上是以空间换时间的手段。在之前源码mapassign中,其实已经注释map扩容条件,主要是两点:

  • 判断已经达到装载因子的临界点,即元素个数 >= 桶(bucket)总数 * 6.5,这时候说明大部分的桶可能都快满了(即平均每个桶存储的键值对达到6.5个),如果插入新元素,有大概率需要挂在溢出桶(overflow bucket)上。
func overLoadFactor(count int, B uint8) bool {
    return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
复制代码
  • 判断溢出桶是否太多,当桶总数 < 2 ^ 15 时,如果溢出桶总数 >= 桶总数,则认为溢出桶过多。当桶总数 >= 2 ^ 15 时,直接与 2 ^ 15 比较,当溢出桶总数 >= 2 ^ 15 时,即认为溢出桶太多了。
翻倍扩容

负载因子代表的是bmap的使用程度,当map中元素的个数大于8且负载因子大小超过6.5(81.25%),触发翻倍扩容,bucket的数量翻倍。因为每个bmap的容量为8,所以在loadFactor过大时,会导致查找性能急速下降,golang中将loadfactor的阈值设置为13/2=6.5。

等量扩容

根据map的大小会做区分,如果有过多的溢出桶,则触发等量扩容,bucket的数量不变。因为如果溢出桶的数量过多,也会导致map的查找效率下降。所以触发等量扩容但可以减少溢出桶的使用且使bmap的排列更加紧密

等量扩容的触发条件有2个

  • 如果hmap.B<=15,一旦溢出桶的个数超过常规桶,则触发等量扩容
  • 如果hmap.B>15,只要溢出桶的个数超过pow(2,15),即触发等量扩容

在数据写入前会进行扩容条件的判断,如果当前hmap的状态符合上述的翻倍扩容或等量扩容,则触发扩容操作,注意golang map的扩容并不是一个原子过程,是渐进式的,会将扩容操作增量到每一次写操作中去,runtime.hashGrow就是扩容前的准备操作,会将map标记为扩容中状态。

渐进式扩容也是一种分治的思想,意思是golang不会一次完成整个map的迁移,而是会离散到每一次的写入操作,至于golang为什么选择渐进式扩容,好处也是很明显的,渐进式扩容可以降低全量扩容带来的瞬时计算/存储开销,而弊端也会有一些,因为渐进式扩容是要维护中间状态的,所以在使用上也会带来一定的额外成本,这个我们下文会介绍到

bucket搬迁

扩容的时候并没有将旧桶数据转移到新桶。数据转移遵循了写时复制(copy on write)的规则。在真正赋值的时候,会选择是否需要进行数据转移,只会转移当前需要操作的旧桶。

  • hash

因为我们的B是2的正整数次幂,所以翻倍扩容对hash值的影响是只需要向前多取一位即可

image.png

  • biggerSizeGrow(翻倍扩容)

image.png

  • sameSizeGrow(等量扩容)

image.png

源码

//执行桶的迁移,每次迁移桶的数量为1或2

func growWork(t *maptype, h *hmap, bucket uintptr) {

   // make sure we evacuate the oldbucket corresponding

   // to the bucket we're about to use

   // 迁移当前正在访问的旧桶

   evacuate(t, h, bucket&h.oldbucketmask())



   // evacuate one more oldbucket to make progress on growing

   // 为了加快搬迁进度,如果依旧处于扩容中,则再迁移一个桶

   if h.growing() {

      evacuate(t, h, h.nevacuate)

   }

}



// evacDst is an evacuation destination.

type evacDst struct {

    b *bmap          // current destination bucket

    i int            // key/elem index into b

    k unsafe.Pointer // pointer to current key storage

    e unsafe.Pointer // pointer to current elem storage

}



func evacuate(t *maptype, h *hmap, oldbucket uintptr) { 

   b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) 

   newbit := h.noldbuckets()//旧桶的个数

   if !evacuated(b) { 

   // TODO: reuse overflow buckets instead of using new ones, if there

        // is no iterator using the old buckets.  (If !oldIterator.)

        

   //如果是翻倍扩容,一个旧桶会被迁移到两个新桶,分别位于新buckets的前半部分和后半部分。xy[0]指向前半部分,xy[1]指向后半部分

   // xy contains the x and y (low and high) evacuation destinations.

      var xy [2]evacDst 

      x := &xy[0] 

      x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) 

      x.k = add(unsafe.Pointer(x.b), dataOffset) 

      x.e = add(x.k, bucketCnt*uintptr(t.keysize)) 



      if !h.sameSizeGrow() {  // 如果是f是翻倍扩容,也需要用到ypart,计算ypart地址

         // y 代表的 bucket 序号增加了 2^oldB

         y := &xy[1] 

         y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) 

         y.k = add(unsafe.Pointer(y.b), dataOffset) 

         y.e = add(y.k, bucketCnt*uintptr(t.keysize)) 

      } 



      //以bucket index维度进行迁移,每一次迁移需要迁移对应index的常规桶和全部溢出桶

      for ; b != nil; b = b.overflow(t) { 

         k := add(unsafe.Pointer(b), dataOffset) 

         e := add(k, bucketCnt*uintptr(t.keysize)) 

         // 遍历 bucket 中的所有 cell

         for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { 

            top := b.tophash[i] 

    

            k2 := k 

            if t.indirectkey() {

               k2 = *((*unsafe.Pointer)(k2)) 

            } 

    

            var useY uint8 //如果是等量扩容则迁移到xpart。如果是翻倍扩容,则会根据高1位来决定迁移到xpart还是ypart

            if !h.sameSizeGrow() {

               hash := t.hasher(k2, uintptr(h.hash0)) 

               if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {

               //这里的if条件只有key是math.NAN会进入,因为math.NAN!=math.NAN,我们暂时先忽略这里的特殊逻辑处理,后续详细介绍一下math.NAN

                        useY = top & 1

                        top = tophash(hash)

               } else { 

             // 取决于新哈希值的 oldB+1 位是 0 还是 1

                  if hash&newbit != 0 { 

                     useY = 1 

                  } 

               } 

            } 

            

            if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {

                    throw("bad evacuatedN")

            }



            b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY 

            dst := &xy[useY]                 // evacuation destination 



       // 如果新桶已满,则需要使用溢出桶

            if dst.i == bucketCnt { 

               dst.b = h.newoverflow(t, dst.b) // 新建一个 bucket 

               dst.i = 0 // xi 从 0 开始计数 

               dst.k = add(unsafe.Pointer(dst.b), dataOffset) 

               dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) 

            } 

            dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check 

            if t.indirectkey() { // key 是指针 

               *(*unsafe.Pointer)(dst.k) = k2 // copy pointer 

            } else { // 将原 key(是值)复制到新位置 

               typedmemmove(t.key, dst.k, k) // copy elem 

            } 

            if t.indirectelem() { // value 是指针,操作同 key 

               *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) 

            } else { 

               typedmemmove(t.elem, dst.e, e) 

            } 

 

           // 使用下一个 cell

            dst.i++ 

            dst.k = add(dst.k, uintptr(t.keysize)) 

            dst.e = add(dst.e, uintptr(t.elemsize)) 

         } 

      } 



      // Unlink the overflow buckets & clear key/elem to help GC.

        if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {

            b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))

            // Preserve b.tophash because the evacuation

            // state is maintained there.

            ptr := add(b, dataOffset)

            n := uintptr(t.bucketsize) - dataOffset

            memclrHasPointers(ptr, n)

        }

   } 



  // bucket迁移并不是按序迁移的,map在写入时会优先迁移当前正在写入的bucket,如果当前迁移的bucket是hmap中记录的待迁移的下一个bmap,则更新迁移进度。

   if oldbucket == h.nevacuate { 

      advanceEvacuationMark(h, t, newbit) 

   } 

} 



//记录迁移进度

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {

    h.nevacuate++

    // Experiments suggest that 1024 is overkill by at least an order of magnitude.

    // Put it in there as a safeguard anyway, to ensure O(1) behavior.

    stop := h.nevacuate + 1024

    if stop > newbit {

        stop = newbit

    }

    for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {

        h.nevacuate++

    }

    //全部迁移完成,需要将hmap.flags,hmap.oldbuckets,hmap.extra.oldoverflow清除掉

    if h.nevacuate == newbit { // newbit == # of oldbuckets

        // Growing is all done. Free old main bucket array.

        h.oldbuckets = nil

        // Can discard old overflow buckets as well.

        // If they are still referenced by an iterator,

        // then the iterator holds a pointers to the slice.

        if h.extra != nil {

            h.extra.oldoverflow = nil

        }

        h.flags &^= sameSizeGrow

    }

}
复制代码

遍历

结论:遍历map的结果是无序的

map遍历的过程,是按序遍历bucket,同时按需遍历bucket中和其overflow bucket中的cell。但是map在扩容后,会发生key的搬迁,这造成原来落在一个bucket中的key,搬迁后,有可能会落到其他bucket中了,从这个角度看,遍历map的结果就不可能是按照原来的顺序了(详见下文的map扩容内容)。

但其实,go为了保证遍历map的结果是无序的,做了以下事情:map在遍历时,并不是从固定的0号bucket开始遍历的,每次遍历,都会从一个随机值序号的bucket,再从其中随机的cell开始遍历。然后再按照桶序遍历下去,直到回到起始桶结束。

image.png

上图的例子,是遍历一个处于未扩容状态的map。如果map正处于扩容状态时,需要先判断当前遍历bucket是否已经完成搬迁,如果数据还在老的bucket,那么就去老bucket中拿数据。

注意:在下文中会讲解到增量扩容和等量扩容。当发生了增量扩容时,一个老的bucket数据可能会分裂到两个不同的bucket中去,那么此时,如果需要从老的bucket中遍历数据,例如1号,则不能将老1号bucket中的数据全部取出,仅仅只能取出老 1 号 bucket 中那些在裂变之后,分配到新 1 号 bucket 中的那些 key(这个内容,请读者看完下文map扩容的讲解之后再回头理解)。

遍历会使用runtime.hiter结构体作为迭代器,使用runtime.mapiterinit进行迭代器初始化,使用runtime.mapiternext进行map的迭代。

如果扩容的过程是原子的,那么其实遍历可以非常简单,只需按序遍历bucket及cell即可。但是,由于golang中map的扩容是一个渐进式的操作,所以也会存在map处于扩容中的状态,在这种情况下,部分oldbucket数据尚未迁移到bucket,为了保证所有的key都被遍历到,在遍历时就会需要在oldbuckts和buckets间来回跳转。来看一个遍历的图示:

假设当前有一个map的内存布局如下,当前map正处于扩容中,且已完成了一个bucket的迁移,对应到下图中红色oldbuckets[0]已迁移到buckets[0]和buckets[2]。

总结

  • Go语言的map,底层是哈希表实现的,通过链地址法解决哈希冲突,它依赖的核心数据结构是数组加链表。

  • map中定义了2的B次方个桶,每个桶中能够容纳8个key。根据key的不同哈希值,将其散落到不同的桶中。哈希值的低位(哈希值的后B个bit位)决定桶序号,高位(哈希值的前8个bit位)标识同一个桶中的不同 key。

  • 当向桶中添加了很多 key,造成元素过多,超过了装载因子所设定的程度,或者多次增删操作,造成溢出桶过多,均会触发扩容。

  • 扩容分为增量扩容和等量扩容。增量扩容,会增加桶的个数(增加一倍),把原来一个桶中的 keys 被重新分配到两个桶中。等量扩容,不会更改桶的个数,只是会将桶中的数据变得紧凑。不管是增量扩容还是等量扩容,都需要创建新的桶数组,并不是原地操作的。

  • 扩容过程是渐进性的,主要是防止一次扩容需要搬迁的 key 数量过多,引发性能问题。触发扩容的时机是增加了新元素, 桶搬迁的时机则发生在赋值、删除期间,每次最多搬迁两个 桶。查找、赋值、删除的一个很核心的内容是如何定位到 key 所在的位置,需要重点理解。一旦理解,关于 map 的源码就可以看懂了。

常见的坑点

  • 无法比较的类型无法作为key
  • map的value是无法寻址的,也就是说无法获取map value的地址
  • len(map)返回的是map中元素的个数,不是map的容量
  • new出来的map是无法使用的,因为new只是malloc了8字节大小的内存(new返回指针)

相对的,make 是个语法糖,最终被编译器翻译成runtime.makemap 函数调用,会进行内存开辟和对象初始化操作。

  • 通过fmt打印map时,空map和nil map结果是一样的,都为map[]。所以,这个时候别断定map是空还是nil,而应该通过map == nil来判断
  • 超出容量时会自动扩容,但尽量提供一个合理的初始值
  • map作为函数的出参时不需要以指针形式进行传递,因为map本身就是指针,上文中map初始化一节也可以看到runtime.makemap返回的是*runtime.hmap
  • delete是不会真正地把map的内存释放的,要回收map还是需要设为nil

使用建议

  • 从map设计可以知道,它并不是一个并发安全的数据结构。同时对map进行读写时,程序很容易出错。因此,要想在并发情况下使用map,请加上锁(sync.Mutex或者sync.RwMutex)。其实,Go标准库中已经为我们实现了并发安全的map——sync.Map

  • 遍历map的结果是无序的,在使用中,应该注意到该点。

  • 通过map的结构体可以知道,它其实是通过指针指向底层buckets数组。所以和slice一样,尽管go函数都是值传递,但是,当map作为参数被函数调用时,在函数内部对map的操作同样会影响到外部的map。

  • 另外,有个特殊的key值math.NaN,它每次生成的哈希值是不一样的,这会造成m[math.NaN]是拿不到值的,而且多次对它赋值,会让map中存在多个math.NaN的key。不过这个基本用不到,知道有这个特殊情况就可以了。

参考

1、深度解析golang map
2、由浅到深,入门Go语言Map实现原理
3、全面解析Golang之map设计