ad09只在一定范围内查找相似对象_kafka日志段中的二分查找
二分查找
Kafka 中直接接觸索引或索引文件的場景可能不是很多。索引是一個很神秘的組件,Kafka 官方文檔也沒有怎么提過它。索引這個組件的源碼還有一個亮點,那就是它應用了耳熟能詳的二分查找算法來快速定位索引項。而且社區還針對 Kafka 自身的特點對其進行了改良。
1. 索引類圖及源文件組織架構
在 Kafka 源碼中,跟索引相關的源碼文件有 5 個,它們都位于 core 包的 /src/main/scala/kafka/log 路徑下。
- AbstractIndex.scala:它定義了最頂層的抽象類,這個類封裝了所有索引類型的公共操作。
- LazyIndex.scala:它定義了 AbstractIndex 上的一個包裝類,實現索引項延遲加載。這個類主要是為了提高性能。
- OffsetIndex.scala:定義位移索引,保存“< 位移值,文件磁盤物理位置 >”對。
- TimeIndex.scala:定義時間戳索引,保存“< 時間戳,位移值 >”對。
- TransactionIndex.scala:定義事務索引,為已中止事務(Aborted Transcation)保存重要的元數據信息。只有啟用 Kafka 事務后,這個索引才有可能出現。
這些類的繼承關系如下圖所示:
其中,OffsetIndex、TimeIndex 和 TransactionIndex 都繼承了 AbstractIndex 類,而上層的 LazyIndex 僅僅是包裝了一個 AbstractIndex 的實現類,用于延遲加載。
2. AbstractIndex 代碼結構
abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable { ...... }AbstractIndex 定義了 4 個屬性字段。由于是一個抽象基類,它的所有子類自動地繼承了這 4 個字段。也就是說,Kafka 所有類型的索引對象都定義了這些屬性。
AbstractIndex 是抽象的索引對象類。可以說,它是承載索引項的容器,而每個繼承它的子類負責定義具體的索引項結構。
比如,OffsetIndex 的索引項是 < 位移值,物理磁盤位置 > 對,TimeIndex 的索引項是 < 時間戳,位移值 > 對。基于這樣的設計理念,AbstractIndex 類中定義了一個抽象方法 entrySize 來表示不同索引項的大小,如下所示:
protected def entrySize: Int子類實現該方法時需要給定自己索引項的大小,對于 OffsetIndex 而言,該值就是 8;對于 TimeIndex 而言,該值是 12。
// OffsetIndex override def entrySize = 8 // TimeIndex override def entrySize = 128和12具體什么含義呢?
在 OffsetIndex 中,位移值用 4 個字節來表示,物理磁盤位置也用 4 個字節來表示,所以總共是 8 個字節。位移值不是長整型,應該是 8 個字節才對。上面提到 AbstractIndex 已經保存了 baseOffset 了,這里的位移值,實際上是相對于 baseOffset 的相對位移值,即真實位移值減去 baseOffset 的值,使用相對位移值能夠有效地節省磁盤空間。而 Broker 端參數 log.segment.bytes 是整型,這說明,Kafka 中每個日志段文件的大小不會超過 2^32,即 4GB,這就說明同一個日志段文件上的位移值減去 baseOffset 的差值一定在整數范圍內。因此,源碼只需要 4 個字節保存就行了。
同理,TimeIndex 中的時間戳類型是長整型,占用 8 個字節,位移依然使用相對位移值,占用 4 個字節,因此總共需要 12 個字節。
Kafka 中的索引底層的實現原理是 Java 中的 MappedByteBuffer。使用內存映射文件的主要優勢在于,它有很高的 I/O 性能,特別是對于索引這樣的小文件來說,由于文件內存被直接映射到一段虛擬內存上,訪問內存映射文件的速度要快于普通的讀寫文件速度。
在 AbstractIndex 中,這個 MappedByteBuffer 就是名為 mmap 的變量。看下源碼:
@volatile protected var mmap: MappedByteBuffer = { // 第1步:創建索引文件 val newlyCreated = file.createNewFile() // 第2步:以writable指定的方式(讀寫方式或只讀方式)打開索引文件 val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { if(newlyCreated) { if(maxIndexSize < entrySize) // 預設的索引文件大小不能太小,如果連一個索引項都保存不了,直接拋出異常 throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) // 第3步:設置索引文件長度,roundDownToExactMultiple計算的是不超過maxIndexSize的最大整數倍entrySize // 比如maxIndexSize=1234567,entrySize=8,那么調整后的文件長度為1234560 raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) } // 第4步:更新索引長度字段_length _length = raf.length() // 第5步:創建MappedByteBuffer對象 val idx = { if (writable) raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length) else raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length) } /* set the position in the index for the next entry */ // 第6步:如果是新創建的索引文件,將MappedByteBuffer對象的當前位置置成0 // 如果索引文件已存在,將MappedByteBuffer對象的當前位置設置成最后一個索引項所在的位置 if(newlyCreated) idx.position(0) else idx.position(roundDownToExactMultiple(idx.limit(), entrySize)) // 第7步:返回創建的MappedByteBuffer對象 idx } finally { CoreUtils.swallow(raf.close(), AbstractIndex) // 關閉打開索引文件句柄 } }這些代碼最主要的作用就是創建 mmap 對象,AbstractIndex 其他大部分的操作都是和 mmap 相關。
比如:
// 如果我們要計算索引對象中當前有多少個索引項,只需要執行下列計算:protected var _entries: Int = mmap.position() / entrySize // 如果我們要計算索引文件最多能容納多少個索引項,只要定義下面的變量就行了:private[this] var _maxEntries: Int = mmap.limit() / entrySize // 再進一步,有了這兩個變量,我們就能夠很容易地編寫一個方法,來判斷當前索引文件是否已經寫滿:def isFull: Boolean = _entries >= _maxEntries3. 寫入索引項
下面這段代碼是 OffsetIndex 的 append 方法,用于向索引文件中寫入新索引項。
def append(offset: Long, position: Int): Unit = { inLock(lock) { // 第1步:判斷索引文件未寫滿 require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") // 第2步:必須滿足以下條件之一才允許寫入索引項: // 條件1:當前索引文件為空 // 條件2:要寫入的位移大于當前所有已寫入的索引項的位移——Kafka規定索引項中的位移值必須是單調增加的 if (_entries == 0 || offset > _lastOffset) { trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt(relativeOffset(offset)) // 第3步A:向mmap中寫入相對位移值 mmap.putInt(position) // 第3步B:向mmap中寫入物理位置信息 // 第4步:更新其他元數據統計信息,如當前索引項計數器_entries和當前索引項最新位移值_lastOffset _entries += 1 _lastOffset = offset // 第5步:執行校驗。寫入的索引項格式必須符合要求,即索引項個數*單個索引項占用字節數匹配當前文件物理大小,否則說明文件已損壞 require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { // 如果第2步中兩個條件都不滿足,不能執行寫入索引項操作,拋出異常 throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") } } }4. 查找索引項
索引項的寫入邏輯并不復雜,難點在于如何查找索引項。AbstractIndex 定義了抽象方法 parseEntry 用于查找給定的索引項,如下所示:
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry這里的 “n” 表示要查找給定 ByteBuffer 中保存的第 n 個索引項, IndexEntry 是源碼定義的一個接口,里面有兩個方法:indexKey 和 indexValue,分別返回不同類型索引的對。
OffsetIndex 實現 parseEntry 的邏輯如下:
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) }OffsetPosition 是 IndexEntry 的實現類,Key 就是之前說的位移值,而 Value 就是物理磁盤位置值。所以,這里你能看到代碼調用了 relativeOffset(buffer, n) + baseOffset 計算出絕對位移值,之后調用 physical(buffer, n) 計算物理磁盤位置,最后將它們封裝到一起作為一個獨立的索引項返回。
有了 parseEntry 方法,我們就能夠根據給定的 n 來查找索引項了。但是,這里還有個問題需要解決,那就是,我們如何確定要找的索引項在第 n 個槽中呢?也就是如何從一組已排序的數中快速定位符合條件的那個數,二分查找登場。
5. 二分查找算法
到目前為止,從已排序數組中尋找某個數字最快速的算法就是二分查找了,它能做到 O(lgN) 的時間復雜度。Kafka 的索引組件就應用了二分查找算法。
原版的實現代碼:
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 第1步:如果當前索引為空,直接返回對 if(_entries == 0) return (-1, -1) // 第2步:要查找的位移值不能小于當前最小位移值 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // binary search for the entry // 第3步:執行二分查找算法 var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1)這段代碼的核心是,第 3 步的二分查找算法。常刷算法題的朋友,再熟悉不過了。
6. 改進版二分查找算法
大多數操作系統使用頁緩存來實現內存映射,而目前幾乎所有的操作系統都使用 LRU(Least Recently Used)或類似于 LRU 的機制來管理頁緩存。Kafka 寫入索引文件的方式是在文件末尾追加寫入,而幾乎所有的索引查詢都集中在索引的尾部。這么來看的話,LRU 機制是非常適合 Kafka 的索引訪問場景的。
但,這里有個問題是,當 Kafka 在查詢索引的時候,原版的二分查找算法并沒有考慮到緩存的問題,因此很可能會導致一些不必要的缺頁中斷(Page Fault)。此時,Kafka 線程會被阻塞,等待對應的索引項從物理磁盤中讀出并放入到頁緩存中。
下面舉個例子來說明一下這個情況。假設 Kafka 的某個索引占用了操作系統頁緩存 13 個頁(Page),如果待查找的位移值位于最后一個頁上,也就是 Page 12,那么標準的二分查找算法會依次讀取頁號 0、6、9、11 和 12,具體的流程不過多敘述。
接下來是重點:
通常來說,一個頁上保存了成百上千的索引項數據。隨著索引文件不斷被寫入,Page 12 不斷地被填充新的索引項。如果此時索引查詢方都來自 ISR 副本或 Lag 很小的消費者,那么這些查詢大多集中在對 Page 12 的查詢,因此,Page 0、6、9、11、12 一定經常性地被源碼訪問。也就是說,這些頁一定保存在頁緩存上。
后面當新的索引項填滿了 Page 12,頁緩存就會申請一個新的 Page 來保存索引項,即 Page 13。現在,最新索引項保存在 Page 13 中。如果要查找最新索引項,原版二分查找算法將會依次訪問 Page 0、7、10、12 和 13。此時,問題來了:Page 7 和 10 已經很久沒有被訪問過了,它們大概率不在頁緩存中,因此,一旦索引開始征用 Page 13,就會發生 Page Fault,等待那些冷頁數據從磁盤中加載到頁緩存。根據資料查詢,這種加載過程可能長達 1 秒。顯然,這是一個普遍的問題,即每當索引文件占用 Page 數發生變化時,就會強行變更二分查找的搜索路徑,從而出現不在頁緩存的冷數據必須要加載到頁緩存的情形,而這種加載過程是非常耗時的。
基于這個問題,社區提出了改進版的二分查找策略,也就是緩存友好的搜索算法。總體的思路是,代碼將所有索引項分成兩個部分:熱區(Warm Area)和冷區(Cold Area),然后分別在這兩個區域內執行二分查找算法,如下圖所示:
同樣是查詢最熱的那部分數據,一旦索引占用了更多的 Page,要遍歷的 Page 組合就會發生變化。這是導致性能下降的主要原因。這個改進版算法的最大好處在于,查詢最熱那部分數據所遍歷的 Page 永遠是固定的,因此大概率在頁緩存中,從而避免無意義的 Page Fault。
看到這個設計時,我真的感覺到算法的精妙以及commiter的NB。
看下實際的代碼:
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 第1步:如果索引為空,直接返回對 if(_entries == 0) return (-1, -1) // 封裝原版的二分查找算法 def binarySearch(begin: Int, end: Int) : (Int, Int) = { // binary search for the entry var lo = begin var hi = end while(lo < hi) { val mid = (lo + hi + 1) >>> 1 val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) } // 第3步:確認熱區首個索引項位于哪個槽。_warmEntries就是所謂的分割線,目前固定為8192字節處 // 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024個槽 // 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682個槽 val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries) // 第4步:判斷target位移值在熱區還是冷區 if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { return binarySearch(firstHotEntry, _entries - 1) // 如果在熱區,搜索熱區 } // 第5步:確保target位移值不能小于當前最小位移值 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // 第6步:如果在冷區,搜索冷區 binarySearch(0, firstHotEntry)最后來張兩個算法的總結:
7. 空間與時間的互換
到二分查找還沒完,日志段有個參數 indexIntervalBytes, 可以理解為插了多少條消息之后再建一個索引,由此看出kafka的索引其實是稀疏索引,這樣可以避免索引文件占用過多的內存,從而可以在內存中保存更多的索引。對應Broker端參數就是 log.index.interval.bytes 值,默認4kb。
實際的通過索引查找消息的過程是通過offset找到索引所在的文件,然后通過二分法找到離目標最近的索引,再順序遍歷消息文件找到目標文件。復雜度為 O(log2n)+O(m), n是索引文件里索引的個數,m為稀疏程度。
這就是時間和空間的互換,數據結構和算法的平衡。
總結
以上是生活随笔為你收集整理的ad09只在一定范围内查找相似对象_kafka日志段中的二分查找的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 万能点位图软件_【像素图】复古提花毛衣+
- 下一篇: adb命令 android 串口_ADB