Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播
Select 多路選擇
基本使用語法如下:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s \n",ret)
case ret := <-retCh2:t.Logf("result %s \n", rest)
default :t.Error("return empty")
關于channel部分其實是阻塞的,也就是select實際執行的過程中會阻塞在對應的channel部分,直到某一個case對應的channel有有效的數據才會執行該case下的邏輯。
實際程序執行的過程中 如果出現兩個channel同時有有效數據,那兩個case內部的執行順序是無法嚴格保證的,只能由程序員自己來控制。
Select 超時控制
同樣如上代碼,我們要控制channel獲取數據的時間,防止channel中等待有效數據時間過長,所以可以增加一些超時控制:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s \n",ret)// 等待1s 返回一個channel的有效數據,且第一個case還未得到有效數據,則輸出超時
case <- time.After(time.Second * 1): t.Error("time out")
所以 select 可以用于保證多個協程之間的高可用,防止slow response的出現。
以上兩種select多路選擇用法 的 測試代碼如下:
package select_testimport ("fmt""testing""time"
)
func service() string {time.Sleep(time.Millisecond * 400)return "Service1 is Done"
}func AsyncService(i int) chan string {rech := make(chan string,1) // 聲明一個channelvar ret stringgo func() {if i == 1 {ret = service()} else {ret = service1()}fmt.Println("resturned result")rech <- ret // 向 channel 中放數據fmt.Println("service exited")}()return rech // 返回channel
}// 測試超時機制來避免等待channel時間過長
// ret 這個channel需要等待AsyncService 函數中的routine中的
// service函數返回結果,才會將數據填充到ret 中
// service需要執行400ms,所以這里會出現超時的情況
func TestSelectTimeout(t *testing.T) {select {case ret := <-AsyncService(1):t.Logf("result is %s", ret)case <- time.After(time.Millisecond * 100):t.Error("time out")}
}func service1() string {time.Sleep(time.Millisecond * 500)return "Service2 is Done"
}// 測試多個channel返回數據,挑選其中一個先準備好的channel來執行
func TestSelect(t *testing.T) {select {case ret1 := <- AsyncService(1):t.Logf("result is %s",ret1)case ret2 := <- AsyncService(2):t.Logf("result is %s", ret2)case <- time.After(time.Millisecond * 600):t.Log("Time out")}
}
channel 的關閉和廣播
channel 可以說是Go語言中 協程之間通信的一種機制,支持帶buffer和不帶buffer兩種模式,非常方便得實現不同協程之間的通信過程,但是在具體的通信過程中也會暴露一些問題,如下生產者,消費者代碼:
package close_channelimport ("fmt""sync""testing"
)// 數據生產者
func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i ++ {ch <- i}wg.Done()}()
}// 數據消費者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {go func() {// 沒有辦法準確知道channel中什么時候沒有數據,這里保持和生產者相同的填充數據的次數for i := 0;i < 10; i++ { data := <-chfmt.Printf("consumer data %d\n",data)}wg.Done()}()
}func TestProducer(t *testing.T) {var wg sync.WaitGroup // wait groupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait() // 阻塞,直到waitgroup執行完畢,wg的值變為0輸出如下:
=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)
上面代碼中消費者協程在channel buffer 內部沒有數據的時候只能夠被動阻塞等待,直到channel中數據有效。這個實現導致生產者消費者之間的代碼耦合度比較高,且當程序中存在多個producer和多個receiver的時候,receivers并不一定能夠確切得知道什么時候producer才不會生產數據。
還是如上代碼,我們如果啟動多個消費者就能夠很明顯得看到問題,如下測試代碼:
func TestProducer(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0
執行的過程中可以發現消費者消費了0,因為這個時候生產者已經不再生產數據了,再去消費的話會取到channel默認的值即0,且channel沒有關閉,消費者還在等待有效的數據,還會一直阻塞程序運行。
所以channel 也提供了主動關閉的機制,即當生產者不再發送數據的時候可以主動關閉channel,而消費者再次使用channel的時候只需要確認一下channel的狀態即可。如果channel為不可用,即可返回。
關于 關閉的channel 需要注意如下幾點:
- 向關閉的channel 發送數據會導致panic異常
- v,ok <- ch; 接受channel的值和狀態,如果ok為true,則表示channel可以接受數據;如果ok 為false,表示channel已經關閉,無法接受數據
- 所有channel的接受者在channel關閉的時候都會從阻塞等待中返回上述OK值為false。這個廣播機制可以被用作向多個訂閱者發送信號。
修改測試代碼如下:
package close_channelimport ("fmt""sync""testing"
)func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i ++ {ch <- i}close(ch)//ch <- 11 // 向關閉后的channel發送數據會報panic錯誤wg.Done()}()
}func dataComsumer(ch chan int, wg *sync.WaitGroup) {go func() {for {if data,ok := <-ch; ok {// 接受關閉后channel的廣播,保證channel的輸出結果是有效的fmt.Printf("consumer data %d\n",data)}else {break}}wg.Done()}()
}func TestProducer(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg) // 啟動多個消費者wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}
輸出如下:
=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)
可能部分數據的輸出順序和單個消費者的數據輸出順序有差異,因為消費者也是各自的獨立協程,所以在獲取數據并輸出的順序會有差異。
總結
以上是生活随笔為你收集整理的Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Rocksdb Iterator实现:从
- 下一篇: 阴阳师秘卷书童在哪里?
