Apache Hudi重磅特性解读之全局索引
1. 摘要
Hudi表允許多種類型操作,包括非常常用的upsert,當然為支持upsert,Hudi依賴索引機制來定位記錄在哪些文件中。
當前,Hudi支持分區和非分區的數據集。分區數據集是將一組文件(數據)放在稱為分區的桶中的數據集。一個Hudi數據集可能由N個分區和M個文件組成,這種組織結構也非常方便hive/presto/spark等引擎根據分區字段過濾以返回有限的數據量。而分區的值絕大多數情況下是從數據中得來,這個要求一旦一條記錄映射到分區/桶,那么這個映射應該 a) 被Hudi知道;b) 在Hudi數據集生命周期里保持不變。
在一個非分區數據上Hudi需要知道recordKey -> fileId的映射以便對記錄進行upsert操作,現有解決方案如下:a) 用戶/客戶端通過payload提供正確的分區值;b) 實現GlobalBloomIndex索引來掃描指定路徑下的所有文件。上述兩個場景下,要么需要用戶提供映射信息,要么會導致掃描所有文件的性能開銷。
這個方案擬實現一種新的索引類型,維護(recordKey <-> partition, fileId) 映射或者((recordKey, partitionPath) → fileId)映射,這種映射由Hudi存儲和維護,可以解決上述提到的兩個限制。
2. 背景
數據集類型
Hudi存儲抽象主要有兩部分組成:1) 實際存儲的數據;2) 用于定位記錄位置(fileId)的索引,如果沒有這個信息,Hudi不能處理upserts。我們可以將數據湖中攝取的所有數據集大致分為兩類。
插入/事件數據
插入或事件數據表示新寫入表的數據和之前寫入的數據沒有任何交集,更具體點就是表中每一行數據都是新的一行并且和之前寫入的數據沒有重疊。比如從App中攝取日志到表中,每一行日志都是新的一行,和之前寫入的日志沒有關系,因此新的寫入不需要任何之前寫入的上下文來決定新數據應該寫入到哪里。
更新/變更日志數據
更新/變更日志處理是另外一個挑戰,寫入表的數據可能依賴之前寫入的數據。更具體點就是表中每一行數據不是新行并且可能和之前寫入的行會重疊,在這種場景下,系統需要決定哪一行需要被更新,因此需要找到需要更新哪個fileId。
Hudi提供了3種供用戶使用的方案
數據組織結構為分區結構,每個分區包含N個文件,客戶端維護recordKey<->fileId的映射用于表的更新,在將記錄傳遞至Hudi處理之前需要提供分區信息。HoodieBloomIndex實現會掃描分區下所有文件中的BloomIndex,如果匹配,則繼續在文件中確認,這個過程稱為tag,即將記錄定位到具體的fileId。
數據組織結構為扁平結構,即單個目錄包含了表中所有文件。GlobalHoodieBloomIndex實現會掃描所有文件中的BloomIndex,如果匹配,則繼續在文件中確認,這個過程同上,但與第一個不同點在于如果文件數據非常大,那么進行tag的時間會非常耗時。
針對append-only的數據集,即不需要更新,只需要使用payload中的分區,如當前的timestamp。
無論是何種類型數據集(append或者append + update),tag過程對寫和讀的性能影響都非常大。如果我們能夠提供記錄(record)級別的索引(recordKey -> FileId, partition)而不增加太多延遲的話,這將會讓Hudi性能更快。
因此這個RFC旨在提供記錄(record)級別的索引來加快Hudi的查找過程。
注意:為方便解釋說明,下面我們考慮非分區數據集,因此映射中的鍵為recordKey,值為(PartitionPath, FileId)。
3. 實現方案
3.1 基于Hash的索引
索引條目被hash至不同的bucket(桶)中,每個桶中存放recordKey -> (PartitionPath, FileId)的映射,桶總數量需提前定義好,并且不能更新,但每個桶可加載不止一個FileGroup,后面會詳細介紹FileGroup。1000個桶,每個桶100W個條目,那么可索引10億個條目,所以只有當獨立條目大于10億個時,才需要在一個桶中放多個FileGroup。
每個桶對外暴露兩個API,getRecordLocation(JavaRDD<RecordKeys>) 和insertRecords(JavaPairRDD<RecordKey, Tuple2<PatitionPath, FileId>>)
3.2 存儲
使用HFile(link1, link2) 進行存儲,因為HFile有非常好的隨機讀取性能,這里有關于HFile的基準測試,簡要概括如下,如果HFile包含100W個條目,查詢10W個目標在95%情況下只需要~600ms,如果在實際中可以達到這個性能,那么將會進一步提升Hudi性能。
3.3 索引寫路徑
對于寫路徑,一旦確定所有寫入記錄的HoodieRecordLocation,那么這些記錄就被映射為(RecordKey, <PartitionPath, FileId>)。基于RecordKey進行hash,并映射到桶。桶和RecordKey的映射一旦確定后就不會變化。每個Bucket包含N個HFile,另外,所有寫入單個HFile的記錄需要進行排序,每批新寫入會在對應桶中創建新的HFile,因此每個桶會包含N個HFile。同時為限制HFile的數目,也會對HFile做Compaction。因為Hudi中Record對應的FileId永遠不變,因此索引的值也不會再變化,這個特性也會在讀路徑起到作用。
并行度:寫入時并行度最好等于分區總數,每個批次在一個桶中最多創建一個HFile。
需要注意的是數據寫入和索引寫入過程是綁定的,需要在一個ACID內完成,即要么一起提交,要么一起回滾。
3.3.1 更新
現在Hudi中記錄的位置信息是不可變的,但是不能確保之后一直是不可變的,因此索引應該能處理映射的更新,在這種情況下,多個值將會被返回(例如,如果HFile1為Record1返回FileId1,HFile3為Record1返回FileId5,我們會選取HFile5的值,因此Record1的位置就是FileId5)。對于提交時間戳,我們要么依賴文件名要么依賴提交元數據,而不是值里包含的時間,因為這樣會讓索引的大小爆炸。
3.4 索引讀路徑
對于讀和更新路徑,在讀或寫之前需要知道每條記錄的位置,所以getRecordLocations(JavaRDD<RecordKeys>)方法將會被調用,這些記錄將會被hash到對應的桶,對應的桶將會在HFile中查找記錄。
并行度:如前所述,因為暫時不存在對索引的更新,單條記錄在一個Bucket中只能存在于一個HFile,所以所有的HFile可并行查找,例如如果我們有100個桶,每個桶有10個HFile,那么可以設置并行度為1000。
3.5 索引刪除
可以使用特殊值,如添加一個對應null值的條目,所以在索引查找時,可以繼續使用相同的并發度,但是如果返回多個值時選擇最新的值,例如HFile1為Record1返回FileId1,HFile3為Record1返回null,那么會選取HFile3的值并且知道Record1已經被刪除了。對于提交時間戳,我們要么依賴文件名要么依賴提交元數據,而不是值里包含的時間,因為這樣會讓索引的大小爆炸。
支持刪除會讓Compaction變得相對復雜,由于刪除操作存在,在Compaction寫入新文件時 ,可能需要讀取所有待進行Compaction的HFile的所有內容,以便找到最新的值,這可能不會帶來太多的開銷。另外,Compaction也會忽略被刪除的條目以便節省空間。所以可能無法判定一條記錄是否從來都未被插入,或者在插入后被刪除。
注意:對于刪除的條目,還需要支持重新插入。上面介紹的工作流即可支持而無需任何修改。
3.6 Hashing
作為默認實現,我們可以使用Java原生的Hash算法對RecordKey進行Hash,但是可支持開發者自定義Hash算法。
3.7 HFile scan vs seek
通過benchmark可知,對于包含100W個條目的HFile,隨機seek在30W ~ 40W的查找時表現較好,否則全文件scan(讀取整個HFile到內存進行查找)表現更好。所以在查找時可以利用這個實驗結果。我們可以存儲每個HFile的所有條目,在查找時,如果查找 < 30%條目,可以使用隨機seek,否則進行全表掃描。我們可以引入兩個配置,record.level.index.dynamically.choose.scan.vs.seek 和 record.level.index.do.random.lookups,如果第一個配置設置為true,那么會動態選擇scan和seek,如果設置為false,對于流式應用,會考慮第二個配置。
3.8 未來擴展
通常,一個好的做法是留出30%的Buffer,以避免超出初始存儲桶數。因為在嘗試擴展到超出初始化的存儲桶的初始數量時,會有一些權衡或開銷。第一個實現版本可能不會考慮這個問題,希望由用戶自行處理。
3.8.1 選項1-桶中添加多個FileGroup
在一個Bucket中創建多個FileGroup,一個FileGroup代表多個HFile,多個HFile構成一個Group,這些HFile可以被壓縮成一個基礎HFile,所以一個FileGroup擁有一個基礎HFile文件。
若預先分配1000個桶,每個桶100W個條目。對于壓縮而言,一個FileGroup中的所有HFile將會被壓縮成一個HFile,所以如果不擴展到其他FileGroup,那么同一時間一個HFile文件中可能包含200W個條目,這會導致性能下降,所以當達到100W大小時,應該新建一個FileGroup,這意味著一個桶的權重等于兩個虛擬桶,所以hash和桶個數保持相同,但是索引能夠擴展多個條目。但新的FileGroup被創建時,老的FileGroup將會被密封(sealed),即不再寫入新的條目。新的寫入將寫入新的FileGroup,讀取也不會變化,可以并發查找所有HFile文件。
3.8.2 選項2-多個hash查找和桶組
第一個hash可索引到1 ~ 1000的桶(稱為一個桶組),一旦達到桶組的80%時,需要選取一個新的hash,新的hash可索引到1001 ~ 2000,所以在索引查找時,所有記錄會進行兩次查詢,如果查找存在,那么每個桶組只會返回一個值,新的寫入將進入桶1001 ~ 2000。
4. 實現說明
如上面章節所述,我們需要對給定桶中的所有HFile進行Compaction(壓縮)。為了復用現在代碼中的Compaction邏輯,我們引入了 Inline FileSystem ,可以在給定文件中Inline(內聯)任何類型(Parquet、HFIle等),有了Inline FileSystem,我們可以在任何通用文件中內聯HFile。會為每個內聯的HFile生成一個URL路徑,這個URL路徑可被HFile Reader作為單獨的HFile讀取里面的內容,下面展示文件中內聯HFile的結構。
對于云上對象存儲,如OSS、S3(不支持append),那么一個數據文件中只會內聯一個HFile。
考慮索引方案中的每個桶都是Hudi分區中的一個文件組(包含實際數據)。MOR數據集中的典型分區可能有一個基礎文件和N個小增量文件,假設在這個索引中每個桶都有一個相似的結構。每個桶應該有一個基本文件和N個較小的delta文件,每個文件都有一個內聯HFile。每一批新的攝取要么將新的HFile作為新的數據塊附加到現有的delta文件中,要么創建一個新的增量文件并將新的HFile作為第一個數據塊寫入。每隔一段時間,壓縮將提取基礎HFile和所有delta HFile文件,以創建一個新的基本文件(內聯HFile)作為壓縮版本。
下面是一個例子,說明在壓縮前和壓縮后,索引在單個桶中的結構
在對象存儲上的結構如下
上述結構會帶來很多好處。由于異步壓縮已經進行過非常多的測試,只需做一些小的變更就可以重用Compaction。在本例中,它不是數據文件,而是內聯的HFile文件。使用這種布局,回滾和提交也很容易處理。上面結構得到與Hudi分區相同的文件系統視圖(基礎HFile和增量HFile)。基于上面結構也很容易讀取所有在給定提交時間后的索引,在兩個時間間隔內提交的索引等。
5. 總結
記錄級別全局索引將極大提升Hudi的寫入性能,有望在0.6.0版本釋出。
總結
以上是生活随笔為你收集整理的Apache Hudi重磅特性解读之全局索引的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 干货 | AI人脸识别之人脸搜索
- 下一篇: CloudCompare点云体积计算方法
