C语言实现简单的线程池【转】
轉自https://blog.csdn.net/hubi0952/article/details/8045094
線程池的基本原理
在傳統的服務器結構中,常用一個總的線程監聽有沒有新的客戶端連接服務器。每當有一個新的客戶端連接就開啟一個新的線程處理這個客戶端的信息,這個線程只服務于這個用戶,當客戶端和服務器關閉連接后服務器端就銷毀這個線程。
當服務器頻繁的有客戶端連接的時候就要頻繁的開辟與銷毀線程極大的占用了系統的資源。在大量用戶的情況下開辟和銷毀線程將浪費大量的時間和資源。線程池提供了一個解決外部大量用戶與服務器有限資源的矛盾。線程池和傳統的一個用戶對應一個線程的處理方法不同,它的基本思想就是在程序開始時就在內存中開辟一些線程,線程的數目是固定的,它們獨自形成一個類,屏蔽了對外的操作,而服務器只需要將數據包交給線程池就可以了。當有新的客戶請求到達時,不是新創建一個線程為其服務,而是從線程池中選擇一個空閑的線程為新的客戶請求服務,服務完畢后線程進入空閑線程池中。如果沒有線程空閑你的話就將數據報暫時積累,等待線程池內有線程空閑以后再進行處理。
通過對多個任務重用已經存在的線程對象,降低了對線程對象的創建和銷毀的開銷。當客戶請求時,由于線程對象已經存在,可以提高請求時間。
線程池的組成部分:
1、線程管理器:用于創建并管理線程池
2、工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在線程池中,這些初始化的線程一般處于空閑狀態(阻塞(睡眠)),不占用CPU,占用較小的內存空間。
3、每個任務必須實現的接口,當線程池中的任務隊列(任務鏈表)中有可執行任務時,被空閑的工作線程調用執行(線程的閑與忙是通過互斥量實現的)。把任務抽象出來形成接口,可做到線程距具體的任務無關。
4、任務隊列:用來存放沒有處理的任務,提供一種緩沖機制,實現這種結構有好幾種方法,常用的有隊列,利用隊列的先進先出原理。
?
什么時候需要線程池?
如果一個應用需要頻繁的創建和銷毀線程,而任務執行的時間又非常短,這樣線程創建和銷毀帶來的開銷就不容忽視。這時候就需要線程池了。如果線程創建的和銷毀的時間相比任務執行時間可以忽略不計,則沒有必要使用線程池。
?
?
在Linux系統下用C語言創建一個線程池。線程池會維護一個任務鏈表(每個CThread_worker結構就是一個任務)。
任務隊列的結構和線程池的結構
/* *線程池里所有運行和等待的隊列都是一個CThread_worker結構 *由于所有的CThread_worker結構都在隊列中,所以是隊列中的一個節點 */ typedef struct worker {/*回調函數,當任務運行時會調用此函數,也可以聲明為其他形式*/void *(*process)(void *arg);/*回調函數的參數*/void *arg; struct worker *next; }CThread_worker;/*線程池的結構*/ typedef struct {pthread_mutex_t queue_lock;pthread_cond_t queue_ready;/*指向任務等待隊列的隊頭*/CThread_worker *queue_head;/*是否銷毀線程池*/int shutdown;/*線程ID,使用堆空間來分配內存*/pthread_t *threadid;/*線程池中線程的數目*/int max_thread_num;/*當前等待隊列的任務數目*/int cur_queue_size; }CThread_pool;?
pool_init()(線程池初始化函數預先創建好max_thread_num個線程),每個線程執行thread_routine()函數。
pool_init()實現
void pool_init(int max_thread_num) {pool = (CThread_pool *)malloc(sizeof(CThread_pool));if(pool == NULL){printf("pool_init1\n"); exit(-1);}/*初始化線程互斥鎖和條件變量*/pthread_mutex_init(&(pool->queue_lock), NULL);pthread_cond_init(&(pool->queue_ready), NULL);pool->queue_head = NULL; pool->shutdow = 0;pool->max_thread_num = max_thread_num;pool->threadid = (pthread *)malloc(sizeof(pthread) * max_thread_num);if(pool->threadid == NULL){printf("pool_init2\n"); exit(-1);}/*初始化任務隊列為0*/ pool->cur_queue_size = 0;int i = 0;/*創建max_thread_num數目的線程*/for(i-0; i<max_thread_num; i++){pthread_create(&pool->threadid[i], NULL, thread_routine, NULL)} }thread_routine的實現(每個線程都執行的函數)?
void *thread_routine(void *arg) {printf("starting thread 0x%x\n", pthread_self());while(1){/*因為線程中訪問到臨界資源(任意時刻只允許一個線程訪問的資源),所以要上鎖*/pthread_mutex_lock(&pool->queue_lock);/*如果任務等待隊列為空則線程阻塞,使用了條件變量*/while(pool->cur_queue_size == 0 && !pool->shutdown){/*pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒后會加鎖*/printf("thread 0x%x is waiting\n", pthread_self());pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));}if(shutdown){/*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/pthread_mutex_unlock(&(pool->queue_lock));printf("thread x%x will exit\n", phread_self());pthread_exit();}printf("thread 0x%x is starting to work\n", pthread_self());/*從任務隊列中取出任務*/CThread_worker *worker = pool->queue_head;pool->queue_head = worker->next; pool->cur_queue_size--;/*訪問臨界資源結束,要解鎖,以便其他線程訪問*/pthread_mutex_unlock(&(pool->queue_lock));/*執行任務等待隊列中的任務*/worker->process(worker->arg);free(worker);worker = NULL;}pthread_exit(NULL); }?
?pool_add_worker()函數向線程池的任務隊列中加入一個任務,加入后通過調用pthread_cond_signal (&(pool->queue_ready))喚醒一個處于阻塞狀態的線程(如果有的話)
void pool_add_worker(void *(*process)(void *arg), void *arg) {//為新的任務分配內存,然后添加到任務隊列中CThread_woker *newwork = (CThread_worker *)malloc(sizeof(CThread_worker));if(newwork == NULL){printf("pool_add_worker\n");exit(-1);}newwork->process = process;newwork->arg = arg;newwork->next = NULL; /*訪問到臨界資源要上鎖*/pthread_mutex_lock(&(pool->queue_lock));/*將新任務插入到隊尾*/CThread_worker *temp = pool->queue_head;if(temp == NULL){pool->queue_head = newwork;}else{while(temp->next != NULL){temp = temp->next;}temp->next = newwork;}/*任務等待隊列的任務數加1*/pool->cur_queue_size++;/*解鎖*/pthread_mutex_unlock(&(pool->queue_lock));/*發信號喚醒任意一個空閑的線程去處理新加入的任務*/pthread_cond_signal(&(pool->queue_ready));}?
?pool_destroy ()函數用于銷毀線程池,線程池任務鏈表中的任務不會再被執行,但是正在運行的線程會一直把任務運行完后再退出。
void pool_destory() {if(pool->shutdown){return -1; /*防止兩次調用*/}pool->shutdown = 1;/*喚醒所有等待的線程*/pthread_cond_broadcast(&(pool->queue_ready));/*阻塞等待線程退出,否則子線程先退出,主線程沒有回收就變成僵尸線程了*/int i;for(i=0; i<pool->max_thread_num; i++){pthread_join(pool->threadid[i], NULL);}free(pool->threadid);pool->threadid = NULL;/*銷毀等待隊列*/CThread_worker *temp;while(pool->queue_head != NULL){temp = pool->queue_head;pool->queue_head = temp->next;free(temp);}/*銷毀條件互斥鎖和條件變量*/pthread_mutex_destory(&(pool->queue_lock));pthread_cond_destory(&(pool->queue_ready));free(pool);/*銷毀后指針置空*/pool = NULL;return 0; }測試函數
void *myprocess (void *arg) {printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);sleep (1);/*休息一秒,延長任務的執行時間*/return NULL; }int main (int argc, char **argv) {pool_init (5);/*線程池中最多三個活動線程*//*連續向池中投入10個任務*/int *workingnum = (int *) malloc (sizeof (int) * 10);int i;for (i = 0; i < 10; i++){workingnum[i] = i;pool_add_worker (myprocess, &workingnum[i]);}/*等待所有任務完成*/sleep (5);/*銷毀線程池*/pool_destroy ();free (workingnum);return 0; }?
/*----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------*/
線程池的目的是為了減少線程創建、銷毀所帶來的代價,當有非常多的任務需要獨立的線程去做的時候,可以使用線程池,從線程池中喚醒一個空閑(睡眠狀態)線程來處理一個個的任務。
線程池中的每個子線程都是等價的。用線程信號量來控制子線程和任務的分配問題。設置一個信號量來表示任務隊列中的任務資源,每個子線程都會處于死循環中,每輪循環首先等待一個任務資源信號量,當等到之后,互斥的從任務隊列中摘取一個任務節點。任務節點中記錄著該任務所要執行的函數指針及其參數。之后子線程開始執行該任務。執行完后釋放一個信號量并進去下一輪循環。當信號量小于1(沒有信號量)時,子線程將會阻塞。
因此一個任務由哪一個線程執行,要看那個線程能夠獲取到對應的信號量資源。
具體實現:
?任務隊列由雙向鏈表構造,每個節點包含一個任務的函數指針和參數指針。
一般一個簡單的線程池有下列組件:
1、線程池管理器(用于創建并管理線程池)
2、工作線程(線程池中的線程)
3、任務接口(task,每個任務必須實現的接口,以供工作線程調度任務的執行)
4、任務隊列(用于存放沒有處理的任務。提供一種緩沖機制)
?
線程池工作的基本邏輯:
1.首先 線程池初始化時 會創建出很多條線程,但他們沒任務可執行時,就會調用條件變量cond,讓自己沉睡。因此線程池一被創建,就躺著很多條沉睡的線程。
2.線程的執行函數中,有個while(1)循環,讓線程有任務時執行任務,沒任務時就調用pthread_cond_wait()來沉睡。
3.當有任務加入線程池的任務列表時,會通過調用pthread_cond_signal()來喚醒一條線程(add_task()函數),然后線程執行完就繼續執行下一條任務或沉睡。
4.當線程池中的 shundown變量變成true時,便會調用pthread_cond_broadcase()喚醒所有沉睡的線程,使線程們自己退出
總結
以上是生活随笔為你收集整理的C语言实现简单的线程池【转】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: b站视频素材哪里找?怎么找b站素材?
- 下一篇: 论文阅读:ThinLTO: Scalab