Rocksdb 的 MergeOperator 简单使用记录
本篇僅僅是一個記錄 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 來代替Update 場景中的讀改寫操作,即用戶的一個Update 操作需要調(diào)用rocksdb的 Get + Put 接口才能完成。
而這種情況下會引入一些額外的讀寫放大,對于支持SQL這種update 頻繁的場景來說實在是不劃算,所以Merge 操作橫空出世,用戶只需要實現(xiàn)自己的 Merge 操作,通過option 傳入,接下來有update 的場景時只需要調(diào)用一個Merge 就可以完成了,后續(xù)針對當(dāng)前key的 real update 都會在后臺Compaction 以及 用戶調(diào)研 Get 或者 迭代器操作 時會進(jìn)行合并。當(dāng)然,Merge本身也存在問題,就是如果kMergeType得不到及時得compaction 調(diào)度,那可能讀得負(fù)載就重了,因為讀需要將之前的未Merge 都進(jìn)行Merge 才能返回。
因為MergeOperator虛基類 的函數(shù)太多了,會區(qū)分 full merge 和 partial merge,但是對于很多用戶來說就是一個計數(shù)累加或者 string-append 操作,并沒有過于復(fù)雜的操作,所以rocksdb 提供了 更為通用的虛基類AssociativeMergeOperator來屏蔽復(fù)雜的Full merge 和 partial merge,繼承這個類則只需要主體實現(xiàn)一個Merge 和 Name函數(shù)即可。
如下代碼使用了Rocksdb 已經(jīng)封裝好的兩個 Merge操作,一個是StringAppendOperator ,另一個是UInt64AddOperator,Merge本身就是寫一個key/value,只不過key的type是kMergeType,value 也是實際存在的。
StringAppendOperator
StringAppend的簡單測試代碼如下,我們使用同一個key進(jìn)行兩次Merge操作,相當(dāng)于寫入了兩條kMergeType的key到db中,然后調(diào)用一次Flush,會生成一個sst文件(進(jìn)行Merge),再Get會發(fā)現(xiàn)這個key的value 按照StringAppend中的行為完成了Merge。
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默認(rèn)會用 逗號分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() {OpenDB();DoWrite();return 0;
}
輸出如下:
Finish open !
Finish merge !
Get merge value len: 3 data: 2,3
可以看到Get到的value 已經(jīng)進(jìn)行合并了。
UInt64AddOperator
這個是一個自增計數(shù)的Merge 案例。
需要主要的是如果自己實現(xiàn) MergeOperator底層有編解碼,那上層用戶側(cè)請求的寫入也需要 編碼方式寫入 以及 按照底層的解碼方式讀取。
Rocksdb實現(xiàn)的案例代碼在拿到用戶傳入的value的時候會進(jìn)行編解碼:
// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator {public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override {uint64_t orig_value = 0;if (existing_value){// 解碼以存在的value,則我們上層調(diào)用Merge 寫入的時候需要按照Fixed64進(jìn)行編碼orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true; // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const {uint64_t result = 0;if (value.size() == sizeof(uint64_t)) {result = DecodeFixed64(value.data());} else if (logger != nullptr) {// If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};
案例代碼:
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() {int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) {if (LittleEndian()) {memcpy(buf, &value, sizeof(value));} else {buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因為底層實現(xiàn)的Uint64AddOperator 會進(jìn)行編碼 以及 解碼EncodeFixed64(buf, 2);// 對同一個key ,merge 兩個2,則最后Get的時候會變成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() {OpenDB();DoWrite();return 0;
}
輸出如下:
Finish open !
Finish merge !
Get merge value 8 4
總結(jié)
以上是生活随笔為你收集整理的Rocksdb 的 MergeOperator 简单使用记录的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 有哪些人不孕不育
- 下一篇: 《春雪》第十六句是什么