Kafka解惑之Old Producer(2)——Sync Analysis
上接Kafka解惑之Old Producer(1)—— Beginning
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-sync-analysis/
一下子擴展的有點多,我們還是先回到DefaultEventHandler上來,當調用producer.send方法發送消息的時候,緊接著就是調用DefaultEventHandler的handle方法。下面是handle方法的主要內容,雖然行數有點多,但是這是Producer中最最核心的一塊,需要反復研磨,方能一探究竟:
def handle(events: Seq[KeyedMessage[K,V]]) {val serializedData = serialize(events)var outstandingProduceRequests = serializedDatavar remainingRetries = config.messageSendMaxRetries + 1val correlationIdStart = correlationId.get()while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)if (topicMetadataRefreshInterval >= 0 &&Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds}outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)if (outstandingProduceRequests.nonEmpty) {Thread.sleep(config.retryBackoffMs)CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()remainingRetries -= 1producerStats.resendRate.mark()}} }注意handle方法的參數是個Seq[KeyedMessage]類型的,而不是KeyedMessage。雖然Demo中用的只是單個KeyedMessage,最后調用底層的handle方法都是轉換為Seq類型,你可以把Seq看成是java中的List,在Scala中表示序列,指的是一類具有一定長度的可迭代訪問的對象,其中每個元素均帶有一個從0開始計數的固定索引位置。
這個handle方法中首先是調用serialize(events)方法對消息進行序列化操作,這個容易理解,就是通過serializer.class參數指定的序列化類進行序列化。
其次獲取所發送消息對應的元數據信息,然后將一坨消息(也有可能是一條)轉換為HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id,value是TopicAndPartition與消息集的Map,對應的方法為dispatchSerializedData()。因為客戶端發消息是發到對應的broker上,所以要對每個消息找出對應的leader副本所在的broker的位置,然后將要發送的消息集分類,每個broker對應其各自所要接收的消息。而TopicAndPartition是針對broker上的存儲層的,每個TopicAndPartition對應特定的當前的存儲文件(Segment文件),將消息寫入到存儲文件中。
獲取元數據信息并不是每次發送消息都要向metadata.broker.list所指定地址中的服務索要拉取,而是向緩存中的元數據進行拉取,拉取失敗后才向metadata.broker.list所指定地址中的服務發送元數據更新的請求進行拉取。很多朋友會把metadata.broker.list看成是broker的地址,這個不完全正確,官網解釋:
This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
因為這個地址只提供給客戶端拉取元數據信息之用,而剩下的動作比如發送消息是通過與元數據信息中的broker地址建立連接之后再進行操作,這也就意味著metadata.broker.list可以和broker的真正地址沒有任何交集。你完全可以為metadata.broker.list配置一個“偽裝”接口地址,這個接口配合kafka的傳輸格式并提供相應的元數據信息,這樣方便集中式的配置管理(可以集成到配置中心中)。為了簡化說明,我們姑且可以狹義的認為metadata.broker.list指的就是kafka broker的地址。
緩存中的元數據每隔topic.metadata.refresh.interval.ms才去broker拉取元數據信息,可以參考上面大段源碼中的if語句:
if (topicMetadataRefreshInterval >= 0 &&Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval)topic.metadata.refresh.interval.ms參數的默認值是600*1000ms,也就是10分鐘。如果設置為0,則每次發送消息時都要先向broker拉取元數據信息;如果設置為負數,那么只有在元數據獲取失敗的情況下才會請求元數據信息。由于這個老版的Scala的Producer請求元數據和發送消息是在同一個線程中完成的,所以此處會有延遲的隱患,具體的筆者會在后面的案例分析環節為大家詳細介紹。
接下去所要做的工作就是查看是否需要壓縮,如果客戶端設置了壓縮,則根據compression.type參數配置的壓縮方式對消息進行壓縮處理。0.8.2.x版本支持gzip和snappy的壓縮方式,1.0.0版本還支持lz4的壓縮方式。compression.type參數的默認值值none,即不需要壓縮。
最后根據brokerId分組發送消息。這個分組發送的過程就與ProducerPool有關了,我們前面提到在實例化Producer的時候引入了DefaultEventHandler和ProducerPool。這個ProducerPool保存的是生產者和broker的連接,每個連接對應一個SyncProducer對象。SyncProducer包裝了NIO網絡層的操作,每個SyncProducer都是一個與對應broker的socket連接,是真正發送消息至broker中的執行者。
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerPool(val config: ProducerConfig) extends Logging {private val syncProducers = new HashMap[Int, SyncProducer]當調用最上層的send方法發送消息的時候,下面的執行順序為DefaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()。在底層的DefaultEventHandler.send方法定義為:
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])這個方法就需要根據brokerId從ProducerPool中的HashMap中找到對應SyncProducer,然后在將“messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]”這個消息發送到SyncProducer對應的broker上。如果在獲取緩存中的元數據失敗的時候就需要重新向broker拉取元數據,或者定時(topic.metadata.refresh.interval.ms)向broker端請求元數據的數據,都會有可能更新ProducerPool的信息,對應的方法為ProducerPool.updateProducer():
def updateProducer(topicMetadata: Seq[TopicMetadata]) {val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]topicMetadata.foreach(tmd => {tmd.partitionsMetadata.foreach(pmd => {if(pmd.leader.isDefined) {newBrokers += pmd.leader.get}})})lock synchronized {newBrokers.foreach(b => {if(syncProducers.contains(b.id)){syncProducers(b.id).close()syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))} elsesyncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))})} }會Java的讀者看這段代碼的時候應該能看出來個90%以上,解釋下這段代碼:首先是找到更新的元數據中所有的brorker(更具體的來說是broker的id、主機地址host和端口號port三元組信息);之后在查到原有的ProducerPool中是否有相應的SyncProducer,如果有則關閉之后再重新建立;如果沒有則新建。SyncProducer底層是阻塞式的NIO,所以關閉再建立會有一定程度上的開銷,相關細節如下:
channel = SocketChannel.open() if(readBufferSize > 0)channel.socket.setReceiveBufferSize(readBufferSize) if(writeBufferSize > 0)channel.socket.setSendBufferSize(writeBufferSize) channel.configureBlocking(true) channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)玩過NIO的讀者對這段代碼相比很是熟絡,雖然是scala版的。如果沒有接觸過NIO,那么可以先看看這一篇:攻破JAVA NIO技術壁壘。
說道這里我們用一副結構圖來說明下Old Producer的大致脈絡(注:圖中的所有操作都是在一個線程中執行的):
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-sync-analysis/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(2)——Sync Analysis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka解惑之Old Producer
- 下一篇: Kafka解惑之Old Producer