kafka 同步提交 异步_极限MQ (5) Kafka 消费者
要想知道如何從 Kafka 讀取消息,需要先了解消費(fèi)者和消費(fèi)者群組的概念。
假設(shè)我們有一個(gè)應(yīng)用程序需要從 Kafka 主題讀取消息井驗(yàn)證這些消息,然后再把它們保存起來。應(yīng)用程序需要創(chuàng)建一個(gè)消費(fèi)者對象,訂閱主題并開始接收消息,然后驗(yàn)證消息井保存結(jié)果。
過了一陣子,生產(chǎn)者往主題寫入消息的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這個(gè)時(shí)候該怎么辦?如果只使用單個(gè)消費(fèi)者處理消息,應(yīng)用程序會永遠(yuǎn)跟不上消息生成的速度。顯然,此時(shí)很有必要對消費(fèi)者進(jìn)行橫向伸縮。就像多個(gè)生產(chǎn)者可以向相同的主題寫入消息一樣,我們也可以使用多個(gè)消費(fèi)者從同一個(gè)主題讀取消息,對消息進(jìn)行分流。
Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組里的消費(fèi)者訂閱的是同一個(gè)主題,每個(gè)消費(fèi)者只接收主題的部分分區(qū)的消息。
- 假設(shè)主題 T1 有4 個(gè)分區(qū),我們創(chuàng)建了消費(fèi)者 C1 ,它是群組 G1 里唯一的消費(fèi)者,我們用它訂閱主題 T1 。那么消費(fèi)者 C1 將收到主題 T1 全部 4 個(gè)分區(qū)的消息。
- 如果在群組 G1 里新增一個(gè)消費(fèi)者 C2 ,那么每個(gè)消費(fèi)者將分別從兩個(gè)分區(qū)接收消息。消費(fèi)者 C1 接收分區(qū) 1 和分區(qū) 2 的消息,消費(fèi)者 C2 接收分區(qū) 3 和分區(qū) 4 的消息。
- 如果群組有 4 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者可以分配到一個(gè)分區(qū)。
- 如果我們往群組里添加更多的消費(fèi)者,甚至超過主題的分區(qū)數(shù)量,那么多出來的消費(fèi)者就會被 閑置,不會接收到任何消息。
往群組里增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。
Kafka 消費(fèi)者經(jīng)常會做一些高延遲的操作,比如把數(shù)據(jù)寫到數(shù)據(jù)庫或 HDFS ,或者使用數(shù)據(jù)進(jìn)行比較耗時(shí)的計(jì)算。在這些情況下,單個(gè)消費(fèi)者無法跟上數(shù)據(jù)生成的速度,所以可以增加更多的消費(fèi)者,讓它們分擔(dān) 載,每個(gè)消費(fèi)者只處理部分分區(qū)的消息,這就是橫向伸縮的主要手段。
我們有必要為主題創(chuàng)建大量的分區(qū),這樣在負(fù)載增長時(shí)可以加入更多的消費(fèi)者。不過不要讓消費(fèi)者的數(shù)量超過主題分區(qū)的數(shù)量,多余的消費(fèi)者只會被閑置。
除了通過增加消費(fèi)者來橫向伸縮單個(gè)應(yīng)用程序外,還經(jīng)常出現(xiàn)多個(gè)應(yīng)用程序從同主題讀取數(shù)據(jù)的情況。
實(shí)際上, Kafka 設(shè)計(jì)的主要目標(biāo)之一 ,就是要讓 Kafka 主題里的數(shù)據(jù)能夠滿足企業(yè)各種應(yīng)用場景的需求。在這些場景里,每個(gè)應(yīng)用程序可以獲取到所有的消息, 而不只是其中的一部分。那么只要保證每個(gè)應(yīng)用程序有自己的消費(fèi)者群組,就可以讓它們獲取到主題所有的消息。不同于傳統(tǒng)的消息系統(tǒng),橫向伸縮 Kafka 消費(fèi)者和消費(fèi)者群組并不對性能造成負(fù)面影響。
在上面的例子里,如果新增一個(gè)只包含一個(gè)消費(fèi)者的群組 G2 ,那么這個(gè)消費(fèi)者將從主題 T1上接收所有的消息,與群組 G1之間互不影響。群組 G2 可以增加更多的消費(fèi)者,每個(gè)消費(fèi)者可以消費(fèi)若干個(gè)分區(qū),就像群組 G1 那樣。
簡而言之,為每一個(gè)需要獲取一個(gè)或多個(gè)主題全部消息的應(yīng)用程序創(chuàng)建一個(gè)消費(fèi)者群組, 然后往群組里添加消費(fèi)者來伸縮讀取能力和處理能力,群組里的每個(gè)消費(fèi)者只處理部分消息。
然而,分區(qū)并非如此簡單。當(dāng)一個(gè)群組的消費(fèi)者增加和減少時(shí),當(dāng)這個(gè)主題的分區(qū)數(shù)量變化時(shí),有很多問題要處理。
分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者,這樣的行為被稱為再均衡。
再均衡非常重要, 它為群組帶來了高可用性和伸縮性(可以放心地添加或移除梢費(fèi)者)。
不過在正常情況下,我們并不希望發(fā)生這樣的行為。在再均衡期間,消費(fèi)者無法讀取消息,造成整個(gè)群組小段時(shí)間的不可用。
另外,當(dāng)分區(qū)被重新分配給一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會拖慢程序。
分配分區(qū)是怎樣進(jìn)行的?
當(dāng)消費(fèi)者要加入群組時(shí),它會向群組協(xié)調(diào)器發(fā)送 Join Group 請求。第一個(gè)加入群組的消費(fèi)者將成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列表(列表中包含了所有最近發(fā)送過心跳的消費(fèi)者,它們被認(rèn)為是活躍的), 并負(fù)責(zé)給每一個(gè)消費(fèi)者分配分區(qū)。
消費(fèi)者通過向被指派為群組協(xié)調(diào)器的 broker (不同的群組可以有不同的協(xié)調(diào)器)發(fā)送心跳來維持它們和群組的從屬關(guān)系以及它們對分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送心跳,就被認(rèn)為是活躍的,說明它還在讀取分區(qū)里的消息。消費(fèi)者會在輪詢消息 (為了獲取消息)或提交偏移量時(shí)發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時(shí)間足夠長,會話就會過期,群組協(xié)調(diào)器認(rèn)為它已經(jīng)死亡,就會觸發(fā)一次再均衡。
創(chuàng)建消費(fèi)者對象與創(chuàng)建生產(chǎn)者對象非常相似。
唯一不同的屬性是 group.i.d ,而且甚至不是必填的,不過我們現(xiàn)在姑且認(rèn)為它是必需 。它指定了屬于哪 個(gè)消費(fèi)者群組。創(chuàng)建不屬于任何一個(gè)群組的消費(fèi)者也是可以的,只是這樣不太常見。訂閱主題非常簡單,甚至支持正則。
consumer .subscribe( "test.*" );輪詢
輪訓(xùn)是消費(fèi)者 API 的核心,通過一個(gè)簡單的輪詢向服務(wù)器請求數(shù)據(jù)。一旦消費(fèi)者訂閱 了主題,輪詢就會處理所有的細(xì)節(jié),包括群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù),使用者只需要使用一組簡單的 API 來處理從分區(qū)返回的數(shù)據(jù)即可。消費(fèi)者代碼的主要部分如下所示:
try{while (true) {ConsumerRecords<String String> records = consumer.poll(100);for(ConsumerRecord<String String> record :records){int updatedCount = 1;if(custCountryMap.countainsValue(record.value()) {updatedCount = custCountryMap.get (record.value() + 1);}custCountryMap.put(record.value(), updatedCount)System.out.println(custCountryMap);} finally {consumer.close(); }- 一個(gè)無線循環(huán)。
- poll 方法非常重要。就像鯊魚停止移動就會死掉一樣,消費(fèi)者必須持續(xù)對 Kafka 進(jìn)行輪詢,否則會被認(rèn)為己經(jīng)死亡 ,其負(fù)責(zé)的分區(qū)會被移交給群組里的其他消費(fèi)者。傳給 poll 方法 參數(shù)是一個(gè)超時(shí)時(shí)間,用于控制 poll 方法的阻塞時(shí)間。如果該參數(shù)被設(shè)為 0, poll 會立即返回。
- poll 返回的是記錄列表。每條記錄都包含了記錄所屬主題的信息、分區(qū)的信息、分區(qū)偏移量 ,以及記錄的鍵值對。一般會遍歷這個(gè)列表 ,逐條處理這些記錄。
- 一般落地的處理結(jié)果就是結(jié)果保存起來或者對已有的記錄進(jìn)行更新,處理過程也隨之結(jié)束。
- 在退出應(yīng)用程序之前會觸發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳井認(rèn)定它已死亡, 因?yàn)槟菢有枰L的時(shí)間,導(dǎo)致整個(gè)群組在一段時(shí)間內(nèi)無法讀取消息。
在第一次調(diào)用新消費(fèi)者的 poll 方法時(shí),它會負(fù)責(zé)查找 GroupCoordinator 然后加入群組,接受分配分區(qū)。 如果發(fā)生了再均衡,整個(gè)過程也會在輪詢期間進(jìn)行 。當(dāng)然 ,心跳也會在輪詢里發(fā)迭出去的。
注意,一個(gè)消費(fèi)者活在一個(gè)獨(dú)立的線程里。
提交和偏移量
Kafka 不會像其它隊(duì)列那樣需要得到消費(fèi)者的確認(rèn),這是 Kafka 的獨(dú)特之處。
我們把更新分區(qū)當(dāng)前位置的操作叫作提交。
那么消費(fèi)者是如何提交偏移量的呢?
消費(fèi)者往一個(gè)叫作 _consumer_offset d 特殊主題發(fā)送消息,消息里包含每個(gè)分區(qū)的偏移量。
如果消費(fèi)者一直處于運(yùn)行狀態(tài),那么偏移量就沒有什么用處。不過,如果發(fā)生崩壞或者有新的消費(fèi)者加入群組,就會觸發(fā)再均衡,完成再均衡之后,每個(gè)消費(fèi)者可能分配到新的分區(qū),而不是之前處理的那個(gè)。
為了能夠繼續(xù)之前的工作,消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交的偏移量,然后從偏移量指定的地方繼續(xù)處理。
但是,
如果提交的偏移量小于客戶端處理的最后一個(gè)消息的偏移量 ,那么處于兩個(gè)偏移量之間的消息就會被重復(fù)處理。
如果提交的偏移量大于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息將會丟失。
處理偏移量的方式對客戶端會有很大的影響。
自動提交
最簡單的提交方式是讓悄費(fèi)者自動提交偏移量。
如果 enable.auto.commit 被設(shè)為 true ,那么每過5s,消費(fèi)者會自動把從 poll 方法接收到的最大偏移量提交上去。自動提交也是在輪詢里進(jìn)行的。
可是,假設(shè)使用默認(rèn)的 5s 提交時(shí)間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡。再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開始讀取消息。這個(gè)時(shí)候偏移量已經(jīng)落后 3s ,所以在這 3s 內(nèi)到達(dá)的消息會被重復(fù)處理。
自動提交不能避免重復(fù)消息。
開發(fā)者同步提交當(dāng)前偏移量
把 enable.auto.commit 設(shè)為 false ,讓應(yīng)用程序決定時(shí)提交偏移量。使用 commit.Sync() 提交偏移量。
commit.Sync() 將會提交由 poll 返回的最新偏移量。
如果發(fā)生了再均衡,從最近一批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理。
異步提交
手動提交有一個(gè)不足之處在對提交請求作出回應(yīng)之前,應(yīng)用程序會阻塞,這會限制應(yīng)用程序的吞吐量。
可以通過降低提交頻率來提升吞吐,但如果發(fā)生了再均衡, 會增加重復(fù)消息的數(shù)量。
可以使用異步提交。只管發(fā)送提交請求,無需等待 broker 的響應(yīng)。
commit.Async();
在成功提交或碰到無怯恢復(fù)的錯誤之前, 同步方法會一直重試,但是異步方法不會。
同步和異步組合提交
般情況下,針對偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會有太大問題。
但如果這是發(fā)生在關(guān)閉消費(fèi)者或均衡前的最后一次提交,就要確保能夠提交成功。 在消費(fèi)者關(guān)閉前一般會組合使用兩種方式。
總結(jié)
以上是生活随笔為你收集整理的kafka 同步提交 异步_极限MQ (5) Kafka 消费者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: powwr shell_Powershe
- 下一篇: css 剪辑图片_CSS中的clip-p