sync.Mutex sync.Mutex 是 go 最基本的同步原语, 也是最常用的锁之一
基本结构 1 2 3 4 5 type Mutex struct { state int32 sema uint32 }
state: 当前互斥锁的状态sema: 控制锁状态信号量state 一共32位, 最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:
mutexLocked: 第 0 位, 是否上锁mutexWoken: 第 1 位, 是否有协程抢占锁mutexStarving: 第 2 位, 是否处于饥饿模式后续的高 29 位表示阻塞队列中等待的协程数量 加锁/解锁方案 最简单的思路去实现 mutex 互斥锁:
加锁:把锁状态 0 改为 1, 假若已经是 1,则上锁失败,需要等他人解锁 解锁:把 1 置为 0. 针对 goroutine 加锁时发现锁已被抢占的这种情形,此时摆在面前的策略有如下两种:
阻塞/唤醒:将当前 goroutine 阻塞挂起,直到锁被释放后,以回调的方式将阻塞 goroutine 重新唤醒,进行锁争夺; 自旋 + CAS:基于自旋结合 CAS 的方式,反复尝试试图获取锁 方案 优势 劣势 使用场景 阻塞/唤醒 精准分配,不浪费 CPU 时间片 需要挂起协程,进行上下文切换,操作较重 并发竞争激烈的场景 自旋+CAS 无需阻塞协程,短期来看操作较轻 长时间争而不得,会浪费 CPU 时间片 并发竞争强度低的场景
可以看到上面两种方式各有优劣, 基本上其他语言的锁也是基于这两个模式 ,如 java 的 synchronize 也是从偏向锁升级到轻量级锁->重量级锁
类似, Mutex 也有一个锁升级的过程:
首先保持乐观,goroutine 采用自旋 + CAS 的策略争夺锁; 尝试持续受挫达到一定条件后,判定当前过于激烈,则由自旋转为 阻塞/挂起模式. 从文档来看, 这里的升级条件是:
自旋累计达到 4 次仍未取得锁; CPU 单核或仅有单个 P 调度器;(此时自旋,其他 goroutine 根本没机会释放锁,自旋纯属空转); 当前 P 的执行队列中仍有待执行的 G. (避免因自旋影响到 GMP 调度效率) 正常模式和饥饿模式 Mutex 有两种模式:
正常模式: 在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁(新的 goroutine 已经在 cpu 上运行了, 大概率会获取到时间片),为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。 饥饿模式: 互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式 加锁/解锁源码 首先看加锁过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0 , mutexLocked) { return } m.lockSlow() }func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new &mutexWoken == 0 { throw("sync: inconsistent mutex state" ) } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new ) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1 ) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state" ) } delta := int32 (mutexLocked - 1 <<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
解锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new ) } } func (m *Mutex) unlockSlow(new int32 ) { if (new +mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex" ) } if new &mutexStarving == 0 { old : = new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1 <<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new ) { runtime_Semrelease(&m.sema, false , 1 ) return } old = m.state } } else { runtime_Semrelease(&m.sema, true , 1 ) } }
sync.RWMutex 可以把 RWMutex 理解为一把读锁加一把写锁;
写锁具有严格的排他性,当其被占用,其他试图取写锁或者读锁的 goroutine 均阻塞; 读锁具有有限的共享性,当其被占用,试图取写锁的 goroutine 会阻塞,试图取读锁的 goroutine 可与当前 goroutine 共享读锁; 综上,RWMutex 适用于读多写少的场景,最理想化的情况,当所有操作均使用读锁,则可实现去无化;最悲观的情况,倘若所有操作均使用写锁,则 RWMutex 退化为普通的 Mutex.
数据结构 1 2 3 4 5 6 7 type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount atomic.Int32 readerWait atomic.Int32 }
rwmutexMaxReaders:共享读锁的 goroutine 数量上限,值为 2^29;w: 内置的一把普通互斥锁 sync.Mutex;writerSem:关联写锁阻塞队列的信号量;readerSem:关联读锁阻塞队列的信号量;readerCount:正常情况下等于介入读锁流程的 goroutine 数量;当 goroutine 接入写锁流程时,该值为实际介入读锁流程的 goroutine 数量减 rwmutexMaxReaders(显然这个是一个负数) .readerWait:记录在当前 goroutine 获取写锁前,还需要等待多少个 goroutine 释放读锁读锁流程 加锁:
1 2 3 4 5 6 7 8 func (rw *RWMutex) RLock () { if atomic.AddInt32 (&rw.readerCount, 1 ) < 0 { runtime_SemacquireMutex (&rw.readerSem, false, 0 ) } }
这里需要注意的是, 当 readerCount +1 后的值仍然小于0,说明有 goroutine 未释放写锁,因此将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起(读饥饿策略)
解锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1 ); r < 0 { rw.rUnlockSlow(r) } }func (rw *RWMutex) rUnlockSlow(r int32 ) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex" ) } if rw.readerWait.Add(-1 ) == 0 { runtime_Semrelease(&rw.writerSem, false , 1 ) } }
写锁流程 加锁流程:
1 2 3 4 5 6 7 8 9 10 11 func (rw *RWMutex) Lock () { rw.w .Lock () r := atomic.AddInt32 (&rw.readerCount , -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32 (&rw.readerWait , r) != 0 { runtime_SemacquireMutex (&rw.writerSem , false, 0 ) } }
之前说如果有写锁介入,等待读锁的 readerCount 应该是实际介入读锁流程的 goroutine 数量减 rwmutexMaxReader, 在这里也体现了
另外: r代表着在写锁加锁的那一刻(是一个瞬时值,可能会变),已经获得读锁的reader数量,通过过对readerWait赋值和判断,决定是否需要等待信号量;那么问题来了,既然r是一个瞬时值,如果r已经变了,怎么保证readerWait是准的,例如:
在执行这行代码时,r=5,代表有5个reader获得了读锁,此时readerWait==0: r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
而在执行下面这行之前,有2个reader已经释放读锁,此时readerWait==-2,再执行下面这样代码后: atomic.AddInt32(&rw.readerWait, r) readerWait==3,这样设计的精妙之处就在于
不管readerWait中间如何变化,只要在使用的那一刻他是最终准确的就可以,所以严格意义上讲readerWait记录的是已经持有读锁的reader数量,或者 自有写锁pending那一刻来,被释放的读锁的负数量
解锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 func (rw *RWMutex) Unlock () { r := atomic.AddInt32 (&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { fatal ("sync: Unlock of unlocked RWMutex") } for i := 0 ; i < int (r); i ++ { runtime_Semrelease (&rw.readerSem, false, 0 ) } rw.w .Unlock () }
读写锁获取锁的优先级 基于对读和写操作的优先级,读写锁的设计和实现也分成三类:
Read-preferring: 读优先策略,可实现最大并发性,但如果读操作密集,会导致写锁饥饿。因为只要一个读取线程持有锁,写入线程就无法获取锁。如果有源源不断的读操作,写锁只能等待所有读锁释放后才能获取到(写锁饥饿)。Writer-preferring:写优先的策略,可以保证即便在读密集的场景下,写锁也不会饥饿;只要有一个写锁申请加锁,那么就会阻塞后续的所有读锁加锁行为(已经获取到读锁的reader不受影响,写锁仍然要等待这些读锁释放之后才能加锁)Unspecified(不指定):不区分 reader 和 writer 优先级,中庸之道,读写性能不是最优,但是可以避免饥饿问题RWMutex 采取的就是写锁优先策略