0726------Linux基础----------线程池
生活随笔
收集整理的這篇文章主要介紹了
0726------Linux基础----------线程池
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
#ifndef __DEF_H__
#define __DEF_H__#include <stddef.h>
#include <pthread.h>
#include <stdio.h>#define TRUE 1
#define FALSE 0
//任務結構體
typedef struct{void (*thread_function_ptr_) (void*);void *arg_;
}task_t;//隊列結構體
typedef struct node_t{task_t data_; //隊列結點內存放任務struct node_t *next_;
}node_t, *pnode_t;typedef struct{pnode_t head_;pnode_t tail_;size_t size_;
}queue_t;//線程池結構體
typedef struct{size_t size_; //線程池大小pthread_t *threads_; //線程id數組queue_t queue_; //任務隊列int is_started_; // 線程池狀態 是否開啟 c 沒有boolpthread_mutex_t mutex_; //互斥鎖 各線程互斥訪問任務隊列pthread_cond_t cond_; //條件隊列 同步主線程和各子線程
}pool_t;// 線程池的接口
void thread_pool_init(pool_t *pool, size_t size); //初始化線程池
void thread_pool_start(pool_t *pool); // 開啟線程池
void thread_pool_add_task(pool_t *pool, task_t task); //往線程池中加入新的任務
int thread_pool_get_task(pool_t *pool, task_t *task);//從線程池中取出任務
void thread_pool_stop(pool_t *pool); // 關閉線程池
void thread_pool_deatroy(pool_t *pool); //銷毀 線程池
int thread_pool_is_started(pool_t *pool);// 線程池是否開啟
size_t thread_pool_get_size_of_queue(pool_t *pool); //返回線程池任務隊列的大小
void* thread_pool_thread_func(void *); //每個線程執行的函數//隊列的接口
void queue_init(queue_t *queue); //初始化隊列 這里動態創建隊列
void queue_push(queue_t *queue, task_t task); // 入隊列
void queue_pop(queue_t *queue); //出隊列
void queue_destroy(queue_t *queue); //銷毀隊列
void queue_clear(queue_t *queue); //清空隊列
int queue_is_empty(queue_t *queue); //判斷隊列是否為空
size_t queue_size(queue_t *queue); //返回隊列的大小
task_t queue_top(queue_t *queue); // 返回隊首元素#endif#include "def.h"
#include <stdlib.h>
#include <assert.h>void queue_init(queue_t *queue){queue->head_ = NULL;queue->tail_ = NULL;queue->size_ = 0;
}void queue_push(queue_t *queue, task_t task){pnode_t pCur = (pnode_t )malloc(sizeof(node_t));pCur->data_ = task;pCur->next_ = NULL;if(queue_is_empty(queue)){queue->head_= queue->tail_ = pCur;}else{queue->tail_->next_ = pCur;queue->tail_ = pCur;}queue->size_++;
}void queue_pop(queue_t *queue){assert(!queue_is_empty(queue));pnode_t pCur = queue->head_;queue->head_ = queue->head_->next_;free(pCur);queue->size_ --;
}void queue_destroy(queue_t *queue){queue_clear(queue);
}
void queue_clear(queue_t *queue){while(!queue_is_empty(queue)){queue_pop(queue);}
}int queue_is_empty(queue_t *queue){return queue->size_ == 0;
}size_t queue_size(queue_t *queue){return queue->size_;
}
task_t queue_top(queue_t *queue){return queue->head_->data_;
}#include "def.h"
#include <stdio.h>
#include <stdlib.h>
#define POOL_SIZE 3
void quare(void *arg){int num = (int)arg;printf("%d * %d = %d\n", num, num, num * num);
}int main(int argc, const char *argv[])
{pool_t pool;task_t task;srand(10000);thread_pool_init(&pool, POOL_SIZE);thread_pool_start(&pool);while(1){task.thread_function_ptr_ = quare;task.arg_ = (void *)(rand()%100);thread_pool_add_task(&pool, task);sleep(1);}thread_pool_stop(&pool);thread_pool_destroy(&pool);return 0;
}
#include "def.h"
#include <stdio.h>/** 測試隊列*/void func_ptr(void *arg){printf("arg = %d\n", (int)arg);
}int main(int argc, const char *argv[])
{queue_t queue;queue_init(&queue);task_t task, task_2;task.thread_function_ptr_ = func_ptr;task.arg_ = (void *)10;queue_push(&queue, task);task_2 = queue_top(&queue);printf("task_2.arg = %d\n",(int)task_2.arg_);queue_pop(&queue);printf("queue_is_empty = %d\n", queue_is_empty(&queue));return 0;
}
#include "def.h"
#include <stdlib.h>
#include <assert.h>void thread_pool_init(pool_t *pool, size_t size){pool->size_ = size;pool->threads_ = (pthread_t *)malloc(pool->size_ * sizeof(pthread_t));queue_init(&pool->queue_);pool->is_started_ = FALSE;pthread_mutex_init(&pool->mutex_, NULL);pthread_cond_init(&pool->cond_, NULL);
}void *thread_pool_thread_func(void * arg){pool_t *pool = (pool_t *)arg;task_t task;while(1){int ret = thread_pool_get_task(pool, &task);if(ret == TRUE)task.thread_function_ptr_(task.arg_);else //此時說明線程池關閉break;}
}void thread_pool_start(pool_t *pool){if(pool->is_started_ == FALSE){pool->is_started_ = TRUE;int i;for(i = 0; i < pool->size_; i++){pthread_create(&pool->threads_[i], NULL,thread_pool_thread_func, (void*)pool);}}
}void thread_pool_add_task(pool_t *pool, task_t task){assert(pool->is_started_);pthread_mutex_lock(&pool->mutex_);queue_push(&pool->queue_, task); //將新任務加入任務隊列中去pthread_cond_signal(&pool->cond_);pthread_mutex_unlock(&pool->mutex_);
}int thread_pool_get_task(pool_t *pool, task_t *task){ // 根據返回值判斷是否成功取出任務pthread_mutex_lock(&pool->mutex_);while(queue_is_empty(&pool->queue_) && pool->is_started_ == TRUE){pthread_cond_wait(&pool->cond_, &pool->mutex_);}if(pool->is_started_ == FALSE){//有可能是關閉線程池時 被喚醒的pthread_mutex_unlock(&pool->mutex_);return FALSE;}*task = queue_top(&pool->queue_);queue_pop(&pool->queue_);pthread_mutex_unlock(&pool->mutex_);return TRUE;
}void thread_pool_stop(pool_t *pool){if(pool->is_started_ == FALSE)return;pool->is_started_ = FALSE;pthread_cond_broadcast(&pool->cond_); //喚醒所有睡眠線程 結束回收資源int i;for(i = 0; i < pool->size_; i++){pthread_join(pool->threads_[i], NULL);}queue_clear(&pool->queue_); // 清空任務隊列
}void thread_pool_destroy(pool_t *pool){ // 銷毀線程池thread_pool_stop(pool);pthread_mutex_destroy(&pool->mutex_); // 銷毀互斥鎖和條件變量pthread_cond_destroy(&pool->cond_);free(pool->threads_); //釋放動態分配的內存 線程數組和任務隊列queue_destroy(&pool->queue_);
}int thread_pool_is_started(pool_t *pool){return pool->is_started_ == TRUE;
}size_t thread_pool_get_size_of_queue(pool_t *pool){return pool->queue_.size_;
}
轉載于:https://www.cnblogs.com/monicalee/p/3874178.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的0726------Linux基础----------线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux环境下安装Tigase XMP
- 下一篇: mysql数据库表迁移