Kafka解惑之Old Producer(4)——Case Analysis
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-case-analysis/
上接:
在前面三篇文章中詳細(xì)了解了Old Producer的內(nèi)容,本文主要通過(guò)一個(gè)實(shí)際應(yīng)用案例來(lái)加深各位對(duì)Old Producer的理解。
問(wèn)題描述:線上由多臺(tái)Kafka Broker組成的集群(Producer的metadata.broker.list參數(shù)設(shè)置的是所有broker的地址+端口號(hào)的列表),版本號(hào)為0.8.2.x,當(dāng)一臺(tái)Kafka Broker的硬盤(pán)發(fā)生故障導(dǎo)致系統(tǒng)崩潰,由于Kafka的HA的作用線上業(yè)務(wù)無(wú)明顯異常,發(fā)送方和消費(fèi)方的流量也與之前持穩(wěn),但是后面監(jiān)測(cè)到每隔10分鐘左右就有少量的消息發(fā)送的時(shí)延很大,而且有ERROR告警報(bào)出。
2018-01-30 00:53:20 -[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] - [kafka.utils.Logging$$anonfun$swallowError$1:106] kafka.common.KafkaException: fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failedat kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)at kafka.utils.Utils$.swallow(Utils.scala:172)at kafka.utils.Logging$class.swallowError(Logging.scala:106)at kafka.utils.Utils$.swallowError(Utils.scala:45)at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)at kafka.producer.Producer.send(Producer.scala:77)at kafka.javaapi.producer.Producer.send(Producer.scala:33)通過(guò)上面的異常棧我們可以發(fā)現(xiàn)在獲取元數(shù)據(jù)的時(shí)候(kafka.client.ClientUtils$.fetchTopicMetadata)發(fā)生了異常,其實(shí)如果你對(duì)Kafka的配置參數(shù)足夠了解的話,看到10分鐘這個(gè)數(shù)值就可以聯(lián)想到600*1000的某個(gè)參數(shù),也就是topic.metadata.refresh.interval.ms。在DefaultEventHandler的handle()方法中,在調(diào)用dispatchSerializedData()方法預(yù)處理并發(fā)送消息之前就會(huì)有下面的一個(gè)if判斷語(yǔ)言用來(lái)判斷當(dāng)前是否需要更新元數(shù)據(jù):
while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)if (topicMetadataRefreshInterval >= 0 &&Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds}outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)上面代碼中的topicMetadataRefreshInterval指的就是topic.metadata.refresh.interval.ms參數(shù)。如果topic.metadata.refresh.interval.ms這個(gè)參數(shù)設(shè)置為0,那么就意味著每次發(fā)送消息之前都需要拉取并更新元數(shù)據(jù)信息,更新元數(shù)據(jù)信息之后還要更新ProducerPool的內(nèi)容,包括重建SyncProducer,那么這一番操作必然會(huì)有一定的延遲,當(dāng)然這個(gè)延遲沒(méi)有本文中所述問(wèn)題中的延遲那么大。如果topic.metadata.refresh.interval.ms這個(gè)參數(shù)設(shè)置為負(fù)數(shù),那么這個(gè)if條件語(yǔ)句就不能成立,也就是不會(huì)有定時(shí)更新元數(shù)據(jù)的操作,只有在獲取元數(shù)據(jù)信息失敗是才會(huì)請(qǐng)求完整的元數(shù)據(jù)信息。
本文中所述問(wèn)題中采用的topic.metadata.refresh.interval.ms參數(shù)設(shè)置的是默認(rèn)值大小,那么問(wèn)題中還有一個(gè)細(xì)節(jié):只有少量消息發(fā)送超時(shí)。為了進(jìn)一步的一探究竟,我們來(lái)繼續(xù)深究。當(dāng)定時(shí)更新元數(shù)據(jù)信息條件滿足時(shí),就會(huì)調(diào)用brokerPartitionInfo.updateInfo的方法,更進(jìn)取之后,實(shí)際上內(nèi)部調(diào)用的是:ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)來(lái)請(qǐng)求元數(shù)據(jù)信息,并獲得TopicMetadataResponse來(lái)做更新。
TopicMetadataResponse中包含有broker的id、host、port,還有topic、topic中的partition、partition所對(duì)應(yīng)的leader、AR和ISR等等,有興趣的同學(xué)可以進(jìn)一步翻閱kafka.api.TopicMetadataResponse這個(gè)類,主體代碼只有30行左右,只要學(xué)一點(diǎn)Scala的構(gòu)造函數(shù)相關(guān)的只是就能看懂。
我們回過(guò)頭來(lái)再來(lái)進(jìn)一步分析ClientUtils.fetchTopicMetadata這個(gè)方法,詳細(xì)代碼如下:
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {var fetchMetaDataSucceeded: Boolean = falsevar i: Int = 0val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)var topicMetadataResponse: TopicMetadataResponse = nullvar t: Throwable = null// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the// same brokerval shuffledBrokers = Random.shuffle(brokers)while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))try {topicMetadataResponse = producer.send(topicMetadataRequest)fetchMetaDataSucceeded = true}catch {case e: Throwable =>warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed".format(correlationId, topics, shuffledBrokers(i).toString), e)t = e} finally {i = i + 1producer.close()}}if(!fetchMetaDataSucceeded) {throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)} else {debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))}topicMetadataResponse }fetchTopicMetadata方法參數(shù)列表中brokers代表metadata.broker.list所配置的地址列表。可以看到方法中首先建立TopicMetadataRequest的請(qǐng)求,然后從brokers中隨機(jī)挑選(做一個(gè)shuffle,然后從列表中的第一個(gè)開(kāi)始取,也就是相當(dāng)于隨機(jī))一個(gè)broker建立SyncProducer并發(fā)送TopicMetadataRequest請(qǐng)求,問(wèn)題的關(guān)鍵就在這個(gè)隨機(jī)挑選一個(gè)broker之上,如果正好隨機(jī)到的是那臺(tái)磁盤(pán)損毀而崩潰的機(jī)器,那么這個(gè)請(qǐng)求必定要等到設(shè)定的超時(shí)時(shí)間之后才能捕獲異常:[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] ,進(jìn)而再找到下一個(gè)broker重新發(fā)送TopicMetadataRequest請(qǐng)求。
上面提到了超時(shí)時(shí)間,這個(gè)超時(shí)時(shí)間是通過(guò)request.timeout.ms參數(shù)設(shè)定的,默認(rèn)值為10000,也就是10s。具體指的是kafka.network.BlockingChannel中的channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)這段代碼,參數(shù)connectTimeoutMs就是指request.timeout.ms。如果元數(shù)據(jù)的請(qǐng)求時(shí)打到那臺(tái)崩潰的broker上的話,那么元數(shù)據(jù)的請(qǐng)求就要耗時(shí)10s以上,待元數(shù)據(jù)刷新后才能發(fā)送消息。這個(gè)request.timeout.ms參數(shù)才是導(dǎo)致文中開(kāi)頭有少量消息發(fā)送時(shí)延很大的原因。為了進(jìn)一步驗(yàn)證結(jié)論是否正確,筆者將相關(guān)的類SyncProducer、BlockingChannel用Java重寫(xiě)了一遍,并測(cè)試出請(qǐng)求的耗時(shí),當(dāng)訪問(wèn)一個(gè)不存在的ip地址時(shí),從發(fā)送請(qǐng)求到異常報(bào)出的耗時(shí)在10194ms。不過(guò)如果訪問(wèn)一個(gè)存在的ip地址時(shí),但是沒(méi)有kafka服務(wù)的話,從發(fā)送請(qǐng)求到返回的耗時(shí)只有1248ms,基本上減少了一個(gè)數(shù)量級(jí),如果讀者在遇到同樣的問(wèn)題時(shí),不妨上線一臺(tái)(隨意一臺(tái)能建立TCP連接的機(jī)器就好)與崩潰宕機(jī)的那臺(tái)broker一樣的ip地址的機(jī)器,上面無(wú)需運(yùn)行kafka的服務(wù),就能大大的降低發(fā)送消息的時(shí)延。這樣可以流出更多的時(shí)間去定位、修復(fù)、重新上線那臺(tái)崩潰的broker。
當(dāng)然如果對(duì)時(shí)延過(guò)敏,還有一些其它的方法可以參考。比如metadata.broker.list配置的是一個(gè)類似虛IP(VIP)的話,可以在VIP的下游剔除掉這臺(tái)broker,讓VIP過(guò)來(lái)的請(qǐng)求不會(huì)落到崩潰的broker上。或者也可以通過(guò)topic.metadata.refresh.interval.ms參數(shù)來(lái)調(diào)節(jié),比如設(shè)置為負(fù)數(shù)就可以免去定時(shí)刷新元數(shù)據(jù)的煩惱,不過(guò)在元數(shù)據(jù)變動(dòng)的時(shí)候很難有效的感知其變化,通過(guò)定時(shí)重連之類的方法又顯得有點(diǎn)奇葩。當(dāng)然直接上線一臺(tái)服務(wù)器,安裝運(yùn)行kafka服務(wù),且設(shè)置這臺(tái)kafka服務(wù)器的ip為崩潰的那臺(tái)服務(wù)器的ip地址,不過(guò)這番操作比直接拉一臺(tái)空機(jī)器的時(shí)效性要低一些,凡事在于抉擇。
Old Producer拉取元數(shù)據(jù)信息以及發(fā)送消息是在同一個(gè)線程中的,這必然會(huì)引起局部消息的時(shí)延增大。不過(guò)在新版的KafkaProducer中,這些問(wèn)題都已經(jīng)迎刃而解,具體怎么處理,且看后面的文章分析。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-case-analysis/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(4)——Case Analysis的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kafka解惑之Old Producer
- 下一篇: Kafka解析之topic创建(2)