5.4.7 延迟的心跳
5.4.7 延遲的心跳
延遲操作有3個主要的方法:嘗試完成方法(返回布爾值,表示是有可以完成)、超時的回調方法、完成的回調方法。對于“延遲加入”,嘗試完成是判斷消費組成員中是否還有消費者沒有重新發送“加入組請求”,如果全部都發送了“加入組請求”,就認為“延遲加入”可以完成。“延遲加入”完成時的回調方法會發送“加入組響應”。
“延遲心跳”的嘗試完成方法(tryCompleteHeartbeat())判斷條件是:消費者成員是否存活。如果消費者存活,則可以調用完成時的回調方法(onCompleteHeartbeat())。但完成“延遲心跳”的方法實現中,并沒有具體的處理代碼:
 
判斷消費者成員是否存活有下面的3種條件,只要任何一個條件滿足,都認為消費者是存活的。
- 消費者成員的awai.ti.ngJoi.nCallback回調方法不為空。
- 消費者成員的awai.ti.ngSyncCallback回調方法不為空。
- 消費者成員最近的心跳時間加上會話超時時間大于下一次心跳的截止時間。
先來看最后一個條件,因為截止時間是在創建“延遲心跳”時指定,那就來看創建“延遲心跳”時是怎么做的。
再來回顧下協調者調用“完成并調度下一次心跳”方法,創建“延遲心跳”的相關上下文。協調者先通過checkAndCoMplete()方法嘗試完成已有的“延遲心跳”。通常來說,這一步一定能夠完成“延遲的心跳”,否則就沒有必要再創建新的“延遲的心跳”了。
 注意:如果是協調者第一次調用checkAndCo~叭ete()方法,因為延遲緩存中還沒有這個消費者的“延遲心跳”,所以第一次不會真正執行該方法。
在完成了上次的“延遲心跳”后,協調者會計算出下一次的心跳截止時間,并創建新的“延遲心跳”。這一次通過tryCompleteElseWatch()方法嘗試完成剛剛創建的“延遲心跳”,則一定不能完成。因為判斷能夠完成的條件是:最新的心跳時間加上會話超時時間必須大于下一次心跳的截止時間。而剛剛創建的“延遲心跳”對象,在計算這個條件時,“最新條件時間加上會話超時時間等于下一次心跳的截止時間”,因此不滿足完成的條件。相關代碼如下:
 
如圖5-27所示,以協調者處理“加入組請求”和“同步組請求”時,調用“完成和調度下一次心跳”方法(下文簡稱“調度方法”)為例。有3個地方會調用該方法:協調者返回“加入組響應”給每個消費者之后、協調者處理消費者的“同步組請求”時、協調者返回“同步組響應”給每個消費者之后,具體步驟如下。
(1)協調者發送“加入組響應”給某個消費者后,當前時間為0秒。第一次調用調度方法,延遲緩存中沒有“延遲心跳”,先創建“延遲的心跳”,而且它的截止時間為5秒,」之滿足完成的條件,加入延遲緩存。 (2)消費者在l秒時發送了“同步組請求”,當前時間為l秒。協調者處理消費者的“同步組請求”,第二次調用調度方法,延遲緩存中有“延遲的心跳”,嘗試完成它,可以完成。然后創建新的“延遲心跳”,截止時間為6秒。 (3)協調者發送“同步組響應”給某個消費者后,當前時間為3秒。第三次調用調度方法,延遲緩存中有“延遲的心跳”,嘗試完成它,可以完成。然后創建新的“延遲心跳”,截止時間為8秒。如圖5-28所示,我們從延遲緩存的角度看調度方法。調度方法分3步:檢查井完成延遲心跳、創建新的延遲心跳、嘗試完成并監視延遲的心跳。如果緩存中已經存在延遲操作,第一步一定會完成延遲的心跳,并將延遲心跳從緩存中刪除。第三步一定不會完成新創建的延遲心跳,并將剛創建的延遲心跳加入緩存。
如表5-8所示,當執行調度方法時,判斷延遲心跳是杏可以完成。截止時間是上一次延遲心跳的截止時間,而最近心跳是當前時間,所以第三列的條件總是能夠成立(第一次沒有截止時間)。
 
如圖5-29所示,調度方法每次創建新的延遲心跳,都會更新截止時間。只要在下一次心跳截止時之前執行調度方法,都會完成延遲的心跳。但如果沒有在截止時間內再次執行調度方法,延遲緩存中的延遲心跳就會超時,對應的消費者就有可能被協調者從消費組中移除(還有下面分析的其他條件限制)。
注意:協調者創建的每個延遲心跳都和消費者一一對應,延遲心跳的超時時間是消費者設直的會話超時時間。放入延遲緩存中的延遲,心跳被用來表示消費者是否存活。如果消費者在延遲心跳的截止時間之前再次調用了調度方法,舊的延遲,心跳滿足完成的條件,會從緩存中彈出并執行。協調者會創建新的延遲心跳,新延邊,\.;跳的截止時間也會被更新。通過這種方式,消費者只要存活,都對應緩存中的一個延遲心跳。
 
再來看判斷消費者成員是否存活的另外兩個條件:消費者成員的awai.ti.ngJoi.nCallback或awai.ti.ngSyncCallback回調方法不為空,這兩個條件下即使超時了,也被認為是存活的。
消費者成員元數據的回調方法有兩個,先來看awai.ti.ngSyncCallback和延遲心跳示例。如圖5-30所示,假設3個消費者設置的會話超時時間分別是:Cl=lO秒,C2=20秒,C3=40秒。協調者完成“延遲加入”,發送“加入組響應”的時間為10:00:00。對應每個消費者的下次心跳截止時間分別是:Cl=10:00:10,C2=10:00:2日,C3=10:00:40。Cl在10:00:03發送了“同步組請求”,協調者更新Cl的心跳截止時間為10:00:13。那么照理說,Cl在10:00:13后因為一直都沒有機會再調用調度方法,所以“延遲的心跳”就會超時,Cl就會被協調者從消費組中移除。但實際上,協調者處理Cl的“同步組請求”時,設置了awai.ti.ngSyncCallback回調方法。即使“延遲的心跳”超時了,但“回調方法不為空”,消費者成員仍然被認為是存活的,Cl就不會從消費組中移除。
注意:“延遲的心跳”在超時后,還是會調用onExpi.reHeartbeat()方法,只不過對能夠真正執行onMefTlberFai.lure.O方法再加上一層限制條件,防止消費者仍然存活,卻被移除掉。
當主消費者C2直到10:00:20才發送“同步組請求”,而C3還沒有發送“同步組請求”。協調者處理C2的“同步組請求”時,只會完成Cl和C2上一次的延遲心跳,并創建新的延遲心跳。假設協調者返回“同步組響應”給Cl和C2的時間是10:00:25,那么Cl下一次的心跳截止時間為10:00:35,C2下一次的心跳截止時間為10:00:45。下面的步驟是3個消費者延遲心跳的變化情況。
(1)協調者10:00:03處理Cl的“同步組請求”,Cl的下次心跳截止時間為10:00:13。 (2)協調者10:00:20處理C2的“同步組請求”,C2的下次心跳截止時間為10:00:40。 (3)協調者10:00:25返回“同步組響應”給Cl,Cl的下次心跳截止時間為10:00:35。 (4)協調者10:00:25返回“同步組響應”給C2,C2的下次心跳截止時間為10:00:“。 (5)協調者10:00:40處理。的“同步組請求”,C3的下次心跳截止時間為10:01:20。協調者調用“完成并調度下一次心跳”的調度方法時,不管外部事件是哪一種(返回加入組響應、處理同步組請求、返回同步組響應),都會更新同一個延遲的心跳對象。在任何時刻,消費者在延遲緩存中的延遲心跳只有一個。也就是說,協調者在返回“加入組響應”時,也可能會更新返回“同步組響應”創建的延遲心跳。
再來看“判斷消費者成員存活”的另一個條件:awa"i.t"i.ngJo"i.nCallback回調方法不為空。
協調者在處理普通消費者的“同步組請求”時,除了設置awa"i.t"i.ngJo"i.nCallback回調方法,也會調用調度方法更新延遲的心跳。如果延遲心跳超時,但主消費者還沒有發送“同步組請求”,普通消費者仍然被認為是存活的。協調者在處理消費者的“加入組請求”時,也會設置awa"i.t"i.ngJ01.nCallback回調方法,但不會調用調度方法。如果對應的延遲心跳超時,但延遲的加入操作還不能完成,消費者也被認為是存活的。
如圖5-31所示,仍然以前面的示例作為基礎,不過這里假設最開始只有兩個消費者,協調者在10:00:00返回同步組響應給Cl和C2,它們的下次心跳截止時間分別是口=10:00:10和口=10:00:20。下面幾個步驟是協調者處理“加入組請求”、延遲加入、延遲心跳相關的事件順序。
(1)新的消費者C3在10:00:02加入組,Cl和C2必須在心跳截止時間內重新發送“加入組請求”。 (2)Cl在10:00:03重新發送“加入組請求”,延遲加入不能完成,因為C2還沒有發送“加入組請求”。 (3)當時間到10:00:10時(并不是10:00:13),Cl的延遲心跳超時了。但因為協調者處理“加入組請求”時,設置了awa"i.t"i.ngJo"i.nCallback回調方法,所以Cl還是存活的。 (4)C2在10:00:15重新發送了“加入組請求”,延遲加入可以完成。協調者返回響應給3個消費者,并且更新它們的下次心跳截止時間,分別是:C1=10:00:25、(2=10:00:35、ζ3=10:00:550注意:協調者處理“加入紐請求”時,因為沒有調用調度方法,并不會更新下次心跳的截止時間。所以圖中協調者處理Cl,它的心跳截止時間還是上一次的10:00:1日,新加入的消費者C3則沒有延遲心跳。
針對協調者處理“加入組請求”的awa"i.t"i.ngJo"i.nCallback回調方法,再舉個異常的例子。如圖5-32所示,假設消費者2并沒有在下次心跳截止時間(1日:00:2日)之前重新發送“加入組請求”,對應的延遲心跳會判斷到消費者2失敗,從而將其從消費組中移除。另外,延遲的加入在完成時,協調者也不會返回“加入組響應”給消費者2,因為它已經不在消費組中了。
 
總結成員元數據的awai.ti.ngJoi.nCallback和lawai.ti.ngSyncCallback回調方法使用的地方。
- 消費者處理“加入組請求”和“同步組請求”時先保存回調方法,在返回響應時調用回調方法。
- awai.ti.ngJoi.nCallback還用來判斷消費組中的消費者是否重新發送了“加入組請求”。
- 即使消費者的延遲心跳超時了,如果元數據的兩個回調方法不為空,消費者仍然被認為是存活的。
協調者會在消費者加入組的過程中創建延遲的心跳。消費者成功加入消費組(即消費組進入穩定狀態)后,它會發送心跳請求給協調者。協調者處理消費者的心跳請求時,也會調用“完成并調度下一次心跳”方法。
消費者成功加入組后,會在調用。『iJoi.nComplete()回調方法后重置心跳任務,重新開始調度發送心跳的定時任務。這里用“重置”是因為消費組會經常發生再平衡,每次再平衡過后,消費組狀態變為“穩定”,每個消費者都需要重新發送心跳請求給協調者。
協調者處理消費者發送的心跳請求,沒有其他的依賴限制條件。比如,不像“延遲加入”那樣需要等待其他消費者都發送了“加入組請求”,才會返回“加入組響應”;也不需要等待主消費者發送“同步組請求”后,才返回“同步組響應”。協調者處理心跳請求和加入組過程中調用調度方法一樣,也會立即完成延遲緩存中已有的延遲心跳,并創建一個新的延遲心跳并重新放入延遲緩存。最后,心跳的處理沒有產生結果數據,協調者直接返回沒有錯誤碼的“心跳響應”給消費者。
5.5 小結
第4章和本章主要分析了新消費者的客戶端(KafkaConsumer)和服務端的協調者(GroupCoordinator)。消費者客戶端的主要業務邏輯是拉取消息,而為了拉取消息必須分配到分區。同一個消費組的所有消費者通過向協調者發送“加入組請求”,最終獲得分配給向己的分區。服務端的協調者負責消費組的再平衡操作,將集群所有的分區按照分配算法分配給消費組的每個消費者。
新的消費者將“消費組管理協議”和“分區分配策略”進行了分離。協調者仍然負責消費組的管理,包括消費者元數據、消費組元數據、消費組狀態機等數據結構的維護。而分區分配的實現則會在消費組的一個主消費者中完成。由于分區分配交由主消費者客戶端完成,但每個消費者為了獲得分區分配結果,還是只能和協調者聯系,因此主消費者在完成分區分配后,還要將分配結果發送回協調者。采用這種方式,每個消費者都需要發送下面兩種請求給協調者。
- 加入組請求。協調者收集消費組的所有消費者,并選舉一個主消費者執行分區分配工作。
- 同步組請求。主消費者完成分區分配,由協調者將分區的分配結果傳播給每個消費者。
消費者發送“加入組請求”給協調者,是為了讓協調者收集所有的消費者。協調者會把消費者成員列表發送給主消費者,這樣主消費者才可以執行分區分配工作。每個消費者發送給協調者的“加入組請求”,都帶有各自的消費者成員元數據。比如,消費者訂閱的的主題、消費組編號、會話超時時間等。“加入組請求”和“加入組響應”的字段如下:
主消費者收到的“加入組響應”帶有所有的消費者成員,它在執行完分區分配工作后,發送給協調者的“同步組請求”帶有分配給每個消費者的分區結果。協調者在收到主消費者的“同步組請求”后,會先將消費組的分配結果持久化,然后才返回“同步組響應”給每個消費者。每個消費者的“同步組響應”只包含分配給這個消費者的分區列表,分區分配算法保證了不同消費者的分區一定是不同的。“同步組請求”和“同步組響應”的字段如下:
注意:“同步組請求”和“同步紐響應”中并沒有消費者成員字段(MemberId,消費組狀態中的不算)。這是因為消費者成員編號在“加入組請求”和“加入紐響應”中已經存在,所以就不需妥了。
消費者發送“同步組請求”,是在它收到協調者的“加入組響應”后才開始的,“加入組請求”和“同步組請求”鏈式依次調用。協調者處理不同消費者的這兩種請求,用消費組狀態機來維護不同的事件。消費組的狀態主要有下面3個。
- “準備再平衡”。新消費者加入組或者舊消費者離開組消費組都需要執行一次再平衡操作。
- “等待同步”。所有消費者都加入組,協調者返回“加入組響應”給每個消費者前,更改狀態為“等待同步飛它表示協調者等待接收主消費者發送的包含消費組分配結果的“同步組請求”。
- “穩定”。協調者返回帶有分區分配結果的“同步組響應”給每個消費者。
協調者除了管理消費者的負載均衡,并最終分配分區給每個消費者,還會接收每個消費者的心跳請求。協調者通過心跳監控消費者成員是否存活:如果消費者沒有在指定的截止時間內發送心跳,協調者認為消費者失敗,將其從消費組中移除,這樣消費組就需要執行再平衡操作。另外,協調者在處理“加入組請求”和“同步組請求”過程中,為了保證參與加入組的消費者及時響應,也會用心跳來監控消費者成員是否還存活。
總結
以上是生活随笔為你收集整理的5.4.7 延迟的心跳的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: MyBatis第N+1种分页方式,全新的
- 下一篇: 设置电脑休眠
