Kafka controller重设计
本文主要參考社區0.11版本Controller的重設計方案,試圖給大家梳理一下Kafka controller這個組件在設計上的一些重要思考。眾所周知,Kafka中有個關鍵組件叫controller,負責管理和協調Kafka集群。網上關于controller的源碼分析也有很多,本文就不再大段地列出代碼重復做這件事情了。實際上,對于controller的代碼我一直覺得寫的非常混亂,各種調用關系十分復雜,想要完整地理解它的工作原理確實不易。好在我們就是普通的使用者,大致了解controller的工作原理即可。下面我就帶各位簡要了解一下當前Kafka controller的原理架構以及社區為什么要在大改controller的設計。
Controller是做什么的
“負責管理和協調Kafka集群”的說法實在沒有什么營養,上點干貨吧——具體來說Controller目前主要提供多達10種的Kafka服務功能的實現,它們分別是:
- UpdateMetadataRequest:更新元數據請求。topic分區狀態經常會發生變更(比如leader重新選舉了或副本集合變化了等)。由于當前clients只能與分區的leader broker進行交互,那么一旦發生變更,controller會將最新的元數據廣播給所有存活的broker。具體方式就是給所有broker發送UpdateMetadataRequest請求
- CreateTopics: 創建topic請求。當前不管是通過API方式、腳本方式抑或是CreateTopics請求方式來創建topic,做法幾乎都是在Zookeeper的/brokers/topics下創建znode來觸發創建邏輯,而controller會監聽該path下的變更來執行真正的“創建topic”邏輯
- DeleteTopics:刪除topic請求。和CreateTopics類似,也是通過創建Zookeeper下的/admin/delete_topics/<topic>節點來觸發刪除topic,controller執行真正的邏輯
- 分區重分配:即kafka-reassign-partitions腳本做的事情。同樣是與Zookeeper結合使用,腳本寫入/admin/reassign_partitions節點來觸發,controller負責按照方案分配分區
- Preferred leader分配:preferred leader選舉當前有兩種觸發方式:1. 自動觸發(auto.leader.rebalance.enable = true);2.?kafka-preferred-replica-election腳本觸發。兩者“玩法”相同,向Zookeeper的/admin/preferred_replica_election寫數據,controller提取數據執行preferred leader分配
- 分區擴展:即增加topic分區數。標準做法也是通過kafka-reassign-partitions腳本完成,不過用戶可直接往Zookeeper中寫數據來實現,比如直接把新增分區的副本集合寫入到/brokers/topics/<topic>下,然后controller會為你自動地選出leader并增加分區
- 集群擴展:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工作
- broker崩潰:同樣地,controller通過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可立即感知并為受影響分區選舉新的leader
- ControlledShutdown:broker除了崩潰,還能“優雅”地退出。broker一旦自行終止,controller會接收到一個ControlledShudownRequest請求,然后controller會妥善處理該請求并執行各種收尾工作
- Controller leader選舉:controller必然要提供自己的leader選舉以防這個全局唯一的組件崩潰宕機導致服務中斷。這個功能也是通過Zookeeper的幫助實現的
Controller當前設計
當前controller啟動時會為集群中所有broker創建一個各自的連接。這么說吧,假設你的集群中有100臺broker,那么controller啟動時會創建100個Socket連接(也包括與它自己的連接!)。當前新版本的Kafka統一使用了NetworkClient類來建模底層的網絡連接(有興趣研究源碼的可以去看下這個類,它主要依賴于Java NIO的Selector)。Controller會為每個連接都創建一個對應的請求發送線程,專門負責給對應的broker發送請求。也就是說,如果還是那100臺broker,那么controller啟動時還會創建100個RequestSendThread線程。當前的設計中Controller只能給broker發送三類請求,它們是:
- UpdateMetadataRequest:更新元數據
- LeaderAndIsrRequest:創建分區、副本以及完成必要的leader和/或follower角色的工作
- StopReplicaRequest:停止副本請求,還可能刪除分區副本
Controller通常都是發送請求給broker的,只有上面談到的controller 10大功能中的ControlledShutdownRequest請求是例外:這個請求是待關閉的broker通過RPC發送給controller的,即它的方向是反的。另外這個請求還有一個特別之處就是其他所有功能或是請求都是通過Zookeeper間接與controller交互的,只有它是直接與controller進行交互的。
Controller組成
構成controller的組件太多了,多到我已經不想用文字表達了,直接上圖吧:
其中比較重要的組件包括:
- ControllerContext:可以說是controller的緩存。當前controller為人詬病的原因之一就是用了大量的同步機制來保護這個東西。ControllerContext的構成如下圖所示:
緩存內容十分豐富,這也是controller可以協調管理整個cluster的基礎。
- TopicDeletionManager:負責刪除topic的組件
- ****Selector:controller提供的各種功能的leader選舉器
- ****Listener:controller注冊的各種Zookeeper監聽器。想要讓controller無所不能,必然要注冊各種"觸角" 才能實時感知各種變化
Controller當前問題
?不謙虛地說,我混跡社區也有些日子了。在里面碰到過很多關于controller的bug。社區對于這些bug有個很共性的特點,那就是沒有什么人愿意(敢去)改這部分代碼,因為它實在是太復雜了。具體的問題包括:
1. 需要在多線程間共享狀態
編寫正確的多線程程序一直是Java開發者的痛點。在Controller的實現類KafkaController中創建了很多線程,比如之前提到的RequestSendThread線程,另外ZkClient也會創建單獨的線程來處理zookeeper回調,這還不算TopicDeletionManager創建的線程和其他IO線程等。幾乎所有這些線程都需要訪問ControllerContext(RequestSendThread只操作它們專屬的請求隊列,不會訪問ControllerContext),因此必要的多線程同步機制是一定需要的。當前是使用controllerLock鎖來實現的,因此可以說沒有并行度可言。
2. 代碼組織混亂
看過源代碼的人相信對這一點深有體會。KafkaController、PartitionStateMachine和ReplicaStateMachine每個都是500+行的大類且彼此混調的現象明顯,比如KafkaController的stopOldReplicasOfReassignedPartition方法調用ReplicaStateMachine的handleStateChanges方法,而后者又會調用KafkaController的remoteReplicaFromIsr方法。類似的情況還發生在KafkaController和ControllerChannelManager之間。
3. 管理類請求與數據類請求未分離
當前broker對入站請求類型不做任何優先級處理,不論是PRODUCE請求、FETCH請求還是Controller類的請求。這就可能造成一個問題:即clients發送的數據類請求積壓導致controller推遲了管理類請求的處理。設想這樣的場景,假設controller向broker廣播了leader發生變更。于是新leader開始接收clients端請求,而同時老leader所在的broker由于出現了數據類請求的積壓使得它一直忙于處理這些請求而無法處理controller發來的LeaderAndIsrRequest請求,因此這是就會出現“雙主”的情況——也就是所謂的腦裂。此時倘若client發送的一個PRODUCE請求未指定acks=-1,那么因為日志水位截斷的緣故這個請求包含的消息就可能“丟失”了。現在社區中關于controller丟失數據的bug大多是因為這個原因造成的。
4. Controller同步寫Zookeeper且是一個分區一個分區地寫
當前controller操作Zookeeper是通過ZkClient來完成的。ZkClient目前是同步寫入Zookeeper,而同步通常意味著性能不高。更為嚴重的是,controller是一個分區一個分區進行寫入的,對于分區數很多的集群來說,這無疑是個巨大的性能瓶頸。如果用戶仔細查看源代碼,可以發現PartitionStateMachine的electLeaderForPartition就是一個分區一個分區地選舉的。
5. Controller按照一個分區一個分區的發送請求
Controller當前發送請求都是按照分區級別發送的,即一個分區一個分區地發送。沒有任何batch或并行可言,效率很低。
6. Controller給broker的請求無版本號信息
這里的版本號類似于new consumer的generation,總之是要有一種機制告訴controller broker的版本信息。因為有些情況下broker會處理本已過期或失效的請求導致broker狀態不一致。舉個例子,如果一個broker正常關閉過程中“宕機”了,那么重啟之后這個broker就有可能處理之前controller發送過來的StopReplicaRequest,導致某些副本被置成offline從而無法使用。而這肯定不是我們希望看到的結果,對吧?
7. ZkClient阻礙狀態管理
Contoller目前是使用了ZkClient這個開源工具,它可以自動重建會話并使用特有的線程順序處理所有的Zookeeper監聽消息。因為是順序處理,它就有可能無法及時響應最新的狀態變更導致Kafka集群狀態的不一致。
Controller改進方案
1. 單線程事件模型
和new consumer類似,controller摒棄多線程的模型,采用單線程的事件隊列模型。這樣簡化了設計同時也避免了復雜的同步機制。各位在最新的trunk分支上已然可以看到這種變化:增加了ControllerEventManager類以及對應的ControllerEventThread線程類專門負責處理ControllerEvent。目前總共有9種controller event,它們分別是:
- Idle
- ControllerChange
- BrokerChange
- TopicChange
- TopicDeletion
- PartitionReassignment
- AutoLeaderBalance
- ManualLeaderBalance
- ControlledShutdown
- IsrChange
我們基本上可以從名字就能判斷出它們分別代表了什么事件。
2. 使用Zookeeper的async API
將所有同步操作Zookeeper的地方都改成異步調用+回調的方式。實際上Apache Zookeeper客戶端執行請求的方式有三種:同步、異步和batch。通常以batch性能最好,但Kafka社區目前還是傾向于用async替換sync。畢竟實現起來相對簡單同時性能上也能得到不少提升。
3. 重構狀態管理
可能摒棄之前狀態機的方式,采用和GroupCoordinator類似的方式,讓controller保存所有的狀態并且負責狀態的流轉以及狀態流轉過程中的邏輯。當然,具體的實現還要再結合0.11最終代碼才能確定。
4. 對請求排定優先級
對管理類請求和數據類請求區分優先級。比如使用優先級隊列替換現有的BlockingQueue——社區應該已經實現了這個功能,開發了一個叫PrioritizationAwareBlockingQueue的類來做這件事情,后續大家可以看下這個類的源代碼
5. 為controller發送的請求匹配broker版本信息
為broker設定版本號(generation id)。如果controller發送過來的請求中包含的generation與broker自己的generation不匹配, 那么broker會拒絕該請求。
6. 拋棄ZkClient,使用原生Zookeeper client
ZkClient是同步順序處理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API還能夠在接收到狀態變更通知時便馬上開始處理,而ZkClient的特定線程則必須要在隊列中順序處理到這條變更消息時才能處理。
結語
以上就是關于Kafka controller的一些討論,包括了它當前的組件構成、設計問題以及對應的改進方案。有很多地方可能理解的還不是透徹,期待著在Kafka 0.11正式版本中可以看到全新的controller組件。
總結
以上是生活随笔為你收集整理的Kafka controller重设计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: day2编写购物商城(1)
- 下一篇: 【码云周刊第 24 期】超实用 Ando