HBase源码分析之HRegion上compact流程分析(三)
? ? ? ? 在《HBase源碼分析之HRegion上compact流程分析(二)》一文中,我們沒有講解真正執(zhí)行合并的CompactionContext的compact()方法。現(xiàn)在我們來分析下它的具體實現(xiàn)。
? ? ? ? 首先,CompactionContext表示合并的上下文信息,它只是一個抽象類,其compact()并沒有實現(xiàn),代碼如下:
/*** Runs the compaction based on current selection. select/forceSelect must have been called.* @return The new file paths resulting from compaction.*/public abstract List<Path> compact() throws IOException;? ? ? ? 那么,我們來找下它的實現(xiàn)類。它一共有兩種實現(xiàn)類:DefaultCompactionContext和StripeCompaction,今天我們以DefaultCompactionContext為例來講解。? ? ? ? 首先看下DefaultCompactionContext中compact()方法的實現(xiàn):
@Overridepublic List<Path> compact() throws IOException {return compactor.compact(request);}? ? ? ? 這個compactor可以根據(jù)參數(shù)hbase.hstore.defaultengine.compactor.class配置,但是默認實現(xiàn)為DefaultCompactor。那么,接下來,我們看下它的實現(xiàn): /*** Do a minor/major compaction on an explicit set of storefiles from a Store.* 在一個Store中明確的storefiles集合中執(zhí)行一個minor或者major合并*/public List<Path> compact(final CompactionRequest request) throws IOException {// 從請求中獲取文件詳情fd,fd是FileDetails類型FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());// 構(gòu)造合并過程追蹤器CompactionProgressthis.progress = new CompactionProgress(fd.maxKeyCount);// Find the smallest read point across all the Scanners.// 找到scanners中的最小的可讀點,實際上就是找到最小能夠讀取數(shù)據(jù)的點long smallestReadPoint = getSmallestReadPoint();List<StoreFileScanner> scanners;Collection<StoreFile> readersToClose;// 根據(jù)參數(shù)hbase.regionserver.compaction.private.readers確定是否使用私有readersif (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,// HFileFiles, and their readers// 克隆所有的StoreFiles,以便我們將在StoreFiles、HFileFiles以及它們的readers等一個獨立的副本上執(zhí)行合并// 根據(jù)請求中待合并文件的數(shù)目創(chuàng)建一個StoreFile列表:readersToClosereadersToClose = new ArrayList<StoreFile>(request.getFiles().size());// 將待合并文件復制一份加入readersToClose列表for (StoreFile f : request.getFiles()) {readersToClose.add(new StoreFile(f));}// 根據(jù)readersToClose列表,即待合并文件的副本創(chuàng)建文件瀏覽器FileScannersscanners = createFileScanners(readersToClose, smallestReadPoint);} else {// 創(chuàng)建空的列表readersToClosereadersToClose = Collections.emptyList();// 根據(jù)實際請求中的待合并文件列表創(chuàng)建文件瀏覽器FileScannersscanners = createFileScanners(request.getFiles(), smallestReadPoint);}StoreFile.Writer writer = null;List<Path> newFiles = new ArrayList<Path>();boolean cleanSeqId = false;IOException e = null;try {InternalScanner scanner = null;try {/* Include deletes, unless we are doing a compaction of all files */// 確定scan類型scanType:// 如果compact請求是MAJOR或ALL_FILES合并,則scanType為COMPACT_DROP_DELETES;// 如果compact請求是MINOR合并,則scanType為COMPACT_RETAIN_DELETES。ScanType scanType =request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;// 如果有協(xié)處理器,調(diào)用協(xié)處理器的preCreateCoprocScanner()方法scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);if (scanner == null) {// 如果協(xié)處理器中未創(chuàng)建scanner,調(diào)用createScanner()方法創(chuàng)建一個scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);}// 如果有協(xié)處理器,調(diào)用協(xié)處理器的preCompact()方法scanner = postCreateCoprocScanner(request, scanType, scanner);if (scanner == null) {// NULL scanner returned from coprocessor hooks means skip normal processing.return newFiles;}// Create the writer even if no kv(Empty store file is also ok),// because we need record the max seq id for the store file, see HBASE-6059// 確定最小讀取點smallestReadPointif(fd.minSeqIdToKeep > 0) {smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);cleanSeqId = true;}// When all MVCC readpoints are 0, don't write them.// See HBASE-8166, HBASE-12600, and HBASE-13389.// 調(diào)用HStore的createWriterInTmp()方法,獲取writerwriter = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);// 調(diào)用performCompaction()方法,執(zhí)行合并boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);// 如果沒有完成合并if (!finished) {// 關(guān)閉writerwriter.close();// 刪除writer中的臨時文件store.getFileSystem().delete(writer.getPath(), false);writer = null;// 拋出異常throw new InterruptedIOException( "Aborting compaction of store " + store +" in region " + store.getRegionInfo().getRegionNameAsString() +" because it was interrupted.");}} finally {// 關(guān)閉scannerif (scanner != null) {scanner.close();}}} catch (IOException ioe) {e = ioe;// Throw the exceptionthrow ioe;}finally {try {if (writer != null) {if (e != null) {// 無異常的話,關(guān)閉writerwriter.close();} else {// 存在異常的話,寫入元數(shù)據(jù),關(guān)閉writer,并將寫入地址加入newFileswriter.appendMetadata(fd.maxSeqId, request.isAllFiles());writer.close();newFiles.add(writer.getPath());}}} finally {// 依次關(guān)閉readersToClose中StoreFile的Readerfor (StoreFile f : readersToClose) {try {f.closeReader(true);} catch (IOException ioe) {LOG.warn("Exception closing " + f, ioe);}}}}// 返回newFilesreturn newFiles;}? ? ? ? 總結(jié)下DefaultCompactor的compact()方法的處理流程,大體有如下幾點:? ? ? ? 1、通過父類Compactor的getFileDetails()方法從請求中獲取文件詳情fd,fd是FileDetails類型,這個FileDetails類型的文件詳情中主要包含如下信息:
? ? ? ? ? ? ? (1)合并之后總的keyvalue數(shù)目:maxKeyCount;
? ? ? ? ? ? ? (2)如果是major合并,最早的Put時間戳earliestPutTs;
? ? ? ? ? ? ? (3)合并時文件中最大的序列號maxSeqId;
? ? ? ? ? ? ? (4)相關(guān)文件中最新的MemStore數(shù)據(jù)讀取點maxMVCCReadpoint;
? ? ? ? ? ? ? (5)最大的tag長度maxTagsLength;
? ? ? ? ? ? ? (6)在major合并期間需要保持的最小序列號minSeqIdToKeep。
? ? ? ? 2、構(gòu)造合并過程追蹤器CompactionProgress,用于追蹤合并過程;
? ? ? ? 3、通過父類Compactor的getSmallestReadPoint()方法找到所有scanners中的最小的可讀點,實際上就是找到最小能夠讀取數(shù)據(jù)的點smallestReadPoint;
? ? ? ? 4、根據(jù)參數(shù)hbase.regionserver.compaction.private.readers確定是否使用私有readers,默認為false不使用:
? ? ? ? ? ? ? 4.1、如果需要使用,即參數(shù)配置為true的話,克隆所有的StoreFiles,以便我們將在StoreFiles、HFileFiles以及它們的readers等一個獨立的副本上執(zhí)行合并;
? ? ? ? ? ? ? ? ? ? ? ?4.1.1、根據(jù)請求中待合并文件的數(shù)目創(chuàng)建一個StoreFile列表:readersToClose;
? ? ? ? ? ? ? ? ? ? ? ?4.1.2、將請求中待合并文件逐一復制加入readersToClose列表;
? ? ? ? ? ? ? ? ? ? ? ?4.1.3、根據(jù)readersToClose列表,即待合并文件的副本創(chuàng)建文件瀏覽器FileScanners;
? ? ? ? ? ? ? 4.2、如果不需要使用,即參數(shù)配置為false的話,使用請求中實際發(fā)送的文件列表;
? ? ? ? ? ? ? ? ? ? ? ?4.2.1、創(chuàng)建空的列表readersToClose;
? ? ? ? ? ? ? ? ? ? ? ?4.2.2、根據(jù)實際請求中的待合并文件列表創(chuàng)建文件瀏覽器FileScanners;
? ? ? ? ?5、根據(jù)compact請求類型確定scan類型scanType:
? ? ? ? ? ? ? ?如果compact請求是MAJOR或ALL_FILES合并,則scanType為COMPACT_DROP_DELETES;
? ? ? ? ? ? ? ?如果compact請求是MINOR合并,則scanType為COMPACT_RETAIN_DELETES。
? ? ? ? ?6、如果有協(xié)處理器,調(diào)用協(xié)處理器的preCreateCoprocScanner()方法,獲得scanner,如果協(xié)處理器中未創(chuàng)建scanner,調(diào)用createScanner()方法創(chuàng)建一個;
? ? ? ? ?7、如果有協(xié)處理器,調(diào)用協(xié)處理器的preCompact()方法;
? ? ? ? ?8、根據(jù)之前獲取的smallestReadPoint和文件詳情fd中的minSeqIdToKeep確定最小讀取點smallestReadPoint,并置狀態(tài)位cleanSeqId;
? ? ? ? ?9、調(diào)用HStore的createWriterInTmp()方法,獲取writer;
? ? ? ? 10、調(diào)用父類Compactor的performCompaction()方法,利用scanner、writer、smallestReadPoint、cleanSeqId執(zhí)行合并:
? ? ? ? ? ? ? ? 實際上就是利用scanner讀取舊文件數(shù)據(jù),利用writer寫入新文件數(shù)據(jù)。
? ? ? ? 11、如果沒有完成合并:關(guān)閉writer、刪除writer中的臨時文件并拋出異常;
? ? ? ? 12、關(guān)閉scanner;
? ? ? ? 13、無異常的話,關(guān)閉writer;存在異常的話,寫入元數(shù)據(jù),關(guān)閉writer,并將寫入地址加入newFiles;
? ? ? ? 14、依次關(guān)閉readersToClose中StoreFile的Reader;
? ? ? ? 15、返回newFiles。
? ? ? ? 大體流程就是如此。針對其中的某些細節(jié),我們逐一進行分析。
? ? ? ? 首先說下這個文件詳情FileDetails,它是通過getFileDetails()方法獲取的。文件詳情FileDetails類定義如下:
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */protected static class FileDetails {/** Maximum key count after compaction (for blooms) */// 合并之后總的keyvalue數(shù)目public long maxKeyCount = 0;/** Earliest put timestamp if major compaction */// 如果是major合并,最早的Put時間戳earliestPutTspublic long earliestPutTs = HConstants.LATEST_TIMESTAMP;/** The last key in the files we're compacting. */// 合并時文件中最大的序列號public long maxSeqId = 0;/** Latest memstore read point found in any of the involved files */// 相關(guān)文件中最新的MemStore數(shù)據(jù)讀取點maxMVCCReadpointpublic long maxMVCCReadpoint = 0;/** Max tags length**/// 最大的tag長度maxTagsLengthpublic int maxTagsLength = 0;/** Min SeqId to keep during a major compaction **/// 在major合并期間需要保持的最小序列號minSeqIdToKeeppublic long minSeqIdToKeep = 0;}? ? ? ? 而它的獲取方法如下:
/*** Extracts some details about the files to compact that are commonly needed by compactors.* 提取文件合并的一些細節(jié)* @param filesToCompact Files.* @param allFiles Whether all files are included for compaction* @return The result.*/protected FileDetails getFileDetails(Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {// 構(gòu)造一個FileDetails實例fdFileDetails fd = new FileDetails();// 計算保持MVCC的最新HFile時間戳:當前時間-24小時 * keepSeqIdPeriod// keepSeqIdPeriod為一個參數(shù),即被指定的在major合并期間MVCC值可以保持多少天long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); // 遍歷需要合并的文件for (StoreFile file : filesToCompact) {// 如果allFiles為true,即所有文件都需要檢測,且文件的修改時間小于上述保持MVCC的最新HFile時間戳if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {// when isAllFiles is true, all files are compacted so we can calculate the smallest // MVCC value to keep// 如果文件細節(jié)中需要保持的最小序列號小于文件MemStore的時間戳if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {// 將文件MemStore的時間戳賦值給fd的需要保持的最小序列號minSeqIdToKeepfd.minSeqIdToKeep = file.getMaxMemstoreTS();}}// 獲取文件的最大序列號IDlong seqNum = file.getMaxSequenceId();// 賦值給文件細節(jié)fd中的maxSeqId,記錄待合并文件的最大序列號IDfd.maxSeqId = Math.max(fd.maxSeqId, seqNum);// 獲取ReaderStoreFile.Reader r = file.getReader();if (r == null) {LOG.warn("Null reader for " + file.getPath());continue;}// NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized// blooms can cause progress to be miscalculated or if the user switches bloom// type (e.g. from ROW to ROWCOL)// 獲取文件中的keyvalue數(shù)量,實際上就是列的數(shù)量,// HBase底層對每個列都是按照keyvalue格式存儲的,key包含rowkey+column family+quality+tm等,value即列值long keyCount = r.getEntries();// 累加keyvalue數(shù)目maxKeyCountfd.maxKeyCount += keyCount;// calculate the latest MVCC readpoint in any of the involved store files// 計算所有相關(guān)存儲文件的最新mvcc讀取點maxMVCCReadpoint// 先加載文件信息fileInfoMap<byte[], byte[]> fileInfo = r.loadFileInfo();byte tmp[] = null;// Get and set the real MVCCReadpoint for bulk loaded files, which is the// SeqId number.// 如果是Bulk導入的,maxMVCCReadpoint為fd的maxMVCCReadpoint和文件SequenceID中較大者if (r.isBulkLoaded()) {fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());}else {// 否則,讀取文件信息中最大的memstore時間戳MAX_MEMSTORE_TS_KEYtmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);if (tmp != null) {// maxMVCCReadpoint就是fd的maxMVCCReadpoint和文件信息中最大的memstore時間戳MAX_MEMSTORE_TS_KEY中較大者fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));}}// 更新最大標簽長度maxTagsLengthtmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);if (tmp != null) {fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));}// If required, calculate the earliest put timestamp of all involved storefiles.// This is used to remove family delete marker during compaction.long earliestPutTs = 0;// 獲取最早的Put時間戳earliestPutTsif (allFiles) {tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);if (tmp == null) {// There's a file with no information, must be an old one// assume we have very old putsfd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;} else {earliestPutTs = Bytes.toLong(tmp);fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);}}if (LOG.isDebugEnabled()) {LOG.debug("Compacting " + file +", keycount=" + keyCount +", bloomtype=" + r.getBloomFilterType().toString() +", size=" + StringUtils.humanReadableInt(r.length()) +", encoding=" + r.getHFileReader().getDataBlockEncoding() +", seqNum=" + seqNum +(allFiles ? ", earliestPutTs=" + earliestPutTs: ""));}}// 返回合并細節(jié)fdreturn fd;}? ? ? ? 接下來再看下找到scanners中的最小的可讀點,實際上就是找到最小能夠讀取數(shù)據(jù)的點,它是通過父類Compactor的getSmallestReadPoint()方法實現(xiàn)的,代碼如下: protected long getSmallestReadPoint() {// 獲取的是HStore中的SmallestReadPointreturn store.getSmallestReadPoint();}? ? ? ? 可以看出,父類的該方法實際上還是通過HStore中的getSmallestReadPoint()方法實現(xiàn)的,如下: @Overridepublic long getSmallestReadPoint() {// 獲取的是Region中的SmallestReadPoint,因為HBase是行級事務(wù),SmallestReadPoint應(yīng)該也是行級的return this.region.getSmallestReadPoint();}? ? ? ? 而HStore實際上最終獲取的是Region中的SmallestReadPoint,這也從側(cè)面反映了那個我們熟知的問題:因為HBase是行級事務(wù),SmallestReadPoint應(yīng)該也是行級的。而具體的SmallestReadPoint該如何獲取,我們在以后的多版本控制協(xié)議MVCC中再細講。? ? ? ? 接下來,我們再看下如何創(chuàng)建文件瀏覽器FileScanners,它是通過父類Compactor的createFileScanners()方法來構(gòu)造的,代碼如下:
/*** Creates file scanners for compaction.* @param filesToCompact Files.* @return Scanners.*/protected List<StoreFileScanner> createFileScanners(final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,smallestReadPoint);}? ? ? ? 它是一個專門為合并創(chuàng)建scanner的方法,這個scanner區(qū)別于客戶端的scanner,我們繼續(xù)看StoreFileScanner的getScannersForStoreFiles()方法,如下: /*** Return an array of scanners corresponding to the given set of store files,* And set the ScanQueryMatcher for each store file scanner for further* optimization*/public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(files.size());// 遍歷StoreFile文件filesfor (StoreFile file : files) {// 獲取每個文件的ReaderStoreFile.Reader r = file.createReader();// 根據(jù)Reader獲取StoreFileScanner類型的scanner,這個scanner專門用于讀取StoreFileStoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,isCompaction, readPt);scanner.setScanQueryMatcher(matcher);// 加入scanner列表scannersscanners.add(scanner);}// 返回scanner列表return scanners;}? ? ? ? 很簡單,不再贅述,讀者可以自己閱讀源碼。? ? ? ? 繼續(xù),我們再看下如果獲取一個內(nèi)部InternalScanner類型的scanner,它是通過createScanner()來獲取的,代碼如下:
/*** 創(chuàng)建一個scanner* * @param store store* @param scanners Store file scanners.* @param scanType Scan type.* @param smallestReadPoint Smallest MVCC read point.* @param earliestPutTs Earliest put across all files.* @return A compaction scanner.*/protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {// 構(gòu)造一個Scan實例scanScan scan = new Scan();// 設(shè)置最大版本號,即列簇被設(shè)置的最大版本號(是不是從這里就能看出,compact時會做數(shù)據(jù)清理工作呢,O(∩_∩)O)scan.setMaxVersions(store.getFamily().getMaxVersions());// 返回一個StoreScanner實例return new StoreScanner(store, store.getScanInfo(), scan, scanners,scanType, smallestReadPoint, earliestPutTs);}? ? ? ? 這里的scanner,實際上是StoreScanner類型的實例,它是針對Store的內(nèi)部Scanner,而且,這里有一個重點,創(chuàng)建scan時會設(shè)置最大版本號,即列簇被設(shè)置的最大版本號,那么我們是不是從這里就能看出,compact時會做數(shù)據(jù)清理工作呢,答案當然是肯定的。所以HBase在數(shù)據(jù)修改時,并不是簡單的刪除,而是增加一個版本,而過期數(shù)據(jù)則會在compact過程中,通過scanner設(shè)置最大版本號的方式來過濾掉,這種處理方式是很高效的,它體現(xiàn)了HBase低延遲的特點。? ? ? ? 有了讀數(shù)據(jù)的scanner,我們接著來看下寫數(shù)據(jù)的writer。畢竟數(shù)據(jù)得有讀有寫,才能將舊文件合并成新文件,而writer是通過HStore的createWriterInTmp()方法來創(chuàng)建的,如下:
/** @param maxKeyCount* @param compression Compression algorithm to use* @param isCompaction whether we are creating a new file in a compaction* @param includesMVCCReadPoint - whether to include MVCC or not* @param includesTag - includesTag or not* @return Writer for a new StoreFile in the tmp dir.*/@Overridepublic StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)throws IOException {final CacheConfig writerCacheConf;// 是否為合并if (isCompaction) {// 如果是合并,不在writerCacheConf上緩存數(shù)據(jù)// Don't cache data on write on compactions.writerCacheConf = new CacheConfig(cacheConf);writerCacheConf.setCacheDataOnWrite(false);} else {writerCacheConf = cacheConf;}InetSocketAddress[] favoredNodes = null;// 獲取有利節(jié)點if (region.getRegionServerServices() != null) {favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());}// 創(chuàng)建HFile上下文HFileContextHFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,cryptoContext);// 創(chuàng)建StoreFile的StoreFile,需要使用上述信息,比如文件系統(tǒng)、文件路徑、合并器、最大keyvalue數(shù)目、有利節(jié)點等StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,this.getFileSystem())// 文件系統(tǒng).withFilePath(fs.createTempName())// 文件路徑.withComparator(comparator)// 合并器.withBloomType(family.getBloomFilterType()).withMaxKeyCount(maxKeyCount)// 最大keyvalue數(shù)目.withFavoredNodes(favoredNodes)// 有利節(jié)點.withFileContext(hFileContext)// HFile上下文信息.build();return w;}? ? ? ? 這個writer本質(zhì)上是StoreFile的Writer,它是針對存儲文件的寫入者,其中包含很多關(guān)鍵信息,比如文件系統(tǒng)、文件路徑、合并器、最大keyvalue數(shù)目、有利節(jié)點、HFile上下文信息等。? ? ? ? 有了scanner,可以讀數(shù)據(jù)了,又有了writer,也可以寫數(shù)據(jù)了,那么我們就可以開始合并了:由舊文件讀取數(shù)據(jù),往新文件寫入數(shù)據(jù)。我們看下Compactor的performCompaction()方法,代碼如下:
/*** Performs the compaction.* 執(zhí)行合并* * @param scanner Where to read from.* @param writer Where to write to.* @param smallestReadPoint Smallest read point.* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint* @return Whether compaction ended; false if it was interrupted for some reason.*/protected boolean performCompaction(InternalScanner scanner,CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {// 已寫字節(jié)數(shù)long bytesWritten = 0;// 處于寫過程的字節(jié)數(shù)long bytesWrittenProgress = 0;// Since scanner.next() can return 'false' but still be delivering data,// we have to use a do/while loop.// Cell列表List<Cell> cells = new ArrayList<Cell>();// 周期性檢測的閾值:合并已被處理的數(shù)據(jù)量大小,取參數(shù)hbase.hstore.close.check.interval,默認為10Mlong closeCheckInterval = HStore.getCloseCheckInterval();long lastMillis = 0;if (LOG.isDebugEnabled()) {lastMillis = EnvironmentEdgeManager.currentTime();}long now = 0;// 進入一個do...while循環(huán),一直循環(huán)的條件是hasMore為true,即scanner中還有數(shù)據(jù)boolean hasMore;do {// scanner中是否還存在數(shù)據(jù),取出到cells中hasMore = scanner.next(cells, compactionKVMax);if (LOG.isDebugEnabled()) {now = EnvironmentEdgeManager.currentTime();}// output to writer:// 遍歷cells,寫入writerfor (Cell c : cells) {if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {CellUtil.setSequenceId(c, 0);}// 寫入writerwriter.append(c);// keyvalue大小int len = KeyValueUtil.length(c);// 計數(shù)器累加:kv累計數(shù)目和累計大小++progress.currentCompactedKVs;progress.totalCompactedSize += len;if (LOG.isDebugEnabled()) {bytesWrittenProgress += len;}// check periodically to see if a system stop is requested// 周期性檢測是否一個系統(tǒng)停止被請求if (closeCheckInterval > 0) {// 累加已寫字節(jié)數(shù)bytesWrittenbytesWritten += len;// 如果已寫字節(jié)數(shù)bytesWritten大于closeCheckIntervalif (bytesWritten > closeCheckInterval) {// 重置已寫字節(jié)數(shù)bytesWritten bytesWritten = 0;// 判斷HStore是否可寫,不可寫的話,說明一個system stop請求已發(fā)起,則通過progress取消合并if (!store.areWritesEnabled()) {progress.cancel();return false;}}}}// Log the progress of long running compactions every minute if// logging at DEBUG levelif (LOG.isDebugEnabled()) {if ((now - lastMillis) >= 60 * 1000) {LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",(bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0)));lastMillis = now;bytesWrittenProgress = 0;}}// 情況cell列表cells.clear();} while (hasMore);// 合并過程progress標記已完成progress.complete();return true;}? ? ? ? 這個合并執(zhí)行的過程還是比較簡單的,它通過一個do...while循環(huán),不斷的從scanner中讀取數(shù)據(jù),放入cell列表,然后遍歷cells,將Cell依次寫入writer,并累加kv數(shù)目和大小,直到scanner中數(shù)據(jù)被處理完。如此,舊文件數(shù)據(jù)不斷的被讀取出來,然后將其不斷的寫入新文件,最好通過合并過程progress標記合并已完成。大致就是這個流程。? ? ? ? 這里有個需要特別說明的地方,在數(shù)據(jù)合并過程中,還需要周期性的檢測是否有外部發(fā)起系統(tǒng)關(guān)系的請求,如果是的話,則需要取消合并。這個周期性不是針對時間的,而是針對一個已合并數(shù)據(jù)量的閾值closeCheckInterval,這個closeCheckInterval取自參數(shù)hbase.hstore.close.check.interval,默認為10M。在合并過程中,被合并數(shù)據(jù)大小bytesWritten不斷的被累加,直到超過閾值closeCheckInterval,清空,并且根據(jù)HStore的可寫狀態(tài)來判斷是否有外部發(fā)起系統(tǒng)停止的請求,如果有的話,通過progress取消合并,否則繼續(xù)進入下一個累加至閾值再進行判斷的周期。
? ? ? ? 接下來,根據(jù)上述合并的結(jié)果finished,來判斷后續(xù)處理步驟:如果沒有完成合并:關(guān)閉writer、刪除writer中的臨時文件并拋出異常。
? ? ? ? 最好,如果存在異常e,寫入元數(shù)據(jù),關(guān)閉writer,并將寫入地址加入newFiles;如果不存在異常e,則關(guān)閉writer,返回合并后的文件列表newFiles。不管結(jié)果如何,最終依次關(guān)閉readersToClose中StoreFile的Reader。
? ? ? ? 至此,整個HRegion中精確到HStore上的compact流程就分析完畢了。限于篇幅的原因,可能部分細節(jié)簡單掠過或者沒有提及,留待以后再慢慢分析吧!
? ? ? ??
總結(jié)
以上是生活随笔為你收集整理的HBase源码分析之HRegion上compact流程分析(三)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: maven The method mus
- 下一篇: cogs 896. 圈奶牛