Go并发编程中的那些事[译]
- 原文地址:Concurrent programming
- 原文作者:StefanNilsson
- 譯文出自:掘金翻譯計劃
- 本文永久鏈接:github.com/xitu/gold-m…
- 譯者:kobehaha
- 校對者:joyking7?alfred-zhong
本文講的是Go并發編程中的那些事,
bouncing balls- 1. 多線程執行
- 2. Channels
- 3. 同步
- 4. 死鎖
- 5. 數據競爭
- 6. 互斥鎖
- 7. 檢測數據競爭
- 8. Select標識符
- 9. 最基本的并發實例
- 10. 并行計算
這篇文章將會以Go語言舉例介紹并發編程,包括以下內容
- 線程的并發執行(goroutines)
- 基本的同步技術(channel和鎖)
- Go中的基本并發模式
- 死鎖和數據競爭
- 并行計算
開始之前,你需要去了解怎樣寫最基本的 Go 程序。 如果你已經對 C/C++,Java 或者Python比較熟悉,A tour of go將會給你一些幫助。你也可以看一下Go for C++ programmers?或者Go for Java programmers。
1.多線程執行
goroutine?是 go 的一種調度機制。 Go 使用 go 進行聲明,以 goroutine 調度機制開啟一個新的執行線程。它會在新創建的 goroutine 執行程序。在單個程序中,所有goroutines都是共享相同的地址空間。
相比于分配棧空間,goroutine 更加輕量,花銷更小。棧空間初始化很小,需要通過申請和釋放堆空間來擴展內存。Goroutines 內部是被復用在多個操作系統線程上。如果一個goroutine阻塞了一個操作系統線程,比如正在等待輸入,此時,這個線程中的其他 goroutine 為了保證繼續運行,將會遷移到其他線程中,而你不需要去關心這些細節。
下面的程序將會打印?"Hello from main goroutine". 是否打印"Hello from another goroutine",取決于兩個goroutines誰先完成.
func main() {go fmt.Println("Hello from another goroutine")fmt.Println("Hello from main goroutine")// 程序執行到這,所有活著的goroutines都會被殺掉}goroutine1.go
下一段程序?"Hello from main goroutine"?和?"Hello from another goroutine"?可能會以任何順序打印。但有一種可能性是第二個goroutine運行的非常慢,以至于到程序結束之前都不會打印。
func main() {go fmt.Println("Hello from another goroutine")fmt.Println("Hello from main goroutine")time.Sleep(time.Second) // 為其他goroutine完成等1秒鐘 }goroutine2.go
這有一個更實際的例子,我們定義一個使用并發來推遲事件的函數。
// 在指定時間過期后,文本會被打印到標準輸出 // 這無論如何都不會被阻塞 func Publish(text string, delay time.Duration) {go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)}() // 注意括號。我們必須調用匿名函數 }publish1.go
你可能用下面的方式調用?Publish?函數
func main() {Publish("A goroutine starts a new thread of execution.", 5*time.Second)fmt.Println("Let’s hope the news will published before I leave.")// 等待消息被發布time.Sleep(10 * time.Second)fmt.Println("Ten seconds later: I’m leaving now.") }publish1.go
該程序很有可能按以下順序打印三行,每行輸出會間隔五秒鐘。
$ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread of execution. Ten seconds later: I’m leaving now.一般來說,我們不可能讓線程休眠去等待對方。在下一節中, 我們將會介紹 Go 的一種同步機制,?channels?。然后演示如何使用channel來讓一個 goruntine 等待另外的 goruntine。
2. Channels
Sushi conveyor belt壽司輸送帶
channel?是一種 Go 語言結構,它通過傳遞特定元素類型的值來為兩個 goroutines 提供同步執行和交流數據的機制
。?<-?標識符表示了channel的傳輸方向,接收或者發送。如果沒有指定方向。那么 channel 就是雙向的。
Channels 是一種被 make 分配的引用類型
ic := make(chan int) // 不帶緩存的 int channel wc := make(chan *Work, 10) // 帶緩沖工作的 channel通過 channel 發送值,可使用 <- 作為二元運算符。通過 channel 接收值,可使用它作為一元運算符。
ic <- 3 // 向channel中發送3 work := <-wc // 從channel中接收指針到work如果 channel 是無緩沖的,發送者會一直阻塞直到有接收者從中接收值。如果是帶緩沖的,只有當值被拷貝到緩沖區且緩沖區已滿時,發送者才會阻塞直到有接收者從中接收。接收者會一直阻塞直到 channel 中有值可被接收。
關閉
close?的作用是保證不能再向 channel 中發送值。 channel 被關閉后,仍然是可以從中接收值的。接收操作會獲得零值而不會阻塞。多值接收操作會額外返回一個布爾值,表示該值是否被發送的。
ch := make(chan string) go func() {ch <- "Hello!"close(ch) }() fmt.Println(<-ch) // 打印 "Hello!" fmt.Println(<-ch) // 不阻塞的打印空值 "" fmt.Println(<-ch) // 再一次打印 "" v, ok := <-ch // v 的值是 "" , ok 的值是 false伴有 range 分句的 for 語句會連續讀取通過 channel 發送的值,直到 channel 被關閉
func main() {var ch <-chan Sushi = Producer()for s := range ch {fmt.Println("Consumed", s)} }func Producer() <-chan Sushi {ch := make(chan Sushi)go func() {ch <- Sushi("海老握り") // Ebi nigirich <- Sushi("鮪とろ握り") // Toro nigiriclose(ch)}()return ch }sushi.go
3.同步
下一個例子中,Publish?函數返回一個channel,它會把發送的文本當做消息廣播出去。
// 指定時間過期后函數Publish將會打印文本到標準輸出. // 當文本被發布channel將會被關閉. func Publish(text string, delay time.Duration) (wait <-chan struct{}) {ch := make(chan struct{})go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)close(ch) // broadcast – a closed channel sends a zero value forever}()return ch }publish2.go
注意我們使用一個空結構的 channel :?struct{}。 這表明該 channel 僅僅用于信號,而不是傳遞數據。
你可能會這樣使用該函數
func main() {wait := Publish("Channels let goroutines communicate.", 5*time.Second)fmt.Println("Waiting for the news...")<-waitfmt.Println("The news is out, time to leave.") }publish2.go
程序將按給出的順序打印下列三行信息。在信息發送后,最后一行會立刻出現
$ go run publish2.go Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. The news is out, time to leave.4.死鎖
traffic jam讓我們去介紹?Publish?函數中的一個bug。
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {ch := make(chan struct{})go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)**//close(ch)**}()return ch }這時由?Publish?函數開啟的 goroutine 打印重要信息然后退出,留下主 goroutine 繼續等待。
func main() {wait := Publish("Channels let goroutines communicate.", 5*time.Second)fmt.Println("Waiting for the news...")**<-wait**fmt.Println("The news is out, time to leave.") }在某些情況下,程序將不會有任何進展,這種情況被稱為死鎖。
deadlock?是線程之間相互等待而都不能繼續執行的一種情況
在運行時,Go 對于運行時死鎖檢測具有良好支持。但在某種情況下goroutine無法取得任何進展,這時Go程序會提供一個詳細的錯誤信息. 下面就是我們崩潰程序的日志:
Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]: main.main().../goroutineStop.go:11 +0xf6goroutine 2 [syscall]: created by runtime.main.../go/src/pkg/runtime/proc.c:225goroutine 4 [timer goroutine (idle)]: created by addtimer.../go/src/pkg/runtime/ztime_linux_amd64.c:73多數情況下下,在 Go 程序中很容易搞清楚是什么導致了死鎖。接著就是如何去修復它了。
5. 數據競爭
死鎖可能聽起來很糟糕, 但是真正給并發編程帶來災難的是數據競爭。它們相當常見,而且難于調試。
一個?數據競爭?發生在當兩個線程并發訪問相同的變量,同時最少有一個訪問是在寫.
數據競爭是沒有規律的。舉個例子,打印數字1,嘗試找出它是如何發生的 — 一個可能的解釋是在代碼之后.
func race() {wait := make(chan struct{})n := 0go func() {**n++** // 一次操作:讀,增長,寫close(wait)}()**n++** // 另一個沖突訪問<-waitfmt.Println(n) // 輸出: 不確定 }datarace.go
兩個goroutines,?g1?和?g2, 在競爭過程中,我們無法知道他們執行的順序.下面只是許多可能的結果性的一種.
- g1?從n變量中讀取值0
- g2?從n變量中讀取值0
- g1?增加它的值從0變為1
- g1?把它的值把1賦值給n
- g2?增加它的值從0到1
- g2?把它的值把1賦值給n
- 這段程序將會打印n的值,它的值為1
"數據競爭” 的稱呼多少有些誤導,不僅僅是他的執行順序無法被設定,而且也無法保證接下來會發生的情況。編譯器和硬件時常會為了更好的性能而調整代碼的順序。如果你仔細觀察一個正在運行的線程,那么你才可能會看到更多細節。
mid action避免數據競爭的唯一方式是同步操作在線程間所有共享的可變數據。存在幾種方式,在Go中,可能最多使用 channel 或者 lock。較底層的操作可使用?sync?and?sync/atomic?包,這里不再討論。
在Go中,處理并發數據訪問的首選方式是使用一個 channel,它將數據從一個goroutine傳遞到另一個goroutine。有一句經典的話:"不要通過共享內存來傳遞數據;而要通過傳遞數據來共享內存"。
func sharingIsCaring() {ch := make(chan int)go func() {n := 0 // 局部變量只能對當前 goroutine 可見n++ch <- n // 數據通過 goroutine 傳遞}()n := <-ch // ...從另外一個 goroutine 中安全接受n++fmt.Println(n) // 輸出: 2 }datarace.go
在這份代碼中 channel 充當了雙重角色。它作為一個同步點,在不同 goroutine 中傳遞數據。發送的 goroutine 將會等待其它的 goroutine 去接收數據,而接收的 goroutine 將會等待其他的 goroutine 去發送數據。
Go內存模型?- 當一個 goroutine 在讀一個變量,另外一個goroutine在寫相同的變量,這個過程實際上是非常復雜的,但是只要你用 channel 在不同goroutines中共享數據,那么這個操作就是安全的。
6. 互斥鎖
lock有時通過直接鎖定來同步數據比使用 channel 更加方便。為此,Go 標準庫提供了互斥鎖sync.Mutex。
要讓這種類型的鎖正確工作,所有對于共享數據的操作(包括讀和寫)必須在一個 goroutine 持有該鎖時進行。這一點至關重要,goroutine 的一次錯誤就足以破壞程序和導致數據競爭。
因此你需要為API去設計一種定制化的數據結構,并且確保所有同步操作都在內部執行。在這個例子中,我們構建了一種安全易用的并發數據結構,AtomicInt,它存儲了單個整型,任何goroutines 都能安全的通過?Add?和?Value?方法訪問數字。
// AtomicInt 是一種持有int類型的支持并發的數據結構。 // 它的初始化值為0. type AtomicInt struct {mu sync.Mutex // 同一時間只能有一個 goroutine 持有鎖。n int }// Add adds n to the AtomicInt as a single atomic operation. // 原子性的將n增加到AtomicInt中 func (a *AtomicInt) Add(n int) {a.mu.Lock() // 等待鎖被釋放然后獲取。a.n += na.mu.Unlock() // 釋放鎖。 }// 返回a的值. func (a *AtomicInt) Value() int {a.mu.Lock()n := a.na.mu.Unlock()return n }func lockItUp() {wait := make(chan struct{})var n AtomicIntgo func() {n.Add(1) // one accessclose(wait)}()n.Add(1) // 另一個并發訪問<-waitfmt.Println(n.Value()) // Output: 2 }datarace.go
7. 檢測數據競爭
競爭有時候難以檢測。當我執行這段存在數據競爭的程序,它打印55555。再試一次,可能會得到不同的結果。?sync.WaitGroup是go標準庫的一部分;它等待一系列 goroutines 執行結束。
func race() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; **i++** {go func() {**fmt.Print(i)** // 局部變量i被6個goroutine共享wg.Done()}()}wg.Wait() // 等待5個goroutine執行結束fmt.Println() }raceClosure.go
對于輸出?55555?較為合理的解釋是執行?i++?操作的 goroutine 在其他 goroutines 打印之前就已經執行了5次。事實上,更新后的?i?對于其他 goroutines 可見是隨機的。
一個非常簡單的解決辦法是通過使用本地變量作為參數的方式去啟動另外的goroutine。
func correct() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; i++ {go func(n int) { // 局部變量。fmt.Print(n)wg.Done()}(i)}wg.Wait()fmt.Println() }raceClosure.go
這段代碼是正確的,他打印了期望的結果,24031。回想一下,在不同 goroutines 中,程序的執行順序是亂序的。
我們仍然可以使用閉包去避免數據競爭。但是我們需要注意在每個 goroutine 中需要有不同的變量。
func alsoCorrect() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; i++ {n := i // 為每個閉包創建單獨的變量go func() {fmt.Print(n)wg.Done()}()}wg.Wait()fmt.Println() }raceClosure.go
7. 自動競爭檢測
總的來說.我們不可能自動的發現所有的數據競爭。但是 Go(從1.1版本開始) 提供了一個強大的數據競爭檢測器?data race detector。
這個工具使用下來非常簡單: 僅僅增加?-race?到?go?命令后。運行上述程序將會自動檢查并且打印出下面的輸出信息。
$ go run -race raceClosure.go Data race: ================== WARNING: DATA RACE Read at 0x00c420074168 by goroutine 6:main.race.func1()../raceClosure.go:22 +0x3fPrevious write at 0x00c420074168 by main goroutine:main.race()../raceClosure.go:20 +0x1bdmain.main()../raceClosure.go:10 +0x2fGoroutine 6 (running) created at:main.race()../raceClosure.go:24 +0x193main.main()../raceClosure.go:10 +0x2f ================== 12355 Correct: 01234 Also correct: 01234 Found 1 data race(s) exit status 66這個工具發現在程序20行存在數據競爭,一個goroutine向某個變量寫值,而22行存在另外一個 goroutine 在不同步的讀取這個變量的值。
注意這個工具只能找到實際執行時發生的數據競爭。
8. Select 語句
在 Go 并發編程中,最后講的一個是?select?語句。它會挑選出一系列通信操作中能夠執行的操作。如果任意的通信操作都可執行,則會隨機挑選一個并執行相關的語句。否則,如果也沒有默認執行語句的話,則會阻塞直到其中的任意一個通信操作能夠執行。
這有一個例子,顯示了如何用 select 去隨機生成數字.
// RandomBits 返回產生隨機位數的channel func RandomBits() <-chan int {ch := make(chan int)go func() {for {select {case ch <- 0: // 沒有相關操作語句case ch <- 1:}}}()return ch }randBits.go
更簡單,這里 select 被用于設置超時。這段代碼只能打印 news 或者 time-out 消息,這取決于兩個接收語句中誰可以執行.
select { case news := <-NewsAgency:fmt.Println(news) case <-time.After(time.Minute):fmt.Println("Time out: no news in one minute.") }time.After是 go 標準庫的一部分;他等待特定時間過去,然后將當前時間發送到返回的 channel.
9. 最基本的并發實例
couples多花點時間仔細理解這個例子。當你完全理解它,你將會徹底的理解 Go 內部的并發工作機制。
程序演示了單個 channel 同時發送和接受多個 goroutines 的數據。它也展示了 select 語句如何從多個通信操作中選擇執行。
func main() {people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"}match := make(chan string, 1) // 給未匹配的元素預留空間wg := new(sync.WaitGroup)for _, name := range people {wg.Add(1)go Seek(name, match, wg)}wg.Wait()select {case name := <-match:fmt.Printf("No one received %s’s message.\n", name)default:// 沒有待處理的發送操作.} }// 尋求發送或接收匹配上名稱名稱的通道,并在完成后通知等待組. func Seek(name string, match chan string, wg *sync.WaitGroup) {select {case peer := <-match:fmt.Printf("%s received a message from %s.\n", name, peer)case match <- name:// 等待其他人接受消息.}wg.Done() }matching.go
實例輸出:
$ go run matching.go Anna received a message from Eva. Cody received a message from Bob. No one received Dave’s message.10. 并行計算
CPUs具有并發特性應用會將一個大的計算劃分為小的計算單元,每個計算單元都會單獨的工作。
多 CPU 上的分布式計算不僅僅是一門科學,更是一門藝術。
-
每個計算單元執行時間大約在100us至1ms之間.如果這些單元太小,那么分配問題和管理子模塊的開銷可能會增大。如果這些單元太大,整個的計算體系可能會被一個小的耗時操作阻塞。很多因素都會影響計算速度,比如調度,程序終端,內存布局(注意工作單元的個數和 CPU 的個數無關)。
-
盡量減少數據共享的量。并發寫入是非常消耗性能的,特別是多個 goroutines 在不同CPU上執行時。共享數據讀操作對性能影響不是很大。
-
數據的合理組織是一種高效的方式。如果數據保存在緩存中,數據的加載和存儲的速度將會大大加快。再次強調,這對寫操作來說是非常重要的。
下面的例子將會顯示如何將多個耗時計算分配到多個可用的 CPU 上。這就是我們想要優化的代碼。
type Vector []float64// Convolve computes w = u * v, where w[k] = Σ u[i]*v[j], i + j = k. // Precondition: len(u) > 0, len(v) > 0. func Convolve(u, v Vector) Vector {n := len(u) + len(v) - 1w := make(Vector, n)for k := 0; k < n; k++ {w[k] = mul(u, v, k)}return w }// mul returns Σ u[i]*v[j], i + j = k. func mul(u, v Vector, k int) float64 {var res float64n := min(k+1, len(u))j := min(k, len(v)-1)for i := k - j; i < n; i, j = i+1, j-1 {res += u[i] * v[j]}return res }這個想法很簡單:識別適合大小的工作單元,然后在單獨的 goroutine 中運行每個工作單元. 這就是?Convolve?的并發版本.
func Convolve(u, v Vector) Vector {n := len(u) + len(v) - 1w := make(Vector, n)// 將w劃分為多個將會計算100us-1ms時間計算的工作單元size := max(1, 1000000/n)var wg sync.WaitGroupfor i, j := 0, size; i < n; i, j = j, j+size {if j > n {j = n}// goroutines只為讀共享內存.wg.Add(1)go func(i, j int) {for k := i; k < j; k++ {w[k] = mul(u, v, k)}wg.Done()}(i, j)}wg.Wait()return w }convolution.go
當定義好計算單元,通常最好將調度留給程序執行和操作系統。然而,在 Go1.*版本中,你需要指定 goroutines 的個數。
func init() {numcpu := runtime.NumCPU()runtime.GOMAXPROCS(numcpu) // 盡量使用所有可用的 CPU }Stefan Nilsson
原文發布時間為:2017年10月22日
本文來自云棲社區合作伙伴掘金,了解相關信息可以關注掘金網站。
總結
以上是生活随笔為你收集整理的Go并发编程中的那些事[译]的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: biostar handbook: 第一
- 下一篇: 搭建一个简易的https