kafka rebalance 部分分区没有owner
?轉發請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/6234673.html ? ??
? ? ? 最近業務同學反饋kafka上線的時候某個topic的部分分區一直沒有owner注冊上,監控界面形式如圖,其中分區5和7無法被消費者注冊到,重啟客戶端程序rebalance依舊是這兩個分區沒有被消費。
? ? ?由于最近業務方機房大遷移,第一反應是網絡連通性,但是消費端程序挨個測試網絡沒有問題,而且即使通過增加或者減少consumer數量,甚至消費端只開一個客戶端,rebalance結束后依然會有分區沒有owner,而且隨著消費端個數的變化,無owner的分區號也發生了變化,整個rebalance過程客戶端程序沒有任何錯誤日志。
? ? 這種情況還得去過客戶端日志,在只起了兩個客戶端的時候發現有這么一段:
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], Consumer xxx rebalancing the following partitions: ArrayBuffer(5, 7, 3, 8, 1, 4, 6, 2, 0, 9) for topic onlineAdDemographicPredict with consumers: List(aaa-0, yyy-0, xxx-0)
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 2
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 0
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 9
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 0 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 9 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 2 for topic onlineAdDemographicPredict
ArrayBuffer里分區10個分區都全了說明客戶端讀取所有Partirtion個數是沒有問題的,出問題的是with consumers: List()這個信息,此時業務方只起了xxx和yyy兩個客戶端,
但是Consumer確拿到了三個client-id,然后經過計算自己正好需要注冊三個分區2,0,9,剩下的分區就沒人認領了。
查找日志對應kafka源碼如下
?
class RangeAssignor() extends PartitionAssignor with Logging {def assign(ctx: AssignmentContext) = {val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { val curConsumers = ctx.consumersForTopic(topic)val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)val nPartsPerConsumer = curPartitions.size / curConsumers.sizeval nConsumersWithExtraPart = curPartitions.size % curConsumers.sizeinfo("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +" for topic " + topic + " with consumers: " + curConsumers)for (consumerThreadId <- consumerThreadIdSet) {val myConsumerPosition = curConsumers.indexOf(consumerThreadId)assert(myConsumerPosition >= 0)val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)/*** Range-partition the sorted partitions to consumers for better locality.* The first few consumers pick up an extra partition, if any.*/if (nParts <= 0)warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)else {for (i <- startPart until startPart + nParts) {val partition = curPartitions(i)info(consumerThreadId + " attempting to claim partition " + partition)// record the partition ownership decisionpartitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)}}}}partitionOwnershipDecision} }?
object PartitionAssignor {def createInstance(assignmentStrategy: String) = assignmentStrategy match {case "roundrobin" => new RoundRobinAssignor()case _ => new RangeAssignor()} }class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)myTopicCount.getConsumerThreadIdsPerTopic}val partitionsForTopic: collection.Map[String, Seq[Int]] =ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq) val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted }?
class ZKGroupDirs(val group: String) {def consumerDir = ConsumersPathdef consumerGroupDir = consumerDir + "/" + groupdef consumerRegistryDir = consumerGroupDir + "/ids"def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"def consumerGroupOwnersDir = consumerGroupDir + "/owners" }def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {val dirs = new ZKGroupDirs(group)val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]for (consumer <- consumers) {val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {for (consumerThreadId <- consumerThreadIdSet)consumersPerTopicMap.get(topic) match {case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))}}}for ( (topic, consumerList) <- consumersPerTopicMap )consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))consumersPerTopicMap}?
def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = { val dirs = new ZKGroupDirs(group)val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1var subscriptionPattern: String = nullvar topMap: Map[String, Int] = nulltry {Json.parseFull(topicCountString) match {case Some(m) =>val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]consumerRegistrationMap.get("pattern") match {case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}consumerRegistrationMap.get("subscription") match {case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}} catch {case e: Throwable =>
? ? ? 通過上面著色的代碼一路跟下來,可以看出來Consumer獲取group所有客戶端數量邏輯是讀取zk上 /kafkachroot/consumers/{groupid}/ids路徑下
所有存在的consumerid,然后讀取這些consumerid對應的topic信息,最終返回一個[topic, List[ConsumerThreadId]] 的二維數組。
? ? ??于是跑到zk上看節點結構,發現在出問題的group/ids 路徑下果然存在aaa這個臨時節點,通知應用方發現原來有個很老的程序之前也用同樣的groupid消費過這個topic,但是現在業務程序很久沒人管處在一個半假死的狀態,所以這個臨時節點一直不過期,導致后來使用同樣group消費同樣的每次都會感知到一個多余的消費段存在,所以每次都有部分分區無法被消費。
?
附:
?1 ?Consumer Rebalance的算法
2 ?本文討論的版本建立在kafka 0.8.2-beta版本前提上,新出的版本目前沒有研究,可能情況不符。
?
轉載于:https://www.cnblogs.com/dongxiao-yang/p/6234673.html
總結
以上是生活随笔為你收集整理的kafka rebalance 部分分区没有owner的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DP专题训练之HDU 1087 Supe
- 下一篇: Codeforces Good Bye