跟Kafka学技术-缓冲池的使用
點擊上方“朱小廝的博客”,選擇“設為星標”
后臺回復”加群“獲取公眾號專屬群聊入口
作者簡介:黃益明,來自滴滴出行kafka團隊,對Kafka有一年多的研究和實踐,負責滴滴內部云平臺的架構設計和Kafka特性研發工作。
大家都知道Kafka是一個高吞吐的消息隊列,是大數據場景首選的消息隊列,這種場景就意味著發送單位時間消息的量會特別的大,那么Kafka如何做到能支持能同時發送大量消息的呢?
答案是Kafka通過批量壓縮和發送做到的。我們知道消息肯定是放在內存中的,大數據場景消息的不斷發送,內存中不斷存在大量的消息,很容易引起GC,頻繁的GC特別是full gc是會造成“stop the world”,也就是其他線程停止工作等待垃圾回收線程執行,繼而進一步影響發送的速度影響吞吐量,那么Kafka是如何做到優化JVM的GC問題的呢?看完本篇文章你會get到。
Kafka的內存池
下面介紹下Kafka客戶端發送的大致過程,如下圖:
Kafka的kafkaProducer對象是線程安全的,每個發送線程在發送消息時候共用一個kafkaProducer對象來調用發送方法,最后發送的數據根據Topic和分區的不同被組裝進某一個RecordBatch中。發送的數據放入RecordBatch后會被發送線程批量取出組裝成ProduceRequest對象發送給Kafka服務端。可以看到發送數據線程和取數據線程都要跟內存中的RecordBatch打交道,RecordBatch是存儲數據的對象,那么RecordBatch是怎么分配的呢?下面我們看下Kafka的緩沖池結構,如下圖所示:
名詞解釋緩沖池:BufferPool(緩沖池)對象,整個KafkaProducer實例中只有一個BufferPool對象。內存池總大小,它是已使用空間和可使用空間的總和,用totalMemory表示(由buffer.memory配置,默認32M)。
可使用的空間:它包含包括兩個部分,綠色部分代表未申請未使用的部分,用availableMemory表示;黃色部分代表已經申請但沒有使用的部分,用一個ByteBuffer雙端隊列(Deque)表示,在BufferPool中這個隊列叫free,隊列中的每個ByteBuffer的大小用poolableSize表示(由batch.size配置,默認16k),因為每次free申請內存都是以poolableSize為單位申請的,申請poolableSize大小的bytebuffer后用RecordBatch來包裝起來。
已使用空間:代表緩沖池中已經裝了數據的部分。
根據以上介紹,我們可以知道,總的BufferPool大小=已使用空間+可使用空間;free的大小=free.size * poolableSize(poolsize就是單位batch的size)。
數據的分配過程 總的來說是判斷需要存儲的數據的大小是否free里有合適的recordBatch裝得下,如果裝得下則用recordBatch來存儲數據,如果free里沒有空間但是availableMemory+free的大小比需要存儲的數據大(也就是說可使用空間比實際需要申請的空間大),說明可使用空間大小足夠,則會用讓free一直釋放byteBuffer空間直到有空間裝得下要存儲的數據位置,如果需要申請的空間比實際可使用空間大,則內存申請會阻塞直到申請到足夠的內存為止。整個申請過程如下圖:
數據的釋放過程 總的來說有2個入口,釋放過程如下圖:
再來看段申請空間代碼:
//判斷需要申請空間大小,如果需要申請空間大小比batchSize小,那么申請大小就是batchsize,如果比batchSize大,那么大小以實際申請大小為準 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); //這個過程可以參考圖3 ByteBuffer buffer = free.allocate(size, maxTimeToBlock);再來段回收的核心代碼:
public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {//只有標準規格(bytebuffer空間大小和poolableSize大小一致的才放入free)if (size == this.poolableSize && size == buffer.capacity()) {//注意這里的buffer是直接reset了,重新reset后可以重復利用,沒有gc問題buffer.clear();//添加進free循環利用this.free.add(buffer);} else {//規格不是poolableSize大小的那么沒有進行重制,但是會把availableMemory增加,代表整個可用內存空間增加了,這個時候buffer的回收依賴jvm的gcthis.availableMemory += size;}//喚醒排在前面的等待線程Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();} }通過申請和釋放過程流程圖以及釋放空間代碼,我們可以得到一個結論,就是如果用戶申請的數據(發送的消息)大小都是在poolableSize(由batch.size配置,默認16k)以內,并且申請時候free里有空間,那么用戶申請的空間是可以循環利用的空間,可以減少gc,但是其他情況也可能存在直接用堆內存申請空間的情況,存在gc的情況。如何盡量避免呢,如果批量消息里面單個消息都是超過16k,可以考慮調整batchSize大小。
如果沒有使用緩沖池,那么用戶發送的模型是下圖5,由于GC特別是Full GC的存在,如果大量發送,就可能會發生頻繁的垃圾回收,導致的工作線程的停頓,會對整個發送性能,吞吐量延遲等都有影響。
使用緩沖池后,整個使用過程可以縮略為下圖:
總結
Kafka通過使用內存緩沖池的設計,讓整個發送過程中的存儲空間循環利用,有效減少JVM GC造成的影響,從而提高發送性能,提升吞吐量。
想知道更多?掃描下面的二維碼關注我
后臺回復”加群“獲取公眾號專屬群聊入口
【精彩推薦】
一文講透微服務下如何保證事務的一致性
如何理解Linux中的零拷貝技術
干貨!Java字節碼增強探秘
Java Agent初探
IO多路復用是什么意思
當我們在談論內存的時候,我們在談論什么 | 干貨
分布式文件系統設計,該從哪些方面考慮
咱們從頭到尾說一次Java垃圾回收
Netty、Kafka中的零拷貝技術到底有多牛?
JDK14的9大重磅特性
朕已閱?
總結
以上是生活随笔為你收集整理的跟Kafka学技术-缓冲池的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 重新复习一下JDK14的9大重磅特性
- 下一篇: 面试前,我们要复习多少Redis知识点?