Go Concurrency Patterns: Pipelines and cancellation
原文地址: https://blog.golang.org/pipelines
簡介
Go 語言提供的并發原語使得可以很方便的構建數據流 pipeline,使用這樣的 pipeline 可以高效的利用 I/O 和多 cpu 的優勢. 這篇文章我們將展示如何構建并使用 pipeline.
什么是 pipeline ?
在 go 語言中沒有正式的定義什么是 pipeline. 它只是眾多并發程序類型中的一種. 非正式的說,pipeline 是一系列通過 channel 聯系起來的 stage. 每個 stage 包含多個執行相同功能的 goroutine. 在每個 stage 中, goroutine 執行以下操作:
- 從輸入 channel 中讀取數據
- 處理數據,產生新的數據
- 將數據發送到輸出 channel
除了第一個和最后一個 stage,每個 stage 可以擁有任意數量的 輸入channel 和 輸出channel。 第一個和最后一個 stage 只能有一個輸入channel一個輸出channel. 第一個 stage 也被稱為 Source 或 Producer, 最后一個 stage 被稱為 Sink 或 Consumer
接下來,我們通過一個簡單的示例來說明.
平方數
假設我們的 pipeline 有三個 stage.
第一個 stage 是 gen, 用來將與一組數字轉化為一個 channel.
func gen(nums ...int) <-chan int {out := make(chan int)go func() {for _, n := range nums {out <- n}close(out)}()return out }第二個 stage 是 sq, 從 輸入channel 中接收數字,計算數字的平方數,并將數字寫入輸出channel中.
func sq(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n}close(out)}()return out }main 函數中建立該 pipeline,并運行最后最后一個 stage. 最后一個 stage 從第二個 stage 中接收平方數,并將接收到的數據打印出來.
func main() {// Set up the pipeline.c := gen(2, 3)out := sq(c)// Consume the output.fmt.Println(<-out) // 4fmt.Println(<-out) // 9 }因為 gen 的輸入channel 和輸出 channel具有相同的輸入和輸出類型,因此我們可以重復的使用他們任意次.
我們可以將 main 方法重寫為如下形式:
func main() {// Set up the pipeline and consume the output.for n := range sq(sq(gen(2, 3))) {fmt.Println(n) // 16 then 81} }扇入,扇出
多個函數可以從一個channel中讀取數據,直到這個channel關閉,這叫做 扇出(fan-out). 通過這種方式,我們可以將一些列任務分派給多個 woker,這些 worker 可以在多個 CPU 上執行或者進行 I/O 操作.
一個函數可以從多個輸入 channel 中讀取并處理數據,直到所有的 channel 被關閉. 并將輸出寫入到同一個輸出channel 上,處理完數據后關閉輸出 channel. 這叫做 扇入(fan-in).
舉個例子,我們可以運行兩個 sq 方法,這兩個方法均從同一個輸入 channel 上讀取數據. 這里我們再引入另外一個方法 merge, 該方法用于將兩個 sq 的輸出整合到通過一個輸出channel中.
func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// Consume the merged output from c1 and c2.for n := range merge(c1, c2) {fmt.Println(n) // 4 then 9, or 9 then 4} } func merge(cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed, then calls wg.Done.output := func(c <-chan int) {for n := range c {out <- n}wg.Done()}wg.Add(len(cs))for _, c := range cs {go output(c)}// Start a goroutine to close out once all the output goroutines are// done. This must start after the wg.Add call.go func() {wg.Wait()close(out)}()return out }盡快停止
截至目前,我們將所有的 pipeline 函數設計為如下模式:
- 當前 stage 應該關閉 輸出channel,當我們處理完了所有的輸入數據,并且所有的輸出數據已經發送到了 輸出channel 之后.
- 當前 stage 應該持續接收數據直到 輸入channel 被關閉.
這樣設計使得我們可以再接收stage 中使用 range 循環來處理所有的數據,當所有數據被處理并發送到輸出channel之后,我們的循環為自動退出.
但是在真實情況下,我們往往不會接收從輸入channel中接收所有的數據. 有時,我們僅僅需要讀取輸入數據的一個子集便可以繼續往下進行了. 更通常的情況下,stage 提前退出,因為上流 stage 發生了錯誤. 在這種情況下,我們不應該等待所有的數據到來,并且我們希望上流 stage 直接退出而不是繼續產生哪些我們已經不在需要的數據.
在我們的例子中,如果當前 stage 無法正確的處理所有的 輸入數據,那么上流嘗試繼續發送數據到 stage 會被永久的阻塞住.
// Consume the first value from the output.out := merge(c1, c2)fmt.Println(<-out) // 4 or 9return// Since we didn't receive the second value from out,// one of the output goroutines is hung attempting to send it.這會導致資源泄露. goroutine 會消耗內存和運行時資源, goroutine 堆棧中的對該 channel 的引用會阻止垃圾回收器回收該 channel 所占的資源,直到它自己退出.
我們需要我們 pipeline 中的上流 stage 總是能自動退出即使下流 stage 無法接收該stage 所產生的所有數據. 一種方案是給輸出channel設置 buffer. buffer 中可以保存指定數量的數據,只要buffer沒有滿,往這樣的channel 中發送數據的操作總是能立馬返回.
c := make(chan int, 2) // buffer size 2 c <- 1 // succeeds immediately c <- 2 // succeeds immediately c <- 3 // blocks until another goroutine does <-c and receives 1如果我們在創建一個輸出channel的時候,便直到需要發送多少數據,那么使用 buffer 會簡化我們的代碼.
func gen(nums ...int) <-chan int {// 這里,對于每個輸入數字,我們均會產生一個輸出,// 因此我們便可以將輸出 channel 的buffer 大小設置為輸入 nums 的大小// 這樣我們往 out channel 中發送數據的操作永遠不會阻塞當前方法out := make(chan int, len(nums))for _, n := range nums {out <- n}close(out)return out }另外一種方案是,下流 stage 通知上流stage,它已經停止接收數據了.
取消接收
當我們在 main 方法中決定不再從 out channel 中接收數據,直接退出的時候,我們必須通知上流 stage,我們已經不再從該 channel 中接受數據了. 我們可以通過一個 done channel 來實現.
func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// 因為當前 stage 有兩個上流 channel,因此我們將 done 的 buffer 大小初始化為 2done := make(chan struct{}, 2)out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// Tell the remaining senders we're leaving.done <- struct{}{}done <- struct{}{} }上流 stage 需要做如下修改:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed or it receives a value// from done, then output calls wg.Done.output := func(c <-chan int) {for n := range c {// 這里使用 select 語句代替原先的單純發送數據的操作// 以便當下流 stage 停止接收,往 done channel 上發送停止接收的信號select {case out <- n:// 當我們在 main 方法中往 done channel 發送數據后,我們便會在這里接收到該數據// 我們便可以結束當前 stage 了case <-done: }}wg.Done()}// ... the rest is unchanged ... }這種方法存在一個問題,那就是對于每個下流 stage,都得知道上流 stage 的數量,這樣我們才能確定 done channel 的大小. 這看起來并不是一個優雅的解決方案.
我們需要一種解決方案,這個解決方案不需要知道上流和下流的 stage 數量.
在 go 中,我們可以通過關閉 channel 來實現. 因為試圖從一個已經關閉的 channel 上接收數據總是會直接返回,返回值是一個對應數據類型的 zero 值.
這意味著,我們只需要在 main 函數中關閉 done channel,然后所有嘗試從 done 中接收信號的上流stage 都會收到一個零值,這樣他們便可以直接退出了.
修改 main 函數,使用這種方案. 我們需要給每個上流 stage 增加一個done channel 參數,這樣,當 在main 中,我們關閉 done 之后,所有上流 stage 都能收到信號,并退出. 上流stage 的實現類似與 merge 的實現,略.
func main() {// Set up a done channel that's shared by the whole pipeline,// and close that channel when this pipeline exits, as a signal// for all the goroutines we started to exit.done := make(chan struct{}) // 注意,這里 done 不要 bufferdefer close(done) // 使用 defer,在 main 函數退出時,該 channel 會被關閉in := gen(done, 2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(done, in)c2 := sq(done, in)// Consume the first value from output.out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// done will be closed by the deferred call. }計算文件 MD5 checksum
接下來,我們看一個更加真實的例子.
MD5 經常被用來計算文件的 checksum. md5sum 命令可以輸出一組文件的 checksum.
% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go在這個例子中,我們來實現 md5sum 命令. 不同的是我們的md5sum 命令接收一個目錄,輸出這個目錄下所有文件的 checksum,按照路徑排序.
func main() {// Calculate the MD5 sum of all files under the specified directory,// then print the results sorted by path name.m, err := MD5All(os.Args[1])if err != nil {fmt.Println(err)return}var paths []stringfor path := range m {paths = append(paths, path)}sort.Strings(paths)for _, path := range paths {fmt.Printf("%x %s\n", m[path], path)} }MD5All 的實現如下
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) {m := make(map[string][md5.Size]byte)err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}data, err := ioutil.ReadFile(path)if err != nil {return err}m[path] = md5.Sum(data)return nil})if err != nil {return nil, err}return m, nil }并行化計算 MD5 checksum
在這節中,我們將 MD5All 拆分為兩個有兩個 stage 的 pipeline. 第一個stage sumFiles 遍歷文件目錄,計算文件 checksum,并將結果發送到輸出 channel 中, 計算結果的類型為 result.
type result struct {path stringsum [md5.Size]byteerr error } func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {// For each regular file, start a goroutine that sums the file and sends// the result on c. Send the result of the walk on errc.c := make(chan result)errc := make(chan error, 1)// 主線程開啟一個 goroutine, 在goroutine 中遍歷文件,并計算checksum,將結果輸出到 c channel,如果發生錯誤,將錯誤信息發送到 errc channelgo func() {var wg sync.WaitGrouperr := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}wg.Add(1)// 為每個文件使用一個單獨的 goroutine 來計算文件 checksumgo func() {data, err := ioutil.ReadFile(path)// 嘗試往 channel c 中發送計算結果,如果發送操作被阻塞且 done 已經被關閉// select 語句便會進入 done 對應的 case,程序得以繼續往下進行select {case c <- result{path, md5.Sum(data), err}:case <-done:}wg.Done()}()// Abort the walk if done is closed.select {case <-done:return errors.New("walk canceled")default:return nil}})// Walk has returned, so all calls to wg.Add are done. Start a// goroutine to close c once all the sends are done.// 等待所有計算文件 checksum 的 goroutine 退出go func() { wg.Wait()close(c) // 結束時,關閉 channel c}()// No select needed here, since errc is buffered.errc <- err}()return c, errc }MD5All 用來接收 checksum 或者 sumfiles 中發生的錯誤.
func MD5All(root string) (map[string][md5.Size]byte, error) {// MD5All closes the done channel when it returns; it may do so before// receiving all the values from c and errc.done := make(chan struct{})defer close(done)c, errc := sumFiles(done, root)m := make(map[string][md5.Size]byte)// 從 c 上讀取數據,無論 sumFiles 是否正常結束,// range c 都確保我們不會阻塞在這個 for 循環處for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// 檢查是否發生錯誤if err := <-errc; err != nil {return nil, err}return m, nil }限制并行數量
在上一節中,我們給每個文件創建一個 goroutine 用來計算文件的 MD5 checksum. 這里有一個問題,如果某個目錄下有很多文件,那么我們便需要創建大量個 goroutine,這可能會超出實際的物理內存大小.
我們可以通過限制并行處理的文件數量來解決這個問題. 這里,我們通過創建指定數量的 goroutine 來讀取文件. 此時,我們的 pipeline 就需要有三個stage 了: 遍歷文件目錄,讀取數據并計算 MD5 checksum, 收集計算結果.
第一個 stage walkFiles 讀取文件并將結果寫入輸出 channel 中
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {paths := make(chan string)errc := make(chan error, 1)go func() {// Close the paths channel after Walk returns.defer close(paths)// No select needed for this send, since errc is buffered.errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}select {case paths <- path:case <-done:return errors.New("walk canceled")}return nil})}()return paths, errc }第二個 stage 啟用指定數量個 goroutine 執行 digester 方法. 這個 goroutine 從 paths channel 中讀取文件路徑并計算 MD5 checksum,將結果輸出到 channel c 上
// 注意,這里我們不關閉 channel c,因為我們有多個 goroutine 往 c 中發送數據 func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {for path := range paths {data, err := ioutil.ReadFile(path)select {case c <- result{path, md5.Sum(data), err}:case <-done:return}} } // Start a fixed number of goroutines to read and digest files.c := make(chan result)var wg sync.WaitGroupconst numDigesters = 20wg.Add(numDigesters)for i := 0; i < numDigesters; i++ {go func() {digester(done, paths, c)wg.Done()}()}go func() {wg.Wait()close(c)}()最后一個 stage 從 channel c 上接收計算結果或者錯誤信息.
m := make(map[string][md5.Size]byte)for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// Check whether the Walk failed.if err := <-errc; err != nil {return nil, err}return m, nilEND!!!
總結
以上是生活随笔為你收集整理的Go Concurrency Patterns: Pipelines and cancellation的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#灰度图转伪彩色图
- 下一篇: 外设驱动库开发笔记4:AD9833函数发