Rocksdb 通过ingestfile 来支持高效的离线数据导入
文章目錄
- 前言
- 使用方式
- 實(shí)現(xiàn)原理
- 總結(jié)
前言
很多時(shí)候,我們使用數(shù)據(jù)庫(kù)時(shí)會(huì)有離線向數(shù)據(jù)庫(kù)導(dǎo)入數(shù)據(jù)的需求。比如大量用戶在本地的一些離線數(shù)據(jù),想要將這一些數(shù)據(jù)導(dǎo)入到已有的數(shù)據(jù)庫(kù)中;或者說(shuō)NewSQL場(chǎng)景中部分機(jī)器離線,重新上線之后的數(shù)據(jù)增量/全量同步 等場(chǎng)景。這個(gè)時(shí)候 我們并不想要讓這一些數(shù)據(jù)占用過(guò)多的系統(tǒng)資源,更不希望他們對(duì)正常的線上業(yè)務(wù)有影響,所以盡可能高效得完成這一些數(shù)據(jù)的同步就需要深入設(shè)計(jì)一番。
而如果底層引擎使用的是rocksdb,那就非常省事了,只需要組織好你們的數(shù)據(jù)調(diào)用接口就完事了,剩下的導(dǎo)入過(guò)程由引擎完成。 tikv便是通過(guò) rocksdb的這個(gè)功能完成集群異?;謴?fù)之后 region之間的全量增量同步的。回到今天我們要討論的主題,便是rocksdb的這個(gè)數(shù)據(jù)導(dǎo)入過(guò)程是如何盡可能快、盡可能高效得完成的。
使用方式
講解實(shí)現(xiàn)原理之前我們先看看如何使用這個(gè)功能,功能的易用性也很重要,用戶還是希望盡可能得少寫(xiě)代碼來(lái)完成這個(gè)工作。使用上主要是兩部分:創(chuàng)建SST文件 和 導(dǎo)入SST文件。
-
創(chuàng)建sst文件:這一步主要是通過(guò)一個(gè)sst_filter_writer,將需要導(dǎo)入的 k/v 數(shù)據(jù)轉(zhuǎn)換成sst文件
需要注意的是:
- 用戶k/v 數(shù)據(jù)需要按照options.comparator 嚴(yán)格有序,默認(rèn)是按照key的字典序
- 這里的options 建議和db寫(xiě)入的options用一套(壓縮選項(xiàng),sst文件相關(guān)選項(xiàng)等)
Options options;SstFileWriter sst_file_writer(EnvOptions(), options); // 指定形成的sst文件的路徑 std::string file_path = "/home/usr/file1.sst";// open file_path Status s = sst_file_writer.Open(file_path); for (...) {// 寫(xiě)入sst,用戶保證k/v 的順序s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n", key.c_str(),s.ToString().c_str());return 1;} }// 完成寫(xiě)入 s = sst_file_writer.Finish(); -
導(dǎo)入sst文件:這個(gè)步驟就是將創(chuàng)建好的一個(gè)或者多個(gè)sst文件導(dǎo)入到db中,也允許向多個(gè)cf中導(dǎo)入
IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 導(dǎo)入數(shù)據(jù)
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);
使用還是比較簡(jiǎn)單的,整體的使用過(guò)程如下:
#include <iostream>
#include <vector>#include <gflags/gflags.h>#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>#define DATA_SIZE 10
#define VALUE_SIZE 1024using namespace std;// 比較函數(shù)
bool cmp(pair<string, string> str1,pair<string, string> str2) {if(str1.first < str2.first) {return true;} else if (str1.first == str2.first && str1.second < str2.second) {return true;} else {return false;}
}// 隨機(jī)字符串
static string rand_data(long data_range) {char buff[30];unsigned long long num = 1;for (int i = 0;i < 4; ++i) {num *= (unsigned long long )rand();}sprintf(buff, "%llu", num % (unsigned long long)data_range );string data(buff);return data;
}// 構(gòu)造有序數(shù)據(jù)
void construct_data(vector<pair<string,string>> &input) {int i;string key;string value;for (i = 0;i < DATA_SIZE; i++) {if(key == "0") {continue;}key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);input.push_back(make_pair(key, value));}
}void traverse_data(vector<pair<string,string>> input) {int i;for(auto data : input) {cout << data.first << " " << data.second << endl;}
}// 創(chuàng)建sst文件
int create_sst(string file_path) {vector<pair<string,string>> input;vector<pair<string,string>>::iterator input_itr;rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);rocksdb::Status s = sst_file_writer.Open(file_path);if (!s.ok()) {printf("Error while opening file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}// 需要保證數(shù)據(jù)有序后再寫(xiě)入construct_data(input);sort(input.begin(), input.end(), cmp);traverse_data(input);// Insert rows into the SST file, note that inserted keys must be // strictly increasing (based on options.comparator)for (input_itr = input.begin(); input_itr != input.end();input_itr ++) {rocksdb::Slice key(input_itr->first);rocksdb::Slice value(input_itr->second);s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n",key.ToString().c_str(),s.ToString().c_str());return 1;}}// Close the files = sst_file_writer.Finish();if (!s.ok()) {printf("Error while finishing file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}return 0;
}static rocksdb::DB *db;void create_db() {rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);if (!s.ok()) {printf("Open db failed : %s\n", s.ToString().c_str());return;}
}void db_write(int num_keys) {rocksdb::WriteOptions write_option;write_option.sync = true;rocksdb::Slice key;rocksdb::Slice value;rocksdb::Status s;int i;printf("begin write \n");for (i = 0;i < num_keys; i++) {key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);s = db->Put(write_option, key, value);if (!s.ok()) {printf("Put db failed : %s\n", s.ToString().c_str());return;}}db->Flush(rocksdb::FlushOptions());printf("finish write \n");
}int main() {// 先寫(xiě)入一批數(shù)據(jù)create_db();db_write(100000);// 創(chuàng)建sst文件if (create_sst("./test.sst") == 0) {printf("creates sst success !\n");} else {printf("creates sst failed !\n");}// 導(dǎo)入數(shù)據(jù)rocksdb::IngestExternalFileOptions ifo;// Ingest the 2 passed SST files into the DBprintf("Ingest sst !\n");rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);if (!s.ok()) {printf("Error while adding file test.sst , Error %s\n",s.ToString().c_str());return 1;}return 0;
}
運(yùn)行輸出如下:
begin write
finish write
# consturct data,需按照字典序,如果沒(méi)有按照字典序構(gòu)造的話會(huì)報(bào)錯(cuò)
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !
通過(guò)db日志可以看到我們創(chuàng)建的sst文件test.sst被成功導(dǎo)入到db,形成了./db/000020.sst,且在db目錄中。
╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)╰─$ ls db
000017.log 000020.sst IDENTITY LOG LOG.old.1618643738564935 OPTIONS-000008
000019.sst CURRENT LOCK LOG.old.1618123487361092 MANIFEST-000013 OPTIONS-000016
實(shí)現(xiàn)原理
從如何使用這個(gè)功能上我們能夠感覺(jué)到這一些數(shù)據(jù)并不是通過(guò)rocksdb正常的I/O流程寫(xiě)入的。如果使用正常的接口,那我們用戶不需要排序,而是直接通過(guò)db->Put接口將k/v寫(xiě)入,凡事都有但是,但是這樣來(lái)導(dǎo)入離線數(shù)據(jù)在rocksdb內(nèi)部后續(xù)的flush/compaction 都會(huì)消耗大量的系統(tǒng)資源,而這并不是我們想要的高效。所以,rocksdb提供的ingest接口肯定不會(huì)讓這一些要導(dǎo)入的數(shù)據(jù)消耗過(guò)多的資源,接下來(lái)我們一起看看底層的詳細(xì)實(shí)現(xiàn)。
為了更形象得告訴大家在rocksdb作為存儲(chǔ)引擎的場(chǎng)景,如果通過(guò)傳統(tǒng)的put接口導(dǎo)入數(shù)據(jù)會(huì)多出哪一些I/O,如下圖
其中紅色的尖頭 是ingest file 相比于傳統(tǒng)的put接口 少的I/O部分,可以說(shuō)ingest方式導(dǎo)入數(shù)據(jù)極大得節(jié)約了整個(gè)系統(tǒng)資源的開(kāi)銷(包括但不限于I/O , CPU 資源的開(kāi)銷)。
下面主要介紹的是有了sst文件,接下來(lái)如何導(dǎo)入到db中的過(guò)程。關(guān)于通過(guò)sst_file_writer創(chuàng)建具體的sst文件的過(guò)程就不多說(shuō)了,也就是按照sst文件的格式(datablock,index block…footer)等將有序的數(shù)據(jù)一個(gè)個(gè)添加進(jìn)去而已。
主要有如下幾步:
- 為待插入的sst文件創(chuàng)建file link到db目錄,或者直接拷貝進(jìn)去
- 停止寫(xiě)入,需要保證即將導(dǎo)入的sst文件在db中擁有一個(gè)安全合理的seqno,如果持續(xù)寫(xiě)入,那這個(gè)seqno可能不會(huì)全局遞增了。
- 檢查導(dǎo)入的sst文件是否和memtable中的key-range有重疊,有的話需要flush memtable
- 為這個(gè)sst文件 按照其key-range挑選一個(gè)合適的level放進(jìn)去
- 為這個(gè)問(wèn)天添加一個(gè)全局的seqno
- 恢復(fù)db的寫(xiě)入
其中停止寫(xiě)入到恢復(fù)寫(xiě)入這段時(shí)間對(duì)于用戶來(lái)說(shuō)越小越好,所以ingest的性能很重要。
接下來(lái)看看詳細(xì)的源代碼實(shí)現(xiàn):
導(dǎo)入數(shù)據(jù)的函數(shù)入口是DBImpl::IngestExternalFiles
導(dǎo)入的sst文件最后都需要形成一個(gè)db內(nèi)部的sst文件,因?yàn)檫@個(gè)時(shí)候已經(jīng)停止寫(xiě)入了,所以會(huì)從最新的sst文件編號(hào)之后取一個(gè)文件編號(hào),后續(xù)的其他要導(dǎo)入的sst文件會(huì)不斷追加。
Status DBImpl::IngestExternalFiles(const std::vector<IngestExternalFileArg>& args) {...// 構(gòu)造文件編號(hào)到next_file_number中Status status = ReserveFileNumbersBeforeIngestion(static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,pending_output_elem, &next_file_number);if (!status.ok()) {InstrumentedMutexLock l(&mutex_);ReleaseFileNumberFromPendingOutputs(pending_output_elem);return status;}...
}
有了在db內(nèi)部的合法文件編號(hào),我們就可以進(jìn)行文件遷移了,將待導(dǎo)入的sst文件遷移到db內(nèi)部已經(jīng)構(gòu)造好的sst文件編號(hào)之中。
會(huì)為每一個(gè)cf構(gòu)造一個(gè)ingest_job, 將待導(dǎo)入文件拷貝/移動(dòng)到 db內(nèi)部的sst文件中,這個(gè)過(guò)程是在接下來(lái)的Prepare函數(shù)中。
uint64_t start_file_number = next_file_number;for (size_t i = 1; i != num_cfs; ++i) {start_file_number += args[i - 1].external_files.size();auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);// prepare 函數(shù)exec_results[i].second = ingestion_jobs[i].Prepare(args[i].external_files, start_file_number, super_version);exec_results[i].first = true;CleanupSuperVersion(super_version);}
看看Prepare的函數(shù)實(shí)現(xiàn):
- 拿著輸入的多個(gè)sst文件,如果有多個(gè),則需要檢查這一些文件之間是否有重疊key,有的話就不支持了(rocksdb除了l0,其他層不允許有重疊key)。
- 根據(jù)用戶指定的ingest option: move_files 是否為true,來(lái)將待導(dǎo)入文件move到db中, 如果move失敗了就拷貝文件。
Status ExternalSstFileIngestionJob::Prepare(const std::vector<std::string>& external_files_paths,uint64_t next_file_number, SuperVersion* sv) {// 解析文件信息for (const std::string& file_path : external_files_paths) {IngestedFileInfo file_to_ingest;status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);if (!status.ok()) {return status;}files_to_ingest_.push_back(file_to_ingest);}// 確保導(dǎo)入的多個(gè)sst文件之間沒(méi)有重疊......} else if (num_files > 1) {// Verify that passed files dont have overlapping rangesautovector<const IngestedFileInfo*> sorted_files;for (size_t i = 0; i < num_files; i++) {sorted_files.push_back(&files_to_ingest_[i]);}std::sort(sorted_files.begin(), sorted_files.end(),[&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {return sstableKeyCompare(ucmp, info1->smallest_internal_key,info2->smallest_internal_key) < 0;});// 如果有重疊的話,ingest也無(wú)法支持,因?yàn)樵赿b中大于level0的更高層level內(nèi)部的// sst文件之間是不允許有重疊的,加速更高層的二分查找。for (size_t i = 0; i < num_files - 1; i++) {if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,sorted_files[i + 1]->smallest_internal_key) >= 0) {files_overlap_ = true;break;}}}......// 根據(jù)用戶參數(shù)move文件if (ingestion_options_.move_files) {status = env_->LinkFile(path_outside_db, path_inside_db);...} else { // 否則就拷貝文件f.copy_file = true;}if (f.copy_file) {TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",nullptr);// CopyFile also sync the new file.status = CopyFile(env_, path_outside_db, path_inside_db, 0,db_options_.use_fsync);}...
}
到此,文件就已經(jīng)進(jìn)入到了rocksdb 之中,ingest_job的prepare流程就結(jié)束了。
接下來(lái) 就到了我們前面介紹總步驟的第二步,停止用戶對(duì)當(dāng)前db的寫(xiě)入:
DBImpl::IngestExternalFilesWriteThread::EnterUnbatched
其中WriteThread::EnterUnbatched函數(shù)會(huì)讓當(dāng)前db的寫(xiě)入線程都處于wait狀態(tài)。
接下來(lái)就是檢查當(dāng)前要導(dǎo)入的文件是否和memtable中的key-range有重疊,函數(shù)調(diào)用如下:
DBImpl::IngestExternalFilesExternalSstFileIngestionJob::NeedsFlushColumnFamilyData::RangesOverlapWithMemtables
這個(gè)函數(shù)ColumnFamilyData::RangesOverlapWithMemtables會(huì)拿著從ingest files中構(gòu)造好的key-range和memtable中的 key-range 進(jìn)行對(duì)比,如果有重疊key,則會(huì)將memtable flush置為true
Status ColumnFamilyData::RangesOverlapWithMemtables(const autovector<Range>& ranges, SuperVersion* super_version,bool* overlap) {...Status status;// 拿著ingest files的range中的每一個(gè)key,看是否能夠從memtable中找到for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {auto* vstorage = super_version->current->storage_info();auto* ucmp = vstorage->InternalComparator()->user_comparator();InternalKey range_start(ranges[i].start, kMaxSequenceNumber,kValueTypeForSeek);// 從memtable中找memtable_iter->Seek(range_start.Encode());status = memtable_iter->status();ParsedInternalKey seek_result;if (status.ok()) {if (memtable_iter->Valid() &&!ParseInternalKey(memtable_iter->key(), &seek_result)) {status = Status::Corruption("DB have corrupted keys");}}// 找到了,則置overlap為trueif (status.ok()) {if (memtable_iter->Valid() &&ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {*overlap = true;} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,ranges[i].limit)) {*overlap = true;}}}...
}
在后續(xù)的DBImpl::FlushMemTable函數(shù)中會(huì)flush memtable,不同的cf是分開(kāi)進(jìn)行的
DBImpl::IngestExternalFilesDBImpl::FlushMemTable
接下來(lái)就開(kāi)始了第四步和第五步的處理邏輯,需要為每一個(gè)落到db中的sst文件挑選合適的level以及分配全局seqno,處理邏輯在Run函數(shù)中:
DBImpl::IngestExternalFilesExternalSstFileIngestionJob::Run
主要處理邏輯如下:
一個(gè)一個(gè)ingest file進(jìn)行處理
-
選擇一個(gè)合適的level,將ingest file插入進(jìn)去
如果user配置了allow_ingest_behind=true,即允許導(dǎo)入的數(shù)據(jù)直接插入到最后一層的文件位置,且ingest的時(shí)候配置的ingest option中ingest_behind=true,則會(huì)先嘗試插入到bottomest level,如果最后一層的文件和待插入的文件有重疊,則插入失敗。處理邏輯在CheckLevelForIngestedBehindFile函數(shù)之中。否則逐層遍歷,找到第一個(gè)和這一些key-range有重疊的level即可。函數(shù)
AssignLevelAndSeqnoForIngestedFile -
找到了合適的level的同時(shí)會(huì)記錄一個(gè)
assigned_seqno,是在當(dāng)前last_sequence的基礎(chǔ)上+1得到的。函數(shù)AssignLevelAndSeqnoForIngestedFile之中。 -
為當(dāng)前ingest_file 寫(xiě)入一個(gè)global seq no, 并執(zhí)行fsync/sync。函數(shù)
AssignGlobalSeqnoForIngestedFile之中。 -
最后就是將當(dāng)完成更新的ingest file的元信息更新到
VersionEdit之中。
接下來(lái)就進(jìn)入尾聲了:
- 將更新的
VersionEdit寫(xiě)入到MANIFEST文件之中 - 更新每個(gè)ingest file對(duì)應(yīng)的cf信息,并且調(diào)度compaction/flush, 因?yàn)橹癷ngest file時(shí)找的是有重疊key的一層。
- 恢復(fù)db的寫(xiě)入
// 將`VersionEdit`寫(xiě)入到MANIFEST文件之中status =versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,edit_lists, &mutex_, directories_.GetDbDir());}if (status.ok()) {for (size_t i = 0; i != num_cfs; ++i) {auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();if (!cfd->IsDropped()) {//更新每個(gè)ingest file對(duì)應(yīng)的cf信息,并且調(diào)度compaction/flush, 因?yàn)橹癷ngest file時(shí)找的是有重疊key的一層InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],*cfd->GetLatestMutableCFOptions());...}}}// 恢復(fù)db的寫(xiě)入,喚醒db的其他所有的writerwrite_thread_.ExitUnbatched(&w);
到此,整個(gè)ingest就算是結(jié)束了。
總結(jié)
通過(guò)ingest的實(shí)現(xiàn),我們能夠看到rocksdb通過(guò)ingest的方式支持離線數(shù)據(jù)導(dǎo)入確實(shí)能夠極大得降低系統(tǒng)資源的開(kāi)銷。不需要一個(gè)key在LSM中被反復(fù)的寫(xiě)入、讀取。
總結(jié)
以上是生活随笔為你收集整理的Rocksdb 通过ingestfile 来支持高效的离线数据导入的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: “非惜年芳绝”下一句是什么
- 下一篇: 关于Titandb Ratelimite