线程池设计中的惊群问题
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
? ? ? 多線(xiàn)程編程已經(jīng)是現(xiàn)在網(wǎng)絡(luò)編程中常用的編程技術(shù),設(shè)計(jì)一個(gè)良好的線(xiàn)程池庫(kù)顯得尤為重要。在 UNIX(WIN32下可以采用類(lèi)似的方法,acl 庫(kù)中的線(xiàn)程池是跨平臺(tái)的) 環(huán)境下設(shè)計(jì)線(xiàn)程池庫(kù)主要是如何用好如下系統(tǒng) API:
? ? ? 1、pthread_cond_signal/pthread_cond_broadcast:生產(chǎn)者線(xiàn)程通知線(xiàn)程池中的某個(gè)或一些消費(fèi)者線(xiàn)程池,接收處理任務(wù);
? ? ? 2、pthread_cond_wait:線(xiàn)程池中的消費(fèi)者線(xiàn)程等待線(xiàn)程條件變量被通知;
? ? ? 3、pthread_mutex_lock/pthread_mutex_unlock:線(xiàn)程互斥鎖的加鎖及解鎖函數(shù)。
?
? ? ? 下面的代碼示例是大家常見(jiàn)的線(xiàn)程池的設(shè)計(jì)方式:
?
// 線(xiàn)程任務(wù)類(lèi)型定義 struct thread_job {struct thread_job *next; // 指向下一個(gè)線(xiàn)程任務(wù)void (*func)(void*); // 應(yīng)用回調(diào)處理函數(shù) void *arg; // 回調(diào)函數(shù)的參數(shù)... };// 線(xiàn)程池類(lèi)型定義 struct thread_pool {int max_threads; // 線(xiàn)程池中最大線(xiàn)程數(shù)限制int curr_threads; // 當(dāng)前線(xiàn)程池中總的線(xiàn)程數(shù)int idle_threads; // 當(dāng)前線(xiàn)程池中空閑的線(xiàn)程數(shù)pthread_mutex_t mutex; // 線(xiàn)程互斥鎖pthread_cond_t cond; // 線(xiàn)程條件變量thread_job *first; // 線(xiàn)程任務(wù)鏈表的表頭thread_job *last; // 線(xiàn)程任務(wù)鏈表的表尾... }// 線(xiàn)程池中的消費(fèi)者線(xiàn)程處理過(guò)程 static void *consumer_thread(void *arg) {struct thread_pool *pool = (struct thread_pool*) arg;struct thread_job *job;int status;// 該消費(fèi)者線(xiàn)程需要先加鎖pthread_mutex_lock(&pool->mutex);while (1) {if (pool->first != NULL) {// 有線(xiàn)程任務(wù)時(shí),則取出并在下面進(jìn)行處理job = pool->first;pool->first = job->next;if (pool->last == job)pool->last = NULL;// 解鎖,允許其它消費(fèi)者線(xiàn)程加鎖或生產(chǎn)者線(xiàn)程添加新的任務(wù)pthread_mutex_unlock(&pool->mutex);// 回調(diào)應(yīng)用的處理函數(shù)job->func(job->arg);// 釋放動(dòng)態(tài)分配的內(nèi)存free(job);// 重新去加鎖pthread_mutex_lock(&pool->mutex);} else {pool->idle_threads++;// 在調(diào)用 pthread_cond_wait 等待線(xiàn)程條件變量被通知且自動(dòng)解鎖status = pthread_cond_wait(&pool->cond, &pool->mutex);pool->idle_threads--;if (status == 0)continue;// 等待線(xiàn)程條件變量異常,則該線(xiàn)程需要退出pool->curr_threads--;pthread_mutex_unlock(&pool->mutex);break;}}return NULL; }// 生產(chǎn)者線(xiàn)程調(diào)用此函數(shù)添加新的處理任務(wù) void add_thread_job(struct thread_pool *pool, void (*func)(void*), void *arg) {// 動(dòng)態(tài)分配任務(wù)對(duì)象struct thread_job *job = (struct thread_job*) calloc(1, sizeof(*job));job->func = func;job->arg = arg;pthread_mutex_lock(&pool->mutex);// 將新任務(wù)添加進(jìn)線(xiàn)程池的任務(wù)鏈表中if (pool->first == NULL)pool->first = job;elsepool->last->next = job;pool->last = job;job->next = NULL;if (pool->idle_threads > 0) {// 如果有空閑消費(fèi)者線(xiàn)程,則通知空閑線(xiàn)程進(jìn)行處理,同時(shí)需要解鎖pthread_mutex_unlock(&pool->mutex);pthread_cond_signal(&pool->cond);} else if (pool->curr_threads < pool->max_threads) {// 如果未超過(guò)最大線(xiàn)程數(shù)限制,則創(chuàng)建一個(gè)新的消費(fèi)者線(xiàn)程pthread_t id;pthread_attr_t attr;pthread_attr_init(&attr);// 將線(xiàn)程屬性設(shè)為分離模式,這樣當(dāng)線(xiàn)程退出時(shí)其資源自動(dòng)由系統(tǒng)回收pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);// 創(chuàng)建一個(gè)消費(fèi)者線(xiàn)程if (pthread_create(&id, &attr, consumer_thread, pool) == 0)pool->curr_threads++;pthread_mutex_unlock(&pool->mutex);pthread_attr_destroy(&attr);} }// 創(chuàng)建線(xiàn)程池對(duì)象 struct thread_pool *create_thread_pool(int max_threads) {struct thread_pool *pool = (struct thread_pool*) calloc(1, sizeof(*pool));pool->max_threads = max_threads;pthread_mutex_init(&pool->mutex);pthread_cond_init(&pool->cond);...return pool; }/// // 使用上面線(xiàn)程池的示例如下:// 由消費(fèi)者線(xiàn)程回調(diào)的處理過(guò)程 static void thread_callback(void* arg) {... }void test(void) {struct thread_pool *pool = create_thread_pool(100);int i;// 循環(huán)添加 1000000 次線(xiàn)程處理任務(wù)for (i = 0; i < 1000000; i++)add_thread_job(pool, thread_callback, NULL); }?
?
? ? ? 乍一看去,似乎也沒(méi)有什么問(wèn)題,象很多經(jīng)典的開(kāi)源代碼中也是這樣設(shè)計(jì)的,但有一個(gè)重要問(wèn)題被忽視了:線(xiàn)程池設(shè)計(jì)中的驚群現(xiàn)象。大家可以看到,整個(gè)線(xiàn)程池只有一個(gè)線(xiàn)程條件變量和線(xiàn)程互斥鎖,生產(chǎn)者線(xiàn)程和消費(fèi)者線(xiàn)程(即線(xiàn)程池中的子線(xiàn)程)正是通過(guò)這兩個(gè)變量進(jìn)行同步的。生產(chǎn)者線(xiàn)程每添加一個(gè)新任務(wù),都會(huì)調(diào)用 pthread_cond_signal 一次,由操作系統(tǒng)喚醒一個(gè)在線(xiàn)程條件變量等待的消費(fèi)者線(xiàn)程,但如果查看 pthread_cond_signal API 的系統(tǒng)幫助,你會(huì)發(fā)現(xiàn)其中有一句話(huà):調(diào)用此函數(shù)后,系統(tǒng)會(huì)喚醒在相同條件變量上等待的一個(gè)或多個(gè)線(xiàn)程。而正是這句模棱兩可的話(huà)沒(méi)有引起很多線(xiàn)程池設(shè)計(jì)者的注意,這也是整個(gè)線(xiàn)程池中消費(fèi)者線(xiàn)程收到信號(hào)通知后產(chǎn)生驚群現(xiàn)象的根源所在,并且是消費(fèi)者線(xiàn)程數(shù)量越多,驚群現(xiàn)象越嚴(yán)重----意味著 CPU 占用越高,線(xiàn)程池的調(diào)度性能越低。
? ? ? 要想避免如上線(xiàn)程池設(shè)計(jì)中的驚群?jiǎn)栴},在仍然共用一個(gè)線(xiàn)程互斥鎖的條件下,給每一個(gè)消費(fèi)者線(xiàn)程創(chuàng)建一個(gè)線(xiàn)程條件變量,生產(chǎn)者線(xiàn)程在添加任務(wù)時(shí),找到空閑的消費(fèi)者線(xiàn)程,將任務(wù)置入該消費(fèi)者的任務(wù)隊(duì)列中同時(shí)只通知 (pthread_cond_signal) 該消費(fèi)者的線(xiàn)程條件變量,消費(fèi)者線(xiàn)程與生產(chǎn)者線(xiàn)程雖然共用相同的線(xiàn)程互斥鎖(因?yàn)橛腥仲Y源及調(diào)用 pthread_cond_wait 所需),但線(xiàn)程條件變量的通知過(guò)程卻是定向通知的,未被通知的消費(fèi)者線(xiàn)程不會(huì)被喚醒,這樣驚群現(xiàn)象也就不會(huì)產(chǎn)生了。
? ? ? 當(dāng)然,還有一些設(shè)計(jì)上的細(xì)節(jié)需要注意,比如:當(dāng)沒(méi)有空閑消費(fèi)者線(xiàn)程時(shí),需要將任務(wù)添加進(jìn)線(xiàn)程池的全局任務(wù)隊(duì)列中,消費(fèi)者線(xiàn)程處理完自己的任務(wù)后需要查看一下線(xiàn)程池中的全局任務(wù)隊(duì)列中是否還有未處理的任務(wù)。
? ? ? 更多的線(xiàn)程池的設(shè)計(jì)細(xì)節(jié)請(qǐng)參考 acl (https://sourceforge.net/projects/acl/) 庫(kù)中 lib_acl/src/thread/acl_pthread_pool.c 中的代碼。
?
?參考:
線(xiàn)程編程常見(jiàn)API簡(jiǎn)介(上)
線(xiàn)程編程常見(jiàn)API簡(jiǎn)介(中)
線(xiàn)程編程常見(jiàn)API簡(jiǎn)介(下)
使用 acl_cpp 庫(kù)編寫(xiě)多線(xiàn)程程序
利用ACL庫(kù)開(kāi)發(fā)高并發(fā)半駐留式線(xiàn)程池程序
多線(xiàn)程開(kāi)發(fā)時(shí)線(xiàn)程局部變量的使用
再談線(xiàn)程局部變量
?
acl 庫(kù)下載:https://sourceforge.net/projects/acl/
github:https://github.com/zhengshuxin/acl
svn:svn checkout svn://svn.code.sf.net/p/acl/code/trunk acl-code
qq 群:242722074轉(zhuǎn)載于:https://my.oschina.net/u/568966/blog/309536
總結(jié)
以上是生活随笔為你收集整理的线程池设计中的惊群问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: CentOS6.3 下启动Oracle
- 下一篇: 一种导致android开发时无法生成R.