开天辟地 —— Go scheduler 初始化(二)
上一講我們說完了 GPM 結構體,這一講,我們來研究 Go sheduler 結構體,以及整個調度器的初始化過程。
Go scheduler 在源碼中的結構體為 schedt,保存調度器的狀態信息、全局的可運行 G 隊列等。源碼如下:
// 保存調度器的信息 type schedt struct { // accessed atomically. keep at top to ensure alignment on 32-bit systems. // 需以原子訪問訪問。 // 保持在 struct 頂部,以使其在 32 位系統上可以對齊 goidgen uint64 lastpoll uint64 lock mutex // 由空閑的工作線程組成的鏈表 midle muintptr // idle m's waiting for work // 空閑的工作線程數量 nmidle int32 // number of idle m's waiting for work // 空閑的且被 lock 的 m 計數 nmidlelocked int32 // number of locked m's waiting for work // 已經創建的工作線程數量 mcount int32 // number of m's that have been created // 表示最多所能創建的工作線程數量 maxmcount int32 // maximum number of m's allowed (or die) // goroutine 的數量,自動更新 ngsys uint32 // number of system goroutines; updated atomically // 由空閑的 p 結構體對象組成的鏈表 pidle puintptr // idle p's // 空閑的 p 結構體對象的數量 npidle uint32 nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. // Global runnable queue. // 全局可運行的 G隊列 runqhead guintptr // 隊列頭 runqtail guintptr // 隊列尾 runqsize int32 // 元素數量 // Global cache of dead G's. // dead G 的全局緩存 // 已退出的 goroutine 對象,緩存下來 // 避免每次創建 goroutine 時都重新分配內存 gflock mutex gfreeStack *g gfreeNoStack *g // 空閑 g 的數量 ngfree int32 // Central cache of sudog structs. // sudog 結構的集中緩存 sudoglock mutex sudogcache *sudog // Central pool of available defer structs of different sizes. // 不同大小的可用的 defer struct 的集中緩存池 deferlock mutex deferpool [5]*_defer gcwaiting uint32 // gc is waiting to run stopwait int32 stopnote note sysmonwait uint32 sysmonnote note // safepointFn should be called on each P at the next GC // safepoint if p.runSafePointFn is set. safePointFn func(*p) safePointWait int32 safePointNote note profilehz int32 // cpu profiling rate // 上次修改 gomaxprocs 的納秒時間 procresizetime int64 // nanotime() of last change to gomaxprocs totaltime int64 // ∫gomaxprocs dt up to procresizetime }在程序運行過程中, schedt 對象只有一份實體,它維護了調度器的所有信息。
在 proc.go 和 runtime2.go 文件中,有一些很重要全局的變量,我們先列出來:
// 所有 g 的長度 allglen uintptr // 保存所有的 g allgs []*g // 保存所有的 m allm *m // 保存所有的 p,_MaxGomaxprocs = 1024 allp [_MaxGomaxprocs + 1]*p // p 的最大值,默認等于 ncpu gomaxprocs int32 // 程序啟動時,會調用 osinit 函數獲得此值 ncpu int32 // 調度器結構體對象,記錄了調度器的工作狀態 sched schedt // 代表進程的主線程 m0 m // m0 的 g0,即 m0.g0 = &g0 g0 g在程序初始化時,這些全局變量都會被初始化為零值:指針被初始化為 nil 指針,切片被初始化為 nil 切片,int 被初始化為 0,結構體的所有成員變量按其類型被初始化為對應的零值。
因此程序剛啟動時 allgs,allm 和allp 都不包含任何 g,m 和 p。
不僅是 Go 程序,系統加載可執行文件大概都會經過這幾個階段:
從磁盤上讀取可執行文件,加載到內存
創建進程和主線程
為主線程分配棧空間
把由用戶在命令行輸入的參數拷貝到主線程的棧
把主線程放入操作系統的運行隊列等待被調度
上面這段描述,來自公眾號“ go語言核心編程技術”的調度系列教程。
我們從一個 HelloWorld 的例子來回顧一下 Go 程序初始化的過程:
package main import "fmt" func main() { fmt.Println("hello world") }在項目根目錄下執行:
go build -gcflags "-N -l" -o hello src/main.go-gcflags"-N -l" 是為了關閉編譯器優化和函數內聯,防止后面在設置斷點的時候找不到相對應的代碼位置。
得到了可執行文件 hello,執行:
[qcrao@qcrao hello-world]$ gdb hello進入 gdb 調試模式,執行 info files,得到可執行文件的文件頭,列出了各種段:
同時,我們也得到了入口地址:0x450e20。
(gdb) b *0x450e20 Breakpoint 1 at 0x450e20: file /usr/local/go/src/runtime/rt0_linux_amd64.s, line 8.這就是 Go 程序的入口地址,我是在 linux 上運行的,所以入口文件為 src/runtime/rt0_linux_amd64.s,runtime 目錄下有各種不同名稱的程序入口文件,支持各種操作系統和架構,代碼為:
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8 LEAQ 8(SP), SI // argv MOVQ 0(SP), DI // argc MOVQ $main(SB), AX JMP AX主要是把 argc,argv 從內存拉到了寄存器。這里 LEAQ 是計算內存地址,然后把內存地址本身放進寄存器里,也就是把 argv 的地址放到了 SI 寄存器中。最后跳轉到:
TEXT main(SB),NOSPLIT,$-8 MOVQ $runtime·rt0_go(SB), AX JMP AX繼續跳轉到 runtime·rt0_go(SB),完成 go 啟動時所有的初始化工作。位于 /usr/local/go/src/runtime/asm_amd64.s,代碼:
TEXT runtime·rt0_go(SB),NOSPLIT,$0 // copy arguments forward on an even stack MOVQ DI, AX // argc MOVQ SI, BX // argv SUBQ $(4*8+7), SP // 2args 2auto // 調整棧頂寄存器使其按 16 字節對齊 ANDQ $~15, SP // argc 放在 SP+16 字節處 MOVQ AX, 16(SP) // argv 放在 SP+24 字節處 MOVQ BX, 24(SP) // create istack out of the given (operating system) stack. // _cgo_init may update stackguard. // 給 g0 分配棧空間 // 把 g0 的地址存入 DI MOVQ $runtime·g0(SB), DI // BX = SP - 64*1024 + 104 LEAQ (-64*1024+104)(SP), BX // g0.stackguard0 = SP - 64*1024 + 104 MOVQ BX, g_stackguard0(DI) // g0.stackguard1 = SP - 64*1024 + 104 MOVQ BX, g_stackguard1(DI) // g0.stack.lo = SP - 64*1024 + 104 MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.hi = SP MOVQ SP, (g_stack+stack_hi)(DI) // …………………… // 省略了很多檢測 CPU 信息的代碼 // …………………… // 初始化 m 的 tls // DI = &m0.tls,取 m0 的 tls 成員的地址到 DI 寄存器 LEAQ runtime·m0+m_tls(SB), DI // 調用 settls 設置線程本地存儲,settls 函數的參數在 DI 寄存器中 // 之后,可通過 fs 段寄存器找到 m.tls CALL runtime·settls(SB) // store through it, to make sure it works // 獲取 fs 段基址并放入 BX 寄存器,其實就是 m0.tls[1] 的地址,get_tls 的代碼由編譯器生成 get_tls(BX) MOVQ $0x123, g(BX) MOVQ runtime·m0+m_tls(SB), AX CMPQ AX, $0x123 JEQ 2(PC) MOVL AX, 0 // abort ok: // set the per-goroutine and per-mach "registers" // 獲取 fs 段基址到 BX 寄存器 get_tls(BX) // 將 g0 的地址存儲到 CX,CX = &g0 LEAQ runtime·g0(SB), CX // 把 g0 的地址保存在線程本地存儲里面,也就是 m0.tls[0]=&g0 MOVQ CX, g(BX) // 將 m0 的地址存儲到 AX,AX = &m0 LEAQ runtime·m0(SB), AX // save m->g0 = g0 // m0.g0 = &g0 MOVQ CX, m_g0(AX) // save m0 to g0->m // g0.m = &m0 MOVQ AX, g_m(CX) CLD // convention is D is always left cleared CALL runtime·check(SB) MOVL 16(SP), AX // copy argc MOVL AX, 0(SP) MOVQ 24(SP), AX // copy argv MOVQ AX, 8(SP) CALL runtime·args(SB) // 初始化系統核心數 CALL runtime·osinit(SB) // 調度器初始化 CALL runtime·schedinit(SB) // create a new goroutine to start program MOVQ $runtime·mainPC(SB), AX // entry // newproc 的第二個參數入棧,也就是新的 goroutine 需要執行的函數 // AX = &funcval{runtime·main}, PUSHQ AX // newproc 的第一個參數入棧,該參數表示 runtime.main 函數需要的參數大小, // 因為 runtime.main 沒有參數,所以這里是 0 PUSHQ $0 // arg size // 創建 main goroutine CALL runtime·newproc(SB) POPQ AX POPQ AX // start this M // 主線程進入調度循環,運行剛剛創建的 goroutine CALL runtime·mstart(SB) // 永遠不會返回,萬一返回了,crash 掉 MOVL $0xf1, 0xf1 // crash RET這段代碼完成之后,整個 Go 程序就可以跑起來了,是非常核心的代碼。這一講其實只講到了第 80 行,也就是調度器初始化函數:
CALL runtime·schedinit(SB)schedinit 函數返回后,調度器的相關參數都已經初始化好了,猶如盤古開天辟地,萬事萬物各就其位。接下來詳細解釋上面的匯編代碼。
調整 SP
第一段代碼,將 SP 調整到了一個地址是 16 的倍數的位置:
SUBQ $(4*8+7), SP // 2args 2auto // 調整棧頂寄存器使其按 16 個字節對齊 ANDQ $~15, SP先是將 SP 減掉 39,也就是向下移動了 39 個 Byte,再進行與運算。
15 的二進制低四位是全 1:1111,其他位都是 0;取反后,變成了 0000,高位則是全 1。這樣,與 SP 進行了與運算后,低 4 位變成了全 0,高位則不變。因此 SP 繼續向下移動,并且這回是在一個地址值為 16 的倍數的地方,16 字節對齊的地方。
為什么要這么做?畫一張圖就明白了。不過先得說明一點,前面 _rt0_amd64_linux函數里講過,DI 里存的是 argc 的值,8 個字節,而 SI 里則存的是 argv 的地址,8 個字節。
上面兩張圖中,左側用箭頭標注了 16 字節對齊的位置。第一步表示向下移動 39 B,第二步表示與 ~15 相與。
存在兩種情況,這也是第一步將 SP 下移的時候,多移了 7 個 Byte 的原因。第一張圖里,與 ~15 相與的時候,SP 值減少了 1,第二張圖則減少了 9。最后都是移位到了 16 字節對齊的位置。
兩張圖的共同點是 SP 與 argc 中間多出了 16 個字節的空位。這個后面應該會用到,我們接著探索。
至于為什么進行 16 個字節對齊,就比較好理解了:因為 CPU 有一組 SSE 指令,這些指令中出現的內存地址必須是 16 的倍數。
初始化 g0 棧
接著往后看,開始初始化 g0 的棧了。g0 棧的作用就是為運行 runtime 代碼提供一個“環境”。
// 把 g0 的地址存入 DI MOVQ $runtime·g0(SB), DI // BX = SP - 64*1024 + 104 LEAQ (-64*1024+104)(SP), BX // g0.stackguard0 = SP - 64*1024 + 104 MOVQ BX, g_stackguard0(DI) // g0.stackguard1 = SP - 64*1024 + 104 MOVQ BX, g_stackguard1(DI) // g0.stack.lo = SP - 64*1024 + 104 MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.hi = SP MOVQ SP, (g_stack+stack_hi)(DI)代碼 L2 把 g0 的地址存入 DI 寄存器;L4 將 SP 下移 (64K-104)B,并將地址存入 BX 寄存器;L6 將 BX 里存儲的地址賦給 g0.stackguard0;L8,L10,L12 分別 將 BX 里存儲的地址賦給 g0.stackguard1, g0.stack.lo, g0.stack.hi。
這部分完成之后,g0 棧空間如下圖:
主線程綁定 m0
接著往下看,中間我們省略了很多檢查 CPU 相關的代碼,直接看主線程綁定 m0 的部分:
// 初始化 m 的 tls // DI = &m0.tls,取 m0 的 tls 成員的地址到 DI 寄存器 LEAQ runtime·m0+m_tls(SB), DI // 調用 settls 設置線程本地存儲,settls 函數的參數在 DI 寄存器中 // 之后,可通過 fs 段寄存器找到 m.tls CALL runtime·settls(SB) // store through it, to make sure it works // 獲取 fs 段基地址并放入 BX 寄存器,其實就是 m0.tls[1] 的地址,get_tls 的代碼由編譯器生成 get_tls(BX) MOVQ $0x123, g(BX) MOVQ runtime·m0+m_tls(SB), AX CMPQ AX, $0x123 JEQ 2(PC) MOVL AX, 0 // abort因為 m0 是全局變量,而 m0 又要綁定到工作線程才能執行。我們又知道,runtime 會啟動多個工作線程,每個線程都會綁定一個 m0。而且,代碼里還得保持一致,都是用 m0 來表示。這就要用到線程本地存儲的知識了,也就是常說的 TLS(Thread Local Storage)。簡單來說,TLS 就是線程本地的私有的全局變量。
一般而言,全局變量對進程中的多個線程同時可見。進程中的全局變量與函數內定義的靜態(static)變量,是各個線程都可以訪問的共享變量。一個線程修改了,其他線程就會“看見”。要想搞出一個線程私有的變量,就需要用到 TLS 技術。
如果需要在一個線程內部的各個函數調用都能訪問、但其它線程不能訪問的變量(被稱為 static memory local to a thread,線程局部靜態變量),就需要新的機制來實現。這就是 TLS。
繼續來看源碼,L3 將 m0.tls 地址存儲到 DI 寄存器,再調用 settls 完成 tls 的設置,tls 是 m 結構體中的一個數組。
// thread-local storage (for x86 extern register) tls [6]uintptr設置 tls 的函數 runtime·settls(SB) 位于源碼 src/runtime/sys_linux_amd64.s處,主要內容就是通過一個系統調用將 fs 段基址設置成 m.tls[1] 的地址,而 fs 段基址又可以通過 CPU 里的寄存器 fs 來獲取。
而每個線程都有自己的一組 CPU 寄存器值,操作系統在把線程調離 CPU 時會幫我們把所有寄存器中的值保存在內存中,調度線程來運行時又會從內存中把這些寄存器的值恢復到 CPU。
這樣,工作線程代碼就可以通過 fs 寄存器來找到 m.tls。
關于 settls 這個函數的解析可以去看阿波張的教程第 12 篇,寫得很詳細。
設置完 tls 之后,又來了一段驗證上面 settls 是否能正常工作。如果不能,會直接 crash。
get_tls(BX) MOVQ $0x123, g(BX) MOVQ runtime·m0+m_tls(SB), AX CMPQ AX, $0x123 JEQ 2(PC) MOVL AX, 0 // abort第一行代碼,獲取 tls, get_tls(BX) 的代碼由編譯器生成,源碼中并沒有看到,可以理解為將 m.tls 的地址存入 BX 寄存器。
L2 將一個數 0x123 放入 m.tls[0] 處,L3 則將 m.tls[0] 處的數據取出來放到 AX 寄存器,L4 則比較兩者是否相等。如果相等,則跳過 L6 行的代碼,否則執行 L6,程序 crash。
繼續看代碼:
// set the per-goroutine and per-mach "registers" // 獲取 fs 段基址到 BX 寄存器 get_tls(BX) // 將 g0 的地址存儲到 CX,CX = &g0 LEAQ runtime·g0(SB), CX // 把 g0 的地址保存在線程本地存儲里面,也就是 m0.tls[0]=&g0 MOVQ CX, g(BX) // 將 m0 的地址存儲到 AX,AX = &m0 LEAQ runtime·m0(SB), AX // save m->g0 = g0 // m0.g0 = &g0 MOVQ CX, m_g0(AX) // save m0 to g0->m // g0.m = &m0 MOVQ AX, g_m(CX)L3 將 m.tls 地址存入 BX;L5 將 g0 的地址存入 CX;L7 將 CX,也就是 g0 的地址存入 m.tls[0];L9 將 m0 的地址存入 AX;L13 將 g0 的地址存入 m0.g0;L16 將 m0 存入 g0.m。也就是:
tls[0] = g0 m0.g0 = &g0 g0.m = &m0代碼中寄存器前面的符號看著比較奇怪,其實它們最后會被鏈接器轉化為偏移量。
看曹大 golangnotes 用 gobufsp(BX) 這個例子講的:
這種寫法在標準 plan9 匯編中只是個 symbol,沒有任何偏移量的意思,但這里卻用名字來代替了其偏移量,這是怎么回事呢?
實際上這是 runtime 的特權,是需要鏈接器配合完成的,再來看看 gobuf 在 runtime 中的 struct 定義開頭部分的注釋:
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
對于我們而言,這種寫法讀起來比較容易。
這一段執行完之后,就把 m0,g0,m.tls[0] 串聯起來了。通過 m.tls[0] 可以找到 g0,通過 g0 可以找到 m0(通過 g 結構體的 m 字段)。并且,通過 m 的字段 g0,m0 也可以找到 g0。于是,主線程和 m0,g0 就關聯起來了。
從這里還可以看到,保存在主線程本地存儲中的值是 g0 的地址,也就是說工作線程的私有全局變量其實是一個指向 g 的指針而不是指向 m 的指針。
目前這個指針指向g0,表示代碼正運行在 g0 棧。
于是,前面的圖又增加了新的玩伴 m0:
初始化 m0
MOVL 16(SP), AX // copy argc MOVL AX, 0(SP) MOVQ 24(SP), AX // copy argv MOVQ AX, 8(SP) CALL runtime·args(SB) // 初始化系統核心數 CALL runtime·osinit(SB) // 調度器初始化 CALL runtime·schedinit(SB)L1-L2 將 16(SP) 處的內容移動到 0(SP),也就是棧頂,通過前面的圖,16(SP) 處的內容為 argc;L3-L4 將 argv 存入 8(SP),接下來調用 runtime·args 函數,處理命令行參數。
接著,連續調用了兩個 runtime 函數。osinit 函數初始化系統核心數,將全局變量 ncpu 初始化的核心數,schedinit 則是本文的核心:調度器的初始化。
下面,我們來重點看 schedinit 函數:
// src/runtime/proc.go // The bootstrap sequence is: // // call osinit // call schedinit // make & queue new G // call runtime·mstart // // The new G calls runtime·main. func schedinit() { // getg 由編譯器實現 // get_tls(CX) // MOVQ g(CX), BX; BX存器里面現在放的是當前g結構體對象的地址 _g_ := getg() if raceenabled { _g_.racectx, raceprocctx0 = raceinit() } // 最多啟動 10000 個工作線程 sched.maxmcount = 10000 tracebackinit() moduledataverify() // 初始化棧空間復用管理鏈表 stackinit() mallocinit() // 初始化 m0 mcommoninit(_g_.m) alginit() // maps must not be used before this call modulesinit() // provides activeModules typelinksinit() // uses maps, activeModules itabsinit() // uses activeModules msigsave(_g_.m) initSigmask = _g_.m.sigmask goargs() goenvs() parsedebugvars() gcinit() sched.lastpoll = uint64(nanotime()) // 初始化 P 的個數 // 系統中有多少核,就創建和初始化多少個 p 結構體對象 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procs > _MaxGomaxprocs { procs = _MaxGomaxprocs } // 初始化所有的 P,正常情況下不會返回有本地任務的 P if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } // …………………… }這個函數開頭的注釋很貼心地把 Go 程序初始化的過程又說了一遍:
call osinit。初始化系統核心數。
call schedinit。初始化調度器。
make & queue new G。創建新的 goroutine。
call runtime·mstart。調用 mstart,啟動調度。
The new G calls runtime·main。在新的 goroutine 上運行 runtime.main 函數。
函數首先調用 getg() 函數獲取當前正在運行的 g, getg() 在 src/runtime/stubs.go 中聲明,真正的代碼由編譯器生成。
// getg returns the pointer to the current g. // The compiler rewrites calls to this function into instructions // that fetch the g directly (from TLS or from the dedicated register). func getg() *g注釋里也說了,getg 返回當前正在運行的 goroutine 的指針,它會從 tls 里取出 tls[0],也就是當前運行的 goroutine 的地址。編譯器插入類似下面的代碼:
get_tls(CX) MOVQ g(CX), BX; // BX存器里面現在放的是當前g結構體對象的地址繼續往下看:
sched.maxmcount = 10000設置最多只能創建 10000 個工作線程。
然后,調用了一堆 init 函數,初始化各種配置,現在不去深究。只關心本小節的重點,m0 的初始化:
// 初始化 m func mcommoninit(mp *m) { // 初始化過程中_g_ = g0 _g_ := getg() // g0 stack won't make sense for user (and is not necessary unwindable). if _g_ != _g_.m.g0 { callers(1, mp.createstack[:]) } // random 初始化 mp.fastrand = 0x49f6428a + uint32(mp.id) + uint32(cputicks()) if mp.fastrand == 0 { mp.fastrand = 0x49f6428a } lock(&sched.lock) // 設置 m 的 id mp.id = sched.mcount sched.mcount++ // 檢查已創建系統線程是否超過了數量限制(10000) checkmcount() // ………………省略了初始化 gsignal // Add to allm so garbage collector doesn't free g->m // when it is just in a register or thread-local storage. mp.alllink = allm atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) // ……………… }因為 sched 是一個全局變量,多個線程同時操作 sched 會有并發問題,因此先要加鎖,操作結束之后再解鎖。
mp.id = sched.mcount sched.mcount++ checkmcount()可以看到,m0 的 id 是 0,并且之后創建的 m 的 id 是遞增的。checkmcount() 函數檢查已創建系統線程是否超過了數量限制(10000)。
mp.alllink = allm將 m 掛到全局變量 allm 上,allm 是一個指向 m 的的指針。
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))這一行將 allm 變成 m 的地址,這樣變成了一個循環鏈表。之后再新建 m 的時候,新 m 的 alllink 就會指向本次的 m,最后 allm 又會指向新創建的 m。
上圖中,1 將 m0 掛在 allm 上。之后,若新創建 m,則 m1 會和 m0 相連。
完成這些操作后,大功告成!解鎖。
初始化 allp
跳過一些其他的初始化代碼,繼續往后看:
procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procs > _MaxGomaxprocs { procs = _MaxGomaxprocs }這里就是設置 procs,它決定創建 P 的數量。ncpu 這里已經被賦上了系統的核心數,因此代碼里不設置 GOMAXPROCS 也是沒問題的。這里還限制了 procs 的最大值,為 1024。
來看最后一個核心的函數:
// src/runtime/proc.go func procresize(nprocs int32) *p { old := gomaxprocs if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs { throw("procresize: invalid arg") } // …………………… // update statistics // 更新數據 now := nanotime() if sched.procresizetime != 0 { sched.totaltime += int64(old) * (now - sched.procresizetime) } sched.procresizetime = now // 初始化所有的 P for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { // 申請新對象 pp = new(p) pp.id = i // pp 的初始狀態為 stop pp.status = _Pgcstop pp.sudogcache = pp.sudogbuf[:0] for i := range pp.deferpool { pp.deferpool[i] = pp.deferpoolbuf[i][:0] } // 將 pp 存放到 allp 處 atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } // …………………… } // 釋放多余的 P。由于減少了舊的 procs 的數量,因此需要釋放 // …………………… // 獲取當前正在運行的 g 指針,初始化時 _g_ = g0 _g_ := getg() if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // continue to use the current P // 繼續使用當前 P _g_.m.p.ptr().status = _Prunning } else { // 初始化時執行這個分支 // …………………… _g_.m.p = 0 _g_.m.mcache = nil // 取出第 0 號 p p := allp[0] p.m = 0 p.status = _Pidle // 將 p0 和 m0 關聯起來 acquirep(p) if trace.enabled { traceGoStart() } } var runnablePs *p // 下面這個 for 循環把所有空閑的 p 放入空閑鏈表 for i := nprocs - 1; i >= 0; i-- { p := allp[i] // allp[0] 跟 m0 關聯了,不會進行之后的“放入空閑鏈表” if _g_.m.p.ptr() == p { continue } // 狀態轉為 idle p.status = _Pidle // p 的 LRQ 里沒有 G if runqempty(p) { // 放入全局空閑鏈表 pidleput(p) } else { p.m.set(mget()) p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) // 返回有本地任務的 P 鏈表 return runnablePs }代碼比較長,這個函數不僅是初始化的時候會執行到,在中途改變 procs 的值的時候,仍然會調用它。所有存在很多一般不用關心的代碼,因為一般不會在中途重新設置 procs 的值。我把初始化無關的代碼刪掉了,這樣會更清晰一些。
函數先是從堆上創建了 nproc 個 P,并且把 P 的狀態設置為 _Pgcstop,現在全局變量 allp 里就維護了所有的 P。
接著,調用函數 acquirep 將 p0 和 m0 關聯起來。我們來詳細看一下:
func acquirep(_p_ *p) { // Do the part that isn't allowed to have write barriers. acquirep1(_p_) // have p; write barriers now allowed _g_ := getg() _g_.m.mcache = _p_.mcache // …………………… }先調用 acquirep1 函數真正地進行關聯,之后,將 p0 的 mcache 資源賦給 m0。再來看 acquirep1:
func acquirep1(_p_ *p) { _g_ := getg() // …………………… _g_.m.p.set(_p_) _p_.m.set(_g_.m) _p_.status = _Prunning }可以看到就是一些字段相互設置,執行完成后:
g0.m.p = p0 p0.m = m0并且,p0 的狀態變成了 _Prunning。
接下來是一個循環,它將除了 p0 的所有非空閑的 P,放入 P 鏈表 runnablePs,并返回給 procresize 函數的調用者,并由調用者來“調度”這些 P。
函數 runqempty 用來判斷一個 P 是否是空閑,依據是 P 的本地 run queue 隊列里有沒有 runnable 的 G,如果沒有,那 P 就是空閑的。
// src/runtime/proc.go // 如果 _p_ 的本地隊列里沒有待運行的 G,則返回 true func runqempty(_p_ *p) bool { // 這里涉及到一些數據競爭,并不是簡單地判斷 runqhead == runqtail 并且 runqnext == nil 就可以 // for { head := atomic.Load(&_p_.runqhead) tail := atomic.Load(&_p_.runqtail) runnext := atomic.Loaduintptr((*uintptr)(unsafe.Pointer(&_p_.runnext))) if tail == atomic.Load(&_p_.runqtail) { return head == tail && runnext == 0 } } }并不是簡單地判斷 head == tail 并且 runnext == nil 為真,就可以說明 runq 是空的。因為涉及到一些數據競爭,例如在比較 head == tail 時為真,但此時 runnext 上其實有一個 G,之后再去比較 runnext == nil 的時候,這個 G 又通過 runqput跑到了 runq 里去了或者通過 runqget 拿走了,runnext 也為真,于是函數就判斷這個 P 是空閑的,這就會形成誤判。
因此 runqempty 函數先是通過原子操作取出了 head,tail,runnext,然后再次確認 tail 沒有發生變化,最后再比較 head == tail 以及 runnext == nil,保證了在觀察三者都是在“同時”觀察到的,因此,返回的結果就是正確的。
說明一下,runnext 上有時會綁定一個 G,這個 G 是被當前 G 喚醒的,相比其他 G 有更高的執行優先級,因此把它單獨拿出來。
函數的最后,初始化了一個“隨機分配器”:
stealOrder.reset(uint32(nprocs))將來有些 m 去偷工作的時候,會遍歷所有的 P,這時為了偷地隨機一些,就會用到 stealOrder 來返回一個隨機選擇的 P,后面的文章會再講。
這樣,整個 procresize 函數就講完了,這也意味著,調度器的初始化工作已經完成了。
還是引用阿波張公號文章里的總結,寫得太好了,很簡潔,很難再優化了:
使用 make([]p, nprocs) 初始化全局變量 allp,即 allp = make([]p, nprocs)
循環創建并初始化 nprocs 個 p 結構體對象并依次保存在 allp 切片之中
把 m0 和 allp[0] 綁定在一起,即 m0.p = allp[0],allp[0].m = m0
把除了 allp[0] 之外的所有 p 放入到全局變量 sched 的 pidle 空閑隊列之中
說明一下,最后一步,代碼里是將所有空閑的 P 放入到調度器的全局空閑隊列;對于非空閑的 P(本地隊列里有 G 待執行),則是生成一個 P 鏈表,返回給 procresize 函數的調用者。
最后我們將 allp 和 allm 都添加到圖上:
參考資料
【阿波張 goroutine 調度器初始化】https://mp.weixin.qq.com/s/W9D4Sl-6jYfcpczzdPfByQ
總結
以上是生活随笔為你收集整理的开天辟地 —— Go scheduler 初始化(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度解密Go语言之scheduler
- 下一篇: 三足鼎立 —— GPM 到底是什么?(一