使用pthread和线程池实现B+树的并行块加载bulkload过程
數據庫課程設計
實驗環境
- 架構:Intel x86_64 (虛擬機)
 - 操作系統:Ubuntu 20.04
 - 匯編器:gas (GNU Assembler) in AT&T mode
 - 編譯器:gcc
 
項目地址
項目地址
課程設計要求
- 理解B+樹bulkloading操作過程的代碼。
 - 掌握多核并行程序的設計。
 - 并行化實現 
- 實現底層葉子節點的并行構建。
 - 實現索引節點(非葉子節點)的并行構建。
 
 - 實驗結果 
- 比較不同數據量的并行加速效果,將加速比繪制成折線圖。
 
 
項目運行
進入Tree_project文件夾,在終端輸入make,然后:
- 輸入./run為串行加載程序;
 - 輸入./run parallel為并行加載程序;
 - 輸入./run print為串行加載程序,并打印B+樹;
 - 輸入./run parallel print為并行加載程序,并打印B+樹。
 
課程設計報告
B+樹bulkloading過程的理解
1. B+樹bulkloading過程的準備階段:
在main函數中首先執行程序,所以先看main函數,首先聲明節點大小B_為512,存儲指向節點的指針個數n_pts_為1000000,然后將讀取數據文件和B+樹存儲數據文件的文件名復制到data_file和tree_file字符串數組中,并將文件名打印出來,申請一個名為table的Result類型數組,數組大小為n_pts_,讀入數據文件data_file的文件流到文件指針fp中,然后讀入每行數據文件data_file中的數據到string類型變量line中,每一行i的兩個數據以逗號分隔,第一個數據讀到table數組相應下標i的key_里,代表關鍵字,第二個數據讀到table數組相應下標的id_里,代表指向節點的指針,關閉文件流,開始計時,首先創建一個B+樹trees_,并對其進行初始化,然后對其進行bulkloading過程,打印這個過程的時間,程序結束。
2. B+樹bulkloading過程使用的函數:
int BTree::bulkload(int n, const Result *table)B+樹bolkloading會調用一個名為int bulkload(int n, const Result *table)的函數進行加載,這個函數在b_tree.cc中定義,參數n是要加載的條目entries數,參數table是一個哈希表,存儲了這n個條目的相關信息,大小也為n,每個條目與每個哈希表的下標一一對應,哈希表的關鍵字是key_,值是一個指向子結點的指針id_,通過這個哈希表,可以快速找到要加載的條目對應的指針。
2.1 首先聲明變量:
BIndexNode *index_child = NULL; BIndexNode *index_prev_nd = NULL; BIndexNode *index_act_nd = NULL; BLeafNode *leaf_child = NULL; BLeafNode *leaf_prev_nd = NULL; BLeafNode *leaf_act_nd = NULL;int id = -1; int block = -1; float key = MINREAL;bool first_node = true; int start_block = 0; int end_block = 0;函數一開始時,首先聲明了幾個變量:
- 三個BIndexNode *類的索引結點指針: 
- index_child指向索引結點的子結點,這個子結點也是索引結點
 - index_prev_nd指向前一個條目插入滿的索引結點
 - index_act_nd指向當前正在進行插入條目的索引結點
 
 - 聲明了三個BLeafNode *葉子結點指針 
- leaf_child指向第1層索引結點(葉子結點層數為0)的子結點,這個子結點是葉子結點
 - leaf_prev_nd指向上一個插入滿的葉子結點
 - leaf_act_nd指向當前正在進行插入的葉子結點。
 
 - 聲明整型變量id和浮點型變量key存儲遍歷哈希表table時每個id_和key_的值
 - 聲明布爾類型變量first_node判斷當前結點是否是當前層的第一個結點
 - 聲明整型變量start_block和end_block存儲B+樹每一層的第一個結點和最后一個結點的位置,葉子結點的block從1開始,依次加一往后
 - 聲明整型變量block存儲從每一層的第一個結點位置start_block到最后一個結點的位置end_block進行遍歷時每一個結點的位置
 
2.2 接著先從底層葉子節點構建,從左往右按順序構建一個雙向鏈表:
for (int i = 0; i < n; ++i) {id = table[i].id_;key = table[i].key_;if (!leaf_act_nd) {leaf_act_nd = new BLeafNode();leaf_act_nd->init(0, this);if (first_node) {first_node = false; // init <start_block>start_block = leaf_act_nd->get_block();}else { // label siblingleaf_act_nd->set_left_sibling(leaf_prev_nd->get_block());leaf_prev_nd->set_right_sibling(leaf_act_nd->get_block());delete leaf_prev_nd; leaf_prev_nd = NULL;}end_block = leaf_act_nd->get_block();} leaf_act_nd->add_new_child(id, key); // add new entryif (leaf_act_nd->isFull()) {// change next node to store entriesleaf_prev_nd = leaf_act_nd;leaf_act_nd = NULL;} } if (leaf_prev_nd != NULL) {delete leaf_prev_nd; leaf_prev_nd = NULL; } if (leaf_act_nd != NULL) {delete leaf_act_nd; leaf_act_nd = NULL; }- 先從底層葉子節點構建,從左往右按順序構建一個雙向鏈表,遍歷哈希表table: 
- 在每次遍歷中,將哈希表中每一項的id_和key_值記錄在整型變量id和key中,代表要加載每一項的指向子節點的指針和關鍵字。判斷此時指向當前正在進行插入的葉子結點leaf_act_nd是否為空: 
- 如果為空,那么要創建一個新的葉子結點leaf_act_nd,并使用它的初始化方法init(0, this)初始化這個葉子結點,設置這個結點所處層數為0,所在B+樹為當前調用這個操作的B+樹: 
- 在葉子結點的初始化方法void init(int level, BTree *btree)中,首先根據傳入參數設置所在層數和B+樹,設置結點中保存的條目數num_entries_和關鍵字數num_keys_為0,左兄弟和右兄弟所處在的block位置為-1,設置臟塊dirty_為true,表示要寫回文件。
 - 然后調用btree_->file_->get_blocklength()獲取block塊的長度到b_length,調用語句get_key_size(b_length)計算block塊的長度b_length除以葉子結點大小LEAF_NODE_SIZE并向下取整賦給最大可以存儲的關鍵字數capacity_keys_,經過計算后得到關鍵字的大小到key_size中。
 - 接著為結點的關鍵字浮點型指針key_申請一個大小為關鍵字最大容量capacity_keys_的浮點型數組并將其中元素都初始化為MINREAL,獲取結點的頭部大小header_size和條目大小entry_size,根據這兩個值以及block塊的長度b_length和關鍵字的大小key_size計算出這個結點可以包含的條目的最大容量capacity_,小于100則報錯退出,否則為結點的對象id整型指針id_申請一個大小為capacity_的整型數組并將其中元素都初始化為-1。
 - 最后為字符型指針blk申請一個大小為block塊的長度b_length的字符型數組,使用語句btree_->file_->append_block(blk)添加一個新塊block到文件末尾,并設置結點所處塊的序號。 
- 在int append_block(Block block)方法中,首先使用語句fseek(fp_, 0, SEEK_END)使BlockFile的文件指針fp_指向存儲文件的末尾。
 - 然后使用語句put_bytes(block, block_length_)將這個block中的內容寫入文件指針fp_指向的存儲文件,并且文件中block的數目num_blocks_加一。
 - 接著使用語句fseek(fp_, SIZEINT, SEEK_SET)使BlockFile的文件指針fp_指向存儲文件的開頭后4個字節文件頭部的位置,使用語句fwrite_number(num_blocks_)將更新后的block的數目num_blocks_寫入文件中。
 - 最后使用語句fseek(fp_, -block_length_, SEEK_END)使文件指針fp_指向的存儲文件加入的新塊,設置文件指針指向的塊的序號act_block_為num_blocks_并返回act_block_ - 1。
 
 
 - 接著判斷這個葉子結點是否是它所在這一層的第一個結點first_node: 
- 如果是,則標記起始點的位置start_block為這一個結點的位置,并把first_node置為false,使之后的葉子結點都不是第一個葉子結點
 - 如果不是,那么標記當前插入的葉子結點的左兄弟的block位置為上一個插入滿的葉子結點的block位置,上一個插入滿的葉子結點的右兄弟的block位置為當前插入的葉子結點的block位置,形成一個雙向鏈表。刪除指向上一個插入滿的葉子結點的指針leaf_prev_nd,此時調用析構函數將結點的信息寫入文件相應的block位置中。
 
 - 標記最后一個結點的位置end_block為當前正在進行插入的葉子結點的位置
 
 - 如果為空,那么要創建一個新的葉子結點leaf_act_nd,并使用它的初始化方法init(0, this)初始化這個葉子結點,設置這個結點所處層數為0,所在B+樹為當前調用這個操作的B+樹: 
 - 當前插入的葉子結點leaf_act_nd不為空后,使用語句leaf_act_nd->add_new_child(id, key)插入指針為id,關鍵字為key的條目到當前正在執行插入操作的結點leaf_act_nd中,插入完成后,如果當前插入的葉子結點已經滿了,那標記當前插入的葉子結點為上一個插入的結點,當前插入的葉子結點為空,以便之后可以創建新的結點插入新的條目。遍歷完成,哈希表table中所有的條目都被加到了葉子結點中。
 - 刪除指向上一個插入滿的葉子結點的指針leaf_prev_nd和當前正在進行插入的葉子結點leaf_act_nd,此時調用析構函數將結點的信息寫入文件相應的block位置中。
 
 - 在每次遍歷中,將哈希表中每一項的id_和key_值記錄在整型變量id和key中,代表要加載每一項的指向子節點的指針和關鍵字。判斷此時指向當前正在進行插入的葉子結點leaf_act_nd是否為空: 
 
2.3 然后從下往上,一層層構建索引節點,每一層也是從左往右構建索引節點:
int current_level = 1; // current level (leaf level is 0) int last_start_block = start_block; // build b-tree level by level int last_end_block = end_block; // build b-tree level by levelwhile (last_end_block > last_start_block) {first_node = true;for (int i = last_start_block; i <= last_end_block; ++i) {block = i; // get <block>if (current_level == 1) {leaf_child = new BLeafNode();leaf_child->init_restore(this, block);key = leaf_child->get_key_of_node();delete leaf_child; leaf_child = NULL;}else {index_child = new BIndexNode();index_child->init_restore(this, block);key = index_child->get_key_of_node();delete index_child; index_child = NULL;}if (!index_act_nd) {index_act_nd = new BIndexNode();index_act_nd->init(current_level, this);if (first_node) {first_node = false;start_block = index_act_nd->get_block();}else {index_act_nd->set_left_sibling(index_prev_nd->get_block());index_prev_nd->set_right_sibling(index_act_nd->get_block());delete index_prev_nd; index_prev_nd = NULL;}end_block = index_act_nd->get_block();} index_act_nd->add_new_child(key, block); // add new entryif (index_act_nd->isFull()) {index_prev_nd = index_act_nd;index_act_nd = NULL;}}if (index_prev_nd != NULL) {// release the spacedelete index_prev_nd; index_prev_nd = NULL;}if (index_act_nd != NULL) {delete index_act_nd; index_act_nd = NULL;}last_start_block = start_block;// update infolast_end_block = end_block; // build b-tree of higher level++current_level; } root_ = last_start_block; // update the <root>首先進行聲明:
- 以葉子所在層為第0層,從下往上,聲明當前結點所處樹的層current_level為1
 - 之前層開始的位置last_start_block為上面第一個葉子結點的位置start_block
 - 之前層結束的位置last_end_block為上面最后一個葉子結點的位置end_block
 
然后按照每一層從左往右、從下往上構建索引結點,直到根結點:
- 然后當之前層結束的位置last_end_block大于之前層開始的位置last_start_block時,說明還沒有插入到根結點(last_start_block == last_end_block),進入一個while循環,每次循環,從下往上構建每一層的索引結點: 
- 首先使此層第一個點first_node為true,從上一個開始的位置last_start_block開始,到上一個結束的位置last_end_block結束,進入一個for循環,使block為當前位置 
- 如果當前所處層current_level為1,說明正在構建第1層的索引結點,其每個兒子結點都是葉子結點,那么創建一個新的葉子結點leaf_child,這個葉子結點是構建的索引結點的兒子結點,調用這個葉子結點的init_restore(this, block)方法加載處于block位置的之前存儲的葉子結點的信息,并獲取這個結點的關鍵字并賦值給key。刪除葉子結點leaf_child,此時調用析構函數將結點的信息寫入文件相應的block位置中。
 - 如果當前所處層current_level大于1,說明正在構建的索引結點,其每個兒子結點也全都是索引結點,那么創建一個新的索引結點index_child,這個索引結點是構建的索引結點的兒子結點,調用這個結點的init_restore(this, block)方法加載這個處于block位置的之前存儲的結點的信息,并獲取這個結點的關鍵字并賦值給key。刪除索引結點index_child,此時調用析構函數將結點的信息寫入文件相應的block位置中。
 - 判斷此時正在進行插入操作的索引結點index_act_nd是否為空: 
- 如果為空,那么要創建一個新的索引結點index_act_nd,并使用它的初始化方法init(current_level, this)初始化這個索引結點,設置這個結點所處的層數和所在B+樹,接著判斷這個索引結點是否是它所在這一層的第一個結點first_node: 
- 如果是,則標記此層起始點的位置start_block為這一個結點的位置,并把first_node置為false,使之后的索引結點都不是第一個索引結點
 - 如果不是,那么標記當前正在插入的索引結點index_act_nd的左兄弟為上一個插入滿的索引結點index_prev_nd,上一個插入滿的索引結點index_prev_nd的右兄弟為當前正在插入的索引結點index_act_nd,形成一個雙向鏈表。標記最后一個結點的位置end_block為正在進行插入的索引結點的位置
 
 
 - 如果為空,那么要創建一個新的索引結點index_act_nd,并使用它的初始化方法init(current_level, this)初始化這個索引結點,設置這個結點所處的層數和所在B+樹,接著判斷這個索引結點是否是它所在這一層的第一個結點first_node: 
 - 當前正在插入的索引結點index_act_nd不為空后,使用語句index_act_nd->add_new_child(key, block)把關鍵字為key,位置為block的兒子結點插入進這個索引結點中,插入完成后,如果當前插入的索引結點已經滿了,那標記當前插入的索引結點為上一個插入的結點,當前插入的索引結點為空,以便之后可以創建新的結點插入新的條目。遍歷完成,這一層的索引結點就構建完成。
 
 - 當前層插入完成之后,更新之前層開始的位置last_start_block為插入完成后的當前層的開始位置start_block,之前層結束的位置last_end_block為當前層的位置end_block,層數加一,代表往上一層。
 
 - 首先使此層第一個點first_node為true,從上一個開始的位置last_start_block開始,到上一個結束的位置last_end_block結束,進入一個for循環,使block為當前位置 
 - 全部插入完成之后,更新根結點root_為上一個開始的位置last_start_block,刪除所用到的結點指針,此時調用析構函數將結點的信息寫入文件相應的block位置中,bulkloading加載過程完成。
 
算法并行的設計思路
從上面串行bulkloading過程可以知道,首先從最底層葉子結點開始,從左往右初始化葉子結點并構建雙向鏈表,然后從下往上,逐層從左往右初始化索引結點并構建雙向鏈表,直到根結點,程序結束。
那么并行設計算法,就可以通過對每一層結點的位置和相應存儲信息進行計算,得到每一個結點需要添加的條目或兒子結點的信息,從而每一層的每個結點都可以并行地添加數據,從而提高程序運行效率。
葉子結點所添加的是條目,索引結點添加的是關鍵字和兒子結點的位置,分別需要使用不同的添加函數和初始化函數,由于葉子結點在最底層,所以可以將程序分為兩個部分,先開始向葉子結點并行添加數據,葉子結點全部構建完成之后,再每層向上并行向索引結點添加數據。
對每一層進行并行添加數據,從下往上,下面一層構建完成后,再向上一層的結點并行添加數據,直到根結點,程序結束。
算法流程圖
關鍵代碼描述
1. 初始化結點
每個結點都需要根據所處層調用init方法進行初始化,在初始化方法中,需要完成確定結點的block位置(使用init方法)、設置結點的信息數據和添加條目或兒子結點的任務,對于葉子結點和索引結點,初始化方法有所不同,可以分成兩個函數進行實現。
初始化葉子結點:
// 初始化葉子結點所使用的參數結構體 struct Init_Leaf_Node_Params {BLeafNode *first_leaf_node; // 葉子結點層的第一個結點,如果這個初始化的葉子結點不是第一個結點,這個參數為NULLBTree *btree; // 葉子結點所在B+樹int n; // 總的條目個數const Result *table; // 存儲條目信息的哈希表int leaf_node_capacity_entries; // 葉子結點可以存儲的最大條目數int leaf_node_capacity_keys; // 葉子結點可以存儲的最大關鍵字數int leaf_node_num; // 葉子結點的總數 };// 初始化葉子結點 static void init_the_leaf_node(void *arg) {// 獲取參數Init_Leaf_Node_Params *init_params = (Init_Leaf_Node_Params *)arg;BLeafNode *first_leaf_node = init_params->first_leaf_node;BTree *btree = init_params->btree;int n = init_params->n;const Result *table = init_params->table;int leaf_node_capacity_entries = init_params->leaf_node_capacity_entries;int leaf_node_capacity_keys = init_params->leaf_node_capacity_keys;int leaf_node_num = init_params->leaf_node_num;int i, ret;// 聲明初始化的葉子結點BLeafNode *leaf_node;// 如果作為參數傳遞進來的first_leaf_node為空if (first_leaf_node == NULL) {// 說明初始化的這個結點不是該層的第一個結點,調用init方法進行初始化leaf_node = new BLeafNode();// 調用init方法時,會改變block的位置序號,每次調用位置block加一,所以要加鎖leaf_node->init(0, btree);}// first_leaf_node不為空說明是第一個結點,直接獲取結點else {leaf_node = first_leaf_node;}// 獲取結點位置int block = leaf_node->get_block();// 向葉子結點添加條目在哈希表中的起始位置int start_pos = (block - 1) * leaf_node_capacity_entries;// 向葉子結點添加條目在哈希表中的終點位置int end_pos = start_pos + leaf_node_capacity_entries;// 如果終點位置大于條目個數,那么終點位置直接為條目的末尾nif (end_pos > n) {end_pos = n;}// 向這個葉子結點添加的總條目數int num_entries = end_pos - start_pos;// 要完成的任務數加上要添加的條目數task_num += num_entries;// 向這個葉子結點添加的關鍵字數int num_keys;if ((num_entries * SIZEINT) % LEAF_NODE_SIZE == 0) {num_keys = (num_entries * SIZEINT) / LEAF_NODE_SIZE;} else {num_keys = (num_entries * SIZEINT) / LEAF_NODE_SIZE + 1;}assert(num_keys <= leaf_node_capacity_keys);// 并行添加條目的參數數組Add_Leaf_Node_Child_Params params[num_entries];i = 0;while (i < num_entries) {// 在數組中賦值并傳遞參數,可保證參數地址固定,避免并行時參數出現錯誤params[i].leaf_node = leaf_node;params[i].begin_index = i + start_pos;// 確定添加進結點的末尾條目的位置if (i + THREAD_ADD_CHILD_NUM - 1 >= num_entries) {params[i].end_index = start_pos + num_entries - 1;}else {params[i].end_index = i + start_pos + THREAD_ADD_CHILD_NUM - 1;}params[i].table = table;params[i].start_pos = start_pos;// 將添加條目的線程加入任務隊列等待執行addTask(&pools, add_leaf_node_child, (void *)¶ms[i]);i += THREAD_ADD_CHILD_NUM;}// 當完成的任務數小于要完成的任務數,等待完成while (task_num_finish < task_num) {sleep(0);}// 設置葉子結點含有的條目數、關鍵字數、是臟塊刪除指針要寫回文件leaf_node->set_num_entries(num_entries);leaf_node->set_num_keys(num_keys);leaf_node->set_dirty(true);// 如果結點不是這層第一個結點,設置它的左兄弟是前一個結點if (block > 1) {leaf_node->set_left_sibling(block - 1);}// 如果結點不是此層最后一個結點,設置它的右兄弟是后一個結點if (block < leaf_node_num) {leaf_node->set_right_sibling(block + 1);}// 刪除結點指針,使其信息寫回文件if (leaf_node != NULL) {delete leaf_node;leaf_node = NULL;} }在初始化葉子結點的方法中,需要確定這個葉子結點添加的條目在哈希表中的起始位置,這樣就不用一個一個使用add_new_child方法進行添加條目,而是可以并行化,將條目添加進這個葉子結點中,一個線程也可以一次添加多個條目,不一定一次只有添加一個條目,一次添加的最大條目數由宏定義THREAD_ADD_CHILD_NUM控制。
添加進這個葉子結點的條目在哈希表中的起始位置為這個葉子結點的位置block減一(葉子結點層第一個葉子結點位置block為1),乘以每個葉子結點可以添加的最大條目數leaf_node_capacity_entries,終點位置為開始位置加上每個葉子結點可以添加的最大條目數leaf_node_capacity_entries,如果終點位置大于條目個數,那么終點位置直接為條目的末尾n,代碼如下:
// 向葉子結點添加條目在哈希表中的起始位置 int start_pos = (block - 1) * leaf_node_capacity_entries; // 向葉子結點添加條目在哈希表中的終點位置 int end_pos = start_pos + leaf_node_capacity_entries; // 如果終點位置大于條目個數,那么終點位置直接為條目的末尾n if (end_pos > n) {end_pos = n; }// 向這個葉子結點添加的總條目數 int num_entries = end_pos - start_pos;i = 0; while (i < num_entries) {// 在數組中賦值并傳遞參數,可保證參數地址固定,避免并行時參數出現錯誤params[i].leaf_node = leaf_node;params[i].begin_index = i + start_pos;// 確定添加進結點的末尾條目的位置if (i + THREAD_ADD_CHILD_NUM - 1 >= num_entries) {params[i].end_index = start_pos + num_entries - 1;}else {params[i].end_index = i + start_pos + THREAD_ADD_CHILD_NUM - 1;}params[i].table = table;params[i].start_pos = start_pos;// 將添加條目的線程加入任務隊列等待執行addTask(&pools, add_leaf_node_child, (void *)¶ms[i]);i += THREAD_ADD_CHILD_NUM; }在主函數中,使用for循環調用init_the_leaf_node方法,就可以實現從左到右對葉子結點進行初始化。
初始化索引結點:
// 初始化索引結點所使用的參數結構體 struct Init_Index_Node_Params {BIndexNode *first_index_node; // 這個索引結點所在層的第一個結點,如果這個初始化的索引結點不是該層第一個結點,這個參數為NULLBTree *btree; // 葉子結點所在B+樹int last_start_block; // 索引結點下一層結點的開頭結點的block位置int last_end_block; // 索引結點下一層結點的結尾結點的block位置int current_level; // 索引結點所在層數int index_node_capacity_entries; // 索引結點可以存儲的最大兒子結點數int index_node_num_cur_level; // 索引結點所處層的索引結點數 };// 初始化索引結點 static void init_the_index_node(void *arg) {// 獲取參數Init_Index_Node_Params *init_params = (Init_Index_Node_Params *)arg;BIndexNode *first_index_node = init_params->first_index_node;BTree *btree = init_params->btree;int last_start_block = init_params->last_start_block;int last_end_block = init_params->last_end_block;int current_level = init_params->current_level;int index_node_capacity_entries = init_params->index_node_capacity_entries;int index_node_num_cur_level = init_params->index_node_num_cur_level;int i, ret;// 聲明初始化的索引結點BIndexNode *index_node;// 如果作為參數傳遞進來的first_index_node為空if (first_index_node == NULL) {// 說明初始化的這個結點不是該層的第一個結點,調用init方法進行初始化index_node = new BIndexNode();index_node->init(current_level, btree);}// first_index_node不為空說明是第一個結點,直接獲取結點else {index_node = first_index_node;}// 獲取結點位置int block = index_node->get_block();// 索引結點相對這一層的第一個結點的位置int block_index_cur_level = block - last_end_block - 1;// 向索引結點添加的兒子結點的block的起始位置int start_pos = last_start_block + index_node_capacity_entries * block_index_cur_level;// 向索引結點添加的兒子結點的block的終點位置int end_pos = start_pos + index_node_capacity_entries - 1;// 如果終點位置大于下一層結點個數,那么終點位置直接為下一層末尾結點位置last_end_blockif (end_pos > last_end_block) {end_pos = last_end_block;}// 向索引結點添加的兒子結點的總數int num_entries = end_pos - start_pos + 1;// 要完成的任務數加上要添加的兒子結點數task_num += num_entries;// 添加兒子結點的參數數組Add_Index_Node_Child_Params params[num_entries];i = 0;while (i < num_entries) {// 在數組中賦值并傳遞參數,可保證參數地址固定,避免并行時參數出現錯誤params[i].index_node = index_node;params[i].begin_child_block = i + start_pos;// 確定添加進結點的末尾兒子結點的位置if (i + THREAD_ADD_CHILD_NUM - 1 >= num_entries) {params[i].end_child_block = start_pos + num_entries - 1;}else {params[i].end_child_block = i + start_pos + THREAD_ADD_CHILD_NUM - 1;}params[i].start_pos = start_pos;params[i].cur_level = current_level;params[i].btree = btree;// 將添加兒子結點的線程加入任務隊列等待執行addTask(&pools, add_index_node_child, (void *)¶ms[i]);i += THREAD_ADD_CHILD_NUM;}// 當完成的任務數小于要完成的任務數,等待完成while (task_num_finish < task_num) {sleep(0);}// 設置所以結點含有的兒子結點數、是臟塊刪除指針要寫回文件index_node->set_num_entries(num_entries);index_node->set_dirty(true);// 如果結點不是這層第一個結點,設置它的左兄弟是前一個結點if (block_index_cur_level > 0) {index_node->set_left_sibling(block - 1);}// 如果結點不是此層最后一個結點,設置它的右兄弟是后一個結點if (block_index_cur_level < index_node_num_cur_level - 1) {index_node->set_right_sibling(block + 1);}// 刪除結點指針,使其信息寫回文件if (index_node != NULL) {delete index_node;index_node = NULL;} }在初始化索引結點的方法中,需要確定這個索引結點添加的兒子結點的block位置,這樣就不用一個一個使用add_new_child方法進行添加兒子結點,而是可以并行化,將關鍵字和兒子結點添加進這個索引結點中,一個線程也可以一次添加多個關鍵字和兒子結點,不一定一次只添加一個關鍵字和兒子結點,一次添加的兒子結點數由宏定義THREAD_ADD_CHILD_NUM控制。
添加進這個索引結點的全部兒子結點的起始block位置為這個索引結點在其所處層的相對位置(所處層第一個索引結點下標為0)block_index_cur_level,乘以每個索引結點可以添加的最大兒子結點數index_node_capacity_entries,再加上上一層結點的起始位置last_start_block,終點位置為開始位置加上每個索引子結點可以添加的最大兒子結點數index_node_capacity_entries減一,如果終點位置大于上一層結點的末尾位置last_end_block,那么終點位置直接為上一層的末尾位置last_end_block,代碼如下:
// 向索引結點添加的兒子結點的block的起始位置 int start_pos = last_start_block + index_node_capacity_entries * block_index_cur_level; // 向索引結點添加的兒子結點的block的終點位置 int end_pos = start_pos + index_node_capacity_entries - 1; // 如果終點位置大于下一層結點個數,那么終點位置直接為下一層末尾結點位置last_end_block if (end_pos > last_end_block) {end_pos = last_end_block; }// 向索引結點添加的兒子結點的總數 int num_entries = end_pos - start_pos + 1;i = 0; while (i < num_entries) {// 在數組中賦值并傳遞參數,可保證參數地址固定,避免并行時參數出現錯誤params[i].index_node = index_node;params[i].begin_child_block = i + start_pos;// 確定添加進結點的末尾兒子結點的位置if (i + THREAD_ADD_CHILD_NUM - 1 >= num_entries) {params[i].end_child_block = start_pos + num_entries - 1;}else {params[i].end_child_block = i + start_pos + THREAD_ADD_CHILD_NUM - 1;}params[i].start_pos = start_pos;params[i].cur_level = current_level;params[i].btree = btree;// 將添加兒子結點的線程加入任務隊列等待執行addTask(&pools, add_index_node_child, (void *)¶ms[i]);i += THREAD_ADD_CHILD_NUM; }在主函數中,使用for循環調用init_the_index_node方法,就可以實現從左到右對索引結點進行初始化。
2. 添加條目或兒子結點
向葉子結點添加條目:
// 并行向葉子結點添加條目所使用的參數結構體 struct Add_Leaf_Node_Child_Params {BLeafNode *leaf_node; // 所要添加條目的葉子結點int begin_index; // 每次添加進葉子結點的id數組的起始條目在哈希表中的下標位置int end_index; // 每次添加進葉子結點的id數組的末尾條目在哈希表中的下標位置const Result *table; // 存儲條目信息的哈希表int start_pos; // 添加進葉子結點的id數組的第一個條目在哈希表中的下標位置 };// 向葉子結點添加條目所使用的線程函數 static void *add_leaf_node_child(void *arg) {// 獲取參數Add_Leaf_Node_Child_Params *params = (Add_Leaf_Node_Child_Params *)arg;BLeafNode *leaf_node = params->leaf_node;int begin_index = params->begin_index;int end_index = params->end_index;const Result *table = params->table;int start_pos = params->start_pos;// 一次添加end_index - begin_index + 1個條目for (int i = begin_index; i <= end_index; ++i) {// 向葉子結點的id_數組的相應下標位置添加條目leaf_node->set_id(table[i].id_, i - start_pos);// 如果滿足一定的條件,向葉子結點的key_數組的相應下標位置添加關鍵字if (((i - start_pos) * SIZEINT) % LEAF_NODE_SIZE == 0) {leaf_node->set_key(table[i].key_, ((i - start_pos) * SIZEINT) / LEAF_NODE_SIZE);}}sleep(0);// 添加完成的任務數pthread_mutex_lock(&task_num_finish_mutex);task_num_finish += end_index - begin_index + 1;pthread_mutex_unlock(&task_num_finish_mutex); }向索引結點添加條目:
// 并行向索引結點添加關鍵字和兒子結點的位置所使用的參數結構體 struct Add_Index_Node_Child_Params {BIndexNode *index_node; // 所要添加兒子結點的索引結點int begin_child_block; // 每次添加的起始兒子結點的block位置int end_child_block; // 每次添加的末尾兒子結點的block位置int start_pos; // 添加的兒子結點的block的起始位置int cur_level; // 索引結點層數BTree *btree; // 索引結點所在的B+樹 };// 向索引結點添加關鍵字和兒子結點的位置所使用的線程函數 static void *add_index_node_child(void *arg) {// 獲取參數Add_Index_Node_Child_Params *params = (Add_Index_Node_Child_Params *)arg;BIndexNode *index_node = params->index_node;int begin_child_block = params->begin_child_block;int end_child_block = params->end_child_block;int start_pos = params->start_pos;int cur_level = params->cur_level;BTree *btree = params->btree;float key;if (cur_level == 1) {// 一次添加end_child_block - begin_child_block + 1個兒子結點for (int i = begin_child_block; i <= end_child_block; ++i) {BLeafNode *leaf_child = new BLeafNode();pthread_mutex_lock(&init_restore_mutex);leaf_child->init_restore(btree, i);pthread_mutex_unlock(&init_restore_mutex);// 獲取這個結點的關鍵字key = leaf_child->get_key_of_node();if (leaf_child != NULL) {delete leaf_child;leaf_child = NULL;}// 向索引結點添加關鍵字和兒子結點index_node->set_key(key, i - start_pos);index_node->set_son(i, i - start_pos);}} else {// 一次添加end_child_block - begin_child_block + 1個兒子結點for (int i = begin_child_block; i <= end_child_block; ++i) {BIndexNode *index_child = new BIndexNode();pthread_mutex_lock(&init_restore_mutex);index_child->init_restore(btree, i);pthread_mutex_unlock(&init_restore_mutex);// 獲取這個結點的關鍵字key = index_child->get_key_of_node();if (index_child != NULL) {delete index_child;index_child = NULL;}// 向索引結點添加關鍵字和兒子結點index_node->set_key(key, i - start_pos);index_node->set_son(i, i - start_pos);}}sleep(0);// 添加完成的任務數pthread_mutex_lock(&task_num_finish_mutex);task_num_finish += end_child_block - begin_child_block + 1;;pthread_mutex_unlock(&task_num_finish_mutex); }3. 構建葉子結點層
// ----------------------------------------------------------------------------- // 加載葉子結點// ------------------------------------------------------------------------- // build leaf node from <_hashtable> (level = 0) // ------------------------------------------------------------------------- int start_block = 0; // position of first node int end_block = 0; // position of last node// 創建第一個葉子結點 BLeafNode *first_leaf_node = new BLeafNode(); // 進行初始化 first_leaf_node->init(0, this); // 獲取第一個葉子結點的位置作為開頭位置 start_block = first_leaf_node->get_block();// 通過創建的葉子結點,獲取每個葉子結點可以添加的最大條目數和最大關鍵字數 int leaf_node_capacity_entries = first_leaf_node->get_capacity(); int leaf_node_capacity_keys = first_leaf_node->get_capacity_keys();// 通過最大條目數,計算要創建的葉子結點數 int leaf_node_num; if (n % leaf_node_capacity_entries == 0) {leaf_node_num = n / leaf_node_capacity_entries; } else {leaf_node_num = n / leaf_node_capacity_entries + 1; }// 初始化葉子結點的參數數組 Init_Leaf_Node_Params init_leaf_node_params[leaf_node_num]; pthread_t leaf_node_ptid[leaf_node_num]; for (i = 0; i < leaf_node_num; i++) {// 第一個初始化的葉子結點,將之前創建的第一個葉子結點直接作為參數傳遞進去if (i == 0) {init_leaf_node_params[i].first_leaf_node = first_leaf_node;}// 其它結點傳遞NULLelse {init_leaf_node_params[i].first_leaf_node = NULL;}// 設置相應參數init_leaf_node_params[i].btree = this;init_leaf_node_params[i].table = table;init_leaf_node_params[i].n = n;init_leaf_node_params[i].leaf_node_capacity_entries = leaf_node_capacity_entries;init_leaf_node_params[i].leaf_node_capacity_keys = leaf_node_capacity_keys;init_leaf_node_params[i].leaf_node_num = leaf_node_num;// 對每個結點進行初始化init_the_leaf_node((void *)&init_leaf_node_params[i]); }// 葉子結點層的末尾位置為開始位置加上葉子結點個數減一 end_block = start_block + leaf_node_num - 1;- 首先創建第一個葉子結點,并使用它的初始化方法init(0,this)進行初始化,然后使用get_block()方法獲取第一個葉子結點的位置,并將它作為開頭位置start_block。
 - 然后,通過第一個葉子結點獲取每個葉子結點可以添加的最大條目數leaf_node_capacity_entries和最大關鍵字數leaf_node_capacity_keys。并且根據最大條目數,計算要創建的葉子結點數: 
- 如果并行 bulkload 過程bulkload_parallel輸入的條目數n能夠被葉子結點最大條目數整除,說明leaf_node_num個葉子結點可以剛好存儲n個條目,所需要創建的葉子結點數即為leaf_node_num。
 - 否則需要增加一個葉子結點存儲多余的條目,即葉子結點數leaf_node_num = n / leaf_node_capacity_entries + 1。
 
 - 確定葉子結點數后,遍歷創建初始化葉子結點的參數數組init_leaf_node_params 
- 首先判斷當前結點是否為第一個初始化的結點 
- 如果當前葉子結點是第一個初始化的葉子結點,將之前創建的第一個葉子結點first_leaf_node作為參數傳遞給init_leaf_node_params[0]
 - 其他結點均為NULL
 
 - 接著對參數數組中的成員變量btree、table、n、leaf_node_capacity_entries、leaf_node_capacity_keys、leaf_node_num進行賦值
 - 最后,將賦值后的參數數組元素init_leaf_node_params[i]傳遞給函數init_the_leaf_node,用于葉子結點的初始化。
 
 - 首先判斷當前結點是否為第一個初始化的結點 
 - 葉子結點層的末尾位置為開始位置加上葉子結點個數減一,即end_block = start_block + leaf_node_num - 1;
 
4. 構建索引結點部分
// ----------------------------------------------------------------------------- // 加載索引結點// ------------------------------------------------------------------------- // stop condition: lastEndBlock == lastStartBlock (only one node, as root) // ------------------------------------------------------------------------- int current_level = 1; // current level (leaf level is 0) int last_start_block = start_block; // build b-tree level by level int last_end_block = end_block; // build b-tree level by level// 當之前層末尾位置大于之前層開頭位置時,說明還沒有插入到根結點(last_start_block == last_end_block) while (last_end_block > last_start_block) {// 開始構建每一層的索引結點,首先創建每一層的第一個索引結點BIndexNode *first_index_node = new BIndexNode();// 進行初始化first_index_node->init(current_level, this);// 獲取每一層第一個索引結點的位置作為該層開頭位置int block = first_index_node->get_block();start_block = block;// 通過創建的索引結點,獲取每個索引結點可以添加的最大兒子結點數和最大關鍵字數int index_node_capacity_entries = first_index_node->get_capacity();// 計算下一層的結點數量int node_num_last_level = last_end_block - last_start_block + 1;// 根據下一層的結點數量和最大兒子結點數計算這一層的結點數量int index_node_num_cur_level;if (node_num_last_level % index_node_capacity_entries == 0) {index_node_num_cur_level = node_num_last_level / index_node_capacity_entries;}else {index_node_num_cur_level = node_num_last_level / index_node_capacity_entries + 1;} // 初始化索引結點的參數數組Init_Index_Node_Params init_index_node_params[index_node_num_cur_level];pthread_t index_node_ptid[index_node_num_cur_level];for (i = 0; i < index_node_num_cur_level; i++) {// 此層第一個初始化的索引結點,將之前創建的此層第一個索引結點直接作為參數傳遞進去if (i == 0) {init_index_node_params[i].first_index_node = first_index_node;}// 其它結點傳遞NULLelse {init_index_node_params[i].first_index_node = NULL;}// 設置相應參數init_index_node_params[i].btree = this;init_index_node_params[i].last_start_block = last_start_block;init_index_node_params[i].last_end_block = last_end_block;init_index_node_params[i].current_level = current_level;init_index_node_params[i].index_node_capacity_entries = index_node_capacity_entries;init_index_node_params[i].index_node_num_cur_level = index_node_num_cur_level;// 對每個結點進行初始化init_the_index_node((void *)&init_index_node_params[i]);}// 此層索引結點的末尾位置為開始位置加上此層索引結點個數減一end_block = start_block + index_node_num_cur_level - 1;last_start_block = start_block;// update infolast_end_block = end_block; // build b-tree of higher level++current_level; }// 根結點的位置即上一層的開頭位置 root_ = last_start_block; // update the <root>- 當之前層結束的位置last_end_block大于之前層開始的位置last_start_block時,說明還沒有插入到根結點(last_start_block == last_end_block),進入一個while循環,每次循環,從下往上構建每一層的索引結點: 
- 首先創建每一層的第一個索引結點first_index_node,并使用init(current_level,this)進行初始化。使用first_index_node獲取位置,并將第一個索引結點的位置作為當前層的開頭位置start_block。
 - 另外,通過get_capacity()方法,從創建的索引結點中,獲取可以添加的最大兒子結點數和最大關鍵字數,由于索引結點的兒子結點的指針數和關鍵字數相等,所以使用index_node_capacity_entries代替兩者
 - 然后,計算下一層的節點數量node_num_last_level,并根據下一層節點數量和最大兒子結點數計算當前層的結點數量index_node_num_cur_level
 
 - 確定結點數后,遍歷創建初始化索引結點的參數數組init_index_node_params: 
- 首先判斷當前結點是否為第一個初始化的結點 
- 如果當前索引結點是并行時第一個初始化的結點,將之前創建的第一個索引結點first_index_node作為參數傳遞給init_index_node_params[0]
 - 其他結點均為NULL
 
 - 接著對參數數組中的成員變量btree、last_start_block、last_end_block、current_level、index_node_capacity_entries、index_node_num_cur_level進行賦值
 - 最后,將賦值后的參數數組元素init_index_node_params[i]傳遞給函數init_the_index_node,用于索引結點的初始化。
 
 - 首先判斷當前結點是否為第一個初始化的結點 
 - 此層索引結點的末尾位置為開始位置加上此索引結點個數減一,即end_block = start_block + index_node_num_cur_level - 1;
 - 根結點的位置即上一層的開頭位置
 
5. 線程池
因為每個條目或兒子結點的添加都需要創建一個線程,如果每次都pthread_create要造成十分大的開銷,所以可以創建一個線程池,減少了在創建和退出線程時的消耗。
變量聲明:
#define THREADS_NUM 1000 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 50000 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一#define THREAD_ADD_CHILD_NUM 5 // 每個線程在添加條目或兒子結點時一次添加的最多個數// 任務完成數變量的鎖 pthread_mutex_t task_num_finish_mutex = PTHREAD_MUTEX_INITIALIZER; // 調用結點的init_restore方法時使用的鎖 pthread_mutex_t init_restore_mutex = PTHREAD_MUTEX_INITIALIZER;Threadpools pools; // 線程池 long long int task_num_finish = 0; // 完成的任務數 long long int task_num = 0; // 要完成的任務數數據結構:
// 線程池中每個線程執行的任務的結構體 typedef struct {void *(*function)(void *); // 執行函數void *arg; // 參數 } Task;// 任務循環隊列的數據結構 typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務隊列數組int front; // 隊首下標int rear; // 隊尾下標 } TaskQueue;// 線程池數據結構 typedef struct {pthread_t threads[THREADS_NUM]; // 線程數組TaskQueue taskQueue; // 任務隊列bool isEnd; // 程序是否結束,要銷毀線程池sem_t sem_mutex; // 互斥信號量 } Threadpools;任務執行函數:
// 線程池中每個線程執行的任務 static void *executeTask(void *arg) {// 向每個線程傳入的參數是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 等待互斥信號量大于0,防止臨界沖突,然后將互斥鎖信號量減一,繼續執行sem_wait(&pools->sem_mutex);// 當任務隊列為空時while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經沒有剩余任務要處理,那么退出線程if (pools->isEnd) {sem_post(&pools->sem_mutex);pthread_exit(NULL);}// 否則等待任務隊列中有任務后再取任務進行執行sleep(0); }// 獲取任務隊列隊首的任務Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環隊列隊首下標加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 將互斥鎖信號量加一,允許其它線程執行sem_post(&pools->sem_mutex);// 執行任務(*(task.function))(task.arg);} }- 由類型轉換獲得線程池參數pools,循環從任務隊列中取出任務執行 
- 首先使用sem_wait(&pools->sem_mutex)函數進行互斥等待,防止臨界沖突
 - 當任務隊列為空,即pools->taskQueue.front == pools->taskQueue.rear隊尾和隊首相等時 
- 如果已經沒有剩余任務處理,則使用sem_post(&pools->sem_mutex)釋放信號量,并退出當前線程
 - 否則等待任務隊列有任務后再取出任務進行執行
 
 - 任務隊列中不為空時,將直接獲取隊首的任務,具體操作為將任務參數function、arg賦值給task。此時任務隊列減少一個任務,循環隊列隊首下標加一。
 - 取完任務后使用sem_post通知其他線程,開始執行任務(*(task.function))(task.arg);
 
 
初始化線程池:
// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務隊列的隊首和隊尾的坐標都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;pools->isEnd = false;// 初始化互斥信號量為0ret = sem_init(&pools->sem_mutex, 0, 1);if (ret == -1) {perror("sem_init-mutex");exit(1);}// 創建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }- 初始化線程池pools,將線程池任務隊列taskQueue的隊首隊尾下標置為 0,并將線程池銷毀標志置為false,同時,使用sem_init函數初始化互斥信號量sem_mutex。
 - 通過循環,使用pthread_create創建THREADS_NUM個線程在線程池中待命
 
向任務隊列中添加任務:
// 向任務隊列中添加任務 void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當任務隊列為滿時,等待有任務被取出任務隊列不為滿再加入隊列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {sleep(0);}// 向任務隊列的隊尾加入任務Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }- 首先根據任務隊列是否為滿進行等待,有任務被取出任務隊列,任務隊列不為滿時,再將新的任務加入任務隊列
 - 任務隊列增加任務的方式是在任務隊列隊尾插入新任務,增加新任務task后,任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE)
 
6. 主函數
在主函數中,設計了程序使輸入不同命令程序執行不同的結果:
if (nargs == 1) {if (trees_->bulkload(n_pts_, table)) return 1; } else if (nargs > 1) {if (!strcmp(args[1], "print")) {if (trees_->bulkload(n_pts_, table)) return 1;print_tree(trees_);}if (!strcmp(args[1], "parallel")) {if (trees_->bulkload_parallel(n_pts_, table)) return 1;if (nargs > 2 && !strcmp(args[2], "print")) {print_tree(trees_);}} }輸入./run執行串行加載;
 輸入./run parallel執行并行加載;
 輸入./run print執行串行加載并打印B+樹;
 輸入./run parallel print執行并行加載并打印B+樹。
打印B+樹使用的是遞歸算法:
void recursive_print(BTree *btree, int level, int block, string space) {if (level == 0) {BLeafNode *leaf_node = new BLeafNode();leaf_node->init_restore(btree, block);int key_index = 0;int num_entries = leaf_node->get_num_entries();printf("%sblock: %d (leaf node)\n", space.c_str(), block);for (int i = 0; i < num_entries; ++i) {if ((i * SIZEINT) % LEAF_NODE_SIZE == 0) {printf("%s ", space.c_str());int key = leaf_node->get_key(key_index);printf("key:%d\n", key);key_index++;}printf("%s ", space.c_str());printf("id:%d\n", leaf_node->get_entry_id(i));}delete leaf_node;leaf_node = NULL;}else {BIndexNode *index_node = new BIndexNode();index_node->init_restore(btree, block);int num_entries = index_node->get_num_entries();printf("%sblock: %d (index node)\n", space.c_str(), block);for (int i = 0; i < num_entries; ++i) {printf("%s ", space.c_str());int key = index_node->get_key(i);printf("key:%d son_block:%d\n", key, index_node->get_son(i));recursive_print(btree, level - 1, index_node->get_son(i), space + " ");}delete index_node;index_node = NULL;} }void print_tree(BTree *btree) {int root_block = btree->root_;BNode *root_ptr;if (root_block == 1) {root_ptr = new BLeafNode();}else {root_ptr = new BIndexNode();}root_ptr->init_restore(btree, root_block);int level = root_ptr->get_level();printf("root\n");recursive_print(btree, level, root_block, "");delete root_ptr;root_ptr = NULL; }實驗結果與實驗分析
驗證輸出正確
設置點的個數都為1000000,在終端分別輸入./run print > test1.txt和./run parallel print > test2.txt,將串行加載和并行加載打印出的樹輸出到test1.txt文件和test2.txt文件中,然后使用vscode的文件比較功能進行比較:
可以看到,除了開頭因為是并行程序多打印出的線程數量和每個線程處理的最大任務數,和結尾運行時間不同,中間打印出的樹的數據完全相同,測試多次仍然是同樣的結果,說明并行輸出正確。
串行加載與并行加載結果分析
改變點的個數,對串行加載和并行加載的時間進行比較,其中并行加載的線程池中的線程個數為1000,每個線程可以添加的條目或兒子結點的最大數目為
 40,比較如下:
點的個數為100:
點的個數為1000:
點的個數為10000:
點的個數為100000:
點的個數為1000000:
| 100 | 0.00237 | 0.086792 | 
| 1000 | 0.000623 | 0.113164 | 
| 10000 | 0.003443 | 0.160166 | 
| 100000 | 0.018719 | 0.893055 | 
| 1000000 | 0.147339 | 7.018940 | 
可以看到,雖然理論上講,并行的效率比串行要高,但是因為并行創建線程的消耗太大,所以在實驗中,并行的效率并沒有特別的高,雖然代碼是按照并行設計的。
以下數據均在 one thread add child num = 40 時測得
性能調優、創新優化
性能調優
理論上,可以通過增大線程池中的線程數目,使更多的任務可以被線程同時執行,或者調整一個線程可以執行的任務的最大個數,來調優性能。
固定結點數為1000000、線程池中的線程數為1000,調整一個線程可以執行的任務的最大個數:
一個線程可以執行的任務的最大個數為10時:
一個線程可以執行的任務的最大個數為20時:
一個線程可以執行的任務的最大個數為30時:
一個線程可以執行的任務的最大個數為40時:
一個線程可以執行的任務的最大個數為50時:
一個線程可以執行的任務的最大個數為60時:
一個線程可以執行的任務的最大個數為80時:
| 10 | 7.710704 | 7.691538 | 
| 20 | 7.593454 | 7.326895 | 
| 30 | 7.230909 | 7.036579 | 
| 40 | 6.861761 | 6.745689 | 
| 50 | 6.917748 | 6.871345 | 
| 60 | 7.124508 | 7.094536 | 
| 80 | 7.316152 | 7.213579 | 
固定結點數為1000000、一個線程可以執行的任務的最大個數為40,調整線程池中的線程數:
線程池中的線程數為10時:
線程池中的線程數為50時:
線程池中的線程數為100時:
線程池中的線程數為200時:
線程池中的線程數為500時:
線程池中的線程數為1000時:
| 10 | 7.387442 | 7.231549 | 
| 50 | 6.737798 | 6.845321 | 
| 100 | 5.607697 | 5.745186 | 
| 200 | 6.725368 | 6.712356 | 
| 500 | 6.880218 | 6.865419 | 
| 1000 | 7.010874 | 7.010768 | 
可以看到,對于點的個數為1000000時,性能最優時的搭配為線程池中的線程數為100,一個線程可以執行的任務的最大個數為40,這樣性能最優。
創新優化
對于B+數的并行加載,由于需要對條目或兒子結點的添加創建線程,當條目數為初始給定的1000000時,光是向葉子結點添加條目就要創建1000000個線程,對程序來說是極大的開銷,所以想到,將單純的創建線程改為使用線程池,如果使用線程池,就可以避免很大部分的創建線程的花費,從而提升效率。線程池的代碼已在前面提到。
另外如果一個線程一次只添加一個條目或葉子結點,就會顯得有些浪費,所以在優化時,設計了一個線程一次可以添加多個任務,由宏定義THREAD_ADD_CHILD_NUM進行控制,這樣一來,測試發現在一定程度上也提高了程序的效率。
總結
以上是生活随笔為你收集整理的使用pthread和线程池实现B+树的并行块加载bulkload过程的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 使用Lex工具进行tiny+语言的词法分
 - 下一篇: 使用C++对TINY+语言进行词法分析、