Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)
文章很長,建議收藏起來,慢慢讀! Java 高并發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:
-
免費贈送 經典圖書:《Java高并發核心編程(卷1)》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《Java高并發核心編程(卷2)》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《Netty Zookeeper Redis 高并發實戰》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《SpringCloud Nginx高并發核心編程》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取
推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文
| 1:Redis 分布式鎖 (圖解-秒懂-史上最全) | 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全) |
| 3: Redis與MySQL雙寫一致性如何保證? (面試必備) | 4: 面試必備:秒殺超賣 解決方案 (史上最全) |
| 5:面試必備之:Reactor模式 | 6: 10分鐘看懂, Java NIO 底層原理 |
| 7:TCP/IP(圖解+秒懂+史上最全) | 8:Feign原理 (圖解) |
| 9:DNS圖解(秒懂 + 史上最全 + 高薪必備) | 10:CDN圖解(秒懂 + 史上最全 + 高薪必備) |
| 11: 分布式事務( 圖解 + 史上最全 + 吐血推薦 ) | 12:seata AT模式實戰(圖解+秒懂+史上最全) |
| 13:seata 源碼解讀(圖解+秒懂+史上最全) | 14:seata TCC模式實戰(圖解+秒懂+史上最全) |
| 1: JVM面試題(史上最強、持續更新、吐血推薦) | 2:Java基礎面試題(史上最全、持續更新、吐血推薦 |
| 3:架構設計面試題 (史上最全、持續更新、吐血推薦) | 4:設計模式面試題 (史上最全、持續更新、吐血推薦) |
| 17、分布式事務面試題 (史上最全、持續更新、吐血推薦) | 一致性協議 (史上最全) |
| 29、多線程面試題(史上最全) | 30、HR面經,過五關斬六將后,小心陰溝翻船! |
| 9.網絡協議面試題(史上最全、持續更新、吐血推薦) | 更多專題, 請參見【 瘋狂創客圈 高并發 總目錄 】 |
| nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
| SpringCloud gateway (史上最全) | 更多專題, 請參見【 瘋狂創客圈 高并發 總目錄 】 |
Kafka源碼分析10:副本狀態機ReplicaStateMachine
副本狀態機ReplicaStateMachine管理著 Kafka 集群中所有副本和分區的狀態轉換,是非常核心的一個類。
接下來,帶大家圖解一下此核心類。
ReplicaStateMachine的功能
副本狀態機ReplicaStateMachine的功能:
用于管理副本狀態的轉換。
副本狀態機相關的類
ReplicaStateMachine:
副本狀態機抽象類,定義了一些常用方法(如 startup、 shutdown 等),以及狀態機最重要的處理邏輯方法 handleStateChanges。
ReplicaState:
副本狀態類,其7個子類,定義了 7 種副本狀態。
ReplicaState
副本狀態的接口定義。
ReplicaState 接口定義了每種狀態的序號,以及合法的前 置狀態。
// ReplicaState接口 sealed trait ReplicaState {def state: Byte //每種狀態的序號def validPreviousStates: Set[ReplicaState] //合法的前置狀態 }ReplicaState的 7 種副本狀態實現類
源碼中的 ReplicaState 定義了 7 種副本狀態,如下圖:
狀態的有效前置集合
每一種狀態,都定義了有效前置集合,以OnlineReplica狀態為例。
// OnlineReplica狀態 case object OnlineReplica extends ReplicaState {val state: Byte = 2val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible) }OnlineReplica 的 validPreviousStates 集合里面包含 NewReplica、OnlineReplica、OfflineReplica 和 ReplicaDeletionIneligible。
這說明,Kafka 只允許副本從剛剛這 4 種狀態變更到 OnlineReplica 狀態。如果從 ReplicaDeletionStarted 狀態跳轉到 OnlineReplica 狀態,就是非法的狀態轉換。
ReplicaStateMachine 副本狀態機
ReplicaStateMachine 副本狀態機 是 Kafka Broker 端源碼中控制副本狀態流轉的實現類。
每個 Broker 都會創建這些實例,并不代表每個 Broker 都會啟動副本狀態機。
每個 Broker 啟動時都會創建 ReplicaStateMachine 實例,但是,只有在 Controller 所在的 Broker 上,副本狀態機才會被啟動。
副本狀態機一旦被啟動,就意味著它要行使它最重要的職責了:管理副本狀態的轉換。
圖解:副本狀態的轉換的設計與實現
三種基礎的狀態轉換
當副本對象首次被創建出來后,它會被置于 NewReplica 狀態。
經過初始化之后,當副本對象能夠對外提供服務之后,狀態機會將其調整為 OnlineReplica,并一直以該狀態持
續工作。
如果副本所在的 Broker 關閉或者是因為其他原因不能正常工作了,副本需要從 OnlineReplica 變更為 OfflineReplica,表明副本已處于離線狀態。
四種與副本刪除相關的狀態轉換
一旦開啟了如刪除主題這樣的操作,狀態機會將副本狀態跳轉到 ReplicaDeletionStarted,以表明副本刪除已然開啟。
倘若刪除成功,則置為 ReplicaDeletionSuccessful;
倘若不滿足刪除條件(如所在 Broker 處于下線狀態),那就設置成 ReplicaDeletionIneligible,以便后面重試。
當副本對象被刪除后,其狀態會變更為 NonExistentReplica,副本狀態機將移除該副本數據。
handleStateChange 狀態轉換入口方法
handleStateChange 方法的作用是處理狀態的變更,是對外提供狀態轉換操作的入口方法。
該方法接收三個參數:
-
replicas 是一組副本對象,每個副本對象都封裝了它們各自所屬的主題、分區以及副本所在的 Broker ID 數據;
-
targetState 是這組副本對象要轉換成的目標狀態。
-
callbacks 為狀態轉換之后的回調。
其方法如下:
def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,callbacks: Callbacks = new Callbacks()): Unit = {if (replicas.nonEmpty) {try {// raise error if the previous batch is not empty//為了提高KafkaController Leader和集群其他broker的通信效率,實現批量發送請求的功能// 檢查上一批請求KafkaController請求,如果沒有發送完成,就報錯controllerBrokerRequestBatch.newBatch()// 將所有副本對象按照Broker進行分組,依次執行狀態轉換操作replicas.groupBy(_.replica).map { case (replicaId, replicas) =>val partitions = replicas.map(_.topicPartition)doHandleStateChanges(replicaId, partitions, targetState, callbacks)}// 發送對應的Controller請求給BrokercontrollerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)} catch {case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)}}}代碼邏輯總體上分為兩步:
-
第 1 步是調用 doHandleStateChanges 方法執行真正的副本狀態轉換;
-
第 2 步是給集群中的相應 Broker 批量發送請求。
在執行第 1 步的時候,它會將 replicas 按照 Broker ID 進行分組。
舉個例子,如果我們使用 < 主題名,分區號,副本 Broker ID> 表示副本對象,假設 replicas 為集合:
< 主題名,分區號,副本 Broker ID> <test, 0, 0><test, 0, 1> <test, 1, 0><test, 1, 1>那么,在調用 doHandleStateChanges 方法前,代碼會將 replicas 按照 Broker ID 進行分組,即變成:
Map(0 -> Set(<test, 0, 0>, <test, 1, 0>))Map(1 -> Set(<test, 0, 1>, <test, 1, 1>))待這些都做完之后,代碼開始調用 doHandleStateChanges 方法,執行狀態轉換操作。
doHandleStateChanges 執行狀態轉換操作
這個方法看著很長,其實就是3步。
核心代碼如下:
private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,callbacks: Callbacks): Unit = {//嘗試獲取副本在 Controller 端元數據緩存中的當前狀態val replicas = partitions.map(partition => PartitionAndReplica(partition, replicaId))//如果沒有保存某個副本對象的狀態,代碼會將其初始化為 NonExistentReplica 狀態。replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica))//將副本對象集合劃分成兩部分:能夠合法轉換的副本對象集合,以及執行非法狀態轉換的副本對象集合。val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState))//為執行非法狀態轉換的副本對象集合中的每個副本對象記錄一條錯誤日志invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))//根據要轉換成的目標狀態 ,進入到不同的代碼分支,處理能夠執行合法轉換的副本對象集合的副本的狀態轉換targetState match {case NewReplica =>...case OnlineReplica =>...//七大分支}第 1 步,嘗試獲取副本在 Controller 端元數據緩存中的當前狀態。
如果沒有保存某個副本對象的狀態,代碼會將其初始化為 NonExistentReplica 狀態。
第 2 步,將副本對象集合劃分成兩部分:
-
能夠合法轉換的副本對象集合
-
不能合法轉換的副本對象集合。
代碼根據不同 ReplicaState 中定義的合法前置狀態集合以及傳入的目標狀態(targetState),將副本對象集合劃分成兩部分:能夠合法轉換的副本對象集合,不能合法轉換的副本對象集合。
doHandleStateChanges 方法會為不能合法轉換的副本對象集合中的每個副本對象記錄一條錯誤日志。
第 3 步,根據要轉換成的目標狀態 ,進入到不同的代碼分支,處理能夠執行合法轉換的副本對象集合的副本的狀態轉換。
由于 Kafka 為副本定義了 7 類狀態,因此,這里的代碼分支總共有 7 路。
我挑選幾路最常見的狀態轉換路徑詳細說明下,包括副本被創建時被轉換到 NewReplica 狀態,副本正常工作時被轉換到 OnlineReplica 狀態,副本停止服務后被轉換到 OfflineReplica 狀態。至于剩下的記錄代碼,你可以在課后自行學習下,它們的轉換操作原理大致是相同的。
第 1 路:轉換到 NewReplica 狀態
首先看第 1 路,即狀態轉換的目標狀態是 NewReplica 的場景。流程圖如下:
核心代碼如下:
case NewReplica => // 遍歷所有能夠執行轉換的副本對象 validReplicas.foreach { replica =>// 獲取該副本對象的分區對象,即<主題名,分區號>數據val partition = replica.topicPartition// 嘗試從元數據緩存中獲取該分區當前信息// 包括Leader是誰、ISR都有哪些副本等數據controllerContext.partitionLeadershipInfo.get(partition) match {// 如果成功拿到分區數據信息case Some(leaderIsrAndControllerEpoch) =>// 如果該副本是Leader副本if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")// 記錄錯誤日志。Leader副本不能被設置成NewReplica狀態logFailedStateChange(replica, replicaState(replica), OfflineReplica, exception)} else {// 否則,給該副本所在的Broker發送LeaderAndIsrRequest// 向它同步該分區的數據, 之后給集群當前所有Broker發送// UpdateMetadataRequest通知它們該分區數據發生變更controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),replica.topicPartition,leaderIsrAndControllerEpoch,controllerContext.partitionReplicaAssignment(replica.topicPartition),isNew = true)logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)// 更新緩存中該副本對象的當前狀態為NewReplicareplicaState.put(replica, NewReplica)}case None =>logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)// 更新緩存中該副本對象的當前狀態為NewReplicareplicaState.put(replica, NewReplica)} }這一路主要做的事情是,嘗試從元數據緩存中,獲取這些副本對象的分區信息數據,包括分區的 Leader 副本在哪個 Broker 上、ISR 中都有哪些副本,等等。
如果找不到對應的分區數據,就直接把副本狀態更新為 NewReplica。否則,代碼就需要給該副本所在的 Broker 發送請求,讓它知道該分區的信息。同時,代碼還要給集群所有運行中的 Broker 發送請求,讓它們感知到新副本的加入。
第 2 路:轉換到 OnlineReplica 狀態
再看第2路,即狀態轉換的目標狀態是 OnlineReplica的場景。OnlineReplica是副本對象正常工作時所處的狀態。流程圖如下:
核心代碼如下:
case OnlineReplica =>validReplicas.foreach { replica =>// 獲取副本所在分區val partition = replica.topicPartition// 獲取副本當前狀態replicaState(replica) match {// 如果當前狀態是NewReplicacase NewReplica =>// 從元數據緩存中拿到分區副本列表val assignment = controllerContext.partitionReplicaAssignment(partition)// 如果副本列表不包含當前副本if (!assignment.contains(replicaId)) {// 將該副本加入到副本列表中,并更新元數據緩存中該分區的副本列表controllerContext.partitionReplicaAssignment.put(partition, assignment :+ replicaId)}// 如果當前狀態是其他狀態case _ =>// 嘗試獲取該分區當前信息數據controllerContext.partitionLeadershipInfo.get(partition) match {// 如果存在分區信息// 向該副本對象所在Broker發送請求,令其同步該分區數據case Some(leaderIsrAndControllerEpoch) =>controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),replica.topicPartition,leaderIsrAndControllerEpoch,controllerContext.partitionReplicaAssignment(partition), isNew = false)case None =>}}logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica)// 將該副本對象設置成OnlineReplica狀態replicaState.put(replica, OnlineReplica)}代碼依然會對副本對象進行遍歷,并依次執行下面的幾個步驟。
第 1 步,獲取元數據中該副本所屬的分區對象,以及該副本的當前狀態。
第 2 步,查看當前狀態是否是 NewReplica。
-
如果是,則獲取分區的副本列表,并判斷該副本是否在當前的副本列表中,假如不在,就記錄錯誤日志,并更新元數據中的副本列表;
-
如果狀態不是 NewReplica,就說明,這是一個已存在的副本對象,那么,源碼會獲取對應分區的詳細數據,然后向該副本對象所在的 Broker 發送 LeaderAndIsrRequest 請求,令其同步獲知,并保存該分區數據。
第 3 步,將該副本對象狀態變更為 OnlineReplica。至此,該副本處于正常工作狀態。
第 3 路:轉換到 OfflineReplica 狀態
再看第3路,即狀態轉換的目標狀態是OfflineReplica的場景。OfflineReplica表示副本服務下線時所處的狀態。
如果副本所在的 Broker 關閉或者是因為其他原因不能正常工作了,副本需要從 OnlineReplica 變更為 OfflineReplica,表明副本已處于離線狀態。
第 3 路流程圖如下:
第 3 路流程的核心代碼如下:
case OfflineReplica => validReplicas.foreach { replica =>// 向副本所在Brokers發送StopReplicaRequest請求,停止對應副本controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,deletePartition = false, (_, _) => ()) } // 過濾出:有Leader信息的副本集合 val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)) // 從分區中移除該副本對象并更新ZooKeeper節點 val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition)) // 遍歷每個更新過的分區信息 updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>// 如果分區對應主題并未被刪除if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {// 獲取該分區除給定副本以外的其他副本所在的Brokerval recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)// 向這些Broker發送請求更新該分區更新過的分區LeaderAndIsr數據controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,partition,leaderIsrAndControllerEpoch,controllerContext.partitionReplicaAssignment(partition), isNew = false)}val replica = PartitionAndReplica(partition, replicaId)logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)// 設置該分區給定副本的狀態為OfflineReplicareplicaState.put(replica, OfflineReplica) }首先,代碼會給所有符合狀態轉換的副本所在的 Broker,發送 StopReplicaRequest 請求,顯式地告訴這些 Broker 停掉其上的對應副本。
broker接收到 StopReplica 請求之后,通過Kafka 的副本管理器組件(ReplicaManager)負責處理這個邏輯。也就是說,StopReplicaRequest 被發送出去之后,這些 Broker 上對應的副本就停止工作了。
其次,代碼根據分區是否保存了 Leader 信息,將副本集合劃分成兩個子集:有 Leader 副本集合和無 Leader 副本集合。有 Leader 信息副本集合并不僅僅包含 Leader,還有 ISR 和 controllerEpoch 等數據。
最后,迭代有 Leader信息副本集合,向這些副本所在的 Broker 發送 LeaderAndIsrRequest 請求,去更新停止副本操作之后的分區信息,再把這些分區狀態設置為 OfflineReplica。
總之,把副本狀態變更為 OfflineReplica 的主要邏輯,包含兩個核心工作:
- 停止Broker 上對應副本
- 更新遠端 Broker 元數據。
參考文獻:
https://www.cnblogs.com/boanxin/p/13696136.html
https://www.cnblogs.com/start-from-zero/p/13430611.html
https://blog.csdn.net/lidazhou/article/details/95909496
https://www.jianshu.com/p/5bef1f9f74cd
http://www.louisvv.com/archives/2348.html
http://www.machengyu.net/tech/2019/09/22/kafka-version.html
總結
以上是生活随笔為你收集整理的Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三极管简介及其使用注意事项
- 下一篇: marvell万兆交换机内核编译总结