apache kafka源码分析-Producer分析---转载
原文地址:http://www.aboutyun.com/thread-9938-1-1.html
問題導讀
1.Kafka提供了Producer類作為java producer的api,此類有幾種發送方式?
2.總結調用producer.send方法包含哪些流程?
3.Producer難以理解的在什么地方?
producer的發送方式剖析
Kafka提供了Producer類作為java producer的api,該類有sync和async兩種發送方式。
sync架構圖
async架構圖
調用流程如下:
代碼流程如下:
Producer:當new Producer(new ProducerConfig()),其底層實現,實際會產生兩個核心類的實例:Producer、DefaultEventHandler。在創建的同時,會默認new一個ProducerPool,即我們每new一個java的Producer類,就會有創建Producer、EventHandler和ProducerPool,ProducerPool為連接不同kafka broker的池,初始連接個數有broker.list參數決定。
調用producer.send方法流程:
當應用程序調用producer.send方法時,其內部其實調的是eventhandler.handle(message)方法,eventHandler會首先序列化該消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
調用邏輯解釋:當客戶端應用程序調用producer發送消息messages時(既可以發送單條消息,也可以發送List多條消息),調用eventhandler.serialize首先序列化所有消息,序列化操作用戶可以自定義實現Encoder接口,下一步調用partitionAndCollate根據topics的messages進行分組操作,messages分配給dataPerBroker(多個不同的Broker的Map),根據不同Broker調用不同的SyncProducer.send批量發送消息數據,SyncProducer包裝了nio網絡操作信息。
Producer的sync與async發送消息處理,大家看以上架構圖一目了然。
partitionAndCollate方法詳細作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上),
創建一個HashMap>>>,把messages按照brokerId分組組裝數據,然后為SyncProducer分別發送消息作準備工作。
名稱解釋:partKey:分區關鍵字,當客戶端應用程序實現Partitioner接口時,傳入參數key為分區關鍵字,根據key和numPartitions,返回分區(partitions)索引。記住partitions分區索引是從0開始的。
Producer平滑擴容機制
如果開發過producer客戶端代碼,會知道metadata.broker.list參數,它的含義是kafak broker的ip和port列表,producer初始化時,就連接這幾個broker,這時大家會有疑問,producer支持kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker信息,答案是肯定的,producer可以支持平滑擴容broker,他是通過定時與現有的metadata.broker.list通信,獲取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后續應用程序調用。
BrokerPartitionInfo的updateInfo方法代碼如下:
def updateInfo(topics: Set[String], correlationId: Int) {var topicsMetadata: Seq[TopicMetadata] = Nil//根據topics列表,meta.broker.list,其他配置參數,correlationId表示請求次數,一個計數器參數而已//創建一個topicMetadataRequest,并隨機的選取傳入的broker信息中任何一個去取metadata,直到取到為止val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)topicsMetadata = topicMetadataResponse.topicsMetadata// throw partition specific exceptiontopicsMetadata.foreach(tmd =>{trace("Metadata for topic %s is %s".format(tmd.topic, tmd))if(tmd.errorCode == ErrorMapping.NoError) {topicPartitionInfo.put(tmd.topic, tmd)} elsewarn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))tmd.partitionsMetadata.foreach(pmd =>{if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,ErrorMapping.exceptionFor(pmd.errorCode).getClass))} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata })})producerPool.updateProducer(topicsMetadata)}ClientUtils.fetchTopicMetadata方法代碼:
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {var fetchMetaDataSucceeded: Boolean = falsevar i: Int = 0val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)var topicMetadataResponse: TopicMetadataResponse = nullvar t: Throwable = nullval shuffledBrokers = Random.shuffle(brokers) //生成隨機數while(i ProducerPool的updateProducer def updateProducer(topicMetadata: Seq[TopicMetadata]) {val newBrokers = new collection.mutable.HashSet[Broker]topicMetadata.foreach(tmd => {tmd.partitionsMetadata.foreach(pmd => {if(pmd.leader.isDefined)newBrokers+=(pmd.leader.get)})})lock synchronized {newBrokers.foreach(b => {if(syncProducers.contains(b.id)){syncProducers(b.id).close()syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))} elsesyncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))})}}當我們啟動kafka broker后,并且大量producer和consumer時,經常會報如下異常信息。
筆者也是經常很長時間看源碼分析,才明白了為什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任選一個或幾個即可。因為他會通過您選擇的broker和topics信息而獲取最新的所有的broker信息。
值得了解的是用于發送TopicMetadataRequest的SyncProducer雖然是用ProducerPool.createSyncProducer方法建出來的,但用完并不還回ProducerPool,而是直接Close.
重難點理解:
刷新metadata并不僅在第一次初始化時做。為了能適應kafka broker運行中因為各種原因掛掉、paritition改變等變化,
eventHandler會定期的再去刷新一次該metadata,刷新的間隔用參數topic.metadata.refresh.interval.ms定義,默認值是10分鐘。
這里有三點需要強調:
客戶端調用send, 才會新建SyncProducer,只有調用send才會去定期刷新metadata在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完后再close。根據當前SyncProducer(一個Broker的連接)取得的最新的完整的metadata,刷新ProducerPool中到broker的連接.每10分鐘的刷新會直接重新把到每個broker的socket連接重建,意味著在這之后的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把topic.metadata.refresh.interval.ms值改為-1,這樣只有在發送失敗時,才會重新刷新。Kafka的集群中如果某個partition所在的broker掛了,可以檢查錯誤后重啟重新加入集群,手動做rebalance,producer的連接會再次斷掉,直到rebalance完成,那么刷新后取到的連接著中就會有這個新加入的broker。
說明:每個SyncProducer實例化對象會建立一個socket連接
特別注意:
在ClientUtils.fetchTopicMetadata調用完成后,回到BrokerPartitionInfo.updateInfo繼續執行,在其末尾,pool會根據上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的,即每一個SyncProducer對應一個broker,內部封了一個到該broker的socket連接。每次刷新時,會把已存在SyncProducer給close掉,即關閉socket連接,然后新建SyncProducer,即新建socket連接,去覆蓋老的。
如果不存在,則直接創建新的。
?
轉載于:https://www.cnblogs.com/davidwang456/p/4182001.html
總結
以上是生活随笔為你收集整理的apache kafka源码分析-Producer分析---转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于keepalived对redis做高
- 下一篇: 应用Druid监控SQL语句的执行情况-