Go实战 | 一文带你搞懂从单队列到优先级队列的实现
大家好,我是漁夫子,今天跟大家聊聊在我們項(xiàng)目中的優(yōu)先級(jí)隊(duì)列的實(shí)現(xiàn)。
? 優(yōu)先級(jí)隊(duì)列概述
隊(duì)列,是數(shù)據(jù)結(jié)構(gòu)中實(shí)現(xiàn)先進(jìn)先出策略的一種數(shù)據(jù)結(jié)構(gòu)。而優(yōu)先隊(duì)列則是帶有優(yōu)先級(jí)的隊(duì)列,即先按優(yōu)先級(jí)分類,然后相同優(yōu)先級(jí)的再 進(jìn)行排隊(duì)。優(yōu)先級(jí)高的隊(duì)列中的元素會(huì)優(yōu)先被消費(fèi)。如下圖所示:
在Go中,可以定義一個(gè)切片,切片的每個(gè)元素代表一種優(yōu)先級(jí)隊(duì)列,切片的索引順序代表優(yōu)先級(jí)順序,后面代碼實(shí)現(xiàn)部分我們會(huì)詳細(xì)講解。
? 為什么需要優(yōu)先級(jí)隊(duì)列
先來(lái)看現(xiàn)實(shí)生活中的例子。銀行的辦事窗口,有普通窗口和vip窗口,vip窗口因?yàn)榕抨?duì)人數(shù)少,等待的時(shí)間就短,比普通窗口就會(huì)優(yōu)先處理。同樣,在登機(jī)口,就有貴賓通道和普通,同樣貴賓通道優(yōu)先登機(jī)。
在互聯(lián)網(wǎng)中,當(dāng)然就是請(qǐng)求和響應(yīng)。使用優(yōu)先級(jí)隊(duì)列的作用是將請(qǐng)求按特定的屬性劃分出優(yōu)先級(jí),然后按優(yōu)先級(jí)的高低進(jìn)行優(yōu)先處理。在研發(fā)服務(wù)的時(shí)候這里有個(gè)隱含的約束條件就是服務(wù)器資源(CPU、內(nèi)存、帶寬等)是有限的。如果服務(wù)器資源是無(wú)限的,那么也就不需要隊(duì)列進(jìn)行排隊(duì)了,來(lái)一個(gè)請(qǐng)求就立即處理一個(gè)請(qǐng)求就好了。所以,為了在最大限度的利用服務(wù)器資源的前提下,將更重要的任務(wù)(優(yōu)先級(jí)高的請(qǐng)求)優(yōu)先處理,以更好的服務(wù)用戶。
對(duì)于請(qǐng)求優(yōu)先級(jí)的劃分可以根據(jù)業(yè)務(wù)的特點(diǎn)根據(jù)價(jià)值高的優(yōu)先原則來(lái)進(jìn)行劃分即可。例如可以根據(jù)是否是否是會(huì)員、是否是VIP會(huì)員等屬性進(jìn)行劃分優(yōu)先級(jí)。也可以根據(jù)是否是付費(fèi)用戶進(jìn)行劃分。在博客的業(yè)務(wù)中,也可以根據(jù)是否是大V的屬性進(jìn)行優(yōu)先級(jí)劃分。在互聯(lián)網(wǎng)廣告業(yè)務(wù)中,可以根據(jù)廣告位資源價(jià)值高低來(lái)劃分優(yōu)先級(jí)。
? 優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)
01 三個(gè)角色
在完整的優(yōu)先級(jí)隊(duì)列中有三個(gè)重要的角色,分別是優(yōu)先級(jí)隊(duì)列、工作單元Job、消費(fèi)者worker。
- 優(yōu)先級(jí)隊(duì)列:按優(yōu)先級(jí)劃分的隊(duì)列,用來(lái)暫存對(duì)應(yīng)優(yōu)先級(jí)的工作單元Job,相同優(yōu)先級(jí)的工作單元會(huì)在同一個(gè)隊(duì)列里。 
- 工作單元Job:隊(duì)列里的元素。我們把每一次業(yè)務(wù)處理都封裝成一個(gè)工作單元,該工作單元會(huì)進(jìn)入對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列進(jìn)行排隊(duì),然后等待消費(fèi)者worker來(lái)消費(fèi)執(zhí)行。 
- 消費(fèi)者worker:監(jiān)聽(tīng)noticeChan,當(dāng)監(jiān)聽(tīng)到noticeChan有消息時(shí),說(shuō)明隊(duì)列中有工作單元需要被處理,優(yōu)先從高優(yōu)先級(jí)隊(duì)列中獲取元素進(jìn)行消費(fèi)。 
02 隊(duì)列-消費(fèi)者模式
根據(jù)隊(duì)列個(gè)數(shù)和消費(fèi)者個(gè)數(shù),我們可以將隊(duì)列-消費(fèi)者模式分為單隊(duì)列-單消費(fèi)者模式、多隊(duì)列(優(yōu)先級(jí)隊(duì)列)- 單消費(fèi)者模式、多隊(duì)列(優(yōu)先級(jí)隊(duì)列)- 多消費(fèi)者模式。
我們先從最簡(jiǎn)單的單隊(duì)列-單消費(fèi)者模式實(shí)現(xiàn),然后一步步演化成多隊(duì)列(優(yōu)先級(jí)隊(duì)列)-多消費(fèi)者模式。
03 單隊(duì)列-單消費(fèi)者模式實(shí)現(xiàn)
3.1 隊(duì)列的實(shí)現(xiàn)
我們先來(lái)看下隊(duì)列的實(shí)現(xiàn)。這里我們用Golang中的List數(shù)據(jù)結(jié)果來(lái)實(shí)現(xiàn),List數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,包含了將元素放到鏈表尾部、將頭部元素彈出的操作,符合隊(duì)列先進(jìn)先出的特性。
好,我們看下具體的隊(duì)列的數(shù)據(jù)結(jié)構(gòu):
type?JobQueue?struct?{mu?sync.Mutex?//隊(duì)列的操作需要并發(fā)安全jobList?*list.List?//List是golang庫(kù)的雙向隊(duì)列實(shí)現(xiàn),每個(gè)元素都是一個(gè)jobnoticeChan?chan?struct{}?//入隊(duì)一個(gè)job就往該channel中放入一個(gè)消息,以供消費(fèi)者消費(fèi) }- 入隊(duì)操作 
到這里有同學(xué)就會(huì)問(wèn)了,為什么不直接將job推送到Channel中,然后讓消費(fèi)者依次消費(fèi)不就行了么?是的,單隊(duì)列這樣是可以的,因?yàn)槲覀冏罱K目標(biāo)是為了實(shí)現(xiàn)優(yōu)先級(jí)的多隊(duì)列,所以這里即使是單隊(duì)列,我們也使用List數(shù)據(jù)結(jié)構(gòu),以便后續(xù)的演變。
還有一點(diǎn),大家注意到了,這里入隊(duì)操作時(shí)有一個(gè) 這樣的操作:
queue.noticeChan?<-?struct{}{}消費(fèi)者監(jiān)聽(tīng)的實(shí)際上不是隊(duì)列本身,而是通道noticeChan。當(dāng)有一個(gè)元素入隊(duì)時(shí),就往noticeChan通道中輸入一條消息,這里是一個(gè)空結(jié)構(gòu)體,主要作用就是通知消費(fèi)者worker,隊(duì)列里有要處理的元素了,可以從隊(duì)列中獲取了。這個(gè)在后面演化成多隊(duì)列以及多消費(fèi)者模式時(shí)會(huì)很有用。
- 出隊(duì)操作 
根據(jù)隊(duì)列的先進(jìn)先出原則,是要獲取隊(duì)列的最先進(jìn)入的元素。Golang中List結(jié)構(gòu)體的Front()函數(shù)是獲取鏈表的第一個(gè)元素,然后通過(guò)Remove函數(shù)將該元素從鏈表中移出,即得到了隊(duì)列中的第一個(gè)元素。這里的Job結(jié)構(gòu)體先不用關(guān)心,我們后面實(shí)現(xiàn)工作單元Job時(shí),會(huì)詳細(xì)講解。
/***?彈出隊(duì)列的第一個(gè)元素*/ func?(queue?*JobQueue)?PopJob()?Job?{queue.mu.Lock()defer?queue.mu.Unlock()/***?說(shuō)明在隊(duì)列中沒(méi)有元素了*/if?queue.jobList.Len()?==?0?{return?nil}elements?:=?queue.jobList.Front()?//獲取隊(duì)列的第一個(gè)元素return?queue.jobList.Remove(elements).(Job)?//將元素從隊(duì)列中移除并返回 }- 等待通知操作 
上面我們提到,消費(fèi)者監(jiān)聽(tīng)的是noticeChan通道。當(dāng)有元素入隊(duì)時(shí),會(huì)往noticeChan中輸入一條消息,以便通知消費(fèi)者進(jìn)行消費(fèi)。如果隊(duì)列中沒(méi)有要消費(fèi)的元素,那么消費(fèi)者就會(huì)阻塞在該通道上。
func?(queue?*JobQueue)?WaitJob()?<-chan?struct{}?{return?queue.noticeChan }3.2 工作單元Job的實(shí)現(xiàn)
一個(gè)工作單元就是一個(gè)要執(zhí)行的任務(wù)。在系統(tǒng)中往往需要執(zhí)行不同的任務(wù),就是需要有不同類型的工作單元,但這些工作單元都有一組共同的執(zhí)行流程。我們看下工作單元的類圖。
我們看下類圖中的幾個(gè)角色:
- Job接口:定義了所有Job要實(shí)現(xiàn)的方法。 
- BaseJob類(結(jié)構(gòu)體):定義了具體Job的基類。因?yàn)榫唧wJob類中的有共同的屬性和方法。所以抽象出一個(gè)基類,避免重復(fù)實(shí)現(xiàn)。但該基類對(duì)Execute方法沒(méi)有實(shí)現(xiàn),因?yàn)椴煌墓ぷ鲉卧芯唧w的執(zhí)行邏輯。 
- SquareJob和AreaJob類(結(jié)構(gòu)體):是我們要具體實(shí)現(xiàn)的業(yè)務(wù)工作Job。主要是實(shí)現(xiàn)Execute的具體執(zhí)行邏輯。根據(jù)業(yè)務(wù)的需要定義自己的工作Job和對(duì)應(yīng)的Execute方法即可。 
接下來(lái),我們以計(jì)算一個(gè)int類型數(shù)字的平方的SquareJob為例來(lái)看下具體的實(shí)現(xiàn)。
- BaseJob結(jié)構(gòu)體 
首先看下該結(jié)構(gòu)體的定義
type?BaseJob?struct?{Err?errorDoneChan?chan?struct{}?//當(dāng)作業(yè)完成時(shí),或者作業(yè)被取消時(shí),通知調(diào)用者Ctx?context.ContextcancelFunc?context.CancelFunc }在該結(jié)構(gòu)體中,我們主要關(guān)注DoneChan字段就行,該字段是當(dāng)具體的Job的Execute執(zhí)行完成后,來(lái)通知調(diào)用者的。
再來(lái)看Done函數(shù),該函數(shù)就是在Execute函數(shù)完成后,要關(guān)閉DoneChan通道,以解除Job的阻塞而繼續(xù)執(zhí)行其他邏輯。
/***?作業(yè)執(zhí)行完畢,關(guān)閉DoneChan,所有監(jiān)聽(tīng)DoneChan的接收者都能收到關(guān)閉的信號(hào)*/ func?(job?*BaseJob)?Done()?{close(job.DoneChan) }再來(lái)看WaitDone函數(shù),該函數(shù)是當(dāng)Job執(zhí)行后,要等待Job執(zhí)行完成,在未完成之前,DoneChan里沒(méi)有消息,通過(guò)該函數(shù)就能將job阻塞,直到Execute中調(diào)用了Done(),以便解除阻塞。
/***?等待job執(zhí)行完成*/ func?(job?*BaseJob)?WaitDone()??{select?{case?<-job.DoneChan:return} }- SquareJob結(jié)構(gòu)體 
從結(jié)構(gòu)體的定義中可知,SquareJob嵌套了BaseJob,所以該結(jié)構(gòu)體擁有BaseJob的所有字段和方法。在該結(jié)構(gòu)體主要實(shí)現(xiàn)了Execute的邏輯:對(duì)x求平方。
func?(s?*SquareJob)?Execute()?error?{result?:=?s.x?*?s.xfmt.Println("the?result?is?",?result)return?nil }3.3 消費(fèi)者Worker的實(shí)現(xiàn)
Worker主要功能是通過(guò)監(jiān)聽(tīng)隊(duì)列里的noticeChan是否有需要處理的元素,如果有元素的話從隊(duì)列里獲取到要處理的元素job,然后執(zhí)行job的Execute方法。
我們將該結(jié)構(gòu)體定位為WorkerManager,因?yàn)樵诤竺嫖覀冎v解多Worker模式時(shí),會(huì)需要一個(gè)Worker的管理者,因此定義成了WorkerManager。
type?WorkerManager?struct?{queue?*JobQueuecloseChan?chan?struct{} }StartWorker函數(shù),只有一個(gè)for循環(huán),不斷的從隊(duì)列中獲取Job。獲取到Job后,進(jìn)行消費(fèi)Job,即ConsumeJob。
func?(m?*WorkerManager)?StartWork()?error?{fmt.Println("Start?to?Work")for?{select?{case?<-m.closeChan:return?nilcase?<-m.queue.noticeChan:job?:=?m.queue.PopJob()m.ConsumeJob(job)}}return?nil }func?(m?*WorkerManager)?ConsumeJob(job?Job)?{defer?func()?{job.Done()}()job.Execute() }到這里,單隊(duì)列-單消費(fèi)者模式中各角色的實(shí)現(xiàn)就講解完了。我們通過(guò)main函數(shù)將其關(guān)聯(lián)起來(lái)。
04 多隊(duì)列-單消費(fèi)者模式
有了單隊(duì)列-單消費(fèi)者的基礎(chǔ),我們?nèi)绾螌?shí)現(xiàn)多隊(duì)列-單消費(fèi)者模式。也就是優(yōu)先級(jí)隊(duì)列。
優(yōu)先級(jí)的隊(duì)列,實(shí)質(zhì)上就是根據(jù)工作單元Job的優(yōu)先級(jí)屬性,將其放到對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列中,以便worker可以根據(jù)優(yōu)先級(jí)進(jìn)行消費(fèi)。我們要在Job結(jié)構(gòu)體中增加一個(gè)Priority屬性。因?yàn)樵搶傩允撬蠮ob都共有的,因此定義在BaseJob上更合適。
type?BaseJob?struct?{Err?errorDoneChan?chan?struct{}?//當(dāng)作業(yè)完成時(shí),或者作業(yè)被取消時(shí),通知調(diào)用者Ctx?context.ContextcancelFunc?context.CancelFuncpriority?int?//工作單元的優(yōu)先級(jí) }我們?cè)賮?lái)看看多隊(duì)列如何實(shí)現(xiàn)。實(shí)際上就是用一個(gè)切片來(lái)存儲(chǔ)各個(gè)隊(duì)列,切片的每個(gè)元素存儲(chǔ)一個(gè)JobQueue隊(duì)列元素即可。
var?queues?=?make([]*JobQueue,?10,?100)那各優(yōu)先級(jí)的隊(duì)列在切片中是如何存儲(chǔ)的呢?切片索引順序只代表優(yōu)先級(jí)的高于低,不代表具體是哪個(gè)優(yōu)先級(jí)。
什么意思呢?假設(shè)我們現(xiàn)在對(duì)目前的工作單元定義了1、4、7三個(gè)優(yōu)先級(jí)。這3個(gè)優(yōu)先級(jí)在切片中是按優(yōu)先級(jí)從小到到依次存儲(chǔ)在queues切片中的,如下圖:
那為什么不讓切片的索引就代表優(yōu)先級(jí),讓優(yōu)先級(jí)為1的隊(duì)列存儲(chǔ)在索引1處,優(yōu)先級(jí)4的隊(duì)列存儲(chǔ)在索引4處,優(yōu)先級(jí)7的隊(duì)列存儲(chǔ)在索引7處呢?如果這樣存儲(chǔ)的話,就會(huì)變成如下這樣:
由此可見(jiàn),這樣的存儲(chǔ)會(huì)造成空間的浪費(fèi)。所以,我們是將隊(duì)列按優(yōu)先級(jí)高低依次存放到了切片中。
那既然這樣,當(dāng)一個(gè)優(yōu)先級(jí)的job來(lái)了之后,我該怎么知道該優(yōu)先級(jí)的隊(duì)列是存儲(chǔ)在哪個(gè)索引中呢?我們用一個(gè)map來(lái)映射優(yōu)先級(jí)和切片索引之間的關(guān)系。這樣當(dāng)一個(gè)工作單元Job入隊(duì)的時(shí)候,以優(yōu)先級(jí)為key,就可以查找到對(duì)應(yīng)優(yōu)先級(jí)的隊(duì)列存儲(chǔ)在切片的哪個(gè)位置了。如下圖所示:
代碼定義:
var?priorityIdx?map[int][int]?//該map的key是優(yōu)先級(jí),value代表的是queues切片的索引好了,我們重新定義一下隊(duì)列的結(jié)構(gòu)體:
type?PriorityQueue?struct?{mu?sync.MutexnoticeChan?chan?struct{}queues?[]*JobQueuepriorityIdx?map[int]int }//原來(lái)的JobQueue會(huì)變成如下這樣: type?JobQueue?struct?{priority?int?//代表該隊(duì)列是哪種優(yōu)先級(jí)的隊(duì)列jobList?*list.List?//List是golang庫(kù)的雙向隊(duì)列實(shí)現(xiàn),每個(gè)元素都是一個(gè)job }這里我們注意到有以下幾個(gè)變化:
- JobQueue里多了一個(gè)Priority屬性,代表該隊(duì)列是哪個(gè)優(yōu)先級(jí)別。 
- noticeChan屬性從JobQueue中移動(dòng)到了PriorityQueue中。因?yàn)楝F(xiàn)在有多個(gè)隊(duì)列,只要任意一個(gè)隊(duì)列里有元素就需要通知消費(fèi)者worker進(jìn)行消費(fèi),因此消費(fèi)者worker監(jiān)聽(tīng)的是PriorityQueue中是否有元素,而在監(jiān)聽(tīng)階段不關(guān)心具體哪個(gè)優(yōu)先級(jí)隊(duì)列中有元素。 
好了,數(shù)據(jù)結(jié)構(gòu)定義完了,我們看看將工作單元Job推入隊(duì)列和從隊(duì)列中彈出Job又有什么變化。
- 優(yōu)先級(jí)隊(duì)列的入隊(duì)操作 
優(yōu)先級(jí)隊(duì)列的入隊(duì)操作,就需要根據(jù)入隊(duì)Job的優(yōu)先級(jí)屬性放到對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列中,入隊(duì)流程圖如下:
當(dāng)一個(gè)Job加入隊(duì)列的時(shí)候,有兩種場(chǎng)景,一種是該優(yōu)先級(jí)的隊(duì)列已經(jīng)存在,則直接Push到隊(duì)尾即可。一種是該優(yōu)先級(jí)的隊(duì)列還不存在,則需要先創(chuàng)建該優(yōu)先級(jí)的隊(duì)列,然后再將該工作單元Push到隊(duì)尾。如下是兩種場(chǎng)景。
隊(duì)列已經(jīng)存在的場(chǎng)景
這種場(chǎng)景會(huì)比較簡(jiǎn)單。假設(shè)我們要插入優(yōu)先級(jí)為7的工作單元,首先從映射表中查找7是否存在,發(fā)現(xiàn)對(duì)應(yīng)關(guān)系是2,則直接找到切片中索引2的元素,即優(yōu)先級(jí)為7的隊(duì)列,將job加入即可。如下圖。
隊(duì)列不存在的場(chǎng)景
這種場(chǎng)景稍微復(fù)雜些,在映射表中找不到要插入優(yōu)先級(jí)的隊(duì)列的話,則需要在切片中插入一個(gè)優(yōu)先級(jí)隊(duì)列,而為了優(yōu)先級(jí)隊(duì)列在切片中也保持有序(保持有序就可以知道隊(duì)列的優(yōu)先級(jí)的高低了),則需要移動(dòng)相關(guān)的元素。我們以插入優(yōu)先級(jí)為6的工作單元為例來(lái)講解。
1、首先,我們的隊(duì)列有一個(gè)初始化的狀態(tài),存儲(chǔ)了優(yōu)先級(jí)1、4、7的隊(duì)列。如下圖。
2、當(dāng)插入優(yōu)先級(jí)為6的工作單元時(shí),發(fā)現(xiàn)在映射表中沒(méi)有優(yōu)先級(jí)6的映射關(guān)系,說(shuō)明在切片中還沒(méi)有優(yōu)先級(jí)為6的隊(duì)列的元素。所以需要在切片中依次查找到優(yōu)先級(jí)6應(yīng)該插入的位置在4和7之間,也就是需要存儲(chǔ)在切片2的位置。
3、將原來(lái)索引2位置的優(yōu)先級(jí)為7的隊(duì)列往后移動(dòng)到3,同時(shí)更新映射表中的對(duì)應(yīng)關(guān)系。
4、將優(yōu)先級(jí)為6的工作單元插入到索引2的隊(duì)列中,同時(shí)更新映射表中的優(yōu)先級(jí)和索引的關(guān)系。
我們看下代碼實(shí)現(xiàn):
func?(priorityQueue?*PriorityQueue)?Push(job?Job)?{priorityQueue.mu.Lock()defer?priorityQueue.mu.Unlock()//先根據(jù)job的優(yōu)先級(jí)找要入隊(duì)的隊(duì)列var?idx?intvar?ok?bool//從優(yōu)先級(jí)-切片索引的map中查找該優(yōu)先級(jí)的隊(duì)列是否存在if?idx,?ok?=?priorityQueue.priorityIdx[job.Priority()];?!ok?{//如果不存在該優(yōu)先級(jí)的隊(duì)列,則需要初始化一個(gè)隊(duì)列,并返回該隊(duì)列在切片中的索引位置idx?=?priorityQueue.addPriorityQueue(job.Priority)}//根據(jù)獲取到的切片索引idx,找到具體的隊(duì)列queue?:=?priority.queues[idx]//將job推送到隊(duì)列的隊(duì)尾queue.JobList.PushBack(job)//隊(duì)列job個(gè)數(shù)+1priorityQueue.Size++//如果隊(duì)列job個(gè)數(shù)超過(guò)隊(duì)列的最大容量,則從優(yōu)先級(jí)最低的隊(duì)列中移除工作單元if?priorityQueue.size?>?priorityQueue.capacity?{priorityQueue.RemoveLeastPriorityJob()}else?{//通知新進(jìn)來(lái)一個(gè)jobpriorityQueue.noticeChan?<-?struct{}{}} }代碼中大部分也都做了注釋,不難理解。這里我們來(lái)看下addPriorityQueue的具體實(shí)現(xiàn):
func?(priorityQueue?*PriorityQueue)?addPriorityQueue(priority?int)?int?{n?:=?len(priorityQueue.queues)//通過(guò)二分查找找到priority應(yīng)插入的切片索引pos?:=?sort.Search(n,?func(i?int)?bool?{return?priority?<?priorityQueue.priority})//更新映射表中優(yōu)先級(jí)和切片索引的對(duì)應(yīng)關(guān)系for?i?:=?pos;?i?<?n;?i++?{priorityQueue.priorityIdx[priorityQueue.queues[i].priority]?=?i?+?1}tail?:=?make([]*jobQueue,?n-pos)copy(tail,?priorityQueue.queues[pos:])//初始化一個(gè)新的優(yōu)先級(jí)隊(duì)列,并將該元素放到切片的pos位置中priorityQueue.queues?=?append(priorityQueue.queues[0:pos],?newJobQueue(priority))//將高于priority優(yōu)先級(jí)的元素也拼接到切片后面priorityQueue.queues?=?append(priorityQueue.queues,?tail...)?return?pos }最后,我們?cè)賮?lái)看一個(gè)實(shí)際的調(diào)用例子:
func?main()?{//初始化一個(gè)隊(duì)列queue?:=?&PriorityQueue{noticeChan:?make(chan?struct{},?cap),capacity:?cap,priorityIdx:?make(map[int]int),size:?0,}//初始化一個(gè)消費(fèi)workerworkerManger?:=?NewWorkerManager(queue)//?worker開(kāi)始監(jiān)聽(tīng)隊(duì)列g(shù)o?workerManger.StartWork()//?構(gòu)造SquareJobjob?:=?&SquareJob{BaseJob:?&BaseJob{DoneChan:?make(chan?struct{},?1),},x:?5,priority:?10,}//壓入隊(duì)列尾部queue.PushJob(job)//等待job執(zhí)行完成job.WaitDone()print("The?End") }05 多隊(duì)列-多消費(fèi)者模式
我們?cè)诙嚓?duì)列-單消費(fèi)者的基礎(chǔ)上,再來(lái)看看多消費(fèi)者模式。也就是增加worker的數(shù)量,提高Job的處理速度。
我們?cè)賮?lái)看下worker的定義:
type?WorkerManager?struct?{queue?*PriorityQueuecloseChans?[]chan?struct{} }這里需要注意,closeChans變成了切片數(shù)組。因?yàn)槲覀兠繂?dòng)一個(gè)worker,就需要有一個(gè)關(guān)閉通道。然后看StartWorker函數(shù)的實(shí)現(xiàn):
func?(m?*WorkerManager)?StartWork(n?int)?error?{fmt.Println("Start?to?Work")for?i?:=?0;?i?<?n;?i++?{m.createWorker();}return?nil }func?(m?*WorkerManager)?createWorker()?{closeChan?:=?make(chan?struct{})//每個(gè)協(xié)程,就是一個(gè)workergo?func(closeChan?chan?struct{})?{var?job?Jobfor?{select?{case?<-m.closeChan:return?nilcase?<-m.queue.noticeChan:job?:=?m.queue.PopJob()m.ConsumeJob(job)}??}}(closeChan)m.closeChanMu.Lock()defer?m.closeChanMu.Unlock()m.closeChans?=?append(m.closeChans,?closeChan)return?nil }func?(m?*WorkerManager)?ConsumeJob(job?Job)?{defer?func()?{job.Done()}()job.Execute() }這里需要注意的是,所有的worker都需要監(jiān)聽(tīng)隊(duì)列的noticeChan通道。測(cè)試的例子就留給讀者自己了。另外如下圖的單隊(duì)列-多消費(fèi)者模式是多隊(duì)列-多消費(fèi)者模式的一個(gè)特例,這里就不再進(jìn)行實(shí)現(xiàn)了。
? 總結(jié)
優(yōu)先級(jí)隊(duì)列的實(shí)現(xiàn)主要利用了切片來(lái)存儲(chǔ)多個(gè)隊(duì)列,并將隊(duì)列的優(yōu)先級(jí)依次存儲(chǔ)在切片索引中,并將具體的優(yōu)先級(jí)和切片索引存儲(chǔ)在映射表中,以便快速的定位一個(gè)具體優(yōu)先級(jí)隊(duì)列的存儲(chǔ)位置。本文中一些細(xì)節(jié)的并發(fā)加鎖操作做了忽略,大家在實(shí)際應(yīng)用中根據(jù)需要進(jìn)行完善即可。
想要了解更多有關(guān) Go 語(yǔ)言的資訊動(dòng)態(tài),還可通過(guò)掃描下方二維碼,進(jìn)去一起探討交流哦~
總結(jié)
以上是生活随笔為你收集整理的Go实战 | 一文带你搞懂从单队列到优先级队列的实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: Lua——迭代器的使用、pairs 和
- 下一篇: mysql jena rdf_RDF和J
