Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
目錄
類的關系圖
?
RatioBasedCompactionPolicy
selectCompaction?方法
getCurrentEligibleFiles方法
skipLargeFiles方法
createCompactionRequest方法
filterBulk方法
applyCompactionPolicy方法
removeExcessFiles方法
setIsMajor方法
?
其他相關文章
Hbase Compaction 源碼分析 - CompactionChecker
Hbase Compaction 源碼分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源碼分析 - CompactSplitThread 線程池選擇
之前介紹 CompactionChecker 執行時機,這回接著介紹具體的策略
類的關系圖
RatioBasedCompactionPolicy
該類在
org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy?
?
selectCompaction?方法
調用過程如下
我們看一下RatioBasedCompactionPolicy 的 selectCompaction?實現方法,實際是在父類??SortedCompactionPolicy 中
//candidateFiles 候選文件,并且按照seqId從最早到最新的排序//filesCompacting 正在Compcation的文件//mayUseOffPeak 是否為高峰期//forceMajor 是否為MajorCompaction,該值在 CompactionChecker 中會設置為true//返回 符合Compaction的候選列表public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,final List<StoreFile> filesCompacting, final boolean isUserCompaction,final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {// Preliminary compaction subject to filtersArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);// Stuck and not compacting enough (estimate). It is not guaranteed that we will be// able to compact more if stuck and compacting, because ratio policy excludes some// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).int futureFiles = filesCompacting.isEmpty() ? 0 : 1;//如果候選文件大于 文件阻塞個數(hbase.hstore.blockingStoreFiles 值,默認為7), // blockingStoreFiles: 如在任意 HStore 中有超過此數量的 HStoreFiles,則會阻止對此 HRegion 的更新,直到完成壓縮或直到超過為 'hbase.hstore.blockingWaitTime' 指定的值。boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)>= storeConfigInfo.getBlockingFileCount();//刪除正在合并的文件candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +filesCompacting.size() + " compacting, " + candidateSelection.size() +" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");// If we can't have all files, we cannot do major anyway//判斷是否包含全部文件,如果沒有正在合并的文件則為trueboolean isAllFiles = candidateFiles.size() == candidateSelection.size();//如果是全部文件,并且是MajorCompaction,則不進行文件過濾,否則進行文件過濾,過濾掉大于hbase.hstore.compaction.max.size值的文件if (!(forceMajor && isAllFiles)) {//排除大于hbase.hstore.compaction.max.size值的數據,默認Long.MAX_VALUE//hbase.hstore.compaction.max.size 表示文件大小大于該值的store file 一定會被minor compaction排除candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);isAllFiles = candidateFiles.size() == candidateSelection.size();}// Try a major compaction if this is a user-requested major compaction,// or if we do not have too many files to compact and this was requested as a major compaction//isTryingMajor判斷條件有兩種// 1、Major合并為True,且包含所有問文件,且是一個用戶合并// 2、Major合并為True,且包含所有問文件,或者本身就是一個Major合并,同時,必須是candidateSelection的數目小于配置的達到合并條件的最大文件數目boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));// Or, if there are any references among the candidates.//判斷是否包含分裂后的文件boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);//如果不是isTryingMajor且不包含分裂后的文件,則 createCompactionRequest 方法中進行進一步文件過濾CompactionRequest result = createCompactionRequest(candidateSelection,isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());//過濾掉多余最大合并的文件數量removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);result.updateFiles(filesToCompact);isAllFiles = (candidateFiles.size() == filesToCompact.size());result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);return result;}該方法主要流程:
?
1.傳入參數與返回類型
candidateFiles 壓縮候選文件,并且按照seqId從最早到最新的排序
filesCompacting 正在壓縮的文件
mayUseOffPeak 是否為高峰期
forceMajor 是否為MajorCompaction,該值在 CompactionChecker 中會設置為true
isUserCompaction 是否為用戶壓縮
返回 CompactionRequest ,符合Compaction的候選列表
2.判斷是否阻塞,等待合并的文件數量大于blockingStoreFiles,認為是阻塞
如果候選文件大于 文件阻塞個數(hbase.hstore.blockingStoreFiles 值,默認為7),
hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超過此數量的 HStoreFiles,則會阻止對此 HRegion 的更新,直到完成壓縮或直到超過為 'hbase.hstore.blockingWaitTime' 指定的值。
3.從候選列表中candidateSelection刪除正在Compaction的文件
4.判斷是否包含全部文件,如果沒有正在合并的文件isAllFiles則為true
5.如果是全部文件,并且是MajorCompaction,則不進行文件過濾,否則進行文件過濾;
文件過濾方法:skipLargeFiles,過濾掉大于hbase.hstore.compaction.max.size值的文件,該方法后面介紹
6.判斷isTryingMajor(判斷后續是否為Major使用),判斷條件有兩種,滿足一個即為true:
a.Major(forceMajor)合并為true,且包含所有文件,且是一個用戶合并
b.Major(forceMajor)合并為true,且包含所有問文件,或者本身就是一個Major合并,同時,必須是candidateSelection的數目小于配置的達到合并條件的最大文件數目
7.判斷candidateSelection是否包含分裂后的文件
8.如果不是isTryingMajor且不包含分裂后的文件,則執行 createCompactionRequest 方法中進行進一步文件過濾,createCompactionRequest方法后面介紹
9.執行removeExcessFiles方法,如果大于最大合并的文件數量,則過濾掉多余的數量,否則不處理;執行removeExcessFiles方法下一步介紹
10.在返回的result中設置本次Compcation的類型(Major或者Minor),調用方法 setIsMajor,下面介紹
getCurrentEligibleFiles方法
protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,final List<StoreFile> filesCompacting) {// candidates = all storefiles not already in compaction queueif (!filesCompacting.isEmpty()) {// exclude all files older than the newest file we're currently// compacting. this allows us to preserve contiguity (HBASE-2856)StoreFile last = filesCompacting.get(filesCompacting.size() - 1);int idx = candidateFiles.indexOf(last);Preconditions.checkArgument(idx != -1);candidateFiles.subList(0, idx + 1).clear();}return candidateFiles;}該方法主要流程:
1.從候選文件列表中刪除正在Compaction的文件
skipLargeFiles方法
/*** @param candidates pre-filtrate* @return filtered subset exclude all files above maxCompactSize* Also save all references. We MUST compact them*/protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,boolean mayUseOffpeak) {int pos = 0;//候選文件大于0 且文件不是分裂后的文件 且文件大小大于配置最大文件大小maxCompactSize時,該文件會被剔除while (pos < candidates.size() && !candidates.get(pos).isReference()&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {++pos;}if (pos > 0) {LOG.debug("Some files are too large. Excluding " + pos+ " files from compaction candidates");candidates.subList(0, pos).clear();}return candidates;}該方法主要流程:
1.判斷候選文件大于0 且文件不是分裂后的文件(如果是split后的文件,是需要進行Compaction,不會剔除) 且文件大小大于配置最大文件大小maxCompactSize時,執行++pos
2.pos大于0,清除大的數據
createCompactionRequest方法
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (!tryingMajor) {//進入這里則為minorCompaction//過濾掉BulkLoad到HBase的文件candidateSelection = filterBulk(candidateSelection);//過濾掉不應該Minor的文件candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);candidateSelection = checkMinFilesCriteria(candidateSelection,comConf.getMinFilesToCompact());}return new CompactionRequest(candidateSelection);}該方法主要流程:
如果不是isTryingMajor且不包含分裂后的文件,則為MinorCompaction 進行進一步文件過濾,否則直接返回
filterBulk方法
/*** @param candidates pre-filtrate* @return filtered subset exclude all bulk load files if configured*/protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {@Overridepublic boolean apply(StoreFile input) {//判斷該文件是否需要執行MinorCompactionreturn input.excludeFromMinorCompaction();}}));return candidates;}該方法主要作用:
判斷StoreFIle是否設置excludeFromMinorCompaction,也就是過濾掉BulkLoad到HBase的文件
applyCompactionPolicy方法
protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (candidates.isEmpty()) {return candidates;} //前提:,選擇待合并的文件按時間排序,最舊的文件排最前。// we're doing a minor compaction, let's see what files are applicableint start = 0;//hbase.hstore.compaction.ratio 1.2double ratio = comConf.getCompactionRatio();//判斷是否為高峰期,高峰期 ratio 值為5,非高峰期為1.2if (mayUseOffPeak) {//獲取hbase.hstore.compaction.ratio.offpeak值,默認是5ratio = comConf.getCompactionRatioOffPeak();LOG.info("Running an off-peak compaction, selection ratio = " + ratio);}// get store file sizes for incremental compacting selection.//https://blog.csdn.net/bryce123phy/article/details/56003628//獲取待Compaction文件數量final int countOfFiles = candidates.size();long[] fileSizes = new long[countOfFiles];//每個file大小long[] sumSize = new long[countOfFiles];//前幾個file大小總和for (int i = countOfFiles - 1; i >= 0; --i) {StoreFile file = candidates.get(i);fileSizes[i] = file.getReader().length();// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo//getMaxFilesToCompact 獲取最大文件壓縮數,默認為10int tooFar = i + comConf.getMaxFilesToCompact() - 1;sumSize[i] = fileSizes[i]+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);}//getMinFilesToCompact : hbase.hstore.compactionThreshold // 如在任意一個 HStore 中有超過此數量的 HStoreFiles, // 則將運行壓縮以將所有 HStoreFiles 文件作為一個 HStoreFile 重新寫入。 // (每次 memstore 刷新寫入一個 HStoreFile)您可通過指定更大數量延長壓縮, // 但壓縮將運行更長時間。在壓縮期間,更新無法刷新到磁盤。長時間壓縮需要足夠的內存, // 以在壓縮的持續時間內記錄所有更新。如太大,壓縮期間客戶端會超時。//getMinCompactSize 最小合并大小//也就是說,當待合并文件數量大于最小合并數量 并且// 文件大小大于Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio)值// 該端代碼意思是過濾比較大的文件,默認認為最早的StoreFile文件大小最大(之前合并過) //這里高峰期滿足條件的數量小于等于非高峰期數量while (countOfFiles - start >= comConf.getMinFilesToCompact() &&fileSizes[start] > Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio))) {++start;}if (start < countOfFiles) {//從 countOfFiles 個候選文件中選取 start 個文件進行CompactionLOG.info("Default compaction algorithm has selected " + (countOfFiles - start)+ " files from " + countOfFiles + " candidates");} else if (mayBeStuck) {//mayBeStuck判斷規則://如果候選文件大于 文件阻塞個數(hbase.hstore.blockingStoreFiles 值,默認為7),//hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超過此數量的 HStoreFiles,則會阻止對此 HRegion 的更新,直到完成壓縮或直到超過為 'hbase.hstore.blockingWaitTime' 指定的值。// We may be stuck. Compact the latest files if we can.int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();if (filesToLeave >= 0) {start = filesToLeave;}}candidates.subList(0, start).clear();return candidates;}該方法主要流程
1.判斷是否為高峰期,并確認ratio的值
2.計算文件大小
3.增加start變量,過濾掉文件較大的文件
4.判斷
??? a.判斷如果不是所有文件都被過濾,則從候選列表清空比較大的文件,我們認為越老的文件(在變量的最前面,所以可以通過++start方式可以過濾大文件),文件占用空間越大
??? b.判斷如果所有文件都被過濾,繼續判斷是否阻塞(如果候選文件大于 文件阻塞個數(hbase.hstore.blockingStoreFiles 值,默認為7)則視為阻塞),如果阻塞則將start調整,保證Compaction壓縮?
removeExcessFiles方法
protected void removeExcessFiles(ArrayList<StoreFile> candidates,boolean isUserCompaction, boolean isMajorCompaction) {//如果待合并的文件大于配置的最大合并文件數量int excess = candidates.size() - comConf.getMaxFilesToCompact();if (excess > 0) {//如果isMajorCompaction為true并且是用戶合并則不過濾if (isMajorCompaction && isUserCompaction) {LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()+ " files because of a user-requested major compaction");} else {//過濾掉多余最大合并文件數量的文件LOG.debug("Too many admissible files. Excluding " + excess+ " files from compaction candidates");candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();}}}setIsMajor方法
public void setIsMajor(boolean isMajor, boolean isAllFiles) {//如果不是全部文件,并且是major壓縮,拋異常,也就是說如果有正在Compaction的文件,就不能執行MajorCompactionassert isAllFiles || !isMajor;//不是全部文件:則為Minor壓縮//是全部文件:并且是isTryingMajor為true,則為MAJOR,否則則為ALL_FILESthis.isMajor = !isAllFiles ? DisplayCompactionType.MINOR: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);}該方法主要流程:
1.如果不是全部文件,并且是major壓縮,拋異常;也就是說如果有正在Compaction的文件,就不能執行MajorCompaction
2. 不是全部文件:則為Minor壓縮
??? 是全部文件:并且是isTryingMajor為true,則為MAJOR,否則則為ALL_FILES
DisplayCompactionType 枚舉有三個值 MINOR, ALL_FILES, MAJOR
但是判斷是否為Major的條件只有
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); public boolean isMajor() { return this.isMajor == DisplayCompactionType.MAJOR; }我們從代碼中可以看出當類型為ALL_FILES或者MINOR都是MinorCompcation
涉及到的配置參數
| hbase.hregion.majorcompaction | 在一個區域中所有 HStoreFiles Major 壓縮之間的時間(以毫秒為單位)。要禁用自動的Major壓縮,請將此值設置為 0。 | 7天 |
| hbase.hregion.majorcompaction.jitter | 抖動外邊界以進行最大化壓縮。在每個 RegionServer 上,hbase.region.majorcompaction 間隔與此最大邊界內的隨機分數相乘。在即將運行下一個最大化壓縮時加入該 + 或 - 乘積。最大化壓縮不應同時發生在各 RegionServer 上。該數越小,壓縮越緊密。 所以 major compact的時間間隔 = [7-7*0.5,7+7.0.5] | 0.5 |
| hbase.server.thread.wakefrequency | 搜索工作時暫停的時間段(以毫秒為單位)。服務線程如 META 掃描儀、日志滾輪、Major Compcation 線程使用的睡眠間隔。 | 10秒 |
| hbase.server.compactchecker.interval.multiplier | hbase后臺線程檢查因子,hbase.server.compactchecker.interval.multiplier* hbase.server.thread.wakefrequency 就是Compaction Major 檢查的周期,比如1000*10秒≈2.77小時 | 1000 |
| hbase.hstore.compaction.ratio | 這個ratio參數的作用是判斷文件大小 > hbase.hstore.compaction.min.size的StoreFile是否也是適合進行minor compaction的,默認值1.2。更大的值將壓縮產生更大的StoreFile,建議取值范圍在1.0~1.4之間。大多數場景下也不建議調整該參數。 | 1.2 |
| hbase.hstore.compaction.ratio.offpeak | 此參數與compaction ratio參數含義相同,是在原有文件選擇策略基礎上增加了一個非高峰期的ratio控制,默認值5.0。這個參數受另外兩個參數 hbase.offpeak.start.hour 與 hbase.offpeak.end.hour 控制,這兩個參數值為[0, 23]的整數,用于定義非高峰期時間段,默認值均為-1表示禁用非高峰期ratio設置。 | 5 |
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka 详细配置参数说明
- 下一篇: Docker 入门使用 (二)