Go 分布式学习利器(19)-- Go并发编程 之 CSP(communicating sequential processes) 机制
文章目錄
- 前言
- CSP 特點
- CSP代碼 演示
- 1. 正常流程的代碼
- 2. CSP 未設置buffer 代碼
- 3. 設置指定大小的channel buffer
- 總結
前言
CSP 這個名詞大家會比較陌生,但是說到future 熟悉C++ / JAVA 線程模型的伙伴可能就會很熟悉了, 通過future機制能夠實現兩個線程之間的數據交互,當然通過通用的mutex 和 condition_variable 也能夠實現,但這一些機制的引入無形中會大大增加代碼的復雜度。
舉例一段C++代碼:
#include <iostream>
#include <future>
#include <thread>int fib(int n)
{if (n < 3) return 1;else return fib(n-1) + fib(n-2);
}int main()
{std::future<int> f1 = std::async(std::launch::async, [](){return fib(20);});std::future<int> f2 = std::async(std::launch::async, [](){return fib(25);});std::cout << "waiting...\n";//主線程在該處阻塞,直到獲取到f1,f2兩個線程到合法的結果f1.wait();f2.wait();std::cout << "f1: " << f1.get() << '\n';std::cout << "f2: " << f2.get() << '\n';
}
可以看到通過future能夠達到延遲獲取線程執行結果,主線程和兩個子線程之間通信的過程就像建立了一個管道,子線程在一端塞入數據,主線程在一端取數據,管道內沒有數據,主線程可以選擇等待,也可以選擇返回。
相關c++ 的future機制介紹可以參考C++ 多線程編程:future異步訪問類。
同樣Go語言中也會有這樣 支持協程之間便捷自由得數據交互的機制,也就是CSP
CSP 特點
- CSP支持通過channel 進行通信,即channel 支持維護一定大小的buffer(有大小限制)用作消息的存儲,兩個通信協程只需要向channel中放入 或取出即可,無需等待
- CSP 支持 無指定buffer大小的channel通信,即一個協程等待結果,另一個協程負責發送,只有結果被取出來 才能向channel中放入。
- 異步返回,調用協程執行 者無需等待,即可返回
基本使用語法可以參考代碼演示中的第2段和第3段代碼。
CSP代碼 演示
1. 正常流程的代碼
package groutineimport ("fmt""testing""time"
)func service() string {time.Sleep(time.Millisecond * 50)return "Service is Done"
}func otherTask() {fmt.Println("do something else ")time.Sleep(time.Millisecond * 100)fmt.Println("Task is done")
}// 順序執行
func TestService(t *testing.T) {fmt.Println(service())otherTask()
}
這里 直接就是函數的調用,執行也會按照函數的調用順序來執行
=== RUN TestService
Service is Done
do something else
Task is done
--- PASS: TestService (0.16s)
2. CSP 未設置buffer 代碼
func service() string {time.Sleep(time.Millisecond * 50)return "Service is Done"
}func otherTask() {fmt.Println("do something else ")time.Sleep(time.Millisecond * 100)fmt.Println("Task is done")
}func AsyncService() chan string {rech := make(chan string) // 聲明一個channelgo func() {ret := service()fmt.Println("resturned result")rech <- ret // 向 channel 中放數據fmt.Println("service exited")}()return rech // 返回channel
}func TestAsyncService(t *testing.T) {ret := AsyncService()otherTask()// 從channel中取數據,且會阻塞到ret內部有有效數據才返回fmt.Println(<-ret) // 假如后續還有代碼執行,則channel不會阻塞,// 立即執行goroutine后續的代碼,直到channle內部獲取到了有效的數據才會打印// time.Sleep(time.Second * 1)
}
執行如下:
=== RUN TestAsyncService
do something else # otherTask 函數
resturned result # AsyncService 函數
Task is done # otherTask 函數
Service is Done # Service函數, 從channel中取數據,# 此時service函數還在睡眠,并未返回結果,# 這里會阻塞,直到取到有效的Service 給channel返回結果
service exited # AsyncService 函數
--- PASS: TestAsyncService (1.11s)
假如測試 代碼變更為:
func TestAsyncService(t *testing.T) {ret := AsyncService()otherTask()// 從channel中取數據,且會阻塞到ret內部有有效數據才返回fmt.Println(<-ret) // 假如后續還有代碼執行,則channel不會阻塞,// 立即執行goroutine后續的代碼,直到channle內部獲取到了有效的數據才會打印time.Sleep(time.Second * 1)
}
后兩行的輸出則會發生變化:
=== RUN TestAsyncService
do something else # otherTask 函數
resturned result # AsyncService 函數
Task is done # otherTask 函數
service exited # AsyncService 函數# go routine執行完畢,不會阻塞在channel的有效數據獲取處
Service is Done # Service函數 后續執行完畢返回給channel,才會打印
--- PASS: TestAsyncService (1.11s)
3. 設置指定大小的channel buffer
func AsyncService() chan string {rech := make(chan string,1 ) // 聲明一個channel,并指定其buffer大小為1go func() {ret := service()fmt.Println("resturned result")rech <- ret // 向 channel 中放數據fmt.Println("service exited")}()return rech // 返回channel
}func TestAsyncService(t *testing.T) {ret := AsyncService()otherTask()fmt.Println(<-ret) // 從channel中取數據,即使是最后一段邏輯,也不會阻塞go routine的執行
}
最后的輸出如下:
=== RUN TestAsyncService
do something else
resturned result
service exited
Task is done
Service is Done
--- PASS: TestAsyncService (0.10s)
可以看到有了buffer的channel 不會阻塞go routine的執行,后續有了有效數據,直接從channel中取就可以了。
總結
Go 的CSP并發機制比較燒腦,協程之間的數據通信 可以異步獲取結果而不需要等待正常邏輯的完成。
建議大家將以上 代碼本地多多調試一番,對于CSP機制和應用場景的理解會更加深刻一些。
總結
以上是生活随笔為你收集整理的Go 分布式学习利器(19)-- Go并发编程 之 CSP(communicating sequential processes) 机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 玻璃水加在的是汽车哪个部位 水箱吗还是?
- 下一篇: 赠人玫瑰下一句是什么啊?