Consumer设计-high/low Level Consumer
1 Producer和Consumer的數(shù)據(jù)推送拉取方式
??Producer Producer通過主動Push的方式將消息發(fā)布到Broker n Consumer Consumer通過Pull從Broker消費數(shù)據(jù)
? Push? 優(yōu)勢:延時低
? ? ? ? ? 劣勢:可能造成Consumer來不及處理消息;網(wǎng)絡(luò)擁塞?
? Pull? ?優(yōu)勢:Consumer按實際處理能力獲取相應(yīng)量的數(shù)據(jù);Broker實現(xiàn)簡單
? ? ? ? ? 劣勢:如果處理不好,實時性相對不足(例如需要大量不斷請求浪費資源,Kafka使用long polling,一次請求無果等待一段時間從而減少請求次數(shù))。
2? High Level Consumer
? ?場景:客戶程序只是希望從Kafka順序讀取并處理數(shù)據(jù),而不太關(guān)心具體的offset。?
? ? ? ? ? ? 也希望提供一些語義,例如同一條消息只被某一個Consumer消費(單播)或被所有Consumer消費(廣播)。
? ? ? Kafka High Level API提供了一個從Kafka消費數(shù)據(jù)的高層抽象,從而屏蔽掉其中的細(xì)節(jié),并提供豐富的語義。
?(1)Consumer Group? ?理解consumer group記住下面這三個特性就好了:consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進(jìn)程,也可以是一個線程;group.id是一個字符串,唯一標(biāo)識一個consumer group;consumer group下訂閱的topic下的每個分區(qū)只能分配給某個group下的一個consumer(當(dāng)然該分區(qū)還可以被分配給其他group);
? ? ? ? High Level Consumer將從某個Partition讀取的最后一條 消息的offset存于Zookeeper中。
? ? ? ? 這個offset基于客戶程序提供給Kafka的名字來保存,這個 名字被稱為Consumer Group。
? ? ? ? Consumer Group是整個Kafka集群全局唯一的,而非針對某個Topic。
? ? ? ? 每個High Level Consumer實例都屬于一個Consumer Group,若不指定則屬于默認(rèn)的Group。
? ? ? ? ? ? ? ? ??
??
? ? ? ? ? ??
? ? ? ? ? 很多傳統(tǒng)的Message Queue都會在消息被消費完后將消息刪除,一方面避免重復(fù)消費,另一方面可以保證Queue的長度比較短,提高效率。kafka會采用兩種,
刪除(過期或過大)和壓縮,壓縮如下。
?
*消息被消費后,并不會被刪除,只是相應(yīng)的offset加一。
? ? ? ? ? ? *對于每條消息,在同一個Consumer Group里只會被一個Consumer消費
? ? ? ? ? ? *不同Consumer Group可消費同一條消息 。
? ? (2)High Level Consumer Rebalance?
? ? ? ?Kafka保證同一Consumer Group中只有一個Consumer會消費某條消息,實際上,Kafka保證的是穩(wěn)定狀態(tài)下每一個Consumer實例只會消費某一個或多個特定Partition的數(shù)據(jù),而某個Partition的數(shù)據(jù)只會被某一個特定的Consumer實例所消費。也就是說Kafka對消息的分配是以Partition為單位分配的,而非以每一條消息作為分配單元。這樣設(shè)計的劣勢是無法保證同一個Consumer Group里的Consumer均勻消費數(shù)據(jù),優(yōu)勢是每個Consumer不用都跟大量的Broker通信,減少通信開銷,同時也降低了分配難度,實現(xiàn)也更簡單。另外,因為同一個Partition里的數(shù)據(jù)是有序的,這種設(shè)計可以保證每個Partition里的數(shù)據(jù)可以被有序消費。
如果某Consumer Group中Consumer(每個Consumer只創(chuàng)建1個MessageStream)數(shù)量少于Partition數(shù)量,則至少有一個Consumer會消費多個Partition的數(shù)據(jù),如果Consumer的數(shù)量與Partition數(shù)量相同,則正好一個Consumer消費一個Partition的數(shù)據(jù)。而如果Consumer的數(shù)量多于Partition的數(shù)量時,會有部分Consumer無法消費該Topic下任何一條消息。
Consumer Rebalance算法
將目標(biāo)Topic下的所有Partirtion排序,存于PT
對某Consumer Group下所有Consumer排序,存于CG,
第i個Consumer記為Ci
? ? ? N=size(PT)/size(CG) ,向上取整
解除Ci對原來分配的Partition的消費權(quán)(i從0開始)
? ?將第 i?N 到(i+1)?N?1個Partition分配給Ci
? ? ? 在這種策略下,每一個Consumer或者Broker的增加或者減少都會觸發(fā)Consumer Rebalance。因為每個Consumer只負(fù)責(zé)調(diào)整自己所消費的Partition,為了保證整個Consumer Group的一致性,當(dāng)一個Consumer觸發(fā)了Rebalance時,該Consumer Group內(nèi)的其它所有其它Consumer也應(yīng)該同時觸發(fā)Rebalance。因此有以下缺點
Herd effect:任何Broker或者Consumer的增減都會觸發(fā)所有的Consumer的Rebalance
Split Brain:每個Consumer分別單獨通過Zookeeper判斷哪些Broker和Consumer 宕機了,那么不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正確的Reblance嘗試。
調(diào)整結(jié)果不可控:所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會導(dǎo)致Kafka工作在一個不正確的狀態(tài)。
? ? ? 0.9以后的版本,提供了coordinator來解決上述缺點。
3? ?coordinator? ? 和Rebalance
? ??新consumer加入組、已有consumer主動離開組或已有consumer崩潰的時候,會觸發(fā)rebalance。每個consumer group都會被分配一個這樣的coordinator用于組管理和位移管理。這個group coordinator比原來承擔(dān)了更多的責(zé)任,比如組成員管理、位移提交保護(hù)機制等。當(dāng)新版本consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之后該group內(nèi)的所有成員都會和該coordinator進(jìn)行協(xié)調(diào)通信。這種coordinator設(shè)計不再需要zookeeper了,性能上可以得到很大的提升。
? ? * generation:它表示了rebalance之后的一代成員,主要是用于保護(hù)consumer group,隔離無效offset提交的。比如上一代的consumer成員是無法提交位移到新一屆的consumer group中。有時候報ILLEGAL_GENERATION的錯誤就是代錯誤。每次group進(jìn)行rebalance之后,generation號都會加1,表示group進(jìn)入到了一個新的版本,如下圖所示: Generation 1時group有3個成員,隨后成員2退出組,coordinator觸發(fā)rebalance,consumer group進(jìn)入Generation 2,之后成員4加入,再次觸發(fā)rebalance,group進(jìn)入Generation 3。
? ??
? ? ?* 協(xié)議 :rebalance本質(zhì)上是一組協(xié)議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協(xié)議來處理與consumer group coordination相關(guān)的問題。? ?
Heartbeat請求:consumer需要定期給coordinator發(fā)送心跳來表明自己還活著
LeaveGroup請求:主動告訴coordinator我要離開consumer group
SyncGroup請求:group leader把分配方案告訴組內(nèi)所有成員
JoinGroup請求:成員請求加入組
DescribeGroup請求:顯示組的所有信息,包括成員信息,協(xié)議名稱,分配方案,訂閱信息等。通常該請求是給管理員使用。
? ?* 狀態(tài):和很多kafka組件一樣,group也做了個狀態(tài)機來表明組狀態(tài)的流轉(zhuǎn)。coordinator根據(jù)這個狀態(tài)機會對consumer group做不同的處理,如下
? ? ?
Dead:組內(nèi)已經(jīng)沒有任何成員的最終狀態(tài),組的元數(shù)據(jù)也已經(jīng)被coordinator移除了。這種狀態(tài)響應(yīng)各種請求都是一個response: UNKNOWN_MEMBER_ID
Empty:組內(nèi)無成員,但是位移信息還沒有過期。這種狀態(tài)只能響應(yīng)JoinGroup請求
PreparingRebalance:組準(zhǔn)備開啟新的rebalance,等待成員加入
AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
Stable:rebalance完成可以開始消費
? ?* 過程 :加入,移除,崩潰幾種圖如下
1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發(fā)送JoinGroup請求,請求入組。一旦所有成員都發(fā)送了JoinGroup請求,coordinator會從中選擇一個consumer擔(dān)任leader的角色,并把組成員信息以及訂閱信息發(fā)給leader——注意leader和coordinator不是一個概念。leader負(fù)責(zé)消費分配方案的制定。
2 Sync,這一步leader開始分配消費方案,即哪個consumer負(fù)責(zé)消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進(jìn)SyncGroup請求中發(fā)給coordinator,非leader也會發(fā)SyncGroup請求,只是內(nèi)容為空。coordinator接收到分配方案之后會把方案塞進(jìn)SyncGroup的response中發(fā)給各個consumer。這樣組內(nèi)的所有成員就都知道自己應(yīng)該消費哪些分區(qū)了。
? ? ? 新增consumer:
? ? ? ?
? ? 移除consumer:
? ?
? ?consumer崩掉:
? ? ?
?
? ? 參考:https://www.cnblogs.com/byrhuangqiang/p/6384986.html,https://www.cnblogs.com/huxi2b/p/6223228.html,http://www.jasongj.com/2015/08/09/KafkaColumn4/
4 low consumer
使用Low Level Consumer (Simple Consumer)的主要原因是,用戶希望比Consumer Group更好的控制數(shù)據(jù)的消費, 如
? ? ?*同一條消息讀多次,方便Replay
? ? ?*只消費某個Topic的部分Partition
? ? ?*管理事務(wù),從而確保每條消息被處理一次(Exactly once)
? ? ?*與High Level Consumer相對,Low Level Consumer要求用戶做大量的額外工作
? ? ?*在應(yīng)用程序中跟蹤處理offset,并決定下一條消費哪條消息
? ? ?*獲知每個Partition的Leader
? ? ?*處理Leader的變化
? ? ?*處理多Consumer的協(xié)作
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/lkdirk/p/8645755.html
總結(jié)
以上是生活随笔為你收集整理的Consumer设计-high/low Level Consumer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【页面传值6种方式】- 【JSP 页面传
- 下一篇: webservice入门程序学习中经验总