TimingWheel 时间轮详解
在kafka中,有許多請求并不是立即返回,而且處理完一些異步操作或者等待某些條件達(dá)成后才返回,這些請求一般都會帶有timeout參數(shù),表示如果timeout時(shí)間后服務(wù)端還不滿足返回的條件,就判定此次請求為超時(shí),這時(shí)候kafka同樣要返回超時(shí)的響應(yīng)給客戶端,這樣客戶端才知道此次請求超時(shí)了。比如ack=-1的producer請求,就需要等待所有的isr備份完成了才可以返回給客戶端,或者到達(dá)timeout時(shí)間了返回超時(shí)響應(yīng)給客戶端。
上面的場景,可以用延遲任務(wù)來實(shí)現(xiàn)。也就是定義一個任務(wù),在timeout時(shí)間后執(zhí)行,執(zhí)行的內(nèi)容一般就是先檢查返回條件是否滿足,滿足的話就返回客戶端需要的響應(yīng),如果還是不滿足,就發(fā)送超時(shí)響應(yīng)給客戶端。
對于延遲操作,java自帶的實(shí)現(xiàn)有Timer和ScheduledThreadPoolExecutor。這兩個的底層數(shù)據(jù)結(jié)構(gòu)都是基于一個延遲隊(duì)列,在準(zhǔn)備執(zhí)行一個延遲任務(wù)時(shí),將其插入到延遲隊(duì)列中。這些延遲隊(duì)列其實(shí)就是一個用最小堆實(shí)現(xiàn)的優(yōu)先級隊(duì)列,因此,插入一個任務(wù)的時(shí)間復(fù)雜度是O(logN),取出一個任務(wù)執(zhí)行后調(diào)整堆的時(shí)間也是O(logN)。
如果要執(zhí)行的延遲任務(wù)不多,O(logN)的速度已經(jīng)夠快了。但是對于kafka這樣一個高吞吐量的系統(tǒng)來說,O(logN)的速度還不夠,為了追求更快的速度,kafka的設(shè)計(jì)者使用了Timing Wheel的數(shù)據(jù)結(jié)構(gòu),讓任務(wù)的插入時(shí)間復(fù)雜度達(dá)到了O(1)。
Timing Wheel
image.png
上面是時(shí)間輪的一個結(jié)構(gòu)圖,該時(shí)間輪有8個槽,當(dāng)前時(shí)間指向0號槽。
我們再看一下Kafka里面TimingWheel的數(shù)據(jù)結(jié)構(gòu)
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {private[this] val interval = tickMs * wheelSizeprivate[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs }tickMs:表示一個槽所代表的時(shí)間范圍,kafka的默認(rèn)值的1ms
wheelSize:表示該時(shí)間輪有多少個槽,kafka的默認(rèn)值是20
startMs:表示該時(shí)間輪的開始時(shí)間
taskCounter:表示該時(shí)間輪的任務(wù)總數(shù)
queue:是一個TimerTaskList的延遲隊(duì)列。每個槽都有它一個對應(yīng)的TimerTaskList,TimerTaskList是一個雙向鏈表,有一個expireTime的值,這些TimerTaskList都被加到這個延遲隊(duì)列中,expireTime最小的槽會排在隊(duì)列的最前面。
interval:時(shí)間輪所能表示的時(shí)間跨度,也就是tickMs*wheelSize
buckets:表示TimerTaskList的數(shù)組,即各個槽。
currentTime:表示當(dāng)前時(shí)間,也就是時(shí)間輪指針指向的時(shí)間
運(yùn)行原理
當(dāng)新增一個延遲任務(wù)時(shí),通過buckets[expiration / tickMs % wheelSize]先計(jì)算出它應(yīng)該屬于哪個槽。比如延遲任務(wù)的delayMs=2ms,當(dāng)前時(shí)間currentTime是0ms,則expiration=delayMs+startMs=2ms,通過前面的公式算出它應(yīng)該落于2號槽。并把任務(wù)封裝成TimerTaskEntry然后加入到TimerTaskList鏈表中。
之后,kafka會啟動一個線程,去推動時(shí)間輪的指針轉(zhuǎn)動。其實(shí)現(xiàn)原理其實(shí)就是通過queue.poll()取出放在最前面的槽的TimerTaskList。由于queue是一個延遲隊(duì)列,如果隊(duì)列中的expireTime沒有到達(dá),該操作會阻塞住,直到expireTime到達(dá)。如果通過queue.poll()取到了TimerTaskList,說明該槽里面的任務(wù)時(shí)間都已經(jīng)到達(dá)。這時(shí)候就可以遍歷該TimerTaskList中的任務(wù),然后執(zhí)行對應(yīng)的操作了。
針對上面的例子,就2號槽有任務(wù),所以當(dāng)取出2號槽的TimerTaskList后,會先將currentTime = timeMs - (timeMs % tickMs),其中timeMs也就是該TimerTaskList的expireTime,也就是2Ms。所以,這時(shí)currentTime=2ms,也就是時(shí)間輪指針指向2Ms。
時(shí)間溢出處理
在kafka的默認(rèn)實(shí)現(xiàn)中,tickMs=1Ms,wheelSize=20,這就表示該時(shí)間輪所能表示的延遲時(shí)間范圍是0~20Ms,那如果延遲時(shí)間超過20Ms要如何處理呢?Kafka對時(shí)間輪做了一層改進(jìn),使時(shí)間輪變成層級的時(shí)間輪。
一開始,第一層的時(shí)間輪所能表示時(shí)間范圍是0~20Ms之間,假設(shè)現(xiàn)在出現(xiàn)一個任務(wù)的延遲時(shí)間是200Ms,那么kafka會再創(chuàng)建一層時(shí)間輪,我們稱之為第二層時(shí)間輪。
第二層時(shí)間輪的創(chuàng)建代碼如下
overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue )也就是第二層時(shí)間輪每一個槽所能表示的時(shí)間是第一層時(shí)間輪所能表示的時(shí)間范圍,也就是20Ms。槽的數(shù)量還是一樣,其他的屬性也是繼承自第一層時(shí)間輪。這時(shí)第二層時(shí)間輪所能表示的時(shí)間范圍就是0~400Ms了。
之后通過buckets[expiration / tickMs % wheelSize]算出延遲時(shí)間為200Ms的任務(wù)應(yīng)該位于第二層時(shí)間輪的10號槽位。
同理,如果第二層時(shí)間輪的時(shí)間范圍還容納不了新的延遲任務(wù),就會創(chuàng)建第三層、第四層...
值得注意的是,只有當(dāng)前時(shí)間輪無法容納目標(biāo)延遲任務(wù)所能表示的時(shí)間時(shí),才需要創(chuàng)建更高一級的時(shí)間輪,或者說把該任務(wù)加到更高一級的時(shí)間輪中(如果該時(shí)間輪已創(chuàng)建)。
一些細(xì)節(jié)
源碼解析
添加新的延遲任務(wù)
//SystemTimer.scala private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelledif (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}往時(shí)間輪添加新的任務(wù)
//TimingWheel def add(timerTaskEntry: TimerTaskEntry): Boolean = {//獲取任務(wù)的延遲時(shí)間val expiration = timerTaskEntry.expirationMs//先判斷任務(wù)是否已經(jīng)完成if (timerTaskEntry.cancelled) {false//如果任務(wù)已經(jīng)到期} else if (expiration < currentTime + tickMs) {false//判斷當(dāng)前時(shí)間輪所能表示的時(shí)間范圍是否可以容納該任務(wù)} else if (expiration < currentTime + interval) {// 根據(jù)任務(wù)的延遲時(shí)間算出應(yīng)該位于哪個槽val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// 設(shè)置TimerTaskList的expireTimeif (bucket.setExpiration(virtualId * tickMs)) {//把TimerTaskList加入到延遲隊(duì)列queue.offer(bucket)}true} else {//如果時(shí)間超出當(dāng)前所能表示的最大范圍,則創(chuàng)建新的時(shí)間輪,并把任務(wù)添加到那個時(shí)間輪上面if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}}private[this] def addOverflowWheel(): Unit = {synchronized {if (overflowWheel == null) {overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}從上面的代碼可以看出,對于當(dāng)前時(shí)間輪是否可以容納目標(biāo)任務(wù),是通過expiration < currentTime + interval來計(jì)算的,也就是根據(jù)時(shí)間輪的指針往后推interval時(shí)間就是時(shí)間輪所能表示的時(shí)間范圍。
時(shí)間輪指針的推進(jìn)
//SystemTimer.scala def advanceClock(timeoutMs: Long): Boolean = {//從延遲隊(duì)列中取出最近的一個槽,如果槽的expireTime沒到,此操作會阻塞timeoutMsvar bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//推進(jìn)時(shí)間輪的指針timingWheel.advanceClock(bucket.getExpiration())//把TimerTaskList的任務(wù)都取出來重新add一遍,add的時(shí)候會檢查任務(wù)是否已經(jīng)到期bucket.flush(reinsert)bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false}} //TimingWheel def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {//推進(jìn)時(shí)間輪的指針currentTime = timeMs - (timeMs % tickMs)// 推進(jìn)上層時(shí)間輪的指針if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}}總結(jié)
相比于常用的DelayQueue的時(shí)間復(fù)雜度O(logN),TimingWheel的數(shù)據(jù)結(jié)構(gòu)在插入任務(wù)時(shí)只要O(1),獲取到達(dá)任務(wù)的時(shí)間復(fù)雜度也遠(yuǎn)低于O(logN)。另外,kafka的TimingWheel在插入任務(wù)之前還會先檢查任務(wù)是否完成,對于那些在任務(wù)超時(shí)直接就完成指定操作的場景,TimingWheel的表現(xiàn)更加優(yōu)秀。
作者:瘋狂的哈丘
鏈接:https://www.jianshu.com/p/0f0fec47a0ad
來源:簡書
簡書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處。
總結(jié)
以上是生活随笔為你收集整理的TimingWheel 时间轮详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java:ThreadPoolExecu
- 下一篇: 时间轮算法解析(Netty Hashed