由浅入深剖析go channel
channel 是 goroutine 之間通信的一種方式,可以類比成 Unix 中的進程的通信方式管道。
CSP 模型
在講 channel 之前,有必要先提一下 CSP 模型,傳統(tǒng)的并發(fā)模型主要分為 Actor 模型和 CSP 模型,CSP 模型全稱為 communicating sequential processes,CSP 模型由并發(fā)執(zhí)行實體(進程,線程或協(xié)程),和消息通道組成,實體之間通過消息通道發(fā)送消息進行通信。和 Actor 模型不同,CSP 模型關注的是消息發(fā)送的載體,即通道,而不是發(fā)送消息的執(zhí)行實體。關于 CSP 模型的更進一步的介紹,有興趣的同學可以閱讀論文 Communicating Sequential Processes,Go 語言的并發(fā)模型參考了 CSP 理論,其中執(zhí)行實體對應的是 goroutine, 消息通道對應的就是 channel。
channel 介紹
channel提供了一種通信機制,通過它,一個goroutine可以向另一個goroutine發(fā)送消息。channel本身還需關聯(lián)了一個類型,也就是channel可以發(fā)送數(shù)據(jù)的類型。例如:發(fā)送int類型信息的channel寫作chan int。
channel 創(chuàng)建
channel使用內(nèi)置的make函數(shù)創(chuàng)建,下面聲明了一個chan int類型的channel:
ch := make(chan int)c和map類似,make創(chuàng)建了一個底層數(shù)據(jù)結構體的引用,當賦值或參數(shù)傳遞時,只是拷貝了一個channel引用,指向相同的channel對象。和其他引用類型一樣,channel的空值為nil。使用==可以對類型相同的channel進行比較,只是指向相同對象或同為nil時,才返回true。
channel的讀寫操作
ch := make(chan int)// write to channel ch <- x// read from channel x <- ch// another way to read x = <- chchannel一定要初始化后才能進行讀寫操作,否則會永久阻塞。
關閉channel
golang提供了內(nèi)置的close函數(shù)對channel進行關閉操作。
ch := make(chan int) close(ch)有關channel的關閉,你需要注意以下事項:
- 關閉一個為初始化(nil)的channel會產(chǎn)生panic
- 重復關閉同一個channel會產(chǎn)生panic
- 向一個已關閉的channel中發(fā)送消息會產(chǎn)生panic
- 從已關閉的channel讀取消息不會產(chǎn)生panic,且能讀出channel中還未被讀取的消息,若消息均已讀出,則會讀到類型的零值。從一個已關閉的channel中讀取消息永遠不會阻塞,并且會犯一個false的ok-idiom,可以用它來判斷channel是否關閉
- 關閉channel會產(chǎn)生一個廣播機制,所有向channel讀取消息的goroutine都會收到消息
channel的類型
channel分為不帶緩存的channel和帶緩存的channel。
無緩存的channel
從無緩存的channle中讀取消息會阻塞,直到有goroutine向該channel中發(fā)送消息;同理,向無緩存的channel中發(fā)送消息也會阻塞,直到有goroutine從channel中讀取消息。
通過無緩存的channel進行通信時,接收者收到數(shù)據(jù)happens before發(fā)送者goroutine喚醒
有緩存的channel
有緩存的channel的聲明方式為指定make函數(shù)的第二個參數(shù),該參數(shù)為channel緩存的容量
ch := make(chan int, 10)
有緩存的channel類似一個阻塞隊列(采用環(huán)形數(shù)組實現(xiàn))。當緩存未滿時,向channel中發(fā)送消息時不會阻塞,當緩存滿時,發(fā)送操作將被阻塞,直到有其他goroutine從中讀取消息;相應的,當channel中消息不為空時,讀取消息不會出現(xiàn)阻塞,當channel為空時,讀取操作會造成阻塞,直到有goroutine向channel中寫入消息。
ch := make(chan int, 3)// blocked, read from empty buffered channel <- ch ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3// blocked, send to full buffered channel ch <- 4通過len函數(shù)可以獲得chan中的元素個數(shù),通過cap函數(shù)可以得到channel的緩存長度。
channel的用法
goroutine通信
看一個effective go中的例子:
c := make(chan int) // Allocate a channel// Start the sort in a goroutine; when it completes, signal on the channel. go func() {list.Sort()c <- 1 // Send a signal; value does not matter. }()doSomethingForAWhile() <-c主goroutine會阻塞,直到執(zhí)行sort的goroutine完成。
range遍歷
channel也可以使用range取值,并且會一直從channel中讀取數(shù)據(jù),直到有goroutine對改channel執(zhí)行close操作,循環(huán)才會結束。
// consumer worker ch := make(chan int, 10) for x := range ch {fmt.Println(x) }等價于
for {x, ok := <- chif !ok {break}fmt.Println(x) }配合select使用
select用法類似于IO多路復用,可以同時監(jiān)聽多個channel的消息狀態(tài),看下面的例子
select {case <- ch1;...case <- ch2;...case ch3 <- 10;...default:... }- select可以同時監(jiān)聽多個channel的寫入或讀取
- 執(zhí)行select時,若只有一個case通過(不阻塞),則執(zhí)行這個case塊
- 若有多個case通過,則隨機挑選一個case執(zhí)行
- 若所有case均阻塞,且定義了default模塊,則執(zhí)行default模塊。若未定義default模塊,則select語句阻塞,直到有case被喚醒。
- 使用break會跳出select塊。
1.設置超時時間
ch := make(chan struct{})// finish task while send msg to ch go doTask(ch)timeout := time.After(5 * time.Second) select {case <- ch:fmt.Println("task finished.")case <- timeout:fmt.Println("task timeout.") }2.quite channel
有一些場景中,一些worker goroutine需要一直循環(huán)處理信息,直到收到quit信號
msgCh := make(chan struct{}) quitCh := make(chan struct{}) for {select {case <- msgCh:doWork()case <- quitCh:finish()return }單向channel
即只可寫入或只可讀的channel,事實上channel只讀或只寫都沒有意義,所謂的單向channel其實只是聲明時用,比如
func foo(ch chan<- int) <-chan int {...}
chan<-int表示一個只可寫入的channel,<-chan int表示一個只可讀取的cahnel。上面這個函數(shù)約定了foo內(nèi)只能從向ch中寫入數(shù)據(jù),返回只一個只能讀取的channel,雖然使用普通的channel也沒有問題,但這樣在方法聲明時約定可以防止channel被濫用,這種預防機制發(fā)生再編譯期間。
channel源碼分析
channel的主要實現(xiàn)在src/runtime/chan.go中,一下源碼均基于go 1.9.2。源碼閱讀時為了更好理解channel特性,幫助正確合理的使用 channel,閱讀代碼的過程可以回憶前面章節(jié)的 channel 特性。
channel類結構
channel相關類定義如下:
// channel類型定義 type hchan struct {// channel中的元素數(shù)量,lenqcount uint // total data in the queue// channel的大小, capdataqsiz uint //size of the circular queue// channel的緩沖區(qū),環(huán)形數(shù)組實現(xiàn)buf unsafe.Pointer // points to an array of dataqsiz elements// 單個元素的大小elemsize uint16// closed 標志位closed uint32// 元素的類型elemtype *_type // element type// send 和receive的索引,用于實現(xiàn)環(huán)形數(shù)組隊列sendx uint //send indexrecvx uint //receive index// recv goroutine 等待隊列recvq waitq // list of recv waiters// send goroutine 等待隊列sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex }// 等待隊列的鏈表實現(xiàn) type waitq struct {first *sudoglast *sudog }// in src/runtime/runtime2.go // 對G的封裝 type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.g *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next *sudogprev *sudogelem unsafe.Pointer // data element(may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket uint32parent *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog //semaRootc *hchan // channel }可以看到,channel的主要組成有:一個環(huán)形數(shù)組實現(xiàn)的隊列,用于存儲消息元素;兩個鏈表實現(xiàn)的goroutine等待隊列,用于存儲阻塞在recv和send操作上的goroutine;一個互斥鎖,用于各個屬性變動的同步。
channel make實現(xiàn)
func makchan(t *chantype, size int64) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}if size < 0 || int64(uintpr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {panic(plainError("makechan: size out of range"))}var c *hchanif elem.kind&kindNoPointers != 0 || size == 0 {// case 1: channel 不含有指針// case 2: size == 0, 即無緩沖 channel// Allocate memory in one call.// Hchan does not contain pointers interesting for GC in this case:// buf pointers into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.// 在堆上分配連續(xù)的空間用作channelc = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))if size > 0 && elem.size != 0 {c.buf = add(unsafe.Pointer(c), hchanSize)} else {// race detector uses this location for synchronization// Also prevents us from pointing beyond the allocation(see issue 9401).c.buf = unsafe.Pointer(c)}} else {// 有緩沖channel初始化c = new(hchan)// 堆上分配buf內(nèi)存c.buf = newarray(elem, int(size))}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsize=", size, "\n")} return c }make的過程還比較簡單,需要注意一點的是當元素不含指針的時候,會將整個hchan分配成一個連續(xù)的空間。
channel send
// entry point for c <- x from compiled code // go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, getcallerpc(unsafe.Pointer(&c))) }/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 前面章節(jié)說到的,當channel未初始化或為nil時,向其中發(fā)送數(shù)據(jù)將會永久阻塞if c == nil {if !block {return false}// gopark 會使當前goroutine休眠,并通過unlockf喚醒,但是此時傳入的unlockf為nil,因此,goroutine會一直休眠gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//獲取同步鎖lock(&c.lock)//之前章節(jié)提過,向已經(jīng)關閉的channel發(fā)送消息會產(chǎn)生panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// CASE1: 當有goroutine在recv隊列上等待時,跳過緩存隊列,將消息直接發(fā)給receiver goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// CASE2: 緩存隊列未滿,則將消息復制到緩存隊列上if c.qcount < c.dataqsiz {//Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcoun++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// CASE3: 緩存隊列已滿,將goroutine加入send隊列// 初始化 sudog// Block on the channel.Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.selectdone = nilmysg.c = cgp.waiting = mysggp.param = nil// 加入隊列c.sendq.enqueue(mysg)// 休眠goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)// 喚醒 goroutine// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime >0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true }從send代碼中可以看到,之前章節(jié)提到的一些特性都在代碼中有所體現(xiàn)
send有以下幾種情況:
- 有goroutine阻塞在channel recv隊列上,此時緩存隊列為空,直接將消息發(fā)給receiver goroutine,只產(chǎn)生一次復制
- 當channel緩存隊列有剩余空間時,將數(shù)據(jù)放到隊列里,等待接收,接收后總共產(chǎn)生兩次復制
- 當channel緩存隊列已滿時,將當前goroutine加入send隊列并阻塞。
channel receive
//entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true) }//go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return }// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. func chanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 從nil的channel中接收消息,永久阻塞if c == nil {if !block {return}gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// CASE1: 從已經(jīng)close且為空的channel recv數(shù)據(jù),返回控制if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(unsafe.Pointer(c))}unlock(&c.lock) if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// CASE2: send隊列不為空// CASE2.1: 緩存隊列為空,直接從sender recv元素// CASE2.2: 緩存隊列不為空,此時只有可能是緩存隊列已滿,從隊列頭取出元素,并喚醒sender將元素寫入緩存隊列尾部。由于是環(huán)形隊列,因此,隊列滿時只需要將隊列頭賦值給receiver,同時將sender元素復制到該位置,并移動隊列頭尾索引,不需要移動隊列元素if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender, If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queu// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// CASE3: 緩存隊列不為空,直接從隊列去元素,移動頭索引if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// CASE4: 緩存隊列為空,將goroutine加入recv隊列,并阻塞// no sender available: block on this channelgp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed }channel close
func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)// 重復close,產(chǎn)生panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc(unsafe.Pointer(&c))racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))racerelease(unsafe.Pointer(c))}c.closed = 1var glist *g//喚醒所有 receiver// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}// 喚醒所有sender,并產(chǎn)生panic// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for glist != nil {gp := glistglist = glist.schedlink.ptr()gp.schedlink = 0goready(gp, 3)} }?
總結
以上是生活随笔為你收集整理的由浅入深剖析go channel的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: struct结构体和char型数组的相互
- 下一篇: 理解Go语言中的方法和接收者