C++强化之路之线程池开发整体框架(二)
一.線程池開發(fā)框架
我所開發(fā)的線程池由以下幾部分組成:?
1.工作中的線程。也就是線程池中的線程,主要是執(zhí)行分發(fā)來的task。?
2.管理線程池的監(jiān)督線程。這個(gè)線程的創(chuàng)建獨(dú)立于線程池的創(chuàng)建,按照既定的管理方法進(jìn)行管理線程池中的所有線程,主要任務(wù)是監(jiān)聽任務(wù)的到來,喚醒線程池中的空閑線程,分發(fā)任務(wù);如果任務(wù)增多,動(dòng)態(tài)的創(chuàng)建一批線程加入原來的線程池中,進(jìn)行工作;適當(dāng)?shù)匿N毀線程,減少系統(tǒng)開銷。
這個(gè)線程池的開發(fā)涉及了以下幾個(gè)數(shù)據(jù)結(jié)構(gòu)、設(shè)計(jì)模式和軟件結(jié)構(gòu):?
1.任務(wù)隊(duì)列。整個(gè)框架有兩個(gè)任務(wù)隊(duì)列,1)等待任務(wù)隊(duì)列(以下簡(jiǎn)稱wait_queue)。2)正在執(zhí)行中任務(wù)隊(duì)列(以下簡(jiǎn)稱doing_queue)。隊(duì)列采用先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)。當(dāng)一個(gè)任務(wù)來到時(shí),會(huì)先被push到wait_queue,監(jiān)督線程會(huì)一直監(jiān)督wait_queue中的元素,一旦有任務(wù),便會(huì)pop wait_queue中的元素,再push到doing_queue中。?
2.單例設(shè)計(jì)模式。線程池的類被設(shè)計(jì)成單例模式,防止一個(gè)程序中多次創(chuàng)建線程池對(duì)象,出現(xiàn)紊亂現(xiàn)象,用戶只能調(diào)用靜態(tài)方法初始化得到線程池的對(duì)象。?
3.回調(diào)函數(shù)。回調(diào)函數(shù)的設(shè)計(jì)主要是為了能夠把任務(wù)接口(也就是需要線程去執(zhí)行的任務(wù),通常是一個(gè)寫好的函數(shù)方法)提前初始化注冊(cè),然后延遲調(diào)用。
下圖是所用類的的大概結(jié)構(gòu)圖:?
程序整體結(jié)構(gòu)如下:?
二.線程池開發(fā)具體實(shí)現(xiàn)
1.思路分析。?
線程池顧名思義就是同時(shí)有數(shù)個(gè)線程處于待執(zhí)行狀態(tài),編碼上的初始化的實(shí)現(xiàn)無非就是循環(huán)創(chuàng)建指定數(shù)量的線程,然后等待任務(wù)的到來喚醒空閑線程。以下是ThreadPoll的類:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
下面是cpp的實(shí)現(xiàn):
//線程池初始化的實(shí)現(xiàn) API_RETURN_TYPE_T ThreadPool::ThreadPoolInit(int num_thr) {printf("num = %d.\n",num_thr); if(num_thr == 0){return API_SUCCESS;}//設(shè)置創(chuàng)建線程的屬性為DETACHED,線程被釋放后,資源會(huì)被回收。pthread_attr_t attr;pthread_attr_init (&attr);pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);int i = 0;if(F_improve_ThrdPoll == 1)//備用線程池創(chuàng)建的標(biāo)志位,初始化線程不會(huì)走這邊{_thread_bak = new pthread_t[num_thr];for(;i < num_thr;i++){if(RET_OK != pthread_create(&(_thread_bak[i]), &attr, &thread_func, this)){return API_FAIL;}}return API_SUCCESS;}//create thread pool.for(;i < num_thr;i++){if(RET_OK != pthread_create(&(_thread[i]), &attr, &thread_func, this)){return API_FAIL;}}pthread_attr_destroy (&attr);return API_SUCCESS; } //創(chuàng)建的所有線程都會(huì)跑到這個(gè)線程函數(shù),最終指向run void *thread_func(void *arg) {ThreadPool *thread = (ThreadPool*)arg;thread->run(); } //線程池核心內(nèi)容 API_RETURN_TYPE_T ThreadPool::run() {//printf("this is run thread.\n");void *arg;while(1)//線程池內(nèi)部一直在循環(huán){printf ("thread 0x%x begin\n", pthread_self ()); this->mutex->lock();//上鎖if((CANCEL_SIGNAL == 0) && (task_doing_queue.length() < _num_threads || F_improve_ThrdPoll == 1) )//以上條件第一個(gè)是備用線程釋放標(biāo)志,第二個(gè)是任務(wù)執(zhí)行隊(duì)列數(shù)量為0,第三個(gè)是備用線程創(chuàng)建標(biāo)志(或的關(guān)系,為了滿足新增線程進(jìn)入wait狀態(tài)),第一次這些條件都會(huì)滿足{printf ("thread 0x%x is waiting\n", pthread_self ()); this->task_cond->wait(mutex);//每次創(chuàng)建的新線程都會(huì)阻塞到這里,執(zhí)行完任務(wù)的線程也會(huì)阻塞在這里,等待喚醒的signal,雖然是阻塞在這里,但是互斥鎖已經(jīng)是unlock狀態(tài)了,這是linux的機(jī)制。}usleep(200000);this->mutex->unlock();//解鎖pthread_testcancel();//設(shè)置取消線程點(diǎn)pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);//(1)頭尾保護(hù)下面這段code,保證在執(zhí)行任務(wù)的時(shí)候屏蔽外部的線程取消信號(hào)if(callback != NULL){callback(arg); //執(zhí)行回調(diào)函數(shù),此時(shí)的回調(diào)函數(shù)應(yīng)該指向當(dāng)前任務(wù)執(zhí)行函數(shù)的地址callback = NULL;}task_doing_queue.popFront();//執(zhí)行完任務(wù),任務(wù)執(zhí)行隊(duì)列出隊(duì)列元素一個(gè)pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);//同(1)printf("wait len =%d.\n",task_wait_queue.length());printf("thread 0x%x done.length() = %d.\n",pthread_self (),task_doing_queue.length());}return API_SUCCESS;}//管理線程的初始化 API_RETURN_TYPE_T ThreadPool::ManagerThreadInit() {//create manager threadpool thread.if(RET_OK != pthread_create(&taskqueue_thread, NULL, &thread_task_queue, this)){ return API_FAIL;}return API_SUCCESS; } //管理線程的執(zhí)行函數(shù) void *thread_task_queue(void *arg) { ThreadPool *thread = (ThreadPool*)arg;thread->managerThread();} //管理線程的核心內(nèi)容 API_RETURN_TYPE_T ThreadPool::managerThread() {while(1){usleep(400000);printf("managerThread!.\n");this->mutex->lock();//上鎖TASK_QUEUE_T task1; //初始化兩個(gè)隊(duì)列元素對(duì)象TASK_QUEUE_T task2;task1.sTask = TASK_DOING;if(task_wait_queue.length() != 0){//printf("len =%d.\n",task_doing_queue.length());if(task_doing_queue.length() < _num_threads)//只要任務(wù)執(zhí)行隊(duì)列的數(shù)目小于線程池中線程的總數(shù),就會(huì)執(zhí)行{task2 = task_wait_queue.popFront();//pop任務(wù)等待隊(duì)列的元素,并得到這個(gè)元素的對(duì)象callback = task2.cTaskFunc;//獲得任務(wù)的執(zhí)行函數(shù)地址task_doing_queue.pushBack(task1);//將任務(wù)push到任務(wù)執(zhí)行隊(duì)列task_cond->signal();//發(fā)送信號(hào),喚醒一個(gè)空閑線程printf("signal cond.\n");}}//當(dāng)人任務(wù)隊(duì)列的等待任務(wù)數(shù)量大于線程池線程總數(shù)時(shí)會(huì)執(zhí)行if(task_wait_queue.length() >= _num_threads && F_improve_ThrdPoll == 0){//通過簡(jiǎn)單的機(jī)制計(jì)算當(dāng)前是否需要另外新增線程到線程池AutoComputeOptimumThreadNum(task_wait_queue.length(),u4sequence);F_improve_ThrdPoll = 1;ThreadPoolInit(u4sequence);//如果需要新增線程,u4sequence不為0. sleep(2);//緩沖線程創(chuàng)建}if(F_improve_ThrdPoll == 1 ){//檢測(cè)到備用線程池的創(chuàng)建while(task_wait_queue.length() == 0 && task_doing_queue.length() == 0){//也就是當(dāng)前任務(wù)等待隊(duì)列和任務(wù)執(zhí)行隊(duì)列都沒有任務(wù)時(shí)printf("no task!.\n");usleep(500000);wait_time++;//計(jì)時(shí)等待一段時(shí)間if(wait_time == NO_TASK_TIMEOUT){this->mutex->unlock();ReleaseSubThreadPool();//釋放備用線程池printf("release!.\n");F_improve_ThrdPoll = 2;wait_time = 0;break;}}wait_time = 0;}if(F_improve_ThrdPoll != 2)this->mutex->unlock();}return API_SUCCESS;}//自動(dòng)計(jì)算是否需要?jiǎng)?chuàng)建新的線程池的簡(jiǎn)單機(jī)制,后續(xù)會(huì)結(jié)合讀取當(dāng)前CPU的使用率進(jìn)一步優(yōu)化此機(jī)制 API_RETURN_TYPE_T ThreadPool::AutoComputeOptimumThreadNum(int wait_que_num,int &_u4sequence) {if(wait_que_num >= 4*_num_threads){_u4sequence = _num_threads;}else if(wait_que_num >= 2*_num_threads){_u4sequence = _num_threads/2;}else{_u4sequence = 0;}return API_SUCCESS; }//釋放備用線程池,待優(yōu)化API_RETURN_TYPE_T ThreadPool::ReleaseSubThreadPool() {this->mutex->lock();CANCEL_SIGNAL = 1;this->mutex->unlock();task_cond->broadcast();for(int i = 0;i < _num_threads;i++){if(RET_OK != pthread_cancel(_thread_bak[i])){return API_FAIL;}}this->mutex->lock();printf("4444.\n");CANCEL_SIGNAL = 0;this->mutex->unlock();return API_SUCCESS; } //摧毀線程池,待優(yōu)化 API_RETURN_TYPE_T ThreadPool::DestroyThreadPool() {//first ,destroy manager thread.if(RET_OK != pthread_cancel(taskqueue_thread)){return API_FAIL;}return API_SUCCESS; }API_RETURN_TYPE_T ThreadPool::ThreadJoin() {for(int i = 0;i < _num_threads;i++){pthread_join(_thread[i],NULL);}pthread_join(taskqueue_thread,NULL);return API_SUCCESS;} //用戶調(diào)用此函數(shù)接口喚醒 API_RETURN_TYPE_T ThreadPool::wakeupThread(TaskFuncCallback p_func) {printf("wakeupThread in .\n");this->mutex->lock();TASK_QUEUE_T task;task.cTaskFunc = p_func;//將函數(shù)執(zhí)行地址賦值到隊(duì)列元素中task.sTask = TASK_WAIT;if(task_wait_queue.length() < MAX_TASK_NUM ){ this->task_wait_queue.pushBack(task); //push任務(wù)到等待任務(wù)隊(duì)列中}else{//線程池?cái)?shù)量過多,此機(jī)制后續(xù)會(huì)優(yōu)化printf("Current Thread Buffer is full!Please wait a moment!\n");this->mutex->unlock();return API_FAIL;}this->mutex->unlock();return API_SUCCESS;}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
下面新加的關(guān)于LVQueue的實(shí)現(xiàn):
#ifndef QUEUE_H_INCLUDED #define QUEUE_H_INCLUDEDtemplate < typename T > class LVQueue {friend struct Iterator;struct Item {T value;Item * next;Item * prev;Item(T & v) : value(v), next(NULL), prev(NULL) {}};Item * head;Item * tail;int count;Item * remove(Item * p) {if (!p)return NULL;if (!p->prev)head = p->next;elsep->prev->next = p->next;if (!p->next)tail = p->prev;elsep->next->prev = p->prev;p->next = NULL;p->prev = NULL;count--;if (count == 0) {head = tail = NULL;}return p;}void moveToHead(Item * item) {Item * p = remove(item);if (head) {head->prev = p;p->next = head;head = p;} else {head = tail = p;}count++;} public:struct Iterator {private:LVQueue * queue;Item * currentItem;public:Iterator(const Iterator & v) {queue = v.queue;currentItem = v.currentItem;}Iterator(LVQueue * _queue) : queue(_queue), currentItem(NULL) {}T get() { return currentItem ? currentItem->value : T(); }void set(T value) { if (currentItem) currentItem->value = value; }bool next() {if (!currentItem) {// first timecurrentItem = queue->head;} else {// continuecurrentItem = currentItem->next;}return currentItem != NULL;}T remove() {if (!currentItem)return T();Item * next = currentItem->next;Item * p = queue->remove(currentItem);currentItem = next;T res = p->value;delete p;return res;}void moveToHead() {if (currentItem)queue->moveToHead(currentItem);}};public:Iterator iterator() { return Iterator(this); }LVQueue() : head(NULL), tail(NULL), count(0) {}~LVQueue() { clear(); } // T & operator [] (int index) { // Item * p = head; // for (int i = 0; i < index; i++) { // if (!p) // return // } // }int length() { return count; }void pushBack(T item) {Item * p = new Item(item);if (tail) {tail->next = p;p->prev = tail;tail = p;} else {head = tail = p;}count++;}void pushFront(T item) {Item * p = new Item(item);if (head) {head->prev = p;p->next = head;head = p;} else {head = tail = p;}count++;}T popFront() {if (!head)return T();Item * p = remove(head);T res = p->value;delete p;return res;}T popBack() {if (!tail)return T();Item * p = remove(tail);T res = p->value;delete p;return res;}void clear() {while (head) {Item * p = head;head = p->next;delete p;}head = NULL;tail = NULL;count = 0;} };#endif // LVQUEUE_H_INCLUDED- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
以下是簡(jiǎn)單的test程序:
ThreadPool *thread3 = ThreadPool::createThreadPool(8);//得到線程池對(duì)象printf("task coming.\n");//test threadpoolfor(int i = 0;i < 15;i++){thread3->wakeupThread(thread11_func);//每隔一秒喚醒線程,thread11_func一個(gè)函數(shù)的地址sleep(1);thread3->wakeupThread(thread3_func);}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
下面是test程序運(yùn)行的結(jié)果,線程喚醒無一秒間隔
num = 8. task coming. wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . wakeupThread in . thread 0xd528c700 begin thread 0xd528c700 is waiting thread 0xd4a8b700 begin thread 0xd4a8b700 is waiting thread 0xd428a700 begin thread 0xd428a700 is waiting thread 0xd5a8d700 begin thread 0xd5a8d700 is waiting thread 0xd628e700 begin thread 0xd628e700 is waiting thread 0xd6a8f700 begin thread 0xd6a8f700 is waiting thread 0xd7290700 begin thread 0xd7290700 is waiting thread 0xd7a91700 begin thread 0xd7a91700 is waiting managerThread!. signal cond. num = 4. thread 0xd2286700 begin thread 0xd2a87700 begin thread 0xd3288700 begin thread 0xd3a89700 begin thread 0xd2286700 is waiting thread 0xd2a87700 is waiting thread 0xd3288700 is waiting thread 0xd3a89700 is waiting managerThread!. signal cond. managerThread!. signal cond. managerThread!. signal cond. managerThread!. signal cond. managerThread!. signal cond. managerThread!. signal cond. managerThread!. signal cond. managerThread!. managerThread!. managerThread!. managerThread!. managerThread!. rate = 4.82897 this is 0 task thread. wait len =22. thread 0xd528c700 done.length() = 7. thread 0xd528c700 begin thread 0xd528c700 is waiting managerThread!. signal cond. rate = 4.64646 this is 1 task thread. wait len =21. thread 0xd4a8b700 done.length() = 7. thread 0xd4a8b700 begin thread 0xd4a8b700 is waiting managerThread!. signal cond. rate = 4.64646 this is 2 task thread. wait len =20. thread 0xd428a700 done.length() = 7. thread 0xd428a700 begin thread 0xd428a700 is waiting managerThread!. signal cond. rate = 4.25101 this is 3 task thread. wait len =19. thread 0xd5a8d700 done.length() = 7. thread 0xd5a8d700 begin thread 0xd5a8d700 is waiting managerThread!. signal cond. rate = 4.23387 this is 4 task thread. wait len =18. thread 0xd628e700 done.length() = 7. thread 0xd628e700 begin thread 0xd628e700 is waiting managerThread!. signal cond. rate = 4.04858 this is 5 task thread. wait len =17. thread 0xd6a8f700 done.length() = 7. thread 0xd6a8f700 begin thread 0xd6a8f700 is waiting- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
可以看到一次性喚醒了30個(gè)線程,創(chuàng)建了8個(gè)線程的線程池,后來通過優(yōu)化計(jì)算又新增了4個(gè)線程到當(dāng)前線程池中,每喚醒一個(gè)線程執(zhí)行任務(wù),大概是6s的時(shí)間,執(zhí)行完后,又進(jìn)入等待喚醒信號(hào)的狀態(tài)。管理線程檢測(cè)到當(dāng)前所有線程都在執(zhí)行,便會(huì)阻塞當(dāng)前signal行為,直到有空余線程,馬上signal。
這些源代碼還有一些數(shù)據(jù)類型的封裝還沒公布出來,因?yàn)檫€在優(yōu)化中,所以準(zhǔn)備等到優(yōu)化完畢,將會(huì)把完整的源代碼交到GitHub上托管,小弟資歷尚淺,如有出錯(cuò)的地方,煩請(qǐng)不吝賜教。
總結(jié)
以上是生活随笔為你收集整理的C++强化之路之线程池开发整体框架(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 卡塔尔世界杯遭抵制,德国队考虑退赛,这究
- 下一篇: 孔雀东南飞剧情介绍