muduo之EventLoop
生活随笔
收集整理的這篇文章主要介紹了
muduo之EventLoop
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
? ? ? ? ?EventLoop.cc就相當于一個reactor,多線程之間的函數調用(用eventfd喚醒),epoll處理,超時隊列處理,對channel的處理。運行loop的進程被稱為IO線程,EventLoop提供了一些API確保相應函數在IO線程中調用,確保沒有用互斥量保護的變量只能在IO線程中使用,也封裝了超時隊列的基本操作。
EventLoop.h
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files.#ifndef MUDUO_NET_EVENTLOOP_H #define MUDUO_NET_EVENTLOOP_H#include <atomic> #include <functional> #include <vector>#include <boost/any.hpp>#include "muduo/base/Mutex.h" #include "muduo/base/CurrentThread.h" #include "muduo/base/Timestamp.h" #include "muduo/net/Callbacks.h" #include "muduo/net/TimerId.h"namespace muduo { namespace net {class Channel; class Poller; class TimerQueue;/// /// Reactor, at most one per thread. /// /// This is an interface class, so don't expose too much details. class EventLoop : noncopyable {public:typedef std::function<void()> Functor;EventLoop();~EventLoop(); // force out-line dtor, for std::unique_ptr members.////// Loops forever.////// Must be called in the same thread as creation of the object.///void loop();/// Quits loop.////// This is not 100% thread safe, if you call through a raw pointer,/// better to call through shared_ptr<EventLoop> for 100% safety.void quit();////// Time when poll returns, usually means data arrival.///Timestamp pollReturnTime() const { return pollReturnTime_; }int64_t iteration() const { return iteration_; }/// Runs callback immediately in the loop thread./// It wakes up the loop, and run the cb./// If in the same loop thread, cb is run within the function./// Safe to call from other threads.void runInLoop(Functor cb);/// Queues callback in the loop thread./// Runs after finish pooling./// Safe to call from other threads.void queueInLoop(Functor cb);size_t queueSize() const;// timers////// Runs callback at 'time'./// Safe to call from other threads.///TimerId runAt(Timestamp time, TimerCallback cb);////// Runs callback after @c delay seconds./// Safe to call from other threads.///TimerId runAfter(double delay, TimerCallback cb);////// Runs callback every @c interval seconds./// Safe to call from other threads.///TimerId runEvery(double interval, TimerCallback cb);////// Cancels the timer./// Safe to call from other threads.///void cancel(TimerId timerId);// internal usagevoid wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);// pid_t threadId() const { return threadId_; }void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }// bool callingPendingFunctors() const { return callingPendingFunctors_; }bool eventHandling() const { return eventHandling_; }void setContext(const boost::any& context){ context_ = context; }const boost::any& getContext() const{ return context_; }boost::any* getMutableContext(){ return &context_; }static EventLoop* getEventLoopOfCurrentThread();private:void abortNotInLoopThread();void handleRead(); // waked upvoid doPendingFunctors();void printActiveChannels() const; // DEBUGtypedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;const pid_t threadId_;Timestamp pollReturnTime_;std::unique_ptr<Poller> poller_;std::unique_ptr<TimerQueue> timerQueue_;int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variablesChannelList activeChannels_;Channel* currentActiveChannel_;mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_); };} // namespace net } // namespace muduo#endif // MUDUO_NET_EVENTLOOP_HEventLoop.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)#include "muduo/net/EventLoop.h"#include "muduo/base/Logging.h" #include "muduo/base/Mutex.h" #include "muduo/net/Channel.h" #include "muduo/net/Poller.h" #include "muduo/net/SocketsOps.h" #include "muduo/net/TimerQueue.h"#include <algorithm>#include <signal.h> #include <sys/eventfd.h> #include <unistd.h>using namespace muduo; using namespace muduo::net;namespace { __thread EventLoop* t_loopInThisThread = 0;const int kPollTimeMs = 10000;int createEventfd() {int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd; }#pragma GCC diagnostic ignored "-Wold-style-cast" class IgnoreSigPipe {public:IgnoreSigPipe(){::signal(SIGPIPE, SIG_IGN);// LOG_TRACE << "Ignore SIGPIPE";} }; #pragma GCC diagnostic error "-Wold-style-cast"IgnoreSigPipe initObj; } // namespaceEventLoop* EventLoop::getEventLoopOfCurrentThread() {return t_loopInThisThread; }//創建了EventLoop對象的線程稱為IO線程 EventLoop::EventLoop(): looping_(false), //判斷是否在loopquit_(false), //判斷是否退出的標志eventHandling_(false), //處理handevent的標志callingPendingFunctors_(false), //判斷當前是不是在執行方法隊列iteration_(0),threadId_(CurrentThread::tid()), //當前線程IDpoller_(Poller::newDefaultPoller(this)), //創建一個 poll 或 epoll 對象timerQueue_(new TimerQueue(this)), //創建一個計時器wakeupFd_(createEventfd()), //發送喚醒loop消息的描述符,隨便寫點消息即可喚醒wakeupChannel_(new Channel(this, wakeupFd_)), //wakeupChannel_用來自己給自己通知的一個通道,該通道會納入到poller來管理currentActiveChannel_(NULL) //當前活躍的channel鏈表指針 {LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread) //判斷是否是本線程的loop,是一個loop類型的指針{LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_; //用LOG_FATAL終止abort它}else{t_loopInThisThread = this; //this賦給線程局部數據指針}//設定wakeupChannel的回調函數,即EventLoop自己的的handleRead函數wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this)); //channel->handleEventWithGuard會調用到handleRead// we are always reading the wakeupfdwakeupChannel_->enableReading(); //注冊wakeupFd_到poller }EventLoop::~EventLoop() {LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll(); //從監聽隊列fd里移除wakeupChannel_->remove(); //移除epoll里面的channel::close(wakeupFd_);t_loopInThisThread = NULL; }void EventLoop::loop() {assert(!looping_);assertInLoopThread(); //事件循環必須在IO線程中,即創建該evenloop的線程looping_ = true; //是否正在循環quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear(); //activeChannels_是一個vector等待io復用函數返回pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //調用poll返回活動的事件,有可能是喚醒返回的++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priority 按優先級排序//處理IO事件eventHandling_ = true;for (Channel* channel : activeChannels_) //遍歷通道來進行處理{currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_); //pollReturnTime_是poll返回的時刻}currentActiveChannel_ = NULL; //處理完了賦空eventHandling_ = false;//執行方法隊列中的方法[方法隊列functors,我們可以跨線程的往里面添加新的方法,這些方法會在處理完io事件后執行]doPendingFunctors(); //這個設計也能夠進行計算任務}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false; }void EventLoop::quit() {quit_ = true; //設置退出標志// There is a chance that loop() just executes while(!quit_) and exits,// then EventLoop destructs, then we are accessing an invalid object.// Can be fixed using mutex_ in both places.if (!isInLoopThread()){wakeup(); //喚醒} }//在I/O線程中調用某個函數 //實際上就是如果是I/O線程主動調用該函數想要執行,那就同步執行該函數。如果是其他線程施加給I/O線程的任務,那么其他線程就需要把回調函數加入I/O線程的隊列,等待異步執行 void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()) //判斷是否是本線程的loop{cb();}else{queueInLoop(std::move(cb)); } }void EventLoop::queueInLoop(Functor cb)//把方法添加到隊列中,該方法會出現在多個線程中,操作要加鎖 {{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));//std::function支持移動初始化,所以這里用move提升性能。(減少一次拷貝)}if (!isInLoopThread() || callingPendingFunctors_)//如果調用的queneInLoop的線程不是IO線程,那么喚醒{//如果在IO線程調用queueInLoop(),而此時正在調用pending functor,由于doPendingFunctors()調用的Functor可能再次調用queueInLoop(cb),這是queueInLoop()就必須wakeup(),否則新增的cb可能就不能及時調用了wakeup();} }size_t EventLoop::queueSize() const {MutexLockGuard lock(mutex_);return pendingFunctors_.size(); }TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)//在指定的時間調用callback {return timerQueue_->addTimer(std::move(cb), time, 0.0); }TimerId EventLoop::runAfter(double delay, TimerCallback cb)//等一段時間調用callback {Timestamp time(addTime(Timestamp::now(), delay));//微妙return runAt(time, std::move(cb)); }TimerId EventLoop::runEvery(double interval, TimerCallback cb)//以固定的間隔反復的調用callback {Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval); }void EventLoop::cancel(TimerId timerId) //取消timer {return timerQueue_->cancel(timerId); }void EventLoop::updateChannel(Channel* channel) //更新通道,用epoll_ctl更新fd {assert(channel->ownerLoop() == this); //判斷channel的loop是不是當前loopassertInLoopThread(); poller_->updateChannel(channel); }void EventLoop::removeChannel(Channel* channel) //移除通道,將channel從ChannelMap移除并EPOLL_CTL_DEL掉fd {assert(channel->ownerLoop() == this); //表示當前的loopassertInLoopThread();if (eventHandling_) //正在處理channel{assert(currentActiveChannel_ == channel || //當前的channel或不是活躍的channelstd::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());}poller_->removeChannel(channel); }bool EventLoop::hasChannel(Channel* channel)//查找事件分發器是否在channels_中 {assert(channel->ownerLoop() == this);assertInLoopThread();return poller_->hasChannel(channel); }void EventLoop::abortNotInLoopThread() {LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " << CurrentThread::tid(); }void EventLoop::wakeup() {uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); //隨便寫點數據進去就喚醒了if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";} }void EventLoop::handleRead() //讀取喚醒的數據 {uint64_t one = 1;ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";} }// 1. 不是簡單的在臨界區內依次調用functor,而是把回調列表swap到functors中,這一方面減小了 //臨界區的長度,意味著不會阻塞其他線程的queueInLoop(),另一方面也避免了死鎖(因為Functor可能再次調用quueInLoop) // 2. 由于doPendingFunctors()調用的Functor可能再次調用queueInLoop(cb),這是queueInLoop()就必須wakeup(),否則新增的cb可能就不能及時調用了 // 3. muduo沒有反復執行doPendingFunctors()直到pendingFunctors為空,這是有意的,否則I/O線程可能陷入死循環,無法處理I/O事件 void EventLoop::doPendingFunctors() {std::vector<Functor> functors;callingPendingFunctors_ = true;//注意這里的臨界區,這里使用了一個棧上變量functors和pendingFunctors交換{MutexLockGuard lock(mutex_);functors.swap(pendingFunctors_); //pendingFunctors_是存放Functor的vector}//此處其它線程就可以往pendingFunctors添加任務for (const Functor& functor : functors){functor();}callingPendingFunctors_ = false; }void EventLoop::printActiveChannels() const {for (const Channel* channel : activeChannels_){LOG_TRACE << "{" << channel->reventsToString() << "} ";} }? ? ?注釋很詳細了,不多說。
總結
以上是生活随笔為你收集整理的muduo之EventLoop的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: muduo源码client/server
- 下一篇: muduo之TcpServer