跟着Rocskdb 学 存储引擎:读写链路的代码极致优化
文章目錄
- 1. 讀鏈路
- 1.1 FileIndexer
- 1.1.1 LevelDB sst查找實現
- 1.1.2 Rocksdb FileIndexer實現
- 1.2 PinnableSlice 減少內存拷貝
- 1.3 Cache
- 1.3.1 LRU Cache
- 1.3.2 Clock Cache
- 1.4 ThreadLocalPtr 線程私有存儲
- 1.4.1 version系統
- 1.4.2 C++ thread_local vs ThreadLocalPtr
- 1.4.3 ThreadLocalPtr 設計
- 1.4.4 ThreadLocalPtr 在 version 系統中的應用
- 2. 寫鏈路
- 2.1 JoinBatchGroup
- 3. 其他
- 3.1 用最低的代價獲取 critical path 的 Statistics
跟著LevelDB 學 C++,跟著Rocksdb 學引擎。看完這一篇,你會對這一句話有更加深刻的理解。
LevelDB 能夠教給你關于C++語言本身很多的知識,如何寫好C++代碼,如何讓你掌握面向對象的核心編程思想。而Rocksdb 則會交給你足夠多的系統知識,優秀的存儲引擎需要對上層應用以及底層的操作系統有非常深入的了解,這樣我們才能夠通過引擎提供足夠的可靠性/易用性的同時 將操作系統 以及 底層硬件性能發揮到極致。
下面提到的關于Rocksdb 的鏈路優化可能之前的博文中已經描述過了, 會做一個概覽描述。
1. 讀鏈路
1.1 FileIndexer
對于LSM-tree 的Get 鏈路 會執行如下的一些搜索過程:
memtable::Get --> immutable::Get --> table->Get
在前面的部分,如果拿到了key-value 那就直接返回,拿不到,就每一步執行。對于 table->Get的邏輯,其實就是從 sst中讀,讀到的結果會load到block_cache,讀的過程則是 按照LEVEL 進行。L0因為是tierd compaction 策略,LSM-tree 允許其有重疊key,則需要在每一個文件中進行查找;如果沒找到,則對大于L0的文件,按層查找。
我們要討論的FileIndexer的主要功能是Rocksdb 對于這個查找過程的優化,介紹Rocksdb的查找過程之前,我們肯定是先需要了解一下LevelDB的查找邏輯。
1.1.1 LevelDB sst查找實現
對于LSM-tree來說,生成的每一個sst文件都會在內存中維護一個FileMeta,存放在這個sst對應的version之中。
FileMeta數據結構如下:
struct FileMetaData {FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}int refs;int allowed_seeks; // Seeks allowed until compactionuint64_t number;uint64_t file_size; // File size in bytesInternalKey smallest; // Smallest internal key served by tableInternalKey largest; // Largest internal key served by table
};
因為key在單個sst文件中是有序的, 所以FileMeta中只需要保存關鍵的smallest和largest即可,這兩個變量也是我們文件查找過程的關鍵。
因為 L0的每一個文件都要查找,那就需要看當前key 是否在每一個sst文件的range中。
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {FileMetaData* f = files_[0][i];if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&ucmp->Compare(user_key, f->largest.user_key()) <= 0) {// user key在當前 file中,則添加當前file 到數組中tmp.push_back(f);}
}
對大于L0的文件,LevelDB這里做的是整層進行二分查找,找到文件之后返回文件的 id 即可。
int FindFile(const InternalKeyComparator& icmp,const std::vector<FileMetaData*>& files, const Slice& key) {uint32_t left = 0;uint32_t right = files.size();while (left < right) {uint32_t mid = (left + right) / 2;const FileMetaData* f = files[mid];if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {// Key at "mid.largest" is < "target". Therefore all// files at or before "mid" are uninteresting.left = mid + 1;} else {// Key at "mid.largest" is >= "target". Therefore all files// after "mid" are uninteresting.right = mid;}}return right;
}
二分本身的比較邏輯是O(logn)的時間復雜度,對于LSM-tree來說,實際大數據量的情況下大多數的文件是存儲在最底層。加入L1=256M,到第五層的時候是2T,每一個文件64M來算,到第五層會有接近4w個文件,那隨機查找的時候,每一個Get大多情況下都會落到最后一層,也就意味著從L0–>L5 至少20次的比較,對cpu來說可謂是一個非常大的負擔了。
rocksdb 這里利用文件有序這個特性來嘗試減少 整層二分比較的次數。
1.1.2 Rocksdb FileIndexer實現
L0 每一個文件的查找還是避免不了,如果user_key在L0某一個文件的range內又不在這個文件中,這里rocksdb 會基于當前文件 通過 FileIndexer 構造其接下來要查找的下一層文件的文件range。
我們詳細看看FileIndexer 具體會做一些什么:
拿著LN 文件的smallest_key 和 largest_key,構造其 IndexUnit,也就是四個變量:
smallest_lb,smallest_key的左邊界最有可能包含F1 range內 的 文件編號smallest_rb,smallest_key的右邊界 最有可能包含F1 range內 的 文件編號largest_lb,largest_key的左邊界 同上largest_rb, largest_key的右邊界 同上
因為大于L0的文件會整層都不會有key-range 重疊,我們的user_key 假如落在了F1上,那就可以根據上面的四個變量來劃分接下來在LN+1的文件的查找區間。
比如:
1. user_key < smallest_key
smallest_lb=1 查找的file_range [0,1],如果F1的文件編號就是0,那smallest_lb 也就是0了。
2. user_key > smallest_key && user_key < largest_key
smallest_rb=2;largest_lb=4 查找的file_range [2,4],也就是在smallest_rb和largest_lb之間
3. user_key > largest_key
largest_rb=5, 查找的file_range 會在[6,7]之間
大多數的查找場景中,當我們user_key鎖定LN的一個file_range的時候,這個user_key大概率是在LN+1的[smallest_rb,largest_lb] 的range之間。
可以看到,我們有了每一個文件的FileIndexer,這樣就能夠在查找的過程中極大得減少比較的次數。這種FileIndexer的查找過程 key之間的比較次數完全是可控的l,對于Ln的一個文件,在Ln+1 中key_range的放大系數為10,也就是平均只需要 log10 = 3 次比較。相比于LevelDB的整層二分查找,尤其是在最后一層的十幾次比較,可以說是極大得減少了文件查找所需要的CPU。
當然,這里需要注意的是FileIndexer 的構造也需要消耗一定的CPU。
構造部分的邏輯會每一次有新的sst文件生成的時候 以及DB重啟 的時候觸發:
-
DB重啟,
Recover的時候調用Version::PrepareApply來進行構造 -
Compaction/Flush 完成之后 :
// Flush 完成之后 TryInstallMemtableFlushResultsLogAndApplyProcessManifestWritesPrepareApplyGenerateFileIndexer // Compaction完成之后 BackGroundCompactionLogAndApplyProcessManifestWritesPrepareApplyGenerateFileIndexer
構造Indexer的過程肯定會拖慢整個Flush或者Compaction的效率,但是他們是后臺線程,且觸發的次數相比于我們實際workload下的read-heavy來說實在是微不足道,那通過這一點構造時候的CPU消耗來極大得減少我們read-heavy時的CPU消耗,還是非常值的。
1.2 PinnableSlice 減少內存拷貝
先來看一下LevelDB的Get接口:
Status Get(const ReadOptions& options, const Slice& key,std::string* value) override;
這個std::string的value變量在后續從sst中讀到數據之后會復制拷貝到這個value變量中。
struct Saver {SaverState state;const Comparator* ucmp;Slice user_key;std::string* value; // 后續要返回給用戶的變量
};
} // namespace
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {Saver* s = reinterpret_cast<Saver*>(arg);ParsedInternalKey parsed_key;if (!ParseInternalKey(ikey, &parsed_key)) {s->state = kCorrupt;} else {if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;if (s->state == kFound) {// 內存拷貝s->value->assign(v.data(), v.size());}}}
}
這個時候就會發生內存拷貝。
上圖就是Rocksdb 在早期版本的實現,也就是leveldb 的std::string 透傳到底層,通過內存拷貝拿到數據之后再返回給用戶。
內存拷貝涉及到數據從舊的地址中搬移到新的地址,那這個過程會申請新的內存,釋放舊的內存。尤其是在我們讀heavy 且實際數據量遠超內存的場景下,頻繁申請內存/釋放內存(內核缺頁-》回收頁面–》分配新頁面) 以及 數據的拷貝對 內存以及CPU都是不小的負擔,這個痛點也是所有做底層軟件需要盡可能避免的問題。(當然,前提是其他的數據結構/存儲上的 優化已經足夠細致了,瓶頸可能會在這里)
那現在版本的Rocksdb 為了追求極致性能,肯定會對這一部分進行優化的。
直接看下圖:
Rocksdb 這里優化的部分就是在第三步的時候 從sst中拿到BlockContents之后 不會直接將其中的value填給用戶傳下來的變量,而是將拿到的這個value的地址給這個變量。
這里我們會從邏輯中發現一個潛在的問題,實際填充給用戶的是讀上來的value的地址。邏輯如下:
BlockBasedTable::GetBlockBasedTable::NewDataBlockIterator // 構造DataBlockIterGetContext::SaveValue // 將DataBlockIter 的value內容填充給用戶
但是BlockBasedTable通過DataBlockIter 拿到的value結果是要填充給BlockCache的。那填充完成之后當前的DataBlockIter是需要被釋放的,釋放則意味著用戶拿到的地址空間就不存在了。
有沒有辦法讓這個DataBlockIter 的釋放能夠延遲一段時間,到上層Get 用戶已經取到了這個value 之后再釋放呢。
來看一下rocksdb 使用的獲取實際value的類 PinnableSlice 以及 能夠延遲DataBlockIter 生命周期的類Cleanable。
對于PinnableSlice來說繼承Slice,也就能夠通過data_來緩存實際數據。它提供了一系列基于Cleanable類的成員函數實現的函數。所以,我們主要看一下Cleanable這個類是如何讓PinnableSlice 拿到的地址延遲釋放呢。
class PinnableSlice : public Slice, public Cleanable {......
}
Cleanable 類的成員如下:
其中的私有成員主要是維護了一個Cleanup類,這個類用來保存執行具體邏輯的部分,包括函數指針以及兩個參數,同時還維護了一個單鏈表。
那我們具體看看Cleanable類的成員函數 之間是如何和Cleanup一起延長一個類的生命周期的。
void Cleanable::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) {assert(func != nullptr);Cleanup* c;if (cleanup_.function == nullptr) {c = &cleanup_;} else {c = new Cleanup;c->next = cleanup_.next;cleanup_.next = c;}c->function = func;c->arg1 = arg1;c->arg2 = arg2;
}
可以看到在RegisterCleanup中,將需要register的函數指針和參數 創建一個新的Cleanup對象,添加到當前Cleanable類的cleanup_之前。
大體過程如下圖:
有兩個Cleanable對象,如果C1 想要注冊一個cleanup對象,傳入函數指針和參數,那就會構造一個新的cleanup對象,使用鏈表頭插法插入到C1維護的cleanup對象鏈表中。
在Cleanable對象析構的時候可以調用它的析構函數,執行DoCleanup邏輯
Cleanable::~Cleanable() { DoCleanup(); }
執行如下函數的邏輯:
inline void DoCleanup() {if (cleanup_.function != nullptr) {(*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);for (Cleanup* c = cleanup_.next; c != nullptr;) {(*c->function)(c->arg1, c->arg2);Cleanup* next = c->next;delete c;c = next;}}
}
按照順序遍歷鏈表,執行cleanup 對象中的函數,從頭執行到尾巴。
如果將 Cleanable對象 C1 通過DelegateCleanupsTo委托給 C2,我們看看會發生什么:
大體的過程就是將C1對象中的 cleanup 對象逐個取出來,頭插法插入到C2 維護的cleanup對象的鏈表中。
這樣,C1 委托給C2 之后,C2在最后析構的時候就按照順序執行所有的C1 的資源釋放邏輯,是不是非常巧妙。
void Cleanable::DelegateCleanupsTo(Cleanable* other) {assert(other != nullptr);if (cleanup_.function == nullptr) {return;}// 先將當前的cleanup 成員 轉交給otherCleanup* c = &cleanup_;other->RegisterCleanup(c->function, c->arg1, c->arg2);c = c->next;// 如果cleanup鏈表還有成員,則逐個遍歷,挨個轉交while (c != nullptr) {Cleanup* next = c->next;other->RegisterCleanup(c);c = next;}// 最后清理當前對象cleanup_.function = nullptr;cleanup_.next = nullptr;
}
我們可以看一個簡單的Cleanable 類的使用方式,如下測試代碼:
void Multiplier(void* arg1, void* arg2) {int* res = reinterpret_cast<int*>(arg1);int* num = reinterpret_cast<int*>(arg2);*res *= *num;
}int main() {int n2 = 2;int res = 1;{Cleanable c2;{Cleanable c1;c1.RegisterCleanup(Multiplier, &res, &n2); c1.DelegateCleanupsTo(&c2);}// ~Cleanable c1 ,res = 1std::cout << "Delete c1, res is : " << res << std::endl;}// ~Cleanable c2 , res = 2std::cout << "Delete c2, res is : " << res << std::endl;
}// output
Delete c1, res is : 1
Delete c2, res is : 2
當執行C2的析構的時候 才會執行Multiplier 中的邏輯,因為 C1 將注冊的Multiplier 轉交給了C2,如果沒有轉交,則第一個輸出的內容就是2了。
通過Cleanable 對象的特性,我們回到解決內存拷貝問題中的PinnableSlice中,就可以讓DataBlockIter 和 PinnbaleSlice 都繼承 Cleanable類,這樣在將DataBlockIter 的地址交給PinnableSlice 的時候就可以將其釋放邏輯也轉交給PinnableSlice,用戶層拿到了想要的value之后 析構PinnableSlice 的時候再釋放DataBlockIter就好了。
這里的地址賦值 以及 轉交邏輯如下:
BlockBasedTable::Get GetContext::SaveValue //構造好的DataBlockIter 的 value 地址進行傳入進去PinSlice
最終的轉交鏈路如下:
BlockIter和PinnableSlice 都是繼承了Cleannable 類,他們在初始化的時候都會注冊一個自己釋放空間的邏輯,在通過迭代器讀取到了數據之后會將BlockIter 委托給 PinnableSlice對象,委托的過程就是讓PinnableSlice的data_指針指向BlockData 的地址。
1.3 Cache
1.3.1 LRU Cache
Rocksdb 工業級LRU Cache實現
1.3.2 Clock Cache
LRU Cache 在并發場景并不是很友好,所以 Rocksdb 推出了clock cache,在高并發下更加有優勢。
先看一下Clock Cache 的大體架構圖:
Clock Cache同樣是分shard,外部做了一層hash,基本組件如下:
- Concurrent HashMap,用來緩存key以及value-handle。這里為了對并發友好一些,hashmap使用的是 TBB的 concurrent_hash_map
- Circle List,這個數據結構主要是為 實現clock 算法用的。Rocksdb這里使用的是Deque + head 索引實現環形鏈表的功能。即通過head進行deque的遍歷, 遍歷的過程中遍歷到deque 的末尾的時候讓head重新指向deque的頭部。
- Recycle bin,一個保存從Deque 中淘汰節點的數組。用于后續有新的節點加入的時候先從recycle bin中直接取可以插入的位置即可,不需要重新申請存儲空間。
Clock 算法的淘汰原理就是針對存儲在環形鏈表中的每一個節點維護兩個屬性: Access 和 Modify,當clock cache滿了之后需要進行節點的淘汰, 這個時候會循環遍歷環形鏈表,發現每個節點的兩個屬性滿足以下要求時就進行淘汰。
- A = 0 ,M = 0;即沒有被訪問過,也沒有被修改過,則優先淘汰。
- A = 0,M = 1; 沒有被訪問過, 被修改過,則第一次遍歷會將這個屬性置為0,第二次遍歷遇到這種情況就可以直接被淘汰。
- A = 1, M = 0; 被訪問過,沒有被修改過,則第一次遍歷同樣會將訪問過的屬性置0, 第二次遍歷再進行清理。
- A = 1, M = 1; 被放過,也被修改過。第一次遍歷不會被淘汰。
淘汰的時候會優先選擇淘汰第一種情況,對第二種情況進行置位以便下次淘汰,對第三和第四種情況進行訪問計數自減,知道訪問計為0的時候才會淘汰。
而 rocksdb 實現的ClockCache 的算法大體接近,只是多了一些屬性(incache, usage, reference_count )只有當reference_count=1且 incache=1 時才會進行淘汰,否則淘汰的時候會將這一些屬性的計數自減。
Clock Cache 相比于LRUCache 的優勢主要有以下兩種:
- ClockCache 的節點淘汰并不會直接從 環形鏈表中刪除釋放節點,而是將淘汰的節點放在Recycle bin數組中,不像LRU cache的淘汰需要操作雙向鏈表的刪除以及節點釋放。
- ClockCache 在Erase的時候,可以使用tbb-hashtable的 并發erase優勢。
但是ClockCache 在insert的時候也會加鎖,所以 cache insert部分的優勢相比于Lru基本沒有,lookup的時候是無鎖的(不涉及數據結構的變更,不需要加鎖),所以整體上來看 ClockCache在lookup以及erase 的比例比較高的時候性能是比LRU cache有優勢的,但是insert比例比較高的時候基本就沒有什么優勢了。
使用cache_bench 工具進行測試可以比較明顯得看到這兩種Cache在不通workload下的性能差異:
-
Insert 80%, lookup 10%, erase 10%, threads 32, shards 32;
LRU : 305w/s, Clock: 203w/s
-
Insert 10%, lookup 10%, erase 80%, threads 32, shards 32;
LRU: 697w/s, Clock: 1173w/s
-
Insert 10%, lookup 80%, erase 10%, threads 32, shards 32
LRU: 681w/s, Clock: 1137w/s
當然,實際應用過程中對cache的操作是一個復雜過程,與workload強相關,比如Put比例多且cache 大小遠小于實際的磁盤數據量時,則cache的 insert 比例都會比較高(compaction導致的cache失效回填),而且這種情況下的Get 的key_range也是完全隨機的話lookup和insert比例都會比較高。而對于熱點場景,大多數是lookup的情況可能Clockcache會更有優勢一些。
1.4 ThreadLocalPtr 線程私有存儲
線程私有存儲是Rocksdb 為version 系統推出的一種訪問模型,能夠最大程度得實現無鎖訪問,從而加速整個引擎的讀/寫/compaction的效率。了解線程私有存儲之前我們需要先了解一下Rocksdb 的version系統,從而能夠整體看一下Rocksdb 的key/value 的讀寫操作模型。
1.4.1 version系統
整個version系統的架構及關系圖如下:
當我們Get或者Seek的時候,針對都會從當前cf先拿到一個SuperVersion,通過這個sv 來訪問對應的mem/imm/version。訪問version的時候會直接拿當前的current 取其versionStorageInfo,current會指向雙向鏈表維護的最新的version,再去拿到對應的sst文件元信息去索引對應的文件(FileMeta)。
像我們的Rocksdb::GetImpl接口,不論是去讀memtable 或者 immutable 或者 sst的都會先拿到sv,再去具體讀對應的value:
Status DBImpl::GetImpl(const ReadOptions& read_options,ColumnFamilyHandle* column_family, const Slice& key,PinnableSlice* pinnable_val, bool* value_found,ReadCallback* callback, bool* is_blob_index) {...// 獲取當前的superversionSuperVersion* sv = GetAndRefSuperVersion(cfd);...// 讀memtableif (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,&max_covering_tombstone_seq, read_options, callback,is_blob_index)) {...}// 讀immutable sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,&max_covering_tombstone_seq, read_options, callback,is_blob_index)) {...}// 讀sstsv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,&max_covering_tombstone_seq, value_found, nullptr, nullptr,callback, is_blob_index);...
}
如果從三個組件的任何一個能夠讀到,則會返回結果,并釋放當前的sv。
那接下來我們思考一下,針對Rocksdb的訪問一定是并發的,比如并發Get ,而memtable/sst 都是時刻在變動的(不斷得Put 產生的flush和compaction),那我們的一次Get想要保證讀到最新的數據,一定會拿到最新的version,如果沒有太多的考慮, 針對version的訪問需要鎖的保護,防止讀到更新的中間狀態。在這種情況下,一旦有鎖的參與,我們的性能將會巨差無比。
Rocksdb 為了提升高并發場景下針對LSM-tree 各個組件的訪問性能,減少鎖的參與,引入了線程私有存儲。
1.4.2 C++ thread_local vs ThreadLocalPtr
線程私有存儲 就是讓每一個線程都訪問自己的存儲內容,當然C++本身也提供了線程私有存儲的關鍵字(thread_local),當然rocksdb 并沒有選用,而是實現了一個更為復雜的ThreadLocalPtr。
先來看一下Rocksdb 沒有選用這個關鍵字的原因,如下thread_local的代碼:
#include <iostream>
#include <thread>
#include <unistd.h>class A {public:A() {}~A() {}void test(const std::string &name) {thread_local int count = 0;++count;std::cout << name << ": " << count << std::endl;}
};void func(const std::string &name) {A a1;a1.test(name);a1.test(name);A a2;a2.test(name);a2.test(name);
}int main(int argc, char* argv[]) {std::thread t1(func, "t1");t1.join();std::thread t2(func, "t2");t2.join();return 0;
}
我們聲明了類 A的成員函數test的訪問時線程私有存儲方式,然后在func 中生成了兩個對象, 按照我們正常的執行邏輯,兩個對象的test 函數的執行是各自輸出各自的,也就是都輸出1;兩個線程之間的執行互不影響。
而最終的輸出結果:
t1: 1
t1: 2
t1: 3
t1: 4
t2: 1
t2: 2
t2: 3
t2: 4
很明顯的一個問題,thread_local聲明了count之后,在當前線程的所有A 對象會共享同一個變量,我們想要一個線程內部的同一個類不通對象之間不共享,則無法生效。
其次,thread_local 無法區分不同的線程,上面的輸出其實是通過外部標識了不同的線程,對于執行函數func來說,它完全無法區分當前執行的是哪一個線程。
因為這一些語言(thread_local)/編譯器(__thread)本身的限制,Rocksdb 想要針對重要的version系統實現無鎖化,就自己設計了ThreadLocalPtr。
1.4.3 ThreadLocalPtr 設計
設計的初衷主要是線程私有存儲的靈活性:
- 在同一個線程內部區分不同對象(可能會有多個sv實例)。
- 在同一個進程內部區分不同的線程(可能不同線程同時讀寫一個sv)。
- 希望在一個線程內部能夠清空或者重置所有線程的tls
- 希望運行時再確定tls 的類型,而
thread_local和__thread都無法做到。
Rocksdb的 ThreadLocalPtr 使用方式如下,非常簡單(當然, 接口不止如下幾個):
// Define a variable
ThreadLocalPtr tls;// Write data
// thread1
tls.Reset(reinterpret_cast<int*>(1))
// tls.Swap(reinterpret_cast<int*>(1)) // 和上面的效果是一樣的
// thread2
tls.Reset(reinterpret_cast<int*>(2))// Get data
// thread1
tls.Get() == reinterpret_cast<int*>(1)
// thread2
tls.Get() == reinterpret_cast<int*>(2)// set all thread’s tls to null
tls.Scrape(&ptrs, nullptr);
整個ThreadLocalPtr的 類以及其相關類如下(不是標準的UML類圖),大體能夠看到其中各個成員之間的關系。
可以看到ThreadLocalPtr 除了對外的成員函數接口之外主要是一個 id_ 以及 StaticMeta::Instance()私有成員。
而StaticMeta 類中的主要成員 除了實現線程私有存儲的posix 類型pthread_key_t之外 還有一個__thread的ThreadData類型的私有成員tls_。
可以發現,我們實際的私有存儲數據是存放在ThreadData 之中的Entry之中,對于私有存儲數據的管理是通過ThreadData維護的雙向鏈表進行的。而且每一個ThreadData對象都會有一個共享的ThreadLocalPtr::StaticMeta對象。
而最外部的ThreadLocalPtr的id_則能夠標識訪問當前私有存儲的線程。
接下來我們看一下創建一個 ThreadLocalPtr 對象,不同類之間的關系形態:
-
創建第一個
ThreadLocalPtr對象local_1通過
Reset接口,將創建好的ThreadData采用鏈表的尾插法,插入到StaticMeta維護的 ThreaData 類型的head_之后,形成一個雙向循環鏈表,同時將Reset傳入的void*類型的數據存儲到Entry數組之中;標識下一個ThreadLocalPtr 對象的id是1。 -
創建第二個
ThreadLocalPtr對象local_2創建的第二個私有存儲變量
local_2同樣會創建一個ThreadData對象,采用雙向鏈表的尾插法插入到StaticMeta維護的 雙向循環鏈表中,并標識下一個ThreadLocalPtr實例的id是2。這樣,我們有了一個全局的
StaticMeta對象(對StaticMeta 對象的創建是單例模式),通過它的雙向循環鏈表head_能夠訪問到所有的創建的私有存儲對象。每一個雙向鏈表的節點ThreadData都保存了所有ThreadLocalPtr對象,訪問具體的哪一個線程創建的ThreadLocalPtr對象則通過ThreadLocalPtr的id_標識。
具體的訪問以及創建ThreadLocalPtr 對象的代碼如下:
-
訪問
ThreadLocalPtr對象的代碼Get:void* ThreadLocalPtr::StaticMeta::Get(uint32_t id) const {// 拿到全局StaticMeta 實例 的tls auto* tls = GetThreadLocal();if (UNLIKELY(id >= tls->entries.size())) {return nullptr;}// 訪問當前 線程的ThreadLocalPtr 的存儲內容return tls->entries[id].ptr.load(std::memory_order_acquire); } -
創建
ThreadLocalPtr對象的代碼Reset(存儲的時候是松散內存序,可能不會立即同步到內存,僅僅同步到cpu-cache就返回了) /Swap(嚴格內存序,會從cpu-cache同步到內存)void ThreadLocalPtr::StaticMeta::Reset(uint32_t id, void* ptr) {// 拿到全局 StaticMeta 實例的tlsauto* tls = GetThreadLocal();if (UNLIKELY(id >= tls->entries.size())) {// Need mutex to protect entries access within ReclaimIdMutexLock l(Mutex());tls->entries.resize(id + 1);}// 松散內存序存儲tls->entries[id].ptr.store(ptr, std::memory_order_release); }void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) {auto* tls = GetThreadLocal();if (UNLIKELY(id >= tls->entries.size())) {// Need mutex to protect entries access within ReclaimIdMutexLock l(Mutex());tls->entries.resize(id + 1);}// 嚴格內存序列內存 CASreturn tls->entries[id].ptr.exchange(ptr, std::memory_order_acquire); }
1.4.4 ThreadLocalPtr 在 version 系統中的應用
回到我們最初version系統中的SuperVersion,讀取操作主要是依賴superversion。Rocksdb 這里針對SuperVersion 的操作主要提供了如下兩個函數:
InstallSuperVersion // 后臺線程flush/compaction 完成之后會更新 全局的superversion 以及 清理所有的 tls superversion
GetReferencedSuperVersion // 獲取 local tls 的sv,如果過期了,則會重新從全局的sv中獲取一個新的sv進行訪問;否則直接拿到local 的 sv進行訪問。
通過如下圖能夠非常清晰得看到 tls(thread local storage) 在 version系統的重要作用。
我們的writer 和 reader 可以看作是用戶調用的Put 和 Get 操作,每一個操作代表一個并發線程。
Writer 操作會引入flush 以及 compaction操作,BackGroundCompaction --> InstallSuperVersionAndScheduleWork --> InstallSuperVerion 的時候會先更新全局的super_version_,同時將所有的tls sv 通過ResetThreadLocalSuperVersions設置為 obsolute。
代碼如下:
void ColumnFamilyData::ResetThreadLocalSuperVersions() {autovector<void*> sv_ptrs;// 對tls 的所有 ThreadLocalPtr 對象進行通知標記(讀請求拿到的tls也能夠感知到)local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);for (auto ptr : sv_ptrs) {assert(ptr);if (ptr == SuperVersion::kSVInUse) {continue;}auto sv = static_cast<SuperVersion*>(ptr);bool was_last_ref __attribute__((__unused__));was_last_ref = sv->Unref();// sv couldn't have been the last reference because// ResetThreadLocalSuperVersions() is called before// unref'ing super_version_.assert(!was_last_ref);}
}// Scrape 的操作就是通過 StaticMeta 單例對象來遍歷循環鏈表中的 tls存儲,逐個標記
void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs,void* const replacement) {MutexLock l(Mutex());for (ThreadData* t = head_.next; t != &head_; t = t->next) {if (id < t->entries.size()) {void* ptr =t->entries[id].ptr.exchange(replacement, std::memory_order_acquire);if (ptr != nullptr) {ptrs->push_back(ptr);}}}
}
這個時候,Writer 通過Scrape 標記了當前db的所有的tls 存儲為過期,完成標記之前 可能有一個reader 通過 GetReferencedSuperVersion 創建了一個tls 對象,創建完成之后被Scrape 標記了過期,那這個時候reader 需要通過 db 級別的鎖來獲取全局最新的super_version_,獲取之后再更新本地的local_sv。
需要注意的是獲取全局 super_version_ 的時候需要保證獲取期間不會有其他的writer 更新這個 super_version_。
代碼如下:
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex) {// The SuperVersion is cached in thread local storage to avoid acquiring// mutex when SuperVersion does not change since the last use. When a new// SuperVersion is installed, the compaction or flush thread cleans up// cached SuperVersion in all existing thread local storage. To avoid// acquiring mutex for this operation, we use atomic Swap() on the thread// local pointer to guarantee exclusive access. If the thread local pointer// is being used while a new SuperVersion is installed, the cached// SuperVersion can become stale. In that case, the background thread would// have swapped in kSVObsolete. We re-check the value at when returning// SuperVersion back to thread local, with an atomic compare and swap.// The superversion will need to be released if detected to be stale.// 標記當前創建的 tls 是 kSVInUse,void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);// Invariant:// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage// should only keep kSVInUse before ReturnThreadLocalSuperVersion call// (if no Scrape happens).assert(ptr != SuperVersion::kSVInUse);SuperVersion* sv = static_cast<SuperVersion*>(ptr);// 檢測標記之后 可能因為writer 的 install 導致的Obsolete,則需要重新獲取全局的svif (sv == SuperVersion::kSVObsolete ||sv->version_number != super_version_number_.load()) {RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);SuperVersion* sv_to_delete = nullptr;if (sv && sv->Unref()) {RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);db_mutex->Lock();// NOTE: underlying resources held by superversion (sst files) might// not be released until the next background job.// 因為當前拿到的sv 已經過期了, 需要清理掉它占用的資源sv->Cleanup();sv_to_delete = sv;} else {db_mutex->Lock();}// 加鎖獲取全局的 super_version_sv = super_version_->Ref();db_mutex->Unlock();delete sv_to_delete;}// 當然,如果拿到的tls sv 并沒有被標記為過期,則直接返回訪問即可。assert(sv != nullptr);return sv;
}
writer 更新 全局super_version_ 并 標記所有的tls sv失效,reader 拿到 tls sv 做一些 檢測 發現失效則通過db鎖進行同步。
這個過程對version 系統的訪問能夠極大得減少多線程下對version系統訪問的同步開銷,提升了rocksdb 的讀寫性能。
2. 寫鏈路
2.1 JoinBatchGroup
從JoinBatchGroup 代碼細節 來看 Rocskdb 對寫入邏輯的性能優化
3. 其他
3.1 用最低的代價獲取 critical path 的 Statistics
Rocksdb 擁有非常完善的運維體系,包括一百多個tickers 和 幾十個histograms。這一些指標都是嵌入到整個rocksdb workload的critical path之中。比如DB_GET 表示 Get請求的延時,也就是每一次調用Get都會記錄一次延時,并且后續通過rocksdb.db.get.micros 取的時候計算直方圖信息。
那如何保證在打開statistics 之后 這么多關鍵路徑的指標不會影響引擎本身的性能,且指標的存放和讀取不會耗費過多的cpu呢?
引擎指標對于上層應用來說是必不可少的,那如何保證指標這里的性能就成為的關鍵了,接下來我們學習一下Rocksdb 這里是怎么做的。
-
指標的收集
以histogram 類型的指標為例,
DB_GET。在GetImpl入口處 通過StopWatch sw(env_, stats_, DB_GET);進行收集。
即StopWatch類的構造函數是打一個時間點,析構函數會做一個時間差通過reportTimeToHistogram記錄到statistics中。~StopWatch() {if (elapsed_) {if (overwrite_) {*elapsed_ = env_->NowMicros() - start_time_;} else {*elapsed_ += env_->NowMicros() - start_time_;}}if (elapsed_ && delay_enabled_) {*elapsed_ -= total_delay_;}if (stats_enabled_) {statistics_->reportTimeToHistogram(hist_type_, (elapsed_ != nullptr)? *elapsed_: (env_->NowMicros() - start_time_));} } -
指標的存儲
存儲histogram 數據的調用棧如下:reportTimeToHistogramStatistics::recordInHistogramStatisticsImpl::recordInHistogram在
recordInHistogram函數中,實現邏輯如下:void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::recordInHistogram(uint32_t histogramType, uint64_t value) {assert(histogramType < HISTOGRAM_MAX);if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) {return;}per_core_stats_.Access()->histograms_[histogramType].Add(value);if (stats_ && histogramType < HISTOGRAM_MAX) {stats_->recordInHistogram(histogramType, value);} }可以發現真正的存儲是存放在了
per_core_stats_.Access()->histograms_[histogramType].Add(value),也就是一個StatisticsData類型的CoreLocalArray中。StatisticsData的定義中重載了new和delete操作符,保證分配和釋放都是cacheline對齊的。
為什么要有一個CoreLocalArray呢?
先來看看它的作用,通過Access函數 我們進入到了AccessElementAndIndex中
template <typename T>
std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {// 獲取當前的cpu-idint cpuid = port::PhysicalCoreID();size_t core_idx;if (UNLIKELY(cpuid < 0)) {// cpu id unavailable, just pick randomly// 如果cpu id不可用,直接隨便選擇一個cpu idcore_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_);} else {// 否則選擇當前的cpu idcore_idx = static_cast<size_t>(cpuid & ((1 << size_shift_) - 1));}// 取當前cpu-id 緩存的 StatisticsData 類型的地址返回。return {AccessAtCore(core_idx), core_idx};
}
最后使用當前cpu id 的 對象地址 Add 記錄下來的指標數據。
大體上能夠發現,我們的指標數據是緩存在執行當前Get操作的cpu 上。
后續拿該指標的時候通過getHistogramImplLocked
std::unique_ptr<HistogramImpl>
StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getHistogramImplLocked(uint32_t histogramType) const {assert(histogramType < HISTOGRAM_MAX);std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());// 訪問當前cpu的所有core,取每一個core 之前存儲的該指標的數據。for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {res_hist->Merge(per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);}return res_hist;
}
我們大概知道了CoreLocalArray在指標集中的用法, 那rocksdb 為什么要用這樣的方式保存指標呢?
在關鍵路徑上的額外處理邏輯每一步都可能影響性能,我們將指標隨便保存在內存中,后續的用戶線程 以及 引擎內部的stats打印線程 讀取則可能涉及跨numa訪問,同樣會造成額外的cpu開銷。
如果我們將指標按照core-id 保存在各自core 的本地存儲中(cpu-cache/距離cpu近的內存中),而且保證了StatisticsData的分配和釋放都是cache-line對齊,那么在訪問過程中從本地core存儲按照cache-line 讀取能夠極大得減少跨numa 以及 cache-line 補齊 的開銷,最大程度得保證了指標訪問的性能 ,減少對系統CPU資源的占用。
總結
以上是生活随笔為你收集整理的跟着Rocskdb 学 存储引擎:读写链路的代码极致优化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个qq网名女生
- 下一篇: 临字开头成语有哪些啊?