线程互斥与同步 在c#中用mutex类实现线程的互斥_Golang 并发编程与同步原语
當提到并發(fā)編程、多線程編程時,我們往往都離不開『鎖』這一概念,Go 語言作為一個原生支持用戶態(tài)進程 Goroutine 的語言,也一定會為開發(fā)者提供這一功能,鎖的主要作用就是保證多個線程或者 Goroutine 在訪問同一片內存時不會出現(xiàn)混亂的問題,鎖其實是一種并發(fā)編程中的同步原語(Synchronization Primitives)。
在這一節(jié)中我們就會介紹 Go 語言中常見的同步原語 Mutex、RWMutex、WaitGroup、Once 和 Cond 以及擴展原語 ErrGroup、Semaphore和 SingleFlight 的實現(xiàn)原理,同時也會涉及互斥鎖、信號量等并發(fā)編程中的常見概念。
基本原語
Go 語言在 sync 包中提供了用于同步的一些基本原語,包括常見的互斥鎖 Mutex 與讀寫互斥鎖 RWMutex 以及 Once、WaitGroup。
這些基本原語的主要作用是提供較為基礎的同步功能,我們應該使用 Channel 和通信來實現(xiàn)更加高級的同步機制,我們在這一節(jié)中并不會介紹標準庫中全部的原語,而是會介紹其中比較常見的 Mutex、RWMutex、Once、WaitGroup 和 Cond,我們并不會涉及剩下兩個用于存取數(shù)據(jù)的結構體 Map 和 Pool。
Mutex
Go 語言中的互斥鎖在 sync 包中,它由兩個字段 state 和 sema 組成,state 表示當前互斥鎖的狀態(tài),而 sema 真正用于控制鎖狀態(tài)的信號量,這兩個加起來只占 8 個字節(jié)空間的結構體就表示了 Go 語言中的互斥鎖。
type Mutex struct {state int32sema uint32 }狀態(tài)
互斥鎖的狀態(tài)是用 int32 來表示的,但是鎖的狀態(tài)并不是互斥的,它的最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置都用來表示當前有多少個 Goroutine 等待互斥鎖被釋放:
互斥鎖在被創(chuàng)建出來時,所有的狀態(tài)位的默認值都是 0,當互斥鎖被鎖定時 mutexLocked 就會被置成 1、當互斥鎖被在正常模式下被喚醒時 mutexWoken 就會被被置成 1、mutexStarving 用于表示當前的互斥鎖進入了狀態(tài),最后的幾位是在當前互斥鎖上等待的 Goroutine 個數(shù)。
饑餓模式
在了解具體的加鎖和解鎖過程之前,我們需要先簡單了解一下 Mutex 在使用過程中可能會進入的饑餓模式,饑餓模式是在 Go 語言 1.9 版本引入的特性,它的主要功能就是保證互斥鎖的獲取的『公平性』(Fairness)。
互斥鎖可以同時處于兩種不同的模式,也就是正常模式和饑餓模式,在正常模式下,所有鎖的等待者都會按照先進先出的順序獲取鎖,但是如果一個剛剛被喚起的 Goroutine 遇到了新的 Goroutine 進程也調用了 Lock 方法時,大概率會獲取不到鎖,為了減少這種情況的出現(xiàn),防止 Goroutine 被『餓死』,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式。
在饑餓模式中,互斥鎖會被直接交給等待隊列最前面的 Goroutine,新的 Goroutine 在這時不能獲取鎖、也不會進入自旋的狀態(tài),它們只會在隊列的末尾等待,如果一個 Goroutine 獲得了互斥鎖并且它是隊列中最末尾的協(xié)程或者它等待的時間少于 1ms,那么當前的互斥鎖就會被切換回正常模式。
相比于饑餓模式,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的主要作用就是避免一些 Goroutine 由于陷入等待無法獲取鎖而造成較高的尾延時,這也是對 Mutex 的一個優(yōu)化。
加鎖
互斥鎖 Mutex 的加鎖是靠 Lock 方法完成的,最新的 Go 語言源代碼中已經將 Lock 方法進行了簡化,方法的主干只保留了最常見、簡單并且快速的情況;當鎖的狀態(tài)是 0 時直接將 mutexLocked 位置成 1:
func (m *Mutex) Lock() {if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {return}m.lockSlow() }但是當 Lock 方法被調用時 Mutex 的狀態(tài)不是 0 時就會進入 lockSlow 方法嘗試通過自旋或者其他的方法等待鎖的釋放并獲取互斥鎖,該方法的主體是一個非常大 for 循環(huán),我們會將該方法分成幾個部分介紹獲取鎖的過程:
func (m *Mutex) lockSlow() {var waitStartTime int64starving := falseawoke := falseiter := 0old := m.statefor {if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}在這段方法的第一部分會判斷當前方法能否進入自旋來等待鎖的釋放,自旋(Spinnig)其實是在多線程同步的過程中使用的一種機制,當前的進程在進入自旋的過程中會一直保持 CPU 的占用,持續(xù)檢查某個條件是否為真,在多核的 CPU 上,自旋的優(yōu)點是避免了 Goroutine 的切換,所以如果使用恰當會對性能帶來非常大的增益。
在 Go 語言的 Mutex 互斥鎖中,只有在普通模式下才可能進入自旋,除了模式的限制之外,runtime_canSpin 方法中會判斷當前方法是否可以進入自旋,進入自旋的條件非常苛刻:
一旦當前 Goroutine 能夠進入自旋就會調用 runtime_doSpin,它最終調用匯編語言編寫的方法 procyield 并執(zhí)行指定次數(shù)的 PAUSE 指令,PAUSE 指令什么都不會做,但是會消耗 CPU 時間,每次自旋都會調用 30 次 PAUSE,下面是該方法在 386 架構的機器上的實現(xiàn):
TEXT runtime·procyield(SB),NOSPLIT,$0-0MOVL cycles+0(FP), AX again:PAUSESUBL $1, AXJNZ againRET處理了自旋相關的特殊邏輯之后,互斥鎖接下來就根據(jù)上下文計算當前互斥鎖最新的狀態(tài)了,幾個不同的條件分別會更新 state 中存儲的不同信息 mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
new := oldif old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {new &^= mutexWoken}計算了新的互斥鎖狀態(tài)之后,我們就會使用 atomic 包提供的 CAS 函數(shù)修改互斥鎖的狀態(tài),如果當前的互斥鎖已經處于饑餓和鎖定的狀態(tài),就會跳過當前步驟,調用 runtime_SemacquireMutex 方法:
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 1)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}} }runtime_SemacquireMutex 方法的主要作用就是通過 Mutex 的使用互斥鎖中的信號量保證資源不會被兩個 Goroutine 獲取,從這里我們就能看出 Mutex 其實就是對更底層的信號量進行封裝,對外提供更加易用的 API,runtime_SemacquireMutex 會在方法中不斷調用 goparkunlock 將當前 Goroutine 陷入休眠等待信號量可以被獲取。
一旦當前 Goroutine 可以獲取信號量,就證明互斥鎖已經被解鎖,該方法就會立刻返回,Lock 方法的剩余代碼也會繼續(xù)執(zhí)行下去了,當前互斥鎖處于饑餓模式時,如果該 Goroutine 是隊列中最后的一個 Goroutine 或者等待鎖的時間小于 starvationThresholdNs(1ms),當前 Goroutine 就會直接獲得互斥鎖并且從饑餓模式中退出并獲得鎖。
解鎖
互斥鎖的解鎖過程相比之下就非常簡單,Unlock 方法會直接使用 atomic 包提供的 AddInt32,如果返回的新狀態(tài)不等于 0 就會進入 unlockSlow 方法:
func (m *Mutex) Unlock() {new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {m.unlockSlow(new)} }unlockSlow 方法首先會對鎖的狀態(tài)進行校驗,如果當前互斥鎖已經被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex 中止當前程序,在正常情況下會根據(jù)當前互斥鎖的狀態(tài)是正常模式還是饑餓模式進入不同的分支:
func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 {old := newfor {if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {runtime_Semrelease(&m.sema, true, 1)} }如果當前互斥鎖的狀態(tài)是饑餓模式就會直接調用 runtime_Semrelease 方法直接將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者會在被喚醒之后設置 mutexLocked 狀態(tài),由于此時仍然處于 mutexStarving,所以新的 Goroutine 也無法獲得鎖。
在正常模式下,如果當前互斥鎖不存在等待者或者最低三位表示的狀態(tài)都為 0,那么當前方法就不需要喚醒其他 Goroutine 可以直接返回,當有 Goroutine 正在處于等待狀態(tài)時,還是會通過 runtime_Semrelease 喚醒對應的 Goroutine 并移交鎖的所有權。
小結
通過對互斥鎖 Mutex 加鎖和解鎖過程的分析,我們能夠得出以下的一些結論,它們能夠幫助我們更好地理解互斥鎖的工作原理,互斥鎖的加鎖的過程比較復雜,涉及自旋、信號量以及 Goroutine 調度等概念:
- 如果互斥鎖處于初始化狀態(tài),就會直接通過置位 mutexLocked 加鎖;
- 如果互斥鎖處于 mutexLocked 并且在普通模式下工作,就會進入自旋,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
- 如果當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會被切換到饑餓模式;
- 互斥鎖在正常情況下會通過 runtime_SemacquireMutex 方法將調用 Lock 的 Goroutine 切換至休眠狀態(tài),等待持有信號量的 Goroutine 喚醒當前協(xié)程;
- 如果當前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,當前 Goroutine 會將互斥鎖切換回正常模式;
互斥鎖的解鎖過程相對來說就比較簡單,雖然對于普通模式和饑餓模式的處理有一些不同,但是由于代碼行數(shù)不多,所以邏輯清晰,也非常容易理解:
- 如果互斥鎖已經被解鎖,那么調用 Unlock 會直接拋出異常;
- 如果互斥鎖處于饑餓模式,會直接將鎖的所有權交給隊列中的下一個等待者,等待者會負責設置 mutexLocked 標志位;
- 如果互斥鎖處于普通模式,并且沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 獲得了鎖就會直接返回,在其他情況下回通過 runtime_Semrelease 喚醒對應的 Goroutine;
RWMutex
讀寫互斥鎖也是 Go 語言 sync 包為我們提供的接口之一,一個常見的服務對資源的讀寫比例會非常高,如果大多數(shù)的請求都是讀請求,它們之間不會相互影響,那么我們?yōu)槭裁床荒軐Y源讀和寫操作分離呢?這也就是 RWMutex 讀寫互斥鎖解決的問題,不限制對資源的并發(fā)讀,但是讀寫、寫寫操作無法并行執(zhí)行。
| | 讀 | 寫 | | :-: | :-: | :-: | | 讀 | Y | N | | 寫 | N | N |
讀寫互斥鎖在 Go 語言中的實現(xiàn)是 RWMutex,其中不僅包含一個互斥鎖,還持有兩個信號量,分別用于寫等待讀和讀等待寫:
type RWMutex struct {w MutexwriterSem uint32readerSem uint32readerCount int32readerWait int32 }readerCount 存儲了當前正在執(zhí)行的讀操作的數(shù)量,最后的 readerWait 表示當寫操作被阻塞時等待的讀操作個數(shù)。
讀鎖
讀鎖的加鎖非常簡單,我們通過 atomic.AddInt32 方法為 readerCount 加一,如果該方法返回了負數(shù)說明當前有 Goroutine 獲得了寫鎖,當前 Goroutine 就會調用 runtime_SemacquireMutex 陷入休眠等待喚醒:
func (rw *RWMutex) RLock() {if atomic.AddInt32(&rw.readerCount, 1) < 0 {runtime_SemacquireMutex(&rw.readerSem, false, 0)} }如果沒有寫操作獲取當前互斥鎖,當前方法就會在 readerCount 加一后返回;當 Goroutine 想要釋放讀鎖時會調用 RUnlock 方法:
func (rw *RWMutex) RUnlock() {if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {rw.rUnlockSlow(r)} }該方法會在減少正在讀資源的 readerCount,當前方法如果遇到了返回值小于零的情況,說明有一個正在進行的寫操作,在這時就應該通過 rUnlockSlow 方法減少當前寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem:
func (rw *RWMutex) rUnlockSlow(r int32) {if r+1 == 0 || r+1 == -rwmutexMaxReaders {throw("sync: RUnlock of unlocked RWMutex")}if atomic.AddInt32(&rw.readerWait, -1) == 0 {runtime_Semrelease(&rw.writerSem, false, 1)} }writerSem 在被觸發(fā)之后,嘗試獲取讀寫鎖的進程就會被喚醒并獲得鎖。
讀寫鎖
當資源的使用者想要獲取讀寫鎖時,就需要通過 Lock 方法了,在 Lock 方法中首先調用了讀寫互斥鎖持有的 Mutex 的 Lock 方法保證其他獲取讀寫鎖的 Goroutine 進入等待狀態(tài),隨后的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 其實是為了阻塞后續(xù)的讀操作:
func (rw *RWMutex) Lock() {rw.w.Lock()r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReadersif r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)} }如果當前仍然有其他 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 就會調用 runtime_SemacquireMutex 進入休眠狀態(tài),等待讀鎖釋放時觸發(fā) writerSem 信號量將當前協(xié)程喚醒。
對資源的讀寫操作完成之后就會將通過 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 變回正數(shù)并通過 for 循環(huán)觸發(fā)所有由于獲取讀鎖而陷入等待的 Goroutine:
func (rw *RWMutex) Unlock() {r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)if r >= rwmutexMaxReaders {throw("sync: Unlock of unlocked RWMutex")}for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}rw.w.Unlock() }在方法的最后,RWMutex 會釋放持有的互斥鎖讓其他的協(xié)程能夠重新獲取讀寫鎖。
小結
相比狀態(tài)復雜的互斥鎖 Mutex 來說,讀寫互斥鎖 RWMutex 雖然提供的功能非常復雜,但是由于站在了 Mutex 的『肩膀』上,所以整體的實現(xiàn)上會簡單很多。
RWMutex 在 Mutex 之上提供了額外的讀寫分離功能,能夠在讀請求遠遠多于寫請求時提供性能上的提升,我們也可以在場景合適時選擇讀寫互斥鎖。
WaitGroup
WaitGroup 是 Go 語言 sync 包中比較常見的同步機制,它可以用于等待一系列的 Goroutine 的返回,一個比較常見的使用場景是批量執(zhí)行 RPC 或者調用外部服務:
requests := []*Request{...}wg := &sync.WaitGroup{} wg.Add(len(requests))for _, request := range requests {go func(r *Request) {defer wg.Done()// res, err := service.call(r)}(request) }wg.Wait()通過 WaitGroup 我們可以在多個 Goroutine 之間非常輕松地同步信息,原本順序執(zhí)行的代碼也可以在多個 Goroutine 中并發(fā)執(zhí)行,加快了程序處理的速度,在上述代碼中只有在所有的 Goroutine 都執(zhí)行完畢之后 Wait 方法才會返回,程序可以繼續(xù)執(zhí)行其他的邏輯。
總而言之,它的作用就像它的名字一樣,,通過 Done 來傳遞任務完成的信號,比較常用于等待一組 Goroutine 中并發(fā)執(zhí)行的任務全部結束。
結構體
WaitGroup 結構體中的成員變量非常簡單,其中的 noCopy 的主要作用就是保證 WaitGroup 不會被開發(fā)者通過再賦值的方式進行拷貝,進而導致一些詭異的行為:
type WaitGroup struct {noCopy noCopystate1 [3]uint32 }copylock 包就是一個用于檢查類似錯誤的分析器,它的原理就是在 編譯期間 檢查被拷貝的變量中是否包含 noCopy 或者 sync 關鍵字,如果包含當前關鍵字就會報出以下的錯誤:
package mainimport ("fmt""sync" )func main() {wg := sync.Mutex{}yawg := wgfmt.Println(wg, yawg) }$ go run proc.go ./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex ./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex ./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex這段代碼會在賦值和調用 fmt.Println 時發(fā)生值拷貝導致分析器報錯,你可以通過訪問 鏈接 嘗試運行這段代碼。
除了 noCopy 之外,WaitGroup 結構體中還包含一個總共占用 12 字節(jié)大小的數(shù)組,這個數(shù)組中會存儲當前結構體持有的狀態(tài)和信號量,在 64 位與 32 位的機器上表現(xiàn)也非常不同。
WaitGroup 提供了私有方法 state 能夠幫助我們從 state1 字段中取出它的狀態(tài)和信號量。
操作
WaitGroup 對外暴露的接口只有三個 Add、Wait 和 Done,其中 Done 方法只是調用了 wg.Add(-1) 本身并沒有什么特殊的邏輯,我們來了解一下剩余的兩個方法:
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()state := atomic.AddUint64(statep, uint64(delta)<<32)v := int32(state >> 32)w := uint32(state)if v < 0 {panic("sync: negative WaitGroup counter")}if v > 0 || w == 0 {return}*statep = 0for ; w != 0; w-- {runtime_Semrelease(semap, false, 0)} }Add 方法的主要作用就是更新 WaitGroup 中持有的計數(shù)器 counter,64 位狀態(tài)的高 32 位,雖然 Add 方法傳入的參數(shù)可以為負數(shù),但是一個 WaitGroup 的計數(shù)器只能是非負數(shù),當調用 Add 方法導致計數(shù)器歸零并且還有等待的 Goroutine 時,就會通過 runtime_Semrelease 喚醒處于等待狀態(tài)的所有 Goroutine。
另一個 WaitGroup 的方法 Wait 就會在當前計數(shù)器中保存的數(shù)據(jù)大于 0 時修改等待 Goroutine 的個數(shù) waiter 并調用 runtime_Semacquire 陷入睡眠狀態(tài)。
func (wg *WaitGroup) Wait() {statep, semap := wg.state()for {state := atomic.LoadUint64(statep)v := int32(state >> 32)if v == 0 {return}if atomic.CompareAndSwapUint64(statep, state, state+1) {runtime_Semacquire(semap)if +statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}return}} }陷入睡眠的 Goroutine 就會等待 Add 方法在計數(shù)器為 0 時喚醒。
小結
通過對 WaitGroup 的分析和研究,我們能夠得出以下的一些結論:
- Add 不能在和 Wait 方法在 Goroutine 中并發(fā)調用,一旦出現(xiàn)就會造成程序崩潰;
- WaitGroup 必須在 Wait 方法返回之后才能被重新使用;
- Done 只是對 Add 方法的簡單封裝,我們可以向 Add 方法傳入任意負數(shù)(需要保證計數(shù)器非負)快速將計數(shù)器歸零以喚醒其他等待的 Goroutine;
- 可以同時有多個 Goroutine 等待當前 WaitGroup 計數(shù)器的歸零,這些 Goroutine 也會被『同時』喚醒;
Once
Go 語言在標準庫的 sync 同步包中還提供了 Once 語義,它的主要功能其實也很好理解,保證在 Go 程序運行期間 Once 對應的某段代碼只會執(zhí)行一次。
在如下所示的代碼中,Do 方法中傳入的函數(shù)只會被執(zhí)行一次,也就是我們在運行如下所示的代碼時只會看見一次 only once 的輸出結果:
func main() {o := &sync.Once{}for i := 0; i < 10; i++ {o.Do(func() {fmt.Println("only once")})} }$ go run main.go only once作為 sync 包中的結構體,Once 有著非常簡單的數(shù)據(jù)結構,每一個 Once 結構體中都只包含一個用于標識代碼塊是否被執(zhí)行過的 done 以及一個互斥鎖 Mutex:
type Once struct {done uint32m Mutex }Once 結構體對外唯一暴露的方法就是 Do,該方法會接受一個入?yún)榭盏暮瘮?shù),如果使用 atomic.LoadUint32 檢查到已經執(zhí)行過函數(shù)了,就會直接返回,否則就會進入 doSlow 運行傳入的函數(shù):
func (o *Once) Do(f func()) {if atomic.LoadUint32(&o.done) == 0 {o.doSlow(f)} }func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)f()} }doSlow 的實現(xiàn)也非常簡單,我們先為當前的 Goroutine 獲取互斥鎖,然后通過 defer 關鍵字將 done 成員變量設置成 1 并運行傳入的函數(shù),無論當前函數(shù)是正常運行還是拋出 panic,當前方法都會將 done 設置成 1 保證函數(shù)不會執(zhí)行第二次。
小結
作為用于保證函數(shù)執(zhí)行次數(shù)的 Once 結構體,它使用互斥鎖和 atomic 提供的方法實現(xiàn)了某個函數(shù)在程序運行期間只能執(zhí)行一次的語義,在使用的過程中我們也需要注意以下的內容:
- Do 方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了 panic;
- 兩次調用 Do 方法傳入不同的函數(shù)時只會執(zhí)行第一次調用的函數(shù);
Cond
Go 語言在標準庫中提供的 Cond 其實是一個條件變量,通過 Cond 我們可以讓一系列的 Goroutine 都在觸發(fā)某個事件或者條件時才被喚醒,每一個 Cond 結構體都包含一個互斥鎖 L,我們先來看一下 Cond 是如何使用的:
func main() {c := sync.NewCond(&sync.Mutex{})for i := 0; i < 10; i++ {go listen(c)}go broadcast(c)ch := make(chan os.Signal, 1)signal.Notify(ch, os.Interrupt)<-ch }func broadcast(c *sync.Cond) {c.L.Lock()c.Broadcast()c.L.Unlock() }func listen(c *sync.Cond) {c.L.Lock()c.Wait()fmt.Println("listen")c.L.Unlock() }$ go run main.go listen listen ... listen在上述代碼中我們同時運行了 11 個 Goroutine,其中的 10 個 Goroutine 會通過 Wait 等待期望的信號或者事件,而剩下的一個 Goroutine 會調用 Broadcast 方法通知所有陷入等待的 Goroutine,當調用 Boardcast 方法之后,就會打印出 10 次 "listen" 并結束調用。
結構體
Cond 的結構體中包含 noCopy 和 copyChecker 兩個字段,前者用于保證 Cond 不會再編譯期間拷貝,后者保證在運行期間發(fā)生拷貝會直接 panic,持有的另一個鎖 L 其實是一個接口 Locker,任意實現(xiàn) Lock 和 Unlock 方法的結構體都可以作為 NewCond 方法的參數(shù):
type Cond struct {noCopy noCopyL Lockernotify notifyListchecker copyChecker }結構體中最后的變量 notifyList 其實也就是為了實現(xiàn) Cond 同步機制,該結構體其實就是一個 Goroutine 的鏈表:
type notifyList struct {wait uint32notify uint32lock mutexhead *sudogtail *sudog }在這個結構體中,head 和 tail 分別指向的就是整個鏈表的頭和尾,而 wait 和 notify 分別表示當前正在等待的 Goroutine 和已經通知到的 Goroutine,我們通過這兩個變量就能確認當前待通知和已通知的 Goroutine。
操作
Cond 對外暴露的 Wait 方法會將當前 Goroutine 陷入休眠狀態(tài),它會先調用 runtime_notifyListAdd 將等待計數(shù)器 +1,然后解鎖并調用 runtime_notifyListWait 等待其他 Goroutine 的喚醒:
func (c *Cond) Wait() {c.checker.check()t := runtime_notifyListAdd(&c.notify)c.L.Unlock()runtime_notifyListWait(&c.notify, t)c.L.Lock() }func notifyListAdd(l *notifyList) uint32 {return atomic.Xadd(&l.wait, 1) - 1 }notifyListWait 方法的主要作用就是獲取當前的 Goroutine 并將它追加到 notifyList 鏈表的最末端:
func notifyListWait(l *notifyList, t uint32) {lock(&l.lock)if less(t, l.notify) {unlock(&l.lock)return}s := acquireSudog()s.g = getg()s.ticket = tif l.tail == nil {l.head = s} else {l.tail.next = s}l.tail = sgoparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)releaseSudog(s) }除了將當前 Goroutine 追加到鏈表的末端之外,我們還會調用 goparkunlock 陷入休眠狀態(tài),該函數(shù)也是在 Go 語言切換 Goroutine 時經常會使用的方法,它會直接讓出當前處理器的使用權并等待調度器的喚醒。
Cond 對外提供的 Signal 和 Broadcast 方法就是用來喚醒調用 Wait 陷入休眠的 Goroutine,從兩個方法的名字來看,前者會喚醒隊列最前面的 Goroutine,后者會喚醒隊列中全部的 Goroutine:
func (c *Cond) Signal() {c.checker.check()runtime_notifyListNotifyOne(&c.notify) }func (c *Cond) Broadcast() {c.checker.check()runtime_notifyListNotifyAll(&c.notify) }notifyListNotifyAll 方法會從鏈表中取出全部的 Goroutine 并為它們依次調用 readyWithTime,該方法會通過 goready 將目標的 Goroutine 喚醒:
func notifyListNotifyAll(l *notifyList) {s := l.headl.head = nill.tail = nilatomic.Store(&l.notify, atomic.Load(&l.wait))for s != nil {next := s.nexts.next = nilreadyWithTime(s, 4)s = next} }雖然它會依次喚醒全部的 Goroutine,但是這里喚醒的順序其實也是按照加入隊列的先后順序,先加入的會先被 goready 喚醒,后加入的 Goroutine 可能就需要等待調度器的調度。
而 notifyListNotifyOne 函數(shù)就只會從 sudog 構成的鏈表中滿足 sudog.ticket == l.notify 的 Goroutine 并通過 readyWithTime 喚醒:
func notifyListNotifyOne(l *notifyList) {t := l.notifyatomic.Store(&l.notify, t+1)for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {if s.ticket == t {n := s.nextif p != nil {p.next = n} else {l.head = n}if n == nil {l.tail = p}s.next = nilreadyWithTime(s, 4)return}} }在一般情況下我們都會選擇在不滿足特定條件時調用 Wait 陷入休眠,當某些 Goroutine 檢測到當前滿足了喚醒的條件,就可以選擇使用 Signal 通知一個或者 Broadcast 通知全部的 Goroutine 當前條件已經滿足,可以繼續(xù)完成工作了。
小結
與 Mutex 相比,Cond 還是一個不被所有人都清楚和理解的同步機制,它提供了類似隊列的 FIFO 的等待機制,同時也提供了 Signal 和 Broadcast 兩種不同的喚醒方法,相比于使用 for {} 忙碌等待,使用 Cond 能夠在遇到長時間條件無法滿足時將當前處理器讓出的功能,如果我們合理使用還是能夠在一些情況下提升性能,在使用的過程中我們需要注意:
- Wait 方法在調用之前一定要使用 L.Lock 持有該資源,否則會發(fā)生 panic 導致程序崩潰;
- Signal 方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
- Broadcast 雖然是廣播通知全部等待的 Goroutine,但是真正被喚醒時也是按照一定順序的;
擴展原語
除了這些標準庫中提供的同步原語之外,Go 語言還在子倉庫 x/sync 中提供了額外的四種同步原語,ErrGroup、Semaphore、SingleFlight 和 SyncMap,其中的 SyncMap 其實就是 sync 包中的 sync.Map,它在 1.9 版本的 Go 語言中被引入了 x/sync 包,隨著 API 的成熟和穩(wěn)定最后被移到了標準庫 sync 包中。
我們在這一節(jié)中就會介紹 Go 語言目前在擴展包中提供的三種原語,也就是 ErrGroup、Semaphore 和 SingleFlight。
ErrGroup
子倉庫 x/sync 中的包 errgroup 其實就為我們在一組 Goroutine 中提供了同步、錯誤傳播以及上下文取消的功能,我們可以使用如下所示的方式并行獲取網頁的數(shù)據(jù):
var g errgroup.Group var urls = []string{"http://www.golang.org/","http://www.google.com/","http://www.somestupidname.com/", } for i := range urls {url := urls[i]g.Go(func() error {resp, err := http.Get(url)if err == nil {resp.Body.Close()}return err}) } if err := g.Wait(); err == nil {fmt.Println("Successfully fetched all URLs.") }Go 方法能夠創(chuàng)建一個 Goroutine 并在其中執(zhí)行傳入的函數(shù),而 Wait 方法會等待 Go 方法創(chuàng)建的 Goroutine 全部返回后返回第一個非空的錯誤,如果所有的 Goroutine 都沒有返回錯誤,該函數(shù)就會返回 nil。
結構體
errgroup 包中的 Group 結構體同時由三個比較重要的部分組成:
這些字段共同組成了 Group 結構體并為我們提供同步、錯誤傳播以及上下文取消等功能。
操作
errgroup 對外唯一暴露的構造器就是 WithContext 方法,我們只能從一個 Context 中創(chuàng)建一個新的 Group 變量,WithCancel 返回的取消函數(shù)也僅會在 Group 結構體內部使用:
func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancel(ctx)return &Group{cancel: cancel}, ctx }創(chuàng)建新的并行子任務需要使用 Go 方法,這個方法內部會對 WaitGroup 加一并創(chuàng)建一個新的 Goroutine,在 Goroutine 內部運行子任務并在返回錯誤時及時調用 cancel 并對 err 賦值,只有最早返回的錯誤才會被上游感知到,后續(xù)的錯誤都會被舍棄:
func (g *Group) Go(f func() error) {g.wg.Add(1)go func() {defer g.wg.Done()if err := f(); err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}}() }func (g *Group) Wait() error {g.wg.Wait()if g.cancel != nil {g.cancel()}return g.err }Wait 方法其實就只是調用了 WaitGroup 的同步方法,在子任務全部完成時取消 Context 并返回可能出現(xiàn)的錯誤。
小結
errgroup 包中的 Group 同步原語的實現(xiàn)原理還是非常簡單的,它沒有涉及非常底層和運行時包中的 API,只是對基本同步語義進行了簡單的封裝提供了更加復雜的功能,在使用時我們也需要注意以下的幾個問題:
- 出現(xiàn)錯誤或者等待結束后都會調用 Context 的 cancel 方法取消上下文;
- 只有第一個出現(xiàn)的錯誤才會被返回,剩余的錯誤都會被直接拋棄;
Semaphore
信號量是在并發(fā)編程中比較常見的一種同步機制,它會保證持有的計數(shù)器在 0 到初始化的權重之間,每次獲取資源時都會將信號量中的計數(shù)器減去對應的數(shù)值,在釋放時重新加回來,當遇到計數(shù)器大于信號量大小時就會進入休眠等待其他進程釋放信號,我們常常會在控制訪問資源的進程數(shù)量時用到。
Golang 的擴展包中就提供了帶權重的信號量,我們可以按照不同的權重對資源的訪問進行管理,這個包對外也只提供了四個方法:
- NewWeighted 用于創(chuàng)建新的信號量;
- Acquire 獲取了指定權重的資源,如果當前沒有『空閑資源』,就會陷入休眠等待;
- TryAcquire 也用于獲取指定權重的資源,但是如果當前沒有『空閑資源』,就會直接返回 false;
- Release 用于釋放指定權重的資源;
結構體
NewWeighted 方法的主要作用創(chuàng)建一個新的權重信號量,傳入信號量最大的權重就會返回一個新的 Weighted 結構體指針:
func NewWeighted(n int64) *Weighted {w := &Weighted{size: n}return w }type Weighted struct {size int64cur int64mu sync.Mutexwaiters list.List }Weighted 結構體中包含一個 waiters 列表其中存儲著等待獲取資源的『用戶』,除此之外它還包含當前信號量的上限以及一個計數(shù)器 cur,這個計數(shù)器的范圍就是 [0, size]:
信號量中的計數(shù)器會隨著用戶對資源的訪問和釋放進行改變,引入的權重概念能夠幫助我們更好地對資源的訪問粒度進行控制,盡可能滿足所有常見的用例。
獲取
在上面我們已經提到過 Acquire 方法就是用于獲取指定權重資源的方法,這個方法總共由三個不同的情況組成:
另一個用于獲取信號量的方法 TryAcquire 相比之下就非常簡單,它只會判斷當前信號量是否有充足的資源獲取,如果有充足的資源就會直接立刻返回 true 否則就會返回 false:
func (s *Weighted) TryAcquire(n int64) bool {s.mu.Lock()success := s.size-s.cur >= n && s.waiters.Len() == 0if success {s.cur += n}s.mu.Unlock()return success }與 Acquire 相比,TryAcquire 由于不會等待資源的釋放所以可能更適用于一些延時敏感、用戶需要立刻感知結果的場景。
釋放
最后要介紹的 Release 方法其實也非常簡單,當我們對信號量進行釋放時,Release 方法會從頭到尾遍歷 waiters 列表中全部的等待者,如果釋放資源后的信號量有充足的剩余資源就會通過 Channel 喚起指定的 Goroutine:
func (s *Weighted) Release(n int64) {s.mu.Lock()s.cur -= nfor {next := s.waiters.Front()if next == nil {break}w := next.Value.(waiter)if s.size-s.cur < w.n {break}s.cur += w.ns.waiters.Remove(next)close(w.ready)}s.mu.Unlock() }當然也可能會出現(xiàn)剩余資源無法喚起 Goroutine 的情況,在這時當前方法就會釋放鎖后直接返回,通過對這段代碼的分析我們也能發(fā)現(xiàn),如果一個信號量需要的占用的資源非常多,他可能會長時間無法獲取鎖,這可能也是 Acquire 方法引入另一個參數(shù) Context 的原因,為信號量的獲取設置一個超時時間。
小結
帶權重的信號量確實有著更多的應用場景,這也是 Go 語言對外提供的唯一一種信號量實現(xiàn),在使用的過程中我們需要注意以下的幾個問題:
- Acquire 和 TryAcquire 方法都可以用于獲取資源,前者用于同步獲取會等待鎖的釋放,后者會在無法獲取鎖時直接返回;
- Release 方法會按照 FIFO 的順序喚醒可以被喚醒的 Goroutine;
- 如果一個 Goroutine 獲取了較多地資源,由于 Release 的釋放策略可能會等待比較長的時間;
SingleFlight
singleflight 是 Go 語言擴展包中提供了另一種同步原語,這其實也是作者最喜歡的一種同步擴展機制,它能夠在一個服務中抑制對下游的多次重復請求,一個比較常見的使用場景是 — 我們在使用 Redis 對數(shù)據(jù)庫中的一些熱門數(shù)據(jù)進行了緩存并設置了超時時間,緩存超時的一瞬間可能有非常多的并行請求發(fā)現(xiàn)了 Redis 中已經不包含任何緩存所以大量的流量會打到數(shù)據(jù)庫上影響服務的延時和質量。
但是 singleflight 就能有效地解決這個問題,它的主要作用就是對于同一個 Key 最終只會進行一次函數(shù)調用,在這個上下文中就是只會進行一次數(shù)據(jù)庫查詢,查詢的結果會寫回 Redis 并同步給所有請求對應 Key 的用戶:
這其實就減少了對下游的瞬時流量,在獲取下游資源非常耗時,例如:訪問緩存、數(shù)據(jù)庫等場景下就非常適合使用 singleflight 對服務進行優(yōu)化,在上述的這個例子中我們就可以在想 Redis 和數(shù)據(jù)庫中獲取數(shù)據(jù)時都使用 singleflight 提供的這一功能減少下游的壓力;它的使用其實也非常簡單,我們可以直接使用 singleflight.Group{} 創(chuàng)建一個新的 Group 結構體,然后通過調用 Do 方法就能對相同的請求進行抑制:
type service struct {requestGroup singleflight.Group }func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {rows, err := // select * from tablesif err != nil {return nil, err}return rows, nil})if err != nil {return nil, err}return Response{rows: rows,}, nil }上述代碼使用請求的哈希作為抑制相同請求的鍵,我們也可以選擇一些比較關鍵或者重要的字段作為 Do 方法的第一個參數(shù)避免對下游的瞬時大量請求。
結構體
Group 結構體本身由一個互斥鎖 Mutex 和一個從 Key 到 call 結構體指針的映射表組成,每一個 call 結構體都保存了當前這次調用對應的信息:
type Group struct {mu sync.Mutexm map[string]*call }type call struct {wg sync.WaitGroupval interface{}err errordups intchans []chan<- Result }call 結構體中的 val 和 err 字段都是在執(zhí)行傳入的函數(shù)時只會被賦值一次,它們也只會在 WaitGroup 等待結束都被讀取,而 dups 和 chans 字段分別用于存儲當前 singleflight 抑制的請求數(shù)量以及在結果返回時將信息傳遞給調用方。
操作
singleflight 包提供了兩個用于抑制相同請求的方法,其中一個是同步等待的方法 Do,另一個是返回 Channel 的 DoChan,這兩個方法在功能上沒有太多的區(qū)別,只是在接口的表現(xiàn)上稍有不同。
每次 Do 方法的調用時都會獲取互斥鎖并嘗試對 Group 持有的映射表進行懶加載,隨后判斷是否已經存在 key 對應的函數(shù)調用:
因為 val 和 err 兩個字段都只會在 doCall 方法中被賦值,所以當 doCall 方法和 WaitGroup.Wait 方法返回時,這兩個值就會返回給 Do 函數(shù)的調用者。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {c.val, c.err = fn()c.wg.Done()g.mu.Lock()delete(g.m, key)for _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}g.mu.Unlock() }doCall 中會運行傳入的函數(shù) fn,該函數(shù)的返回值就會賦值給 c.val 和 c.err,函數(shù)執(zhí)行結束后就會調用 WaitGroup.Done 方法通知所有被抑制的請求,當前函數(shù)已經執(zhí)行完成,可以從 call 結構體中取出返回值并返回了;在這之后,doCall 方法會獲取持有的互斥鎖并通過管道將信息同步給使用 DoChan 方法的調用方。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch }DoChan 方法和 Do的區(qū)別就是,它使用 Goroutine 異步執(zhí)行 doCall 并向 call 持有的 chans 切片中追加 chan Result 變量,這也是它能夠提供異步傳值的原因。
小結
singleflight 包提供的 Group 接口確實非常好用,當我們需要這種抑制對下游的相同請求時就可以通過這個方法來增加吞吐量和服務質量,在使用的過程中我們也需要注意以下的幾個問題:
- Do 和 DoChan 一個用于同步阻塞調用傳入的函數(shù),一個用于異步調用傳入的參數(shù)并通過 Channel 接受函數(shù)的返回值;
- Forget 方法可以通知 singleflight 在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執(zhí)行方法而不是等待前面的函數(shù)返回;
- 一旦調用的函數(shù)返回了錯誤,所有在等待的 Goroutine 也都會接收到同樣的錯誤;
總結
我們在這一節(jié)中介紹了 Go 語言標準庫中提供的基本原語以及擴展包中的擴展原語,這些并發(fā)編程的原語能夠幫助我們更好地利用 Go 語言的特性構建高吞吐量、低延時的服務,并解決由于并發(fā)帶來的錯誤,到這里我們再重新回顧一下這一節(jié)介紹的內容:
- Mutex 互斥鎖
- 如果互斥鎖處于初始化狀態(tài),就會直接通過置位 mutexLocked 加鎖;
- 如果互斥鎖處于 mutexLocked 并且在普通模式下工作,就會進入自旋,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
- 如果當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會被切換到饑餓模式;
- 互斥鎖在正常情況下會通過 runtime_SemacquireMutex 方法將調用 Lock 的 Goroutine 切換至休眠狀態(tài),等待持有信號量的 Goroutine 喚醒當前協(xié)程;
- 如果當前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,當前 Goroutine 會將互斥鎖切換回正常模式;
- 如果互斥鎖已經被解鎖,那么調用 Unlock 會直接拋出異常;
- 如果互斥鎖處于饑餓模式,會直接將鎖的所有權交給隊列中的下一個等待者,等待者會負責設置 mutexLocked 標志位;
- 如果互斥鎖處于普通模式,并且沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 獲得了鎖就會直接返回,在其他情況下回通過 runtime_Semrelease 喚醒對應的 Goroutine;
- RWMutex 讀寫互斥鎖
- readerSem — 讀寫鎖釋放時通知由于獲取讀鎖等待的 Goroutine;
- writerSem — 讀鎖釋放時通知由于獲取讀寫鎖等待的 Goroutine;
- w 互斥鎖 — 保證寫操作之間的互斥;
- readerCount — 統(tǒng)計當前進行讀操作的協(xié)程數(shù),觸發(fā)寫鎖時會將其減少 rwmutexMaxReaders 阻塞后續(xù)的讀操作;
- readerWait — 當前讀寫鎖等待的進行讀操作的協(xié)程數(shù),在觸發(fā) Lock 之后的每次 RUnlock 都會將其減一,當它歸零時該 Goroutine 就會獲得讀寫鎖;
- 當讀寫鎖被釋放 Unlock 時首先會通知所有的讀操作,然后才會釋放持有的互斥鎖,這樣能夠保證讀操作不會被連續(xù)的寫操作『餓死』;
- WaitGroup 等待一組 Goroutine 結束
- Add 不能在和 Wait 方法在 Goroutine 中并發(fā)調用,一旦出現(xiàn)就會造成程序崩潰;
- WaitGroup 必須在 Wait 方法返回之后才能被重新使用;
- Done 只是對 Add 方法的簡單封裝,我們可以向 Add 方法傳入任意負數(shù)(需要保證計數(shù)器非負)快速將計數(shù)器歸零以喚醒其他等待的 Goroutine;
- 可以同時有多個 Goroutine 等待當前 WaitGroup 計數(shù)器的歸零,這些 Goroutine 也會被『同時』喚醒;
- Once 程序運行期間僅執(zhí)行一次
- Do 方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了 panic;
- 兩次調用 Do 方法傳入不同的函數(shù)時只會執(zhí)行第一次調用的函數(shù);
- Cond 發(fā)生指定事件時喚醒
- Wait 方法在調用之前一定要使用 L.Lock 持有該資源,否則會發(fā)生 panic 導致程序崩潰;
- Signal 方法喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
- Broadcast 雖然是廣播通知全部等待的 Goroutine,但是真正被喚醒時也是按照一定順序的;
- ErrGroup 為一組 Goroutine 提供同步、錯誤傳播以及上下文取消的功能
- 出現(xiàn)錯誤或者等待結束后都會調用 Context 的 cancel 方法取消上下文;
- 只有第一個出現(xiàn)的錯誤才會被返回,剩余的錯誤都會被直接拋棄;
- Semaphore 帶權重的信號量
- Acquire 和 TryAcquire 方法都可以用于獲取資源,前者用于同步獲取會等待鎖的釋放,后者會在無法獲取鎖時直接返回;
- Release 方法會按照 FIFO 的順序喚醒可以被喚醒的 Goroutine;
- 如果一個 Goroutine 獲取了較多地資源,由于 Release 的釋放策略可能會等待比較長的時間;
- SingleFlight 用于抑制對下游的重復請求
- Do 和 DoChan 一個用于同步阻塞調用傳入的函數(shù),一個用于異步調用傳入的參數(shù)并通過 Channel 接受函數(shù)的返回值;
- Forget 方法可以通知 singleflight 在持有的映射表中刪除某個鍵,接下來對該鍵的調用就會直接執(zhí)行方法而不是等待前面的函數(shù)返回;
- 一旦調用的函數(shù)返回了錯誤,所有在等待的 Goroutine 也都會接收到同樣的錯誤;
這些同步原語的實現(xiàn)不僅要考慮 API 接口的易用、解決并發(fā)編程中可能遇到的線程競爭問題,還需要對尾延時進行優(yōu)化避免某些 Goroutine 無法獲取鎖或者資源而被餓死,對同步原語的學習也能夠增強我們隊并發(fā)編程的理解和認識,也是了解并發(fā)編程無法跨越的一個步驟。
Reference
- sync: make Mutex more fair
- runtime: fall back to fair locks after repeated sleep-acquire failures #13086
- x/sync · Golang
- The Go Memory Model
- The X-Files: Exploring the Golang Standard Library Sub-Repositories
- Go: Avoid duplicate requests with sync/singleflight
總結
以上是生活随笔為你收集整理的线程互斥与同步 在c#中用mutex类实现线程的互斥_Golang 并发编程与同步原语的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: visual studio code P
- 下一篇: 4G内存手机到底能用多少?内存管理机制揭