1 定义
1.1 基本概念
互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
以上是维基百科对mutex的定义,golang的sync.mutex就是一种互斥锁的实现。
1.2 golang sync.mutex
效率优先 兼顾公平
golang中的mutex是一把公平的锁,在源码里可以看到如下的解释:
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
复制代码
而这段解释是整个mutex的核心(划重点,面试必考😁),整个mutex的大体实现思路就是这样。翻译一下,大致如下
// 互斥锁的公平性。
//
// 互斥锁有两种运行模式:正常模式和饥饿模式。
// 在正常模式下,请求锁的goroutine会按 FIFO(先入先出)的顺序排队,依次被唤醒,但被唤醒的goroutine并不能直接获取锁,而是要与新请求锁的goroutines去争夺锁的所有权。
// 但是这其实是不公平的,因为新请求锁的goroutines有一个优势:他们正在cpu上运行且数量可能比较多,所以新唤醒的goroutine在这种情况下很难获取锁。在这种情况下,如果这个goroutine获取失败,会直接插入到队列的头部。
// 如果一个等待的goroutine超过1ms时仍未获取到锁,会将这把锁转换为饥饿模式。
//
// 在饥饿模式下,互斥锁的所有权会直接从解锁的goroutine转移到队首的goroutine。
// 并且新到达的goroutines不会尝试获取锁,会直接插入到队列的尾部。
//
// 如果一个等待的goroutine获得了锁的所有权,并且满足以下两个条件之一:
// (1) 它是队列中的最后一个goroutine
// (2) 它等待的时间少于 1 毫秒(hard code在代码里)
// 它会将互斥锁切回正常模式。
//
// 普通模式具有更好的性能,因为即使有很多阻塞的等待锁的goroutine,一个goroutine也可以尝试请求多次锁。
// 而饥饿模式则可以避免尾部延迟这种bad case。
复制代码
2 内存模型
实例中的代码为golang 1.16.3,传送门
2.1 数据结构
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
复制代码
sync.mutex的数据结构非常简单,只有两个字段,
- state字段就是这把mutex的状态,二进制低3位对应锁的状态,将state右移3位代表mutex的数量。
- sema(信号量)用来唤醒goroutine。
以上的两个注释非常重要
- 零值保证,sync.mutex符合golang里的0值语义,即sync.mutex的0值就是一把开启状态的互斥锁。
- mutex 数据结构禁止拷贝,拷贝mutex的结果是未定义的。
关键常量
const (
mutexLocked = 1 << iota // mutex是一个加锁状态,对应0b001
mutexWoken //是否有goroutine被唤醒or新来的goroutine,在尝试获取锁, 对应0b010
mutexStarving //当前锁处于饥饿状态, 对应0b10
mutexWaiterShift = iota //waiter的数量移位,通过mutex.state>> mutexWaiterShift可以得到waiter的数量。
starvationThresholdNs = 1e6//(这里就是上文中提到的转换成饥饿模式的时间限制,在源码里写死为1e6 ns,也就是1ms)
)
复制代码
可以看到源码中用到了iota去初始化各个常量(ps:这里吐槽一下,iota这玩意出现在语言的设计里挺离谱的,尤其是在const里各种iota搭配起来用,对读代码的人,确实是一项挑战)
2.2 接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
复制代码
sync.Mutex就实现了上述的这个接口(sync.Locker),值得一提的是后续会讲到的sync.RWMutex也实现了这个接口。
同时sync.Mutex又有自己的包内私有(小写开头)的函数,在下文会详细讲到。
func (m *Mutex) lockSlow() {}
func (m *Mutex) unlockSlow(new int32) {}
复制代码
而mutex还有一些runtime的调用,这些函数的具体实现在runtime里,分别是如下几个函数。
//确定当前goroutine是否可以自旋(因为自旋是非常消耗cpu的,所以对自旋操作也有一定的次数限制)
func runtime_canSpin(i int) bool
//执行自旋操作
func runtime_doSpin()
//获取当前毫秒时间戳
func runtime_nanotime() int64
//信号量的实现,对应信号量的P原语操作,s的值代表信号量,包含一个fifo等待队列,如果lifo为true,则放入队首。skipframes只有在开启tracing时有效。
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
//信号量的实现,对应信号量的V原语操作,如果handoff为true,则会将锁直接转移给队首goroutine
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
//上述自旋锁、信号量、以及后文中会提到的原子操作,目前只需要知道函数的作用即可,后边我们还会有单独的章节去介绍
复制代码
3 源码细节剖析
3.1 lock
先抢再排
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// fast path:这里使用了atomic的case操作,如果state的值为0(当前锁处于开启状态且无等待者),则可以直接加锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m)) // load happens-before保证
}
return
}
// Slow path:需要执行各种操作,而mutex加锁的核心也在这里
m.lockSlow()
}
复制代码
func (m *Mutex) lockSlow() {
//当前goroutine的等待时间
var waitStartTime int64
//当前goroutine是否饥饿
starving := false
//当前goroutine是否已被唤醒
awoke := false
//当前goroutine的自旋次数
iter := 0
//当前锁的状态
old := m.state
for {
// 如果锁处于饥饿状态下,就不需要自旋了,锁可以直接转移到队首的goroutine
// 这里的if的意思是,如果锁的状态是锁定 &&
// 处于非饥饿模式 &&
// 当前goroutine可以进行自旋操作。
// 则当前goroutine进行自旋操作
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
//如果
// 当前goroutine的状态为非唤醒 &&
// mutex的状态为非唤醒 &&
// waiter数不等于0
//则将自己的状态设置为唤醒,且将锁的状态也设置为唤醒,这样在解锁时就不需要去唤醒那些阻塞的goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 三种情况会走到这里:
// 1. mutex处于非锁定状态
// 2. mutex处于饥饿模式
// 3. 当前goroutine已经不能自旋(iter有次数限制)
new := old
//开始设置期望状态,new在这里为期望设置的新状态(取锁),最后会通过cas操作来保证只有一个goroutine可以操作state
//如果锁不是饥饿状态,则将期望的状态改为加锁,因为如果是饥饿状态,不应该尝试加锁,需要直接进入队列
if old&mutexStarving == 0 {
new |= mutexLocked
}
//如果锁的状态已经是锁定状态或者锁的状态是饥饿模式,则当前goroutine需要进入排队,waiter数量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 当前goroutine已经处于饥饿状态,并且mutex是锁定状态,则期望将mutex转变为饥饿状态。
// 但是如果锁是空闲态,也就不需要切换成饥饿模式了。Unlock 操作期望饥饿状态的mutex有waiter,但在这个case中不是这样的。
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//如果goroutine的状态为唤醒,则需要将该状态重置,因为当前goroutine的状态一定会转变为sleep或已获取锁。
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken //按位置0
}
//开始cas,如果cas失败则需要重新开始循环
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
//如果wait的时间不为0则代表当前goroutine已经被唤醒过了,需要将当前goroutine放入等待队列的队首
queueLifo := waitStartTime != 0
//设置初始时间戳
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//在这里goroutine进入sleep,使用信号量来获取锁
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//接下来sleep完了,当前goroutine被唤醒
//如果当前goroutine的等待时间超过了1ms,则将当前goroutine标记为饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//如果当前锁处于饥饿模式,则锁的状态是解锁态,当前goroutine被唤醒的方式为直接从上一个gorountine转移mutex,否则将当前goroutine状态标记为唤醒态,去与其他未进入队列的goroutine去竞争锁
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//当前routine已获取锁,将waiter数-1,状态更新为锁定
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// mutexLocked + (-1<<mutexWaiterShift)
//如果当前goroutine的等待时间小于1ms(非饥饿态)或者当前goroutine是队列中最后一个goroutine,则需要将锁的模式切换为常规模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
// mutexLocked - mutexStarving + (-1<<mutexWaiterShift)
}
// 加锁、减计数、如果不饥饿则减饥饿
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0 //自旋计数清0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
复制代码
3.2 unlock
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 和加锁时基本同理,在无waiter且锁的模式为正常模式,会尝试直接解锁。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
复制代码
//new标识锁的新状态,原状态减掉1
func (m *Mutex) unlockSlow(new int32) {
//尝试去释放非锁定状态的mutex会导致进程直接崩溃。
//这里对status先原子减lock,再用加法去判也是为了避免并发UnLock的问题。
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//如果mutex处于正常模式
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有waiter在等待mutex
// 或者mutex已被其他goroutine获取,
// 或者有goroutine已处于唤醒状态(在尝试获取锁)
// 或者锁处于饥饿模式(mutex的所有权会直接转移到下一个waiter,而当前goroutine是不在这个队列中的,因为释放锁时,锁的模式为正常模式)
//在上述四种情况下,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
//走到这一步,说明锁的状态是空闲态,且没有锁被唤醒,且锁处于正常模式
// 那么期望的状态就是waiter个数-1,且设置锁的状态为唤醒
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//唤醒队首的waiter去尝试获取锁
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
//如果mutex处于饥饿模式,则将mutex的所有权直接转移到等待队列的队首
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
复制代码
4 常见坑点
- sync.mutex不可重入
- 例如如果连续调用两次lock,会触发死锁,goroutine直接panic。
package main
import "sync"
func main() {
m := sync.Mutex{}
m.Lock()
m.Lock()
}
//fatal error: all goroutines are asleep - deadlock!
复制代码
- 如果A获取A+B,B获取B+A,会触发死锁
var lockA = &sync.Mutex{}
var lockB = &sync.Mutex{}
func TestA() {
lockA.Lock()
defer lockA.Unlock()
time.Sleep(1 * time.Second)
lockB.Lock()
defer lockB.Unlock()
}
func TestB() {
lockB.Lock()
defer lockB.Unlock()
time.Sleep(1 * time.Second)
lockA.Lock()
defer lockA.Unlock()
}
func main() {
go TestA()
TestB()
}
//fatal error: all goroutines are asleep - deadlock!
复制代码
- 死锁不是触发panic的充分条件
package main
import (
"sync"
"time"
)
var lockA = &sync.Mutex{}
var lockB = &sync.Mutex{}
func TestA() {
lockA.Lock()
defer lockA.Unlock()
time.Sleep(1 * time.Second)
lockB.Lock()
defer lockB.Unlock()
}
func TestB() {
lockB.Lock()
defer lockB.Unlock()
time.Sleep(1 * time.Second)
lockA.Lock()
defer lockA.Unlock()
}
func main() {
go TestA()
go TestB()
time.Sleep(3 * time.Second)
}
//正常return
复制代码
- sync.mutex,尝试去unlock一把空闲的mutex,会导致panic。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.UnLock()
}
// fatal error: sync: unlock of unlocked mutex
复制代码
- sync.mutex不与goroutine绑定,可由a goroutine获取锁,b goroutine释放锁。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock()
fmt.Println("a lock")
go func() {
m.Unlock()
fmt.Println("b unlock")
}()
time.Sleep(time.Second)
}
//a lock
//b unlock
复制代码
- 不要复制sync.Mutex,mutex做函数参数时,传参时使用指针。
近期评论