muduo网络库学习(四)事件驱动循环EventLoop
muduo的設(shè)計(jì)采用高并發(fā)服務(wù)器框架中的one loop per thread模式,即一個(gè)線程一個(gè)事件循環(huán)。
這里的loop,其實(shí)就是muduo中的EventLoop,所以到目前為止,不管是Poller,Channel還是TimerQueue都僅僅是單線程下的任務(wù),因?yàn)檫@些都依賴于EventLoop。這每一個(gè)EventLoop,其實(shí)也就是一個(gè)Reactor模型。
而多線程體現(xiàn)在EventLoop的上層,即在EventLoop上層有一個(gè)線程池,線程池中每一個(gè)線程運(yùn)行一個(gè)EventLoop,也就是Reactor + 線程池的設(shè)計(jì)模式
梳理一下
- 每個(gè)muduo網(wǎng)絡(luò)庫(kù)有一個(gè)事件驅(qū)動(dòng)循環(huán)線程池EventLoopThreadPool
- 每個(gè)線程池中有多個(gè)事件驅(qū)動(dòng)線程EventLoopThread
- 每個(gè)線程運(yùn)行一個(gè)EventLoop事件循環(huán)
- 每個(gè)EventLoop事件循環(huán)包含一個(gè)io復(fù)用Poller,一個(gè)計(jì)時(shí)器隊(duì)列TimerQueue
- 每個(gè)Poller監(jiān)聽(tīng)多個(gè)Channel,TimerQueue其實(shí)也是一個(gè)Channel
- 每個(gè)Channel對(duì)應(yīng)一個(gè)fd,在Channel被激活后調(diào)用回調(diào)函數(shù)
- 每個(gè)回調(diào)函數(shù)是在EventLoop所在線程執(zhí)行
- 所有激活的Channel回調(diào)結(jié)束后EventLoop繼續(xù)讓Poller監(jiān)聽(tīng)
所以調(diào)用回調(diào)函數(shù)的過(guò)程中是同步的,如果回調(diào)函數(shù)執(zhí)行時(shí)間很長(zhǎng),那么這個(gè)EventLoop所在線程就會(huì)等待很久之后才會(huì)再次調(diào)用poll。
整個(gè)muduo網(wǎng)絡(luò)庫(kù)實(shí)際上是由Reactor + 線程池實(shí)現(xiàn)的,線程池中每一個(gè)線程都是一個(gè)Reactor模型。在處理大并發(fā)的服務(wù)器任務(wù)上有很大優(yōu)勢(shì)。
簡(jiǎn)化的關(guān)系圖如下,EventLoop只涉及Poller,Channel(簡(jiǎn)單涉及TcpConnection)和TimerQueue。
- 白色三角,繼承
- 黑色菱形,聚合
一個(gè)事件驅(qū)動(dòng)循環(huán)EventLoop其實(shí)就是一個(gè)Reactor模型,是一個(gè)單線程任務(wù)。主要包含io復(fù)用函數(shù)Poller,定時(shí)器隊(duì)列TimerQueue以及激活隊(duì)列。其他的就是一些輔助變量
typedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;/* 創(chuàng)建時(shí)保存當(dāng)前事件循環(huán)所在線程,用于之后運(yùn)行時(shí)判斷使用EventLoop的線程是否是EventLoop所屬的線程 */const pid_t threadId_;/* poll返回的時(shí)間,用于計(jì)算從激活到調(diào)用回調(diào)函數(shù)的延遲 */Timestamp pollReturnTime_;/* io多路復(fù)用 */std::unique_ptr<Poller> poller_;/* 定時(shí)器隊(duì)列 */std::unique_ptr<TimerQueue> timerQueue_;/* 喚醒當(dāng)前線程的描述符 */int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client./* * 用于喚醒當(dāng)前線程,因?yàn)楫?dāng)前線程主要阻塞在poll函數(shù)上* 所以喚醒的方法就是手動(dòng)激活這個(gè)wakeupChannel_,即寫(xiě)入幾個(gè)字節(jié)讓Channel變?yōu)榭勺x* 注: 這個(gè)Channel也注冊(cè)到Poller中*/std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variables/* * 激活隊(duì)列,poll函數(shù)在返回前將所有激活的Channel添加到激活隊(duì)列中* 在當(dāng)前事件循環(huán)中的所有Channel在Poller中*/ChannelList activeChannels_;/* 當(dāng)前執(zhí)行回調(diào)函數(shù)的Channel */Channel* currentActiveChannel_;/* * queueInLoop添加函數(shù)時(shí)給pendingFunctors_上鎖,防止多個(gè)線程同時(shí)添加* * mutable,突破const限制,在被const聲明的函數(shù)仍然可以更改這個(gè)變量*/mutable MutexLock mutex_;/* * 等待在當(dāng)前線程調(diào)用的回調(diào)函數(shù),* 原因是本來(lái)屬于當(dāng)前線程的回調(diào)函數(shù)會(huì)被其他線程調(diào)用時(shí),應(yīng)該把這個(gè)回調(diào)函數(shù)添加到它屬于的線程中* 等待它屬于的線程被喚醒后調(diào)用,以滿足線程安全性* * TcpServer::removeConnection是個(gè)例子* 當(dāng)關(guān)閉一個(gè)TcpConnection時(shí),需要調(diào)用TcpServer::removeConnection,但是這個(gè)函數(shù)屬于TcpServer,* 然而TcpServer和TcpConnection不屬于同一個(gè)線程,這就容易將TcpServer暴露給其他線程,* 萬(wàn)一其他線程析構(gòu)了TcpServer怎么辦(線程不安全)* 所以會(huì)調(diào)用EventLoop::runInLoop,如果要調(diào)用的函數(shù)屬于當(dāng)前線程,直接調(diào)用* 否則,就添加到這個(gè)隊(duì)列中,等待當(dāng)前線程被喚醒*/std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_最后一個(gè)變量std::vector<Functor> pendingFunctors_;比較不好理解,它是一個(gè)任務(wù)容器,存放的是將要執(zhí)行的回調(diào)函數(shù)。
準(zhǔn)備這么一個(gè)容器的原因在于
- 某個(gè)對(duì)象(通常是Channel或者TcpConnection)可能被另一個(gè)線程使用(這個(gè)線程不是這個(gè)對(duì)象所在線程),此時(shí)這個(gè)對(duì)象就等于暴露給其他線程了。這是非常不安全的,萬(wàn)一這個(gè)線程不小心析構(gòu)了這個(gè)對(duì)象,而這個(gè)對(duì)象所屬的那個(gè)線程正要訪問(wèn)這個(gè)對(duì)象(例如調(diào)用這個(gè)對(duì)象的接口),這個(gè)線程就會(huì)崩潰,因?yàn)樗L問(wèn)了一個(gè)本不存在的對(duì)象(已經(jīng)被析構(gòu))。
- 為了解決這個(gè)問(wèn)題,就需要盡量將對(duì)這個(gè)對(duì)象的操作移到它所屬的那個(gè)線程執(zhí)行(這里是調(diào)用這個(gè)對(duì)象的接口)以滿足線程安全性。又因?yàn)槊總€(gè)對(duì)象都有它所屬的事件驅(qū)動(dòng)循環(huán)EventLoop,這個(gè)EventLoop通常阻塞在poll上。可以保證的是EventLoop阻塞的線程就是它所屬的那個(gè)線程,所以調(diào)用poll的線程就是這個(gè)對(duì)象所屬的線程。這就 可以讓poll返回后再執(zhí)行想要調(diào)用的函數(shù),但是需要手動(dòng)喚醒poll,否則一直阻塞在那里會(huì)耽誤函數(shù)的執(zhí)行。
runInLoop和queueInLoop函數(shù)執(zhí)行的就是上述操作
/** 1.如果事件循環(huán)不屬于當(dāng)前這個(gè)線程,就不能直接調(diào)用回調(diào)函數(shù),應(yīng)該回到自己所在線程調(diào)用* 2.此時(shí)需要先添加到自己的隊(duì)列中存起來(lái),然后喚醒自己所在線程的io復(fù)用函數(shù)(poll)* 3.喚醒方法是采用eventfd,這個(gè)eventfd只有8字節(jié)的緩沖區(qū),向eventfd中寫(xiě)入數(shù)據(jù)另poll返回* 4.返回后會(huì)調(diào)用在隊(duì)列中的函數(shù),見(jiàn)EventLoop* * 舉例說(shuō)明什么時(shí)候會(huì)出現(xiàn)事件驅(qū)動(dòng)循環(huán)不屬于當(dāng)前線程的情況* 1.客戶端close連接,服務(wù)器端某個(gè)Channel被激活,原因?yàn)镋POLLHUP* 2.Channel調(diào)用回調(diào)函數(shù),即TcpConnection的handleClose* 3.handleClose調(diào)用TcpServer為它提供的回調(diào)函數(shù)removeConnection* 4.此時(shí)執(zhí)行的是TcpServer的removeConnection函數(shù),* 解釋 * 1.因?yàn)門(mén)cpServer所在線程和TcpConnection所在的不是同一個(gè)線程* 2.這就導(dǎo)致將TcpServer暴露給了TcpConnection所在線程* 3.因?yàn)門(mén)cpServer需要將這個(gè)關(guān)閉的TcpConnection從tcp map中刪除* 就需要調(diào)用自己的另一個(gè)函數(shù)removeConnectionInLoop* 4.為了實(shí)現(xiàn)線程安全性,也就是為了讓removeConnectionInLoop在TcpServer自己所在線程執(zhí)行* 需要先把這個(gè)函數(shù)添加到隊(duì)列中存起來(lái),等到回到自己的線程在執(zhí)行* 5.runInLoop中的queueInLoop就是將這個(gè)函數(shù)存起來(lái)* 6.而此時(shí)調(diào)用runInLoop的仍然是TcpConnection所在線程* 7.因?yàn)樽允贾两K,removeConnection這個(gè)函數(shù)都還沒(méi)有結(jié)束* * 如果調(diào)用runInLoop所在線程和事件驅(qū)動(dòng)循環(huán)線程是一個(gè)線程,那么直接調(diào)用回調(diào)函數(shù)就行了* * 在TcpServer所在線程中,EventLoop明明阻塞在poll上,這里為什么可以對(duì)它進(jìn)行修改* 1.線程相當(dāng)于一個(gè)人可以同時(shí)做兩件事情,一個(gè)EventLoop同時(shí)調(diào)用兩個(gè)函數(shù)就很正常了* 2.其實(shí)函數(shù)調(diào)用都是通過(guò)函數(shù)地址調(diào)用的,既然EventLoop可讀,就一定直到內(nèi)部函數(shù)的地址,自然可以調(diào)用* 3.而更改成員函數(shù),通過(guò)地址訪問(wèn),進(jìn)而修改,也是可以的*/ void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));} }當(dāng)然了,如果這個(gè)對(duì)象所屬線程和當(dāng)前線程相同,就沒(méi)有線程安全性的問(wèn)題,直接調(diào)用即可。否則,就需要添加到pendingFunctors_中,這正是queueInLoop的功效
/** 由runInLoop調(diào)用,也可直接調(diào)用,作用* 1.將相應(yīng)的回調(diào)函數(shù)存在事件驅(qū)動(dòng)循環(huán)的隊(duì)列中,等待回到自己線程再調(diào)用它* 2.激活自己線程的事件驅(qū)動(dòng)循環(huán)*/ void EventLoop::queueInLoop(Functor cb) {{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();} }此處需要上鎖保護(hù)pendingFunctors_以防止多個(gè)線程同時(shí)向它添加函數(shù)。這里的鎖體現(xiàn)了RAII方法,大括號(hào)是語(yǔ)句塊,把里面的變量作為臨時(shí)變量處理
因?yàn)镋ventLoop通常阻塞在poll上,所以添加到pendingFunctors_后需要手動(dòng)喚醒它,不然它一直阻塞在poll,會(huì)耽誤了函數(shù)的執(zhí)行。喚醒的方法是使用eventfd
函數(shù)用于創(chuàng)建一個(gè)eventfd文件描述符,這個(gè)描述符可用于進(jìn)程/線程間的等待/喚醒。原因是內(nèi)核只為eventfd維護(hù)一個(gè)uint64_t類型的計(jì)數(shù)器,大小應(yīng)該在64位。
參數(shù)initval是這個(gè)計(jì)數(shù)器的初值
flags是一些標(biāo)志,可以是下面幾個(gè)的或運(yùn)算結(jié)果
- EFD_NONBLOCK,非阻塞
- EFD_CLOEXEC,設(shè)置close-on-exec屬性,調(diào)用exec時(shí)會(huì)自動(dòng)close
- …
eventfd也可以使用write/read等io函數(shù)進(jìn)行讀寫(xiě),區(qū)別是
write每次只能寫(xiě)入8字節(jié)大小的數(shù)據(jù),內(nèi)核會(huì)將這8字節(jié)大小的數(shù)值加到計(jì)數(shù)器上
read一次性讀取這個(gè)計(jì)數(shù)器的值,并把緩沖區(qū)初始化為0。如果調(diào)用read時(shí)這個(gè)計(jì)數(shù)器值就是0,那么非阻塞時(shí)會(huì)返回EAGAIN,阻塞時(shí)會(huì)等待計(jì)數(shù)器的值變?yōu)榉?
可以把這個(gè)eventfd添加到poll中,在需要喚醒時(shí)寫(xiě)入8字節(jié)數(shù)據(jù),此時(shí)poll返回,執(zhí)行回調(diào)函數(shù),然后執(zhí)行在pendingFunctors_中的函數(shù)。
loop函數(shù)是EventLoop的事件驅(qū)動(dòng)循環(huán),所有的Reactor模型的loop函數(shù)都差不多。執(zhí)行的就是poll和回調(diào)函數(shù)的回調(diào),以及pendingFunctors_中函數(shù)的調(diào)用
/* * 事件驅(qū)動(dòng)主循環(huán)* * 1.每個(gè)TcpServer對(duì)應(yīng)一個(gè)事件驅(qū)動(dòng)循環(huán)線程池* 2.每個(gè)事件驅(qū)動(dòng)循環(huán)線程池對(duì)應(yīng)多個(gè)事件驅(qū)動(dòng)循環(huán)線程* 3.每個(gè)事件驅(qū)動(dòng)循環(huán)線程對(duì)應(yīng)一個(gè)事件驅(qū)動(dòng)主循環(huán)* 4.每個(gè)事件驅(qū)動(dòng)主循環(huán)對(duì)應(yīng)一個(gè)io多路復(fù)用函數(shù)* 5.每個(gè)io多路復(fù)用函數(shù)監(jiān)聽(tīng)多個(gè)Channel* 6.每個(gè)Channel對(duì)應(yīng)一個(gè)fd,也就對(duì)應(yīng)一個(gè)TcpConnection或者監(jiān)聽(tīng)套接字* 7.在poll返回后處理激活隊(duì)列中Channel的過(guò)程是同步的,也就是一個(gè)一個(gè)調(diào)用回調(diào)函數(shù)* 8.調(diào)用回調(diào)函數(shù)的線程和事件驅(qū)動(dòng)主循環(huán)所在線程是同一個(gè),也就是同步執(zhí)行回調(diào)函數(shù)* 9.線程池用在事件驅(qū)動(dòng)循環(huán)上層,也就是事件驅(qū)動(dòng)循環(huán)是線程池中的一個(gè)線程*/ void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){/* 清空激活隊(duì)列 */activeChannels_.clear();/* epoll_wait返回后會(huì)將所有就緒的Channel添加到激活隊(duì)列activeChannel中 */pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priorityeventHandling_ = true;/* 執(zhí)行所有在激活隊(duì)列中的Channel的回調(diào)函數(shù) */for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;/* 執(zhí)行pendingFunctors_中的所有函數(shù) */doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false; }Reactor模式的loop函數(shù)大多一個(gè)樣子
muduo中多了處理pendingFunctors_中的函數(shù),在自己的線程調(diào)用自己的函數(shù)是安全的
Channel的回調(diào)函數(shù)就是根據(jù)被激活原因調(diào)用不同的回調(diào)函數(shù),這些回調(diào)函數(shù)是在TcpConnection創(chuàng)建之初被設(shè)置的。
簡(jiǎn)單說(shuō)一下Channel和TcpConnection的關(guān)系
- 每個(gè)TcpConnection對(duì)象代表一個(gè)tcp連接,所以TcpConnection中需要保存用于服務(wù)器/客戶端通信的套接字,這個(gè)套接字就記錄在Channel中
- TcpConnection在創(chuàng)建之初會(huì)為Channel設(shè)置回調(diào)函數(shù),如果套接字可讀/可寫(xiě)/錯(cuò)誤/關(guān)閉等就會(huì)執(zhí)行TcpConnection中的函數(shù)
- TcpConnection在確定連接已經(jīng)建立后會(huì)向Poller注冊(cè)自己的Channel
Channel的handleEvent如下
tie_是TcpConnection的弱引用,在調(diào)用TcpConnection的函數(shù)之前判斷它是否還存在,如果被析構(gòu)了,那么提升的shared_ptr會(huì)是null
具體可以參考 muduo網(wǎng)絡(luò)庫(kù)學(xué)習(xí)(二)對(duì)套接字和監(jiān)聽(tīng)事件的封裝Channel
EventLoop沒(méi)有特別處理定時(shí)器任務(wù),原因是定時(shí)器任務(wù)TimerQueue也被轉(zhuǎn)換成一個(gè)文件描述符添加到Poller中,所以時(shí)間一到timerfd變?yōu)榭勺x,poll就會(huì)返回,就會(huì)調(diào)用回調(diào)函數(shù)。EventLoop只提供了runAt/runAfter/runEveny三個(gè)接口用于設(shè)置定時(shí)任務(wù)。這些在 muduo網(wǎng)絡(luò)庫(kù)學(xué)習(xí)(三)定時(shí)器TimerQueue的設(shè)計(jì)中有提及
/* * 定時(shí)器功能,由用戶調(diào)用runAt并提供當(dāng)事件到了執(zhí)行的回調(diào)函數(shù)* 時(shí)間在Timestamp設(shè)置,絕對(duì)時(shí)間,單位是微秒*/ TimerId EventLoop::runAt(Timestamp time, TimerCallback cb) {/* std::move,移動(dòng)語(yǔ)義,避免拷貝 */return timerQueue_->addTimer(std::move(cb), time, 0.0); }/** 如上,單位是微秒,相對(duì)時(shí)間*/ TimerId EventLoop::runAfter(double delay, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb)); }/** 每隔多少微秒調(diào)用一次*/ TimerId EventLoop::runEvery(double interval, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval); }幾個(gè)C++方面的知識(shí)點(diǎn)
- std::move,移動(dòng)語(yǔ)義,避免拷貝
- RAII,以鎖為例,構(gòu)造時(shí)上鎖,析構(gòu)時(shí)解鎖(函數(shù)返回時(shí)局部變量析構(gòu))
- 花括號(hào)語(yǔ)句塊
- std::unique_ptr,智能指針,不允許拷貝和賦值,獨(dú)一無(wú)二
- std::shared_ptr,智能指針,可以拷貝賦值,存在引用計(jì)數(shù)
- std::weak_ptr,弱引用,不增加引用計(jì)數(shù),必要時(shí)可通過(guò)lock函數(shù)提升為shared_ptr
總結(jié)
以上是生活随笔為你收集整理的muduo网络库学习(四)事件驱动循环EventLoop的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: muduo网络库学习(三)定时器Time
- 下一篇: 每天一道LeetCode-----合并两