Rocksdb Iterator实现:从DBIter 到 TwoLevelIter 的漫长链路
文章目錄
- 1. 迭代器簡單介紹
- 2. 迭代器用戶態相關接口
- 3. 迭代器內部架構
- 4. 迭代器的入口實現
- 4.1 DBIter
- 4.2 MergingIterator
- 4.3 Memtable系列Iterator
- 4.4 LevelIterator 和 TwoLevelIterator
ps:本文的基礎迭代器設計 以及 相關代碼 是基于rocksdb 6.4.6版本進行描述的
1. 迭代器簡單介紹
使用Rocksdb 進行Scan的過程中 都會用到Rocksdb 的Iterator,當然在使用的過程中大家會發現格外的順手,就像我們的STL標準庫為每一個容器構造的迭代器Iterator一樣,能夠通過指針的地址自增去訪問容器中的數據。
同樣,rocksdb的迭代器也可以很方便的去訪問db內部的數據。
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it -> Seek(start); it -> Valid() && it -> key().ToString() < end;it -> Next() ) {......std::cout << it -> key().ToString() << ": " << it -> Value().ToString() << std::endl;
}assert(it->status.ok()); // check iterator status for any errors found during scan
以上幾行簡單的代碼,即能夠實現一個[start, end)區間的數據遍歷。
2. 迭代器用戶態相關接口
rocksdb迭代器為用戶提供了大量的便捷操作和接口訪問方式
NewIterator創建一個迭代器,需要傳入讀配置項Seek查找一個keySeekToFirst迭代器移動到db的第一個key位置,一般用于順序遍歷整個db的所有keySeekToLast迭代器移動到db的最后一個key位置, 一般用于反向遍歷整個db的所有keySeekForPrev移動到當前key的上一個位置,一般用于遍歷(limit, start]之間的keyNext迭代器移動到下一個keyPrev迭代器移動到上一個key
3. 迭代器內部架構
對用戶態表現的簡單接口,在底層實現過程中是有代價的。
為什么簡單呢?
因為rocksdb的基礎組件包括Memtable, Immutable memtable, 大量的sstables, 迭代器需要在內存/磁盤 數據 之間進行移動,然而只需要使用統一的簡單接口,不需要關注迭代器在查找內存數據還是磁盤數據,簡單易用,C++的封裝特性展現得淋漓盡致。
而底層實現的代價就是需要將用戶接口到內部接口 整個鏈路串起來,并且這個過程中的每一個查找細節都要仔細雕琢,否則Scan的性能將會是LSM 最為明顯的痛點從而丟失大量有SQL需求的用戶(SQL中會有大量的范圍查找)。
一個大佬想要修改之前迭代器反回狀態有歧義的問題,然后提了一個PR https://github.com/facebook/rocksdb/pull/3810,由siying大佬親自review,整個PR 對迭代器的修改過程所涉及的復雜程度讓siying都震驚了。
如下圖為rocksdb迭代器的架構圖,其中包括迭代器之間的級聯關系 以及 流程圖形態的函數調用:
這里畫的是主要的幾個迭代器,還是能夠很明顯得看出來整個迭代器內部的復雜程度。
圖中箭頭指向的迭代器表示被包含,比如MergingIterator被DBIter包含,ArenaWrapperDBIter 屬于分配內存的迭代器,所以使用虛線框起來。
剩下的一些線段上的函數調用,則是從某一個迭代器生成其他迭代器的函數邏輯。其中主體迭代器是MergingIterator,rocksdb內部一般IternalIterator 都是屬于MergingIterator。
不同迭代器之間的關系可以這樣做一個使用者層面的簡單描述:
如上圖,用戶使用DBIter 查找三個用戶key,iter -> Seek(key1)
這個操作在內部會交給InternalIterator類型的MergingIterator,MergingIterator會拿到已經解析好的internal_key: user_key=“key1”, seqno=10, Type=put。這樣的InternalKey,后續更加底層的迭代器會拿著Internal_key和自己所查找的區域進行key的匹配,從而取到底層的value數據。
接下來將描述一下這幾個迭代器在代碼中是如何創建的?
4. 迭代器的入口實現
4.1 DBIter
我們通過db->NewIterator入口進入創建迭代器的邏輯,具體創建之前會拿到當前db最新的或者用戶指定的一個snapshot (落到底層internal_key的話也就是上文中提到的seqno),保證后續的讀取都只讀取小于等于當前snapshot的目標key。
// rocksdb的快照讀,讀取小于等于snapshot的key
auto snapshot = read_options.snapshot != nullptr? read_options.snapshot->GetSequenceNumber(): versions_->LastSequence();
// 創建迭代器入口,這里會返回一個DBIter
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
接下來通過DBImpl::NewIteratorImpl --> NewArenaWrappedDbIterator 來創建一個 ArenaWrappedDBIter,即一個用來進行空間分配的迭代器,后續InternalIterator相關的迭代器都需要通過arena優先分配迭代器所需空間。
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,ColumnFamilyData* cfd,SequenceNumber snapshot,ReadCallback* read_callback,bool allow_blob,bool allow_refresh) {......// 構造一個arena迭代器,負責后續的 internal迭代器的空間分配// 內部會先創建一個Arena迭代器,再創建DBIter迭代器ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,sv->mutable_cf_options.max_sequential_skip_in_iterations,sv->version_number, read_callback, this, cfd, allow_blob,((read_options.snapshot != nullptr) ? false : allow_refresh));// 構造internal 迭代器,包括一系列 MergingIterator: MemtableIter, LevelIter, TwoLevelIterInternalIterator* internal_iter =NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),db_iter->GetRangeDelAggregator(), snapshot);// 綁定db_iter和internal_iterdb_iter->SetIterUnderDBIter(internal_iter);...
}ArenaWrappedDBIter* NewArenaWrappedDbIterator(Env* env, const ReadOptions& read_options,const ImmutableCFOptions& cf_options,const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,uint64_t max_sequential_skip_in_iterations, uint64_t version_number,ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,bool allow_blob, bool allow_refresh) {// 創建一個Arena迭代器ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();// 為db迭代器分配空間,并創建db迭代器iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,max_sequential_skip_in_iterations, version_number, read_callback,db_impl, cfd, allow_blob, allow_refresh);if (db_impl != nullptr && cfd != nullptr && allow_refresh) {iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,allow_blob);}
4.2 MergingIterator
如上圖,MergingIterator 是通過NewIternalIterator 創建的,創建的過程中主要是維護一個MergeIteratorBuilder
具體代碼就是NewIteratorImpl函數中,調用的NewInternalIterator函數,同樣這個函數中也會先通過Arena分配好迭代器需要的空間。
在NewInternalIterator 會先創建一個MergeIteratorBuilder,并依次創建后續的 memtable, rangetombstone,immutable memtable, LevelIterator, TwoLevelIterator等一系列迭代器。
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,ColumnFamilyData* cfd,SuperVersion* super_version,Arena* arena,RangeDelAggregator* range_del_agg,SequenceNumber sequence) {InternalIterator* internal_iter;assert(arena != nullptr);assert(range_del_agg != nullptr);// Need to create internal iterator from the arena.// 創建一個MergingIterMergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena,!read_options.total_order_seek &&super_version->mutable_cf_options.prefix_extractor != nullptr);.....
MergingIterator 底層是通過最小堆 數據結構來維護的,可以通過MergeIteratorBuilder構造過程來看到:
MergingIterator(const InternalKeyComparator* comparator,InternalIterator** children, int n, bool is_arena_mode,bool prefix_seek_mode): is_arena_mode_(is_arena_mode),comparator_(comparator),current_(nullptr),direction_(kForward),minHeap_(comparator_),prefix_seek_mode_(prefix_seek_mode),pinned_iters_mgr_(nullptr) {children_.resize(n);// 將傳入的元素添加到rocksdb自實現的autovector之中for (int i = 0; i < n; i++) {children_[i].Set(children[i]);}// 構建最小堆// 堆頂元素是所有堆元素中的最小值for (auto& child : children_) {if (child.Valid()) {assert(child.status().ok());minHeap_.push(&child);} else {considerStatus(child.status());}}// 取堆頂的元素,表示當前迭代器所指向的keycurrent_ = CurrentForward();
}
這里只是初始化一個空的MerginIterator,里面并沒有具體的key,后續在像range查找或者compaction 這樣的過程中用到iterator的時候才會進行具體key元素的添加。
4.3 Memtable系列Iterator
回到 NewInternalIterator 函數,已經構造好了一個MergingIterator的merge_iter_builder,后續的所有迭代器都會被添加到這個builder之中,也就是數據的存儲形態都會按照MergingIterator 的最小堆來進行存儲。
我們知道rocksdb 的memtable是一種有序內存數據結構實現的(skiplist),memtable也有幾種不同類型的:
- active memtable 是接受寫請求,允許插入key-value數據的一個結構
- immutable memtable 是接受讀請求的,且只讀。主要用在flush過程,當active memtable被寫滿(達到write_buffer_size的限制)會切換為immutable memtable
- rangeTombstone memtbale 是存儲rangetombstone數據的memtable,當上層用戶通過
DeleteRange接口下發一個范圍刪除的請求,會將tombstone信息放在這個memtable之中。
也就是我們在實際通過Iterator進行查找遍歷的時候 這是三個memtable肯定是需要進行遍歷的,也就是這三種memtable都需要各自維護一個iterator, 代碼如下:
// 創建Memtable Iter,并添加到Merge_iter_builder之中
merge_iter_builder.AddIterator(super_version->mem->NewIterator(read_options, arena));
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {// 創建range tombstone 迭代器 in memrange_del_iter.reset(super_version->mem->NewRangeTombstoneIterator(read_options, sequence));range_del_agg->AddTombstones(std::move(range_del_iter));
}
// Collect all needed child iterators for immutable memtables
if (s.ok()) {// 創建imm 迭代器super_version->imm->AddIterators(read_options, &merge_iter_builder);if (!read_options.ignore_range_deletions) {// 創建range tombstone 迭代器 in imms = super_version->imm->AddRangeTombstoneIterators(read_options, arena,range_del_agg);}
}
在memtable系列迭代器的底層移動是通過GetIterator函數訪問 用戶傳入的memtable工廠對應的數據結構的迭代器:
GetIterator過程中會根據用戶傳入的lookahead(預讀數據的大小) 來創建對應的SkipListRep 的迭代器,如果上層調用的next或者prev,到更加底層的數據結構中就是sikplist的next和prev了。
MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {if (lookahead_ > 0) {void *mem =arena ? arena->AllocateAligned(sizeof(SkipListRep::LookaheadIterator)): operator new(sizeof(SkipListRep::LookaheadIterator));return new (mem) SkipListRep::LookaheadIterator(*this);} else {void *mem =arena ? arena->AllocateAligned(sizeof(SkipListRep::Iterator)): operator new(sizeof(SkipListRep::Iterator));return new (mem) SkipListRep::Iterator(&skip_list_);}
}
4.4 LevelIterator 和 TwoLevelIterator
創建完memtable系列的迭代器 就需要創建一系列sst上移動的迭代器。
如上圖,可以看到rocksdb中on sst系列的迭代器主要維護了兩種,第一種是Level1-Level N 的迭代器,第二種是Level0迭代器。
這里通過層來劃分的原因主要是L0的sst文件之間會有重疊key,即sst之間不是有序的,所以查找的過程中對于L0,其所有的SST文件都需要被遍歷到。
L1–LN 層的SST文件之間都是嚴格有序的,所以這一些層的迭代器只需要一種。
還是在NewInternalIterator函數中,創建完memtable系列的迭代器之后會通過current->AddIterators 中的AddIteratorsForLevel 函數,創建兩種不同層的迭代器。
void Version::AddIteratorsForLevel(const ReadOptions& read_options,const EnvOptions& soptions,MergeIteratorBuilder* merge_iter_builder,int level,RangeDelAggregator* range_del_agg) {...// 為level0 創建其迭代器if (level == 0) {// Merge all level zero files together since they may overlapfor (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {const auto& file = storage_info_.LevelFilesBrief(0).files[i];merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(read_options, soptions, cfd_->internal_comparator(),*file.file_metadata, range_del_agg,mutable_cf_options_.prefix_extractor.get(), nullptr,cfd_->internal_stats()->GetFileReadHist(0),TableReaderCaller::kUserIterator, arena,/*skip_filters=*/false, /*level=*/0,/*smallest_compaction_key=*/nullptr,/*largest_compaction_key=*/nullptr));}...}else if(storage_info_.LevelFilesBrief(level).num_files > 0) {// 創建大于level0 層的迭代器auto* mem = arena->AllocateAligned(sizeof(LevelIterator));merge_iter_builder->AddIterator(new (mem) LevelIterator(cfd_->table_cache(), read_options, soptions,cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),cfd_->internal_stats()->GetFileReadHist(level),TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,range_del_agg, /*largest_compaction_key=*/nullptr)); }
}
大于L0 的迭代器就是LevelIterator, 創建好之后在大于L0層的迭代器檢索過程中會通過LevelIterator::Prev或者相關的其他接口進行查找。
當然其中有一個file_iter_數據成員 是實際的sst文件的iter,這個數據成員是在seek過程進行初始化的,將其綁定到具體的sst文件進行查找。
關于Level0 中sst文件的迭代器是通過TableCache::NewIterator函數中的table_reader->NewIterator創建的。之后會進入到我們默認配置的BlockBased::NewIterator 函數,當然如果這里不是使用sst,而是使用PlainTable或者CuckooTable這樣的數據格式,那就是這一些table的迭代器了。
InternalIterator* BlockBasedTable::NewIterator(...) {...if (arena == nullptr) {return new BlockBasedTableIterator<DataBlockIter>(this, read_options, rep_->internal_comparator,NewIndexIterator(read_options,need_upper_bound_check &&rep_->index_type == BlockBasedTableOptions::kHashSearch,/*input_iter=*/nullptr, /*get_context=*/nullptr, &lookup_context),!skip_filters && !read_options.total_order_seek &&prefix_extractor != nullptr,need_upper_bound_check, prefix_extractor, BlockType::kData, caller,compaction_readahead_size);} ...
}
其中NewIndexIterator函數是我們要關注的,用來創建blockbased table的iterator,這個函數內部會使用index_reader->NewIterator函數來具體創建,創建的類型默認是BinarySearchIndexReader,我們這里使用比較典型有趣的設計ParttitionIndexReader,當然其底層也是BinarySearch的。
我們這里可以看PartitionIdexReader的NewIterator實現。
InternalIteratorBase<IndexValue>* NewIterator(const ReadOptions& read_options, bool /* disable_prefix_seek */,IndexBlockIter* iter, GetContext* get_context,BlockCacheLookupContext* lookup_context) override {const bool no_io = (read_options.read_tier == kBlockCacheTier);...if (!partition_map_.empty()) {// We don't return pinned data from index blocks, so no need// to set `block_contents_pinned`.// Two level iteratorit = NewTwoLevelIterator(new BlockBasedTable::PartitionedIndexIteratorState(table(),&partition_map_),index_block.GetValue()->NewIndexIterator(internal_comparator(), internal_comparator()->user_comparator(),nullptr, kNullStats, true, index_has_first_key(),index_key_includes_seq(), index_value_is_full()));...
}
這里TwoLevelIterator 就是在L0中,一個SST文件維護兩個迭代器,一個迭代器用來構造IndexBlock的所以遍歷,另一個迭代器用來實際的訪問value數據即datablock。
NewTwoLevelIterator函數將構造好的NewIndexIterator作為參數傳入之后并作為first_level_iter迭代器。
關于second_level_iter 迭代器是通過其seek 函數進行設置的:
void TwoLevelIndexIterator::Seek(const Slice& target) {first_level_iter_.Seek(target);InitDataBlock();if (second_level_iter_.iter() != nullptr) {second_level_iter_.Seek(target);}SkipEmptyDataBlocksForward();
}
其中InitDataBlock 中進行second_level_iter的創建
void TwoLevelIndexIterator::InitDataBlock() {if (!first_level_iter_.Valid()) {SetSecondLevelIterator(nullptr);} else {// index block中存放的是每隔restart bytes 的data block的起始地址BlockHandle handle = first_level_iter_.value().handle;if (second_level_iter_.iter() != nullptr &&!second_level_iter_.status().IsIncomplete() &&handle.offset() == data_block_handle_.offset()) {// second_level_iter is already constructed with this iterator, so// no need to change anything} else {// 通過從first_level_iter中取到的data 的起始位置作為hanle, 創建second_level_iterInternalIteratorBase<IndexValue>* iter =state_->NewSecondaryIterator(handle);data_block_handle_ = handle;SetSecondLevelIterator(iter);}}
}
到此,整個迭代器的創建過程基本說完了,在使用迭代器進行Seek/Next/Prev…etc 等操作的時候同樣也是由dbiter開始,各個迭代器進行各自維護的組件中進行移動,最終將結果拿到MergingIterator的最小堆做完排序返回。
更具體的細節設計 后續會逐漸補充,畢竟迭代器組件事關引擎Scan性能,一點也不能馬虎。
復雜的調用鏈中核心是簡單的C++封裝和動態綁定的特性,將內存到磁盤的數據結構穿起來,讓整個Scan在不同組件之間并行去完成。
總結
以上是生活随笔為你收集整理的Rocksdb Iterator实现:从DBIter 到 TwoLevelIter 的漫长链路的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 赠人玫瑰下一句是什么啊?
- 下一篇: Go 分布式学习利器(20)-- Go并