📚golangsync.mutex详解

1 定义

1.1 基本概念

互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

以上是维基百科对mutex的定义,golang的sync.mutex就是一种互斥锁的实现。

1.2 golang sync.mutex

效率优先 兼顾公平

golang中的mutex是一把公平的锁,在源码里可以看到如下的解释:

// Mutex fairness.

//

// Mutex can be in 2 modes of operations: normal and starvation.

// In normal mode waiters are queued in FIFO order, but a woken up waiter

// does not own the mutex and competes with new arriving goroutines over

// the ownership. New arriving goroutines have an advantage -- they are

// already running on CPU and there can be lots of them, so a woken up

// waiter has good chances of losing. In such case it is queued at front

// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,

// it switches mutex to the starvation mode.

//

// In starvation mode ownership of the mutex is directly handed off from

// the unlocking goroutine to the waiter at the front of the queue.

// New arriving goroutines don't try to acquire the mutex even if it appears

// to be unlocked, and don't try to spin. Instead they queue themselves at

// the tail of the wait queue.

//

// If a waiter receives ownership of the mutex and sees that either

// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,

// it switches mutex back to normal operation mode.

//

// Normal mode has considerably better performance as a goroutine can acquire

// a mutex several times in a row even if there are blocked waiters.

// Starvation mode is important to prevent pathological cases of tail latency.
复制代码

而这段解释是整个mutex的核心(划重点,面试必考😁),整个mutex的大体实现思路就是这样。翻译一下,大致如下

// 互斥锁的公平性。

//

// 互斥锁有两种运行模式:正常模式和饥饿模式。

// 在正常模式下,请求锁的goroutine会按 FIFO(先入先出)的顺序排队,依次被唤醒,但被唤醒的goroutine并不能直接获取锁,而是要与新请求锁的goroutines去争夺锁的所有权。

// 但是这其实是不公平的,因为新请求锁的goroutines有一个优势:他们正在cpu上运行且数量可能比较多,所以新唤醒的goroutine在这种情况下很难获取锁。在这种情况下,如果这个goroutine获取失败,会直接插入到队列的头部。

// 如果一个等待的goroutine超过1ms时仍未获取到锁,会将这把锁转换为饥饿模式。

//

// 在饥饿模式下,互斥锁的所有权会直接从解锁的goroutine转移到队首的goroutine。

// 并且新到达的goroutines不会尝试获取锁,会直接插入到队列的尾部。

//

// 如果一个等待的goroutine获得了锁的所有权,并且满足以下两个条件之一:

// (1) 它是队列中的最后一个goroutine

// (2) 它等待的时间少于 1 毫秒(hard code在代码里)

// 它会将互斥锁切回正常模式。

//

// 普通模式具有更好的性能,因为即使有很多阻塞的等待锁的goroutine,一个goroutine也可以尝试请求多次锁。

// 而饥饿模式则可以避免尾部延迟这种bad case。 
复制代码

2 内存模型

实例中的代码为golang 1.16.3,传送门

2.1 数据结构

// A Mutex is a mutual exclusion lock.

// The zero value for a Mutex is an unlocked mutex.

//

// A Mutex must not be copied after first use.

type Mutex struct {

    state int32

    sema  uint32

}
复制代码

sync.mutex的数据结构非常简单,只有两个字段,

  1. state字段就是这把mutex的状态,二进制低3位对应锁的状态,将state右移3位代表mutex的数量。
  2. sema(信号量)用来唤醒goroutine。

以上的两个注释非常重要

  1. 零值保证,sync.mutex符合golang里的0值语义,即sync.mutex的0值就是一把开启状态的互斥锁。
  2. mutex 数据结构禁止拷贝,拷贝mutex的结果是未定义的。

关键常量

const (

    mutexLocked = 1 << iota // mutex是一个加锁状态,对应0b001

    mutexWoken //是否有goroutine被唤醒or新来的goroutine,在尝试获取锁,  对应0b010

    mutexStarving //当前锁处于饥饿状态,             对应0b10

    mutexWaiterShift = iota //waiter的数量移位,通过mutex.state>> mutexWaiterShift可以得到waiter的数量。

    

    starvationThresholdNs = 1e6//(这里就是上文中提到的转换成饥饿模式的时间限制,在源码里写死为1e6 ns,也就是1ms)

)
复制代码

可以看到源码中用到了iota去初始化各个常量(ps:这里吐槽一下,iota这玩意出现在语言的设计里挺离谱的,尤其是在const里各种iota搭配起来用,对读代码的人,确实是一项挑战)
mutex.png

2.2 接口

// A Locker represents an object that can be locked and unlocked.

type Locker interface {

    Lock()

    Unlock()

}
复制代码

sync.Mutex就实现了上述的这个接口(sync.Locker),值得一提的是后续会讲到的sync.RWMutex也实现了这个接口。

同时sync.Mutex又有自己的包内私有(小写开头)的函数,在下文会详细讲到。

func (m *Mutex) lockSlow() {}



func (m *Mutex) unlockSlow(new int32) {}
复制代码

而mutex还有一些runtime的调用,这些函数的具体实现在runtime里,分别是如下几个函数。

//确定当前goroutine是否可以自旋(因为自旋是非常消耗cpu的,所以对自旋操作也有一定的次数限制)

func runtime_canSpin(i int) bool



//执行自旋操作

func runtime_doSpin()



//获取当前毫秒时间戳

func runtime_nanotime() int64



//信号量的实现,对应信号量的P原语操作,s的值代表信号量,包含一个fifo等待队列,如果lifo为true,则放入队首。skipframes只有在开启tracing时有效。

func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)



//信号量的实现,对应信号量的V原语操作,如果handoff为true,则会将锁直接转移给队首goroutine

func runtime_Semrelease(s *uint32, handoff bool, skipframes int)



//上述自旋锁、信号量、以及后文中会提到的原子操作,目前只需要知道函数的作用即可,后边我们还会有单独的章节去介绍
复制代码

3 源码细节剖析

3.1 lock

先抢再排

// Lock locks m.

// If the lock is already in use, the calling goroutine

// blocks until the mutex is available.

func (m *Mutex) Lock() {

   // fast path:这里使用了atomic的case操作,如果state的值为0(当前锁处于开启状态且无等待者),则可以直接加锁成功

   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {

      if race.Enabled {

         race.Acquire(unsafe.Pointer(m)) // load happens-before保证

      }

      return

   }

   // Slow path:需要执行各种操作,而mutex加锁的核心也在这里

   m.lockSlow()

}
复制代码
func (m *Mutex) lockSlow() {

   //当前goroutine的等待时间

   var waitStartTime int64

   //当前goroutine是否饥饿

   starving := false

   //当前goroutine是否已被唤醒

   awoke := false

   //当前goroutine的自旋次数 

   iter := 0

   //当前锁的状态

   old := m.state

   for {

      // 如果锁处于饥饿状态下,就不需要自旋了,锁可以直接转移到队首的goroutine

      // 这里的if的意思是,如果锁的状态是锁定 && 

      // 处于非饥饿模式 && 

      // 当前goroutine可以进行自旋操作。

      // 则当前goroutine进行自旋操作

      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

         //如果

         //  当前goroutine的状态为非唤醒 &&

         //  mutex的状态为非唤醒 && 

         //  waiter数不等于0 

         //则将自己的状态设置为唤醒,且将锁的状态也设置为唤醒,这样在解锁时就不需要去唤醒那些阻塞的goroutine

         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&

            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {

            awoke = true

         }

         runtime_doSpin()

         iter++

         old = m.state

         continue

      }

      // 三种情况会走到这里:

      // 1. mutex处于非锁定状态

      // 2. mutex处于饥饿模式

      // 3. 当前goroutine已经不能自旋(iter有次数限制)

      new := old

      //开始设置期望状态,new在这里为期望设置的新状态(取锁),最后会通过cas操作来保证只有一个goroutine可以操作state

      

      //如果锁不是饥饿状态,则将期望的状态改为加锁,因为如果是饥饿状态,不应该尝试加锁,需要直接进入队列

      if old&mutexStarving == 0 {

         new |= mutexLocked

 }

      //如果锁的状态已经是锁定状态或者锁的状态是饥饿模式,则当前goroutine需要进入排队,waiter数量+1

      if old&(mutexLocked|mutexStarving) != 0 {

         new += 1 << mutexWaiterShift

 }

      // 当前goroutine已经处于饥饿状态,并且mutex是锁定状态,则期望将mutex转变为饥饿状态。

      // 但是如果锁是空闲态,也就不需要切换成饥饿模式了。Unlock 操作期望饥饿状态的mutex有waiter,但在这个case中不是这样的。

      // The current goroutine switches mutex to starvation mode.

     // But if the mutex is currently unlocked, don't do the switch.

     // Unlock expects that starving mutex has waiters, which will not be true in this case.

      if starving && old&mutexLocked != 0 {

         new |= mutexStarving

 }

      

      //如果goroutine的状态为唤醒,则需要将该状态重置,因为当前goroutine的状态一定会转变为sleep或已获取锁。

      if awoke {

         // The goroutine has been woken from sleep,

         // so we need to reset the flag in either case.

         if new&mutexWoken == 0 {

            throw("sync: inconsistent mutex state")

         }

         new &^= mutexWoken //按位置0

 }

      //开始cas,如果cas失败则需要重新开始循环

      if atomic.CompareAndSwapInt32(&m.state, old, new) {

         if old&(mutexLocked|mutexStarving) == 0 {

            break // locked the mutex with CAS

         }

         //如果wait的时间不为0则代表当前goroutine已经被唤醒过了,需要将当前goroutine放入等待队列的队首

         queueLifo := waitStartTime != 0

         

         //设置初始时间戳

         if waitStartTime == 0 {

            waitStartTime = runtime_nanotime()

         }

         //在这里goroutine进入sleep,使用信号量来获取锁

         runtime_SemacquireMutex(&m.sema, queueLifo, 1)

         

         //接下来sleep完了,当前goroutine被唤醒

         

         //如果当前goroutine的等待时间超过了1ms,则将当前goroutine标记为饥饿状态

         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

 old = m.state

         //如果当前锁处于饥饿模式,则锁的状态是解锁态,当前goroutine被唤醒的方式为直接从上一个gorountine转移mutex,否则将当前goroutine状态标记为唤醒态,去与其他未进入队列的goroutine去竞争锁

         if old&mutexStarving != 0 {

            // If this goroutine was woken and mutex is in starvation mode,

            // ownership was handed off to us but mutex is in somewhat

            // inconsistent state: mutexLocked is not set and we are still

            // accounted as waiter. Fix that.

            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {

               throw("sync: inconsistent mutex state")

            }

            //当前routine已获取锁,将waiter数-1,状态更新为锁定

            delta := int32(mutexLocked - 1<<mutexWaiterShift)

            // mutexLocked + (-1<<mutexWaiterShift)

            //如果当前goroutine的等待时间小于1ms(非饥饿态)或者当前goroutine是队列中最后一个goroutine,则需要将锁的模式切换为常规模式

            if !starving || old>>mutexWaiterShift == 1 {

               delta -= mutexStarving

               // mutexLocked - mutexStarving + (-1<<mutexWaiterShift)

 }

            // 加锁、减计数、如果不饥饿则减饥饿

            atomic.AddInt32(&m.state, delta)

            break

         }

         awoke = true

         iter = 0 //自旋计数清0

      } else {

         old = m.state

      }

   }



   if race.Enabled {

      race.Acquire(unsafe.Pointer(m))

   }

}
复制代码

3.2 unlock

// A locked Mutex is not associated with a particular goroutine.

// It is allowed for one goroutine to lock a Mutex and then

// arrange for another goroutine to unlock it.

func (m *Mutex) Unlock() {

   if race.Enabled {

      _ = m.state

      race.Release(unsafe.Pointer(m))

   }



   // 和加锁时基本同理,在无waiter且锁的模式为正常模式,会尝试直接解锁。

   new := atomic.AddInt32(&m.state, -mutexLocked)

   if new != 0 {

      // Outlined slow path to allow inlining the fast path.

      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.

      m.unlockSlow(new)

   }

}
复制代码
//new标识锁的新状态,原状态减掉1

func (m *Mutex) unlockSlow(new int32) {

   //尝试去释放非锁定状态的mutex会导致进程直接崩溃。

   //这里对status先原子减lock,再用加法去判也是为了避免并发UnLock的问题。

   if (new+mutexLocked)&mutexLocked == 0 {

      throw("sync: unlock of unlocked mutex")

   }

   //如果mutex处于正常模式

   if new&mutexStarving == 0 {

      old := new

      for {

         // If there are no waiters or a goroutine has already

         // been woken or grabbed the lock, no need to wake anyone.

         // In starvation mode ownership is directly handed off from unlocking

         // goroutine to the next waiter. We are not part of this chain,

         // since we did not observe mutexStarving when we unlocked the mutex above.

         // So get off the way.

         // 如果没有waiter在等待mutex

         // 或者mutex已被其他goroutine获取,

         // 或者有goroutine已处于唤醒状态(在尝试获取锁)

         // 或者锁处于饥饿模式(mutex的所有权会直接转移到下一个waiter,而当前goroutine是不在这个队列中的,因为释放锁时,锁的模式为正常模式)

         //在上述四种情况下,直接返回

         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {

            return

         }

         // Grab the right to wake someone.

         //走到这一步,说明锁的状态是空闲态,且没有锁被唤醒,且锁处于正常模式

         // 那么期望的状态就是waiter个数-1,且设置锁的状态为唤醒

         new = (old - 1<<mutexWaiterShift) | mutexWoken

 if atomic.CompareAndSwapInt32(&m.state, old, new) {

            //唤醒队首的waiter去尝试获取锁

            runtime_Semrelease(&m.sema, false, 1)

            return

         }

         old = m.state

      }

   } else {

   //如果mutex处于饥饿模式,则将mutex的所有权直接转移到等待队列的队首

   

      // Starving mode: handoff mutex ownership to the next waiter, and yield

      // our time slice so that the next waiter can start to run immediately.

      // Note: mutexLocked is not set, the waiter will set it after wakeup.

      // But mutex is still considered locked if mutexStarving is set,

      // so new coming goroutines won't acquire it.

      runtime_Semrelease(&m.sema, true, 1)

   }

}
复制代码

4 常见坑点

  1. sync.mutex不可重入
  • 例如如果连续调用两次lock,会触发死锁,goroutine直接panic。
package main



import "sync"



func main() {

   m := sync.Mutex{}

   m.Lock()

   m.Lock()

}



//fatal error: all goroutines are asleep - deadlock!
复制代码
  • 如果A获取A+B,B获取B+A,会触发死锁
var lockA = &sync.Mutex{}

var lockB = &sync.Mutex{}



func TestA() {

    lockA.Lock()

    defer lockA.Unlock()

    time.Sleep(1 * time.Second)

    lockB.Lock()

    defer lockB.Unlock()

}



func TestB() {

    lockB.Lock()

    defer lockB.Unlock()

    time.Sleep(1 * time.Second)

    lockA.Lock()

    defer lockA.Unlock()

}



func main() {

    go TestA()

    TestB()

}



//fatal error: all goroutines are asleep - deadlock!
复制代码
  • 死锁不是触发panic的充分条件
package main



import (

    "sync"

    "time"

)



var lockA = &sync.Mutex{}

var lockB = &sync.Mutex{}



func TestA() {

    lockA.Lock()

    defer lockA.Unlock()

    time.Sleep(1 * time.Second)

    lockB.Lock()

    defer lockB.Unlock()

}



func TestB() {

    lockB.Lock()

    defer lockB.Unlock()

    time.Sleep(1 * time.Second)

    lockA.Lock()

    defer lockA.Unlock()

}



func main() {

    go TestA()

    go TestB()

    time.Sleep(3 * time.Second)

}



//正常return
复制代码
  1. sync.mutex,尝试去unlock一把空闲的mutex,会导致panic。
package main



import (

   "fmt"

   "sync"

   "time"

)



func main() {

   m := sync.Mutex{}

   m.UnLock()

}

// fatal error: sync: unlock of unlocked mutex
复制代码
  1. sync.mutex不与goroutine绑定,可由a goroutine获取锁,b goroutine释放锁。
package main



import (

   "fmt"

   "sync"

   "time"

)



func main() {

   m := sync.Mutex{}

   m.Lock()

   fmt.Println("a lock")

   go func() {

      m.Unlock()

      fmt.Println("b unlock")

   }()

   time.Sleep(time.Second)

}

//a lock

//b unlock
复制代码
  1. 不要复制sync.Mutex,mutex做函数参数时,传参时使用指针。

5 参考文档