锲而不舍 —— M 是怎样找工作的?(八)
在 schedule 函數中,我們簡單提過找一個 runnable goroutine 的過程,這一講我們來詳細分析源碼。
工作線程 M 費盡心機也要找到一個可運行的 goroutine,這是它的工作和職責,不達目的,絕不罷體,這種鍥而不舍的精神值得每個人學習。
共經歷三個過程:先從本地隊列找,定期會從全局隊列找,最后實在沒辦法,就去別的 P 偷。如下圖所示:
先看第一個:從 P 本地隊列找。源碼如下:
// 從本地可運行隊列里找到一個 g // 如果 inheritTime 為真,gp 應該繼承這個時間片, // 否則,新開啟一個時間片 func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. // 如果 runnext 不為空,則 runnext 是下一個待運行的 G for { next := _p_.runnext if next == 0 { // 為空,則直接跳出循環 break } // 再次比較 next 是否沒有變化 if _p_.runnext.cas(next, 0) { // 如果沒有變化,則返回 next 所指向的 g。且需要繼承時間片 return next.ptr(), true } } for { // 獲取隊列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers // 獲取隊列尾 t := _p_.runqtail if t == h { // 頭和尾相等,說明本地隊列為空,找不到 g return nil, false } // 獲取隊列頭的 g gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() // 原子操作,防止這中間被其他線程因為偷工作而修改 if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }整個源碼結構比較簡單,主要是兩個 for 循環。
第一個 for 循環嘗試返回 P 的 runnext 成員,因為 runnext 具有最高的運行優先級,因此要首先嘗試獲取 runnext。當發現 runnext 為空時,直接跳出循環,進入第二個。否則,用原子操作獲取 runnext,并將其值修改為 0,也就是空。這里用到原子操作的原因是防止在這個過程中,有其他線程過來“偷工作”,導致并發修改 runnext 成員。
第二個 for 循環則是在嘗試獲取 runnext 成員失敗后,嘗試從本地隊列中返回隊列頭的 goroutine。同樣,先用原子操作獲取隊列頭,使用原子操作的原因同樣是防止其他線程“偷工作”時并發對隊列頭的并發寫操作。之后,直接獲取隊列尾,因為不擔心其他線程同時更改,所以直接獲取。注意,“偷工作”時只會修改隊列頭。
比較隊列頭和隊列尾,如果兩者相等,說明 P 本地隊列沒有可運行的 goroutine,直接返回空。否則,算出隊列頭指向的 goroutine,再用一個 CAS 原子操作來嘗試修改隊列頭,使用原子操作的原因同上。
從本地隊列獲取可運行 goroutine 的過程比較簡單,我們再來看從全局隊列獲取 goroutine 的過程。在 schedule 函數中調用 globrunqget 的代碼:
// 為了公平,每調用 schedule 函數 61 次就要從全局可運行 goroutine 隊列中獲取 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) // 從全局隊列最大獲取 1 個 gorutine gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) }這說明,并不是每次調度都會從全局隊列獲取可運行的 goroutine。實際情況是調度器每調度 61 次并且全局隊列有可運行 goroutine 的情況下才會調用 globrunqget 函數嘗試從全局獲取可運行 goroutine。畢竟,從全局獲取需要上鎖,這個開銷可就大了,能不做就不做。
我們來詳細看下 globrunqget 的源碼:
// 嘗試從全局隊列里獲取可運行的 goroutine 隊列 func globrunqget(_p_ *p, max int32) *g { // 如果隊列大小為 0 if sched.runqsize == 0 { return nil } // 根據 p 的數量平分全局運行隊列中的 goroutines n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize // 如果 gomaxprocs 為 1 } // 修正"偷"的數量 if max > 0 && n > max { n = max } // 最多只能"偷"本地工作隊列一半的數量 if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } // 更新全局可運行隊列長度 sched.runqsize -= n // 如果都要被"偷"走,修改隊列尾 if sched.runqsize == 0 { sched.runqtail = 0 } // 獲取隊列頭指向的 goroutine gp := sched.runqhead.ptr() // 移動隊列頭 sched.runqhead = gp.schedlink n-- for ; n > 0; n-- { // 獲取當前隊列頭 gp1 := sched.runqhead.ptr() // 移動隊列頭 sched.runqhead = gp1.schedlink // 嘗試將 gp1 放入 P 本地,使全局隊列得到更多的執行機會 runqput(_p_, gp1, false) } // 返回最開始獲取到的隊列頭所指向的 goroutine return gp }代碼比較簡單。首先根據全局隊列的可運行 goroutine 長度和 P 的總數,來計算一個數值,表示每個 P 可平均分到的 goroutine 數量。
然后根據函數參數中的 max 以及 P 本地隊列的長度來決定把多少全局隊列中的 goroutine 轉移到 P 本地。
最后,for 循環挨個把全局隊列中 n-1 個 goroutine 轉移到本地,并且返回最開始獲取到的隊列頭所指向的 goroutine,畢竟它最需要得到運行的機會。
把全局隊列中的可運行 goroutine 轉移到本地隊列,給了全局隊列中可運行 goroutine 運行的機會,不然全局隊列中的 goroutine 一直得不到運行。
最后,我們繼續看第三個過程,從其他 P “偷工作”:
// 從本地運行隊列和全局運行隊列都沒有找到需要運行的 goroutine, // 調用 findrunnable 函數從其它工作線程的運行隊列中偷取,如果偷不到,則當前工作線程進入睡眠 // 直到獲取到 runnable goroutine 之后 findrunnable 函數才會返回。 if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }這是整個找工作過程最復雜的部分:
、/ // 從其他地方找 goroutine 來執行 func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() // …………………… // local runq // 從本地隊列獲取 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 從全局隊列獲取 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // …………………… // Steal work from other P's. // 如果其他的 P 都處于空閑狀態,那肯定沒有其他工作要做 procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { goto stop } // 如果有很多工作線程在找工作,那我就停下休息。避免消耗太多 CPU if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { // 設置自旋狀態為 true _g_.m.spinning = true // 自旋狀態數加 1 atomic.Xadd(&sched.nmspinning, 1) } // 從其它 p 的本地運行隊列盜取 goroutine for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // …………………… stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // …………………… // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } // 當前工作線程解除與 p 之間的綁定,準備去休眠 if releasep() != _p_ { throw("findrunnable: wrong p") } // 把 p 放入空閑隊列 pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning if _g_.m.spinning { // m 即將睡眠,不再處于自旋 _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again // 休眠之前再檢查一下所有的 p,看一下是否有工作要做 for i := 0; i < int(gomaxprocs); i++ { _p_ := allp[i] if _p_ != nil && !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // …………………… // 休眠 stopm() goto top }這部分也是最能說明 M 找工作的鍥而不舍精神:盡力去各個運行隊列中尋找 goroutine,如果實在找不到則進入睡眠狀態,等待有工作時,被其他 M 喚醒。
先獲取當前指向的 g,也就是 g0,然后拿到其綁定的 p,即 _p_。
首先再次嘗試從 _p_ 本地隊列獲取 goroutine,如果沒有獲取到,則嘗試從全局隊列獲取。如果還沒有獲取到就會嘗試去“偷”了,這也是沒有辦法的事。
不過,在偷之前,先看大的局勢。如果其他所有的 P 都處于空閑狀態,就說明其他 P 肯定沒有工作可做,就沒必要再去偷了,畢竟“地主家也沒有余糧了”,跳到 stop 部分。接著再看下當前正在“偷工作”的線程數量“太多了”,就沒必要扎堆了,這么多人,競爭肯定大,工作肯定不好找,也不好偷。
在真正的“偷”工作之前,把自己的自旋狀態設置為 true,全局自旋數量加 1。
終于到了“偷工作”的部分了,好緊張!整個過程由兩層 for 循環組成,外層控制嘗試偷的次數,內層控制“偷”的順序,并真正的去“偷”。實際上,內層會遍歷所有的 P,因此,整體看來,會嘗試 4 次掃遍所有的 P,并去“偷工作”,是不是非常有毅力!
第二層的循環并不是每次都按一個固定的順序去遍歷所有的 P,這樣不太科學,而是使用了一些方法,“隨機”地遍歷。具體是使用了下面這個變量:
var stealOrder randomOrder type randomOrder struct { count uint32 coprimes []uint32 }初始化的時候會給 count 賦一個值,例如 8,根據 count 計算出 coprimes,里面的元素是小于 count 的值,且和 8 互質,算出來是:[1, 3, 5, 7]。
第二層循環,開始隨機給一個值,例如 2,則第一個訪問的 P 就是 P2;從 coprimes 里取出索引為 2 的值為 5,那么,第二個訪問的 P 索引就是 2+5=7;依此類推,第三個就是 7+5=12,和 count 做一個取余操作,即 12%8=4……
在最后一次遍歷所有的 P 的過程中,連人家的 runnext 也要嘗試偷過來,畢竟前三次的失敗經驗證明,工作太不好“偷”了,民不聊生啊,只能做得絕一點了, stealRunNextG 控制是否要打 runnext 的主意:
stealRunNextG := i > 2確定好準備偷的對象 allp[enum.position() 之后,調用 runqsteal(_p_,allp[enum.position()],stealRunNextG) 函數執行。
// 從 p2 偷走一半的工作放到 _p_ 的本地 func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { // 隊尾 t := _p_.runqtail // 從 p2 偷取工作,放到 _p_.runq 的隊尾 n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } n-- // 找到最后一個 g,準備返回 gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { // 說明只偷了一個 g return gp } // 隊列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers // 判斷是否偷太多了 if t-h+n >= uint32(len(_p_.runq)) { throw("runqsteal: runq overflow") } // 更新隊尾,將偷來的工作加入隊列 atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption return gp }調用 runqgrab 從 p2 偷走它一半的工作放到 _p_ 本地:
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)runqgrab 函數將從 p2 偷來的工作放到以 t 為地址的數組里,數組就是 _p_.runq。我們知道, t 是 _p_.runq 的隊尾,因此這行代碼表達的真正意思是將從 p2 偷來的工作,神不知,鬼不覺地放到 _p_.runq 的隊尾,之后,再悄悄改一下 `_p_.runqtail 就把這些偷來的工作據為己有了。
接著往下看,返回的 n 表示偷到的工作數量。先將 n 自減 1,目的是把第 n 個工作(也就是 g)直接返回,如果這時候 n 變成 0 了,說明就只偷到了一個 g,那就直接返回。否則,將隊尾往后移動 n,把偷來的工作合法化,簡直完美!
我們接著往下看 runqgrab 函數的實現:
// 從 _p_ 批量獲取可運行 goroutine,放到 batch 數組里 // batch 是一個環,起始于 batchHead // 返回偷的數量,返回的 goroutine 可被任何 P 執行 func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { // 隊列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers // 隊列尾 t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer // g 的數量 n := t - h // 取一半 n = n - n/2 if n == 0 { if stealRunNextG { // 連 runnext 都要偷,沒有人性 // Try to steal from _p_.runnext. if next := _p_.runnext; next != 0 { // 這里是為了防止 _p_ 執行當前 g,并且馬上就要阻塞,所以會馬上執行 runnext, // 這個時候偷就沒必要了,因為讓 g 在 P 之間"游走"不太劃算, // 就不偷了,給他們一個機會。 // channel 一次同步的的接收發送需要 50ns 左右,因此 3us 差不多給了他們 50 次機會了,做得還是不錯的 if GOOS != "windows" { usleep(3) } else { osyield() } if !_p_.runnext.cas(next, 0) { continue } // 真的偷走了 next batch[batchHead%uint32(len(batch))] = next // 返回偷的數量,只有 1 個 return 1 } } // 沒偷到 return 0 } // 如果 n 這時變得太大了,重新來一遍了,不能偷的太多,做得太過分了 if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } // 將 g 放置到 bacth 中 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } // 工作被偷走了,更新一下隊列頭指針 if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } }外層直接就是一個無限循環,先用原子操作取出 p 的隊列頭和隊列尾,算出一半的 g 的數量,如果 n == 0,說明地主家也沒有余糧,這時看 stealRunNextG 的值。如果為假,說明不偷 runnext,那就直接返回 0,啥也沒偷到;如果為真,則要嘗試偷一下 runnext。
先判斷 runnext 不為空,那就真的準備偷了。不過在這之前,要先休眠 3 us。這是為了防止 p 正在執行當前的 g,馬上就要阻塞(可能是向一個非緩沖的 channel 發送數據,沒有接收者),之后會馬上執行 runnext。這個時候偷就沒必要了,因為 runnext 馬上就要執行了,偷走它還不是要去執行,那何必要偷呢?大家的愿望就是提高效率,這樣讓 g 在 P 之間"游走"不太劃算,索性先不偷了,給他們一個機會。channel一次同步的的接收或發送需要 50ns 左右,因此休眠 3us 差不多給了他們 50 次機會了,做得還是挺厚道的。
繼續看,再次判斷 n 是否小于等于 p.runq 長度的一半,因為這個時候很可能 p 也被其他線程偷了,它的 p.runq 就沒那么多工作了,這個時候就不能偷這么多了,要重新再走一次循環。
最后一個 for 循環,將 p.runq 里的 g 放到 batch 數組里。使用原子操作更新 p 的隊列頭指針,往后移動 n 個位置,這些都是被偷走的,傷心!
回到 findrunnable 函數,經過上述三個層面的“偷竊”過程,我們仍然沒有找到工作,真慘!于是就走到了 stop 這個代碼塊。
先上鎖,因為要將 P 放到全局空閑 P 鏈表里去。在這之前還不死心,再瞧一下全局隊列里是否有工作,如果有,再去嘗試偷全局。
如果沒有,就先解除當前工作線程和當前 P 的綁定關系:
// 解除 p 與 m 的關聯 func releasep() *p { _g_ := getg() // …………………… _p_ := _g_.m.p.ptr() // …………………… // 清空一些字段 _g_.m.p = 0 _g_.m.mcache = nil _p_.m = 0 _p_.status = _Pidle return _p_ }主要的工作就是將 p 的 m 字段清空,并將 p 的狀態修改為 _Pidle。
這之后,將其放入全局空閑 P 列表:
// 將 p 放到 _Pidle 列表里 //go:nowritebarrierrec func pidleput(_p_ *p) { if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } _p_.link = sched.pidle sched.pidle.set(_p_) // 增加全局空閑 P 的數量 atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic }構造鏈表的過程其實比較簡單,先將 p.link 指向原來的 sched.pidle 所指向的 p,也就是原空閑鏈表的最后一個 P,最后,再更新 sched.pidle,使其指向當前 p,這樣,新的鏈表就構造完成。
接下來就要真正地準備休眠了,但是仍然不死心!還要再查看一次所有的 P 是否有工作,如果發現任何一個 P 有工作的話(判斷 P 的本地隊列不空),就先從全局空閑 P 鏈表里先拿到一個 P:
// 試圖從 _Pidle 列表里獲取 p //go:nowritebarrierrec func pidleget() *p { _p_ := sched.pidle.ptr() if _p_ != nil { sched.pidle = _p_.link atomic.Xadd(&sched.npidle, -1) // TODO: fast atomic } return _p_ }比較簡單,獲取鏈表最后一個,再更新 sched.pidle,使其指向前一個 P。調用 acquirep(_p_) 綁定獲取到的 p 和 m,主要的動作就是設置 p 的 m 字段,更改 p 的工作狀態為 _Prunning,并且設置 m 的 p 字段。做完這些之后,再次進入 top 代碼段,再走一遍之前找工作的過程。
// 休眠,停止執行工作,直到有新的工作需要做為止 func stopm() { // 當前 goroutine,g0 _g_ := getg() // …………………… retry: lock(&sched.lock) // 將 m 放到全局空閑鏈表里去 mput(_g_.m) unlock(&sched.lock) // 進入睡眠狀態 notesleep(&_g_.m.park) // 這里被其他工作線程喚醒 noteclear(&_g_.m.park) // …………………… acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }先將 m 放入全局空閑鏈表里,注意涉及到全局變量的修改,要上鎖。接著,調用 notesleep(&_g_.m.park) 使得當前工作線程進入休眠狀態。其他工作線程在檢測到“當前有很多工作要做”,會調用 noteclear(&_g_.m.park) 將其喚醒。注意,這兩個函數傳入的參數都是一樣的:&_g_.m.park,它的類型是:
type note struct { key uintptr }很簡單,只有一個 key 字段。
note 的底層實現機制跟操作系統相關,不同系統使用不同的機制,比如 linux 下使用的 futex 系統調用,而 mac 下則是使用的 pthreadcondt 條件變量,note 對這些底層機制做了一個抽象和封裝。
這種封裝給擴展性帶來了很大的好處,比如當睡眠和喚醒功能需要支持新平臺時,只需要在 note 層增加對特定平臺的支持即可,不需要修改上層的任何代碼。
上面這一段來自阿波張的系列教程。我們接著來看下 notesleep 的實現:
// runtime/lock_futex.go func notesleep(n *note) { // g0 gp := getg() if gp != gp.m.g0 { throw("notesleep not on g0") } // -1 表示無限期休眠 ns := int64(-1) // …………………… // 這里之所以需要用一個循環,是因為 futexsleep 有可能意外從睡眠中返回, // 所以 futexsleep 函數返回后還需要檢查 note.key 是否還是 0, // 如果是 0 則表示并不是其它工作線程喚醒了我們, // 只是 futexsleep 意外返回了,需要再次調用 futexsleep 進入睡眠 for atomic.Load(key32(&n.key)) == 0 { // 表示 m 被阻塞 gp.m.blocked = true futexsleep(key32(&n.key), 0, ns) // …………………… // 被喚醒,更新標志 gp.m.blocked = false } }繼續往下追:
// runtime/os_linux.go func futexsleep(addr *uint32, val uint32, ns int64) { var ts timespec if ns < 0 { futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, nil, nil, 0) return } // …………………… }當 *addr 和 val 相等的時候,休眠。futex 由匯編語言實現:
TEXT runtime·futex(SB),NOSPLIT,$0 // 為系統調用準備參數 MOVQ addr+0(FP), DI MOVL op+8(FP), SI MOVL val+12(FP), DX MOVQ ts+16(FP), R10 MOVQ addr2+24(FP), R8 MOVL val3+32(FP), R9 // 系統調用編號 MOVL $202, AX // 執行 futex 系統調用進入休眠,被喚醒后接著執行下一條 MOVL 指令 SYSCALL // 保存系統調用的返回值 MOVL AX, ret+40(FP) RET這樣,找不到工作的 m 就休眠了。當其他線程發現有工作要做時,就會先找到空閑的 m,再通過 m.park 字段來喚醒本線程。喚醒之后,回到 findrunnable 函數,繼續尋找 goroutine,找到后返回 schedule 函數,然后就會去運行找到的 goroutine。
這就是 m 找工作的整個過程,歷盡千辛萬苦,終于修成正果。
參考資料
【阿波張 Goroutine 調度策略】https://mp.weixin.qq.com/s/2objs5JrlnKnwFbF4a2z2g總結
以上是生活随笔為你收集整理的锲而不舍 —— M 是怎样找工作的?(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 忠于职守 —— sysmon 线程到底做
- 下一篇: 深度解密Go语言之pprof