muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool
muduo是支持多線程的網(wǎng)絡(luò)庫,在muduo網(wǎng)絡(luò)庫學(xué)習(xí)(七)用于創(chuàng)建服務(wù)器的類TcpServer中也提及了TcpServer中有一個(gè)事件驅(qū)動循環(huán)線程池,線程池中存在大量線程,每個(gè)線程運(yùn)行一個(gè)EventLoop::loop。
線程池的作用體現(xiàn)在
- 用戶啟動TcpServer服務(wù)器時(shí)創(chuàng)建大量子線程,每個(gè)子線程創(chuàng)建一個(gè)EventLoop并開始執(zhí)行EventLoop::loop
- 主線程的線程池保存著創(chuàng)建的這些線程和EventLoop
- 當(dāng)Acceptor接收到客戶端的連接請求后返回TcpServer,TcpServer創(chuàng)建TcpConnection用于管理tcp連接
- TcpServer從事件驅(qū)動線程池中取出一個(gè)EventLoop,并將這個(gè)EventLoop傳給TcpConnection的構(gòu)造函數(shù)
- TcpConnection創(chuàng)建用于管理套接字的Channel并注冊到從線程池中取出的EventLoop的Poller中
- 服務(wù)器繼續(xù)監(jiān)聽
這個(gè)池子既是一個(gè)線程池,又是一個(gè)EventLoop池,二者是等價(jià)的,一個(gè)EventLoop對應(yīng)一個(gè)線程
這種方式稱為one loop per thread即reactor + 線程池
線程池的定義比較簡單,唯一復(fù)雜的地方是由主線程創(chuàng)建子線程,子線程創(chuàng)建EventLoop并執(zhí)行EventLoop::loop,主線程返回創(chuàng)建的EventLoop給線程池并保存起來,比較繞。
線程池EventLoopThreadPool定義如下
成員變量和函數(shù)沒什么特別的,其中
- baseLoop_是主線程所在的事件驅(qū)動循環(huán),即TcpServer所在的那個(gè)主線程,這個(gè)事件驅(qū)動循環(huán)通常只負(fù)責(zé)監(jiān)聽客戶端連接請求,即Acceptor的Channel。
- 兩個(gè)vector保存著所有子線程即每個(gè)子線程對應(yīng)的EventLoop。事件驅(qū)動循環(huán)線程被封裝在EventLoopThread中,EventLoopThread中使用的Thread才是真正的線程封裝
線程池是由TcpServer啟動的,在TcpServer::start函數(shù)中(由用戶調(diào)用)
/* * 開啟事件驅(qū)動循環(huán)線程池,將Acceptor的Channel添加到EventLoop中,注冊到Poller上* 此時(shí)還沒有調(diào)用EventLoop::loop(),所以還沒有開啟監(jiān)聽*/ void TcpServer::start() {if (started_.getAndSet(1) == 0){/* 啟動線程池 */threadPool_->start(threadInitCallback_);assert(!acceptor_->listenning());/* * Acceptor和TcpServer在同一個(gè)線程,通常會直接調(diào)用 * std::bind只能值綁定,如果傳入智能指針會增加引用計(jì)數(shù),這里傳遞普通指針* 因?yàn)門cpServer沒有銷毀,所以不用擔(dān)心Accepor會銷毀*/loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));} }TcpServer::start中調(diào)用threadPool_->start()用于啟動線程池,傳入的參數(shù)是創(chuàng)建好所有線程后調(diào)用的回調(diào)函數(shù),也是由用戶提供
void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創(chuàng)建一定數(shù)量的線程(事件驅(qū)動循環(huán)) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅(qū)動循環(huán)線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創(chuàng)建新線程,返回新線程的事件驅(qū)動循環(huán)EventLoop */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }創(chuàng)建線程的過程有些繞,在這里梳理一下
Thread類通過調(diào)用pthread_create創(chuàng)建子線程
/* EventLoopThread::startLoop函數(shù)中調(diào)用,用于創(chuàng)建子線程 */ void Thread::start() {assert(!started_);started_ = true;// FIXME: move(func_)/* 創(chuàng)建子線程為線程函數(shù)提供的參數(shù),封裝起來就可以實(shí)現(xiàn)傳遞多個(gè)參數(shù) */detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);/* 創(chuàng)建線程,子線程調(diào)用detail::startThread,主線程繼續(xù)向下執(zhí)行 */if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{/* 如果線程創(chuàng)建成功,主線程阻塞在這里 */latch_.wait();assert(tid_ > 0);} }創(chuàng)建成功后,主線程阻塞在latch_.wait()上(條件變量),等待子線程執(zhí)行threadFunc之前被喚醒
這里不太明白原因,主線程為什么需要阻塞在這里
ThreadData是線程數(shù)據(jù)類,將線程函數(shù)用到的所有參數(shù)都存在這里面即可,線程函數(shù)為detail::startThread,進(jìn)而調(diào)用runInThread
runInLoop調(diào)用latch_->countDown,此時(shí)會喚醒主線程,主線程回到startLoop中由于loop_為null阻塞在wait上。而此時(shí)子線程正在準(zhǔn)備調(diào)用threadFunc(在EventLoopThread創(chuàng)建之初將EventLoopThread::threadFunc傳遞給Thread,用于在創(chuàng)建完線程后調(diào)用)
/* * 創(chuàng)建線程后間接調(diào)用的函數(shù),用于執(zhí)行EventLoopThread::threadFunc* 這個(gè)函數(shù)在EventLoopThread構(gòu)造時(shí)傳給Thread對象的* EventLoopThread::startLoop函數(shù)中調(diào)用Thread對象的Thread::start函數(shù)* Thread::start中創(chuàng)建子線程,子線程調(diào)用detail::startThread,進(jìn)而調(diào)用detail::runInThread* detail::runInLoop調(diào)用EventLoopThread::threadFunc,創(chuàng)建EventLoop,喚醒主線程,子線程執(zhí)行l(wèi)oop循環(huán)* 轉(zhuǎn)了一大圈又回到EventLoopThread中*/void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();/* 給當(dāng)前線程命名 */::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);try{/* EventLoopThread::threadFunc() */func_();muduo::CurrentThread::t_threadName = "finished";}catch (const Exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());throw; // rethrow}}兜兜轉(zhuǎn)轉(zhuǎn)又回到了EventLoopThread,此時(shí)主線程阻塞在EventLoopThread::startInLoop的wait上,子線程在EventLoopThread::threadFunc中,準(zhǔn)備創(chuàng)建一個(gè)EventLoop然后喚醒主線程,并開啟事件循環(huán)
/* * 傳遞給線程的回調(diào)函數(shù),當(dāng)創(chuàng)建線程后,在detail::runInLoop會回調(diào)這個(gè)函數(shù)* 此函數(shù)創(chuàng)建一個(gè)事件驅(qū)動循環(huán),并開啟事件監(jiān)聽(loop)*/ void EventLoopThread::threadFunc() {/* 子線程創(chuàng)建事件驅(qū)動循環(huán) */EventLoop loop;if (callback_){callback_(&loop);}{/* 上鎖后賦值給loop_ */MutexLockGuard lock(mutex_);loop_ = &loop;/* * pthread_cond_signal(pthread_cond_t&)喚醒一個(gè)wait的線程* 此時(shí)主線程發(fā)現(xiàn)loop_已經(jīng)不為null,隨后返回到EventLoopThreadPool中*/cond_.notify();}/* 子線程開啟事件監(jiān)聽,進(jìn)入無限循環(huán),不返回 */loop.loop();//assert(exiting_);loop_ = NULL; }子線程將一直停留在loop.loop()上,主線程由于被子線程喚醒,發(fā)現(xiàn)loop_已經(jīng)不為null,說明已經(jīng)創(chuàng)建了一個(gè)線程,同時(shí)也在那個(gè)線程中創(chuàng)建了一個(gè)事件驅(qū)動循環(huán),所以主線程返回,將創(chuàng)建好的事件驅(qū)動循環(huán)返回給線程池保存起來,當(dāng)有新的TcpConnection被創(chuàng)建后取出一個(gè)用來監(jiān)聽tcp連接
void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創(chuàng)建一定數(shù)量的線程(事件驅(qū)動循環(huán)) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅(qū)動循環(huán)線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創(chuàng)建新線程,返回新線程的事件驅(qū)動循環(huán)EventLoop *//* EventLoopThread主線程返回后,將事件驅(qū)動循環(huán)保存下來,然后繼續(xù)創(chuàng)建線程 */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }至此線程池的創(chuàng)建工作完成,每一個(gè)線程都運(yùn)行著EventLoop::loop,進(jìn)行EventLoop::loop -> Poller::poll -> Channel::handleEvent -> TcpConnection::handle* -> EventLoop::doPendingFunctors -> EventLoop::loop的工作。
如果提供了回調(diào)函數(shù),在創(chuàng)建完成后也會執(zhí)行,但通常用戶不會在意線程池的創(chuàng)建工作,所以一般都不提供
創(chuàng)建完成后,線程池唯一的工作就是在新建TcpConnection時(shí)從池子中取出一個(gè)EventLoop傳給TcpConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();/* 從事件驅(qū)動線程池中取出一個(gè)線程給TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* 為TcpConnection生成獨(dú)一無二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根據(jù)sockfd獲取tcp連接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 創(chuàng)建一個(gè)新的TcpConnection代表一個(gè)Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));... }EventLoopThreadPool::getNextLoop函數(shù)如下,用于取出一個(gè)EventLoop。
如果線程池中沒有線程,就返回主線程的EventLoop,此時(shí)只有一個(gè)EventLoop在運(yùn)行,即TcpServer的那個(gè)
總結(jié)
以上是生活随笔為你收集整理的muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode----位运算实
- 下一篇: 每天一道LeetCode-----给定字