kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?
Kafka 一直以來都以高吞吐量的特性而家喻戶曉,就在上周,在一個性能監控項目中,需要使用到 Kafka 傳輸海量消息,在這過程中遇到了一個 Kafka Producer 異步發送消息會被阻塞的問題,導致生產端發送耗時很大。
是的,你沒聽錯,Kafka Producer 異步發送消息也會發生阻塞現象,那究竟是怎么回事呢?
在新版的 Kafka Producer 中,設計了一個消息緩沖池,客戶端發送的消息都會被存儲到緩沖池中,同時 Producer 啟動后還會開啟一個 Sender 線程,不斷地從緩沖池獲取消息并將其發送到 Broker,如下圖所示:
這么看來,Kafka 的所有發送,都可以看作是異步發送了,因此在新版的 Kafka Producer 中廢棄掉異步發送的方法了,僅保留了一個 send 方法,同時返回一個 Futrue 對象,需要同步等待發送結果,就使用 Futrue#get 方法阻塞獲取發送結果。而我在項目中直接調用 send 方法,為何還會發送阻塞呢?
我們在構建 Kafka Producer 時,會有一個自定義緩沖池大小的參數 buffer.memory,默認大小為 32M,因此緩沖池的大小是有限制的,我們不妨想一下,緩沖池內存資源耗盡了會怎么樣?
Kafka 源碼的注釋是非常詳細的,RecordAccumulator 類是 Kafka Producer 緩沖池的核心類,而 RecordAccumulator 類就有那么一段注釋:
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
大概的意思是:
當緩沖池的內存塊用完后,消息追加調用將會被阻塞,直到有空閑的內存塊。
由于性能監控項目每分鐘需要發送幾百萬條消息,只要 Kafka 集群負載很高或者網絡稍有波動,Sender 線程從緩沖池撈取消息的速度趕不上客戶端發送的速度,就會造成客戶端發送被阻塞。
我寫個例子讓大家直觀感受一下被阻塞的現象:
public?static?void?main(String[]?args){
Properties?properties?=?new?Properties();
properties.put(ProducerConfig.ACKS_CONFIG,?"0");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,?"localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.LINGER_MS_CONFIG,?1000);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,?1024?*?1024);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,?5242880);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,?"lz4");
KafkaProducer?producer?=?new?KafkaProducer<>(properties);
String?str?=?"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
List?bytesList?=?new?ArrayList<>();
Random?random?=?new?Random();
for?(int?j?=?0;?j?
int?i1?=?random.nextInt(10);
if?(i1?==?0)?{
i1?=?1;
}
byte[]?bytes?=?new?byte[1024?*?i1];
for?(int?i?=?0;?i?
bytes[i]?=?(byte)?str.charAt(random.nextInt(62));
}
bytesList.add(bytes);
}
while?(true)?{
long?start?=?System.currentTimeMillis();
producer.send(new?ProducerRecord<>("test_topic",?bytesList.get(random.nextInt(1023))));
long?end?=?System.currentTimeMillis()?-?start;
if?(end?>?100)?{
System.out.println("發送耗時:"?+?end);
}
//?Thread.sleep(10);
}
}
以上例子構建了一個 ?Kafka Producer 對象,同時使用死循環不斷地發送消息,這時如果把 Thread.sleep(10);注釋掉,則會出現發送耗時很長的現象:
使用 JProfiler 可以查看到分配內存的地方出現了阻塞:
跟蹤到源碼:
發現在 org.apache.kafka.clients.producer.internals.BufferPool#allocate 方法中,如果判斷緩沖池沒有空閑的內存了,則會阻塞內存分配,直到有空閑內存為止。
如果不注釋 Thread.sleep(10);這段代碼則不會發生阻塞現象,打斷點到阻塞的地方,也不會被 Debug 到,從現象能夠得知,Thread.sleep(10);使得發送消息的頻率變低了,此時 Sender 線程發送的速度超過了客戶端的發送速度,緩沖池一直處于未滿狀態,因此不會產生阻塞現象。
除了以上緩沖池內存滿了會發生阻塞之外,Kafka Produer 其它情況都不會發生阻塞了嗎?非也,其實還有一個地方,也會發生阻塞!
Kafka Producer 通常在第一次發送消息之前,需要獲取該主題的元數據 Metadata,Metadata 內容包括了主題相關分區 Leader 所在節點信息、副本所在節點信息、ISR 列表等,Kafka Producer 獲取 Metadata 后,便會根據 Metadata 內容將消息發送到指定的分區 Leader 上,整個獲取流程大致如下:
如上圖所示,Kafka Producer 在發送消息之前,會檢查主題的 Metadata 是否需要更新,如果需要更新,則會喚醒 Sender 線程并發送 Metatadata 更新請求,此時 Kafka Producer 主線程則會阻塞等待 Metadata 的更新。
如果 Metadata 一直無法更新,則會導致客戶端一直阻塞在那里。
總結
以上是生活随笔為你收集整理的kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: kvmweb管理工具_KVM的web管理
- 下一篇: U盘系统怎么启动主界面 U盘系统的启动界
