Kafka Partition重分配流程简析
節(jié)日快樂(lè)~
今天是屬于廣大程序員的節(jié)日,祝自己快樂(lè)hhhhhh
隨著業(yè)務(wù)量的急速膨脹和又一年雙11的到來(lái),我們會(huì)對(duì)現(xiàn)有的Kafka集群進(jìn)行擴(kuò)容,以應(yīng)對(duì)更大的流量和業(yè)務(wù)尖峰。當(dāng)然,擴(kuò)容之后的新Kafka Broker默認(rèn)是不會(huì)有任何Topic和Partition的,需要手動(dòng)利用分區(qū)重分配命令kafka-reassign-partitions將現(xiàn)有的Partition/Replica平衡到新的Broker上去。那么Kafka具體是如何執(zhí)行重分配流程的呢?本文就來(lái)簡(jiǎn)單解讀一下。
生成、提交重分配方案
我們知道,使用kafka-reassign-partitions命令分為三步,一是根據(jù)指定的Topic生成JSON格式的重分配方案(--generate),二是將生成的方案提交執(zhí)行(--execute),三是觀察重分配的進(jìn)度(--verify),它們分別對(duì)應(yīng)kafka.admin.ReassignPartitionsCommand類中的generateAssignment()、executeAssignment()和verifyAssignment()方法。
generateAssignment()方法會(huì)調(diào)用AdminUtils#assignReplicasToBrokers()方法生成Replica分配方案。源碼就不再讀了,其原則簡(jiǎn)述如下:
- 將Replica盡量均勻地分配到各個(gè)Broker上去;
- 一個(gè)Partition的所有Replica必須位于不同的Broker上;
- 如果Broker有機(jī)架感知(rack aware)的信息,將Partition的Replica盡量分配到不同的機(jī)架。
executeReassignment()方法調(diào)用了reassignPartitions()方法,其源碼如下。
def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {maybeThrottle(throttle)try {val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }if (validPartitions.isEmpty) falseelse {if (proposedReplicaAssignment.nonEmpty) {val adminClient = adminClientOpt.getOrElse(throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))val alterReplicaDirResult = adminClient.alterReplicaLogDirs(proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {try {future.get()throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +s" Reassign replica to another log directory on the same broker is currently not supported.")} catch {case t: ExecutionException =>t.getCause match {case e: ReplicaNotAvailableException => // It is OK if the replica is not availablecase e: Throwable => throw e}}}}}val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)true}} catch {// ......} }在進(jìn)行必要的Partition校驗(yàn)之后,創(chuàng)建ZK持久節(jié)點(diǎn)/admin/reassign_partitions,并將JSON格式的重分配方案寫進(jìn)去。如果該節(jié)點(diǎn)存在,就表示已經(jīng)在進(jìn)行重分配,不能再啟動(dòng)新的重分配流程(相關(guān)的判斷在executeReassignment()方法中)。
監(jiān)聽(tīng)并處理重分配事件
在之前講解Kafka Controller時(shí),筆者提到Controller會(huì)注冊(cè)多個(gè)ZK監(jiān)聽(tīng)器,將監(jiān)聽(tīng)到的事件投遞到內(nèi)部的事件隊(duì)列,并由事件處理線程負(fù)責(zé)處理。監(jiān)聽(tīng)ZK中/admin/reassign_partitions節(jié)點(diǎn)的監(jiān)聽(tīng)器為PartitionReassignmentListener,并產(chǎn)生PartitionReassignment事件,處理邏輯如下。
case object PartitionReassignment extends ControllerEvent {def state = ControllerState.PartitionReassignmentoverride def process(): Unit = {if (!isActive) returnval partitionReassignment = zkUtils.getPartitionsBeingReassigned()val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))partitionsToBeReassigned.foreach { case (partition, context) =>if(topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {error(s"Skipping reassignment of partition $partition since it is currently being deleted")removePartitionFromReassignedPartitions(partition)} else {initiateReassignReplicasForTopicPartition(partition, context)}}} }該方法先取得需要重分配的Partition列表,然后從中剔除掉那些已經(jīng)被標(biāo)記為刪除的Topic所屬的Partition,再調(diào)用initiateReassignReplicasForTopicPartition()方法:
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) {val newReplicas = reassignedPartitionContext.newReplicasval topic = topicAndPartition.topicval partition = topicAndPartition.partitiontry {val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)assignedReplicasOpt match {case Some(assignedReplicas) =>if (assignedReplicas == newReplicas) {throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))} else {info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))// first register ISR change listenerwatchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)// mark topic ineligible for deletion for the partitions being reassignedtopicDeletionManager.markTopicIneligibleForDeletion(Set(topic))onPartitionReassignment(topicAndPartition, reassignedPartitionContext)}case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist".format(topicAndPartition))}} catch {case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)// remove the partition from the admin path to unblock the admin clientremovePartitionFromReassignedPartitions(topicAndPartition)} }該方法的執(zhí)行邏輯如下:
執(zhí)行重分配流程
onPartitionReassignment()方法的代碼如下。
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {val reassignedReplicas = reassignedPartitionContext.newReplicasif (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned not yet caught up with the leader")val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSetval newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet//1. Update AR in ZK with OAR + RAR.updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),newAndOldReplicas.toSeq)//3. replicas in RAR - OAR -> NewReplicastartNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned to catch up with the leader")} else {//4. Wait until all replicas in RAR are in sync with the leader.val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet//5. replicas in RAR -> OnlineReplicareassignedReplicas.foreach { replica =>replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,replica)), OnlineReplica)}//6. Set AR to RAR in memory.//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and// a new AR (using RAR) and same isr to every broker in RARmoveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)//10. Update AR in ZK with RAR.updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)//11. Update the /admin/reassign_partitions path in ZK to remove this partition.removePartitionFromReassignedPartitions(topicAndPartition)info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))controllerContext.partitionsBeingReassigned.remove(topicAndPartition)//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every brokersendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completedtopicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))} }官方JavaDoc比較詳細(xì),給出了3個(gè)方便解釋流程的定義,列舉如下:
- RAR(Re-assigned replicas):重分配的Replica集合,記為reassignedReplicas;
- OAR(Original assigned replicas):重分配之前的原始Replica集合,通過(guò)controllerContext.partitionReplicaAssignment()方法取得;
- AR(Assigned replicas):當(dāng)前的Replica集合,隨著重分配的進(jìn)行不斷變化。
根據(jù)上文的代碼和注釋,我們可以很容易地梳理出重分配的具體流程:
(0) 檢查RAR是否都已經(jīng)在Partition的ISR集合中(即是否已經(jīng)同步),若否,則計(jì)算RAR與OAR的差集,也就是需要被創(chuàng)建或者重分配的Replica集合;
(1) 計(jì)算RAR和OAR的并集,即所有Replica的集合,并將ZK中的AR更新;
(2) 增加Partition的Leader紀(jì)元值,并向AR中的所有Replica所在的Broker發(fā)送LeaderAndIsrRequest;
(3) 更新RAR與OAR的差集中Replica的狀態(tài)為NewReplica,以觸發(fā)這些Replica的創(chuàng)建或同步;
(4) 計(jì)算OAR和RAR的差集,即重分配過(guò)程中需要被下線的Replica集合;
(5) 等待RAR都已經(jīng)在Partition的ISR集合中,將RAR中Replica的狀態(tài)設(shè)置為OnlineReplica,表示同步完成;
(6) 將遷移現(xiàn)場(chǎng)的AR更新為RAR;
(7) 檢查Partition的Leader是否在RAR中,如果沒(méi)有,則觸發(fā)新的Leader選舉。然后增加Partition的Leader紀(jì)元值,發(fā)送LeaderAndIsrRequest更新Leader的結(jié)果;
(8~9) 將OAR和RAR的差集中的Replica狀態(tài)設(shè)為Offline->NonExistentReplica,這些Replica后續(xù)將被刪除;
(10) 將ZK中的AR集合更新為RAR;
(11) 一個(gè)Partition重分配完成,更新/admin/reassign_partitions節(jié)點(diǎn)中的執(zhí)行計(jì)劃,刪掉完成的Partition;
(12) 發(fā)送UpdateMetadataRequest給所有Broker,刷新元數(shù)據(jù)緩存;
(13) 如果有一個(gè)Topic已經(jīng)重分配完成并且將被刪除,就將它從不可刪除的Topic集合中移除。
The End
最后一個(gè)小問(wèn)題:Partition重分配往往會(huì)涉及大量的數(shù)據(jù)交換,有可能會(huì)影響正常業(yè)務(wù)的運(yùn)行,如何避免呢?ReassignPartitionsCommand也提供了throttle功能用于限流,在代碼和幫助文檔中都可以看到它,就不多講了。當(dāng)然,一旦啟用了throttle,我們一定要定期進(jìn)行verify操作,防止因?yàn)橄蘖鲗?dǎo)致重分配的Replica一直追不上Leader的情況發(fā)生。
民那晚安晚安。
總結(jié)
以上是生活随笔為你收集整理的Kafka Partition重分配流程简析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【题解】CF1550F Jumping
- 下一篇: 洛谷P1550 打井