go sync.WaitGroup源码分析
生活随笔
收集整理的這篇文章主要介紹了
go sync.WaitGroup源码分析
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
?
go版本 :1.10.3
原理實現(xiàn):信號量
信號量是Unix系統(tǒng)提供的一種保護共享資源的機制,用于防止多個線程同時訪問某個資源。
可簡單理解為信號量為一個數(shù)值:
- 當信號量>0時,表示資源可用,獲取信號量時系統(tǒng)自動將信號量減1;
- 當信號量==0時,表示資源暫不可用,獲取信號量時,當前線程會進入睡眠,當信號量為正時被喚醒
?
WaitGroup的定義
type WaitGroup struct {noCopy noCopy // noCopy用來標記不可復制,只能用指針傳遞,保證全局唯一.其實即使復制了,編譯,運行都沒問題,只有用go vet檢測時才會顯示出錯誤// 只需要64位,即8個字節(jié),其中高32位是counter值,低32位值是waiter值// 不直接使用uint64,是因為uint64的原子操作需要64位系統(tǒng),而32位系統(tǒng)下,可能會出現(xiàn)崩潰// 所以這里用byte數(shù)組來實現(xiàn),32位系統(tǒng)下4字節(jié)對齊,64位系統(tǒng)下8字節(jié)對齊,所以申請12個字節(jié),其中必定有8個字節(jié)是符合8字節(jié)對齊的,下面的state()函數(shù)中有進行判斷state1 [12]bytesema uint32 // 信號量 } // 得到counter值(uint64的高32位),waiter值(uint64的低32位) func (wg *WaitGroup) state() *uint64 {// 根據(jù)state1的起始地址分析,若是8字節(jié)對齊的,則直接用前8個字節(jié)作為*uint64類型// 若不是,說明是4字節(jié)對齊,則后移4個字節(jié)后,這樣必為8字節(jié)對齊,然后取后面8個字節(jié)作為*uint64類型if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {return (*uint64)(unsafe.Pointer(&wg.state1))} else {return (*uint64)(unsafe.Pointer(&wg.state1[4]))} }增加或減少counter的值
func (wg *WaitGroup) Add(delta int) {//當前的 counter值 waiter值statep := wg.state()// 把delta值加到counter上state := atomic.AddUint64(statep, uint64(delta)<<32)v := int32(state >> 32) // counter值w := uint32(state) // waiter值// counter為負,則panicif v < 0 {panic("sync: negative WaitGroup counter")}// waiter值不為0,累加后的counter值和delta相等,說明Add()和Wait()同時調(diào)用了,panic,因為正確的做法是先Add()后Wait()if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 正常Add()后的情況// 1. counter > 0,說明還不需要釋放信號量,返回// 2. waiter = 0,說明沒有等待的goruntine,也不需要釋放信號量,返回if v > 0 || w == 0 {return}// 下面是counter==0,且w>0的情況// 現(xiàn)在若原state和新state不等,有以下兩種可能// 1. add和wait同時調(diào)用// 2. counter已經(jīng)為0,但waiter還為正值,這種情況永遠不可能觸發(fā)信號量了// 都是出錯了if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 把counter,waiter都置為0,因為已經(jīng)觸發(fā)信號,通知所有等待的goroutine即可,此時不可以再Add()或者Wait()了*statep = 0// 原子地遞增信號量,并通知等待的goroutinefor ; w != 0; w-- {runtime_Semrelease(&wg.sema, false)} } // Done函數(shù)即簡單將counter值減1 func (wg *WaitGroup) Done() {wg.Add(-1) }增加waiter值
func (wg *WaitGroup) Wait() {//當前的 counter值 waiter值statep := wg.state()// 一直等待,直到無需等待或信號量觸發(fā)時,才返回for {state := atomic.LoadUint64(statep)v := int32(state >> 32) // counter值w := uint32(state) // waiter值// 若counter值為0,說明所有g(shù)oroutine都退出了,無需等待,直接返回即可if v == 0 {return}// 原子地增加waiter的值,CAS方法,外面有for循環(huán)會一直嘗試,保證多個goroutine同時調(diào)用Wait()也能正確累加waiterif atomic.CompareAndSwapUint64(statep, state, state+1) {// 一直等待信號量sema,直到>0,信號量觸發(fā),然后以原子的方式遞減它runtime_Semacquire(&wg.sema)// 看上面的Add()函數(shù),觸發(fā)信號量前會先將counter和waiter置0,所以此時必定為0// 若不為0,說明WaitGroup此時又被執(zhí)行Add()或者Wait()操作了,應(yīng)panicif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}// 可以返回了return}} }提示
- Add()操作必須早于Wait(), 否則會panic
- Add()設(shè)置的值必須與實際等待的goroutine個數(shù)一致,否則會panic
- WaitGroup只可保持一份,不可拷貝給其他變量,否則會造成意想不到的BUG
總結(jié)
以上是生活随笔為你收集整理的go sync.WaitGroup源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: go uintptr unsafe Po
- 下一篇: go errgroup 递归搜索目录中的