muduo源码client/server通信流程
? ? ? ? ? 今天來學習一下muduo源碼中client和server間的大致通信流程,以echo服務為例,先看一下echo對面的main函數代碼。
#include "examples/simple/echo/echo.h"#include "muduo/base/Logging.h" #include "muduo/net/EventLoop.h"#include <unistd.h>// using namespace muduo; // using namespace muduo::net;int main() {LOG_INFO << "pid = " << getpid(); //getpid()獲取進程號muduo::net::EventLoop loop;muduo::net::InetAddress listenAddr(2007); //綁定IP地址和端口EchoServer server(&loop, listenAddr);server.start();loop.loop(); }? ? ? ?先實例化一個loop,這會調用loop的構造函數,看一下loop的構造函數。
EventLoop::EventLoop(): looping_(false), //判斷是否在loopquit_(false), //判斷是否退出的標志eventHandling_(false), //處理handevent的標志callingPendingFunctors_(false), //判斷當前是不是在執行方法隊列iteration_(0),threadId_(CurrentThread::tid()), //當前線程IDpoller_(Poller::newDefaultPoller(this)), //創建一個 poll 或 epoll 對象timerQueue_(new TimerQueue(this)), //創建一個計時器wakeupFd_(createEventfd()), //發送喚醒loop消息的描述符,隨便寫點消息即可喚醒wakeupChannel_(new Channel(this, wakeupFd_)), //wakeupChannel_用來自己給自己通知的一個通道,該通道會納入到poller來管理currentActiveChannel_(NULL) //當前活躍的channel鏈表指針 {LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread) //判斷是否是本線程的loop,是一個loop類型的指針{LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_; //用LOG_FATAL終止abort它}else{t_loopInThisThread = this; //this賦給線程局部數據指針}//設定wakeupChannel的回調函數,即EventLoop自己的的handleRead函數wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this)); //channel->handleEventWithGuard會調用到handleRead// we are always reading the wakeupfdwakeupChannel_->enableReading(); //注冊wakeupFd_到poller }? ? ?loop就相當于一個reactor,loop構造函數new了epoll,創建了喚醒描述符,并注冊到epoll。接著對EchoServer進行了構造,調用了start函數。
EchoServer::EchoServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr): server_(loop, listenAddr, "EchoServer") {server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1)); //連接到達server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3)); //消息到達 }void EchoServer::start() {server_.start(); }? ? ??EchoServer構造函數中設置了連接達到和消息達到的回調函數,并構建了TcpServer。再看TcpServer的構造函數。
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(CHECK_NOTNULL(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),nextConnId_(1) { /* * 設置回調函數,當有客戶端請求時,Acceptor接收客戶端請求,然后調用這里設置的回調函數* 回調函數用于創建TcpConnection連接*/acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2)); }? ? ??TcpServer構造函數new了一個acceptor_,設置了當有新連接達到時該調用的回調函數。看一下acceptor的構造函數。
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),//初始化創建sockt fdacceptChannel_(loop, acceptSocket_.fd()),//初始化channellistenning_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {assert(idleFd_ >= 0);acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));// //有可讀事件,當fd可讀時調用回調函數hanleRead }? ? ?Acceptor構造函數中設置了當有可讀事件是該調用的構造函數,然后start()函數和loop()函數開始調用。
void TcpServer::start() {if (started_.getAndSet(1) == 0){threadPool_->start(threadInitCallback_);//啟動線程池,threadInitCallback_創建好所有線程后調用的回調函數assert(!acceptor_->listenning());loop_->runInLoop( //直接調用linsten函數std::bind(&Acceptor::listen, get_pointer(acceptor_)));} } void EventLoop::loop() {assert(!looping_);assertInLoopThread(); //事件循環必須在IO線程中,即創建該evenloop的線程looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear(); //activeChannels_是一個vector等待io復用函數返回pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //調用poll返回活動的事件,有可能是喚醒返回的++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priority 按優先級排序//處理IO事件eventHandling_ = true;for (Channel* channel : activeChannels_) //遍歷通道來進行處理{currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_); //pollReturnTime_是poll返回的時刻}currentActiveChannel_ = NULL; //處理完了賦空eventHandling_ = false;//執行方法隊列中的方法[方法隊列functors,我們可以跨線程的往里面添加新的方法,這些方法會在處理完io事件后執行]doPendingFunctors(); //這個設計也能夠進行計算任務} void Acceptor::listen() {loop_->assertInLoopThread(); //保證是在IO線程listenning_ = true;acceptSocket_.listen();acceptChannel_.enableReading(); 注冊可讀事件 }? ? ? ?start函數中在loop里面調用了Acceptor的listen函數注冊可讀事件(將可讀事件加入epoll),接著loop函數在while循環里面調用poller_->poll()。接著看一下epoll的poll函數。
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) {LOG_TRACE << "fd total count " << channels_.size();int numEvents = ::epoll_wait(epollfd_,&*events_.begin(), //events_已初始化static_cast<int>(events_.size()), //監控套接字的數目timeoutMs);int savedErrno = errno;Timestamp now(Timestamp::now());if (numEvents > 0){LOG_TRACE << numEvents << " events happened";fillActiveChannels(numEvents, activeChannels);if (implicit_cast<size_t>(numEvents) == events_.size()) //如果返回的事件數目等于當前事件數組大小,就分配2倍空間{events_.resize(events_.size()*2);}}else if (numEvents == 0){LOG_TRACE << "nothing happened";}else{// error happens, log uncommon onesif (savedErrno != EINTR){errno = savedErrno;LOG_SYSERR << "EPollPoller::poll()";}}return now; }? ? ? ?代碼中有調用epoll_wait函數。
? ? ? ?當有新連接到達時,epoll返回,currentActiveChannel_->handleEvent(上述loop.loop函數中)被調用,看一下對應代碼。
//處理所有發生的事件,如果活著,底層調用handleEventWithGuard void Channel::handleEvent(Timestamp receiveTime) 事件到來調用handleEvent處理 {std::shared_ptr<void> guard; //守護if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);} }//處理所有發生的事件 //EPOLLIN :表示對應的文件描述符可以讀; //EPOLLOUT:表示對應的文件描述符可以寫; //EPOLLPRI:表示對應的文件描述符有緊急的數據可讀 //EPOLLERR:表示對應的文件描述符發生錯誤; //EPOLLHUP:表示對應的文件描述符被掛斷; //EPOLLET:表示對應的文件描述符有事件發生; void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; LOG_TRACE << reventsToString();if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) //判斷返回事件類型{if (logHup_){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL) //不合法文件描述符{LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) //POLLRDHUP是對端關閉連接事件,如shutdown等{if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false; }? ? ? 是可讀事件,readCallback_被調用,[acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this))]有設置,則Acceptor::handleRead()函數被調用。
void Acceptor::handleRead() //新連接到達由acccptor處理 {loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);//這里時真正接收連接if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);//將新連接信息傳送到回調函數中}else//沒有回調函數則關閉client對應的fd{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}} }? ? ?newConnectionCallback_被調用,[acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));],則調用tcpServer中的newConnection函數。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();//從事件驅動線程池中取出一個線程給TcpConnection /* 為TcpConnection生成獨一無二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根據sockfd獲取tcp連接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 創建一個新的TcpConnection代表一個Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));/* 添加到所有tcp 連接的map中,鍵是tcp連接獨特的名字(服務器名+客戶端<地址,端口>) */connections_[connName] = conn;/* 為tcp連接設置回調函數(由用戶提供) */conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);/* * 關閉回調函數,由TcpServer設置,作用是將這個關閉的TcpConnection從map中刪除* 當poll返回后,發現被激活的原因是EPOLLHUP,此時需要關閉tcp連接* 調用Channel的CloseCallback,進而調用TcpConnection的handleClose,進而調用removeConnection*/conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/* * 連接建立后,調用TcpConnection連接建立成功的函數* 1.新建的TcpConnection所在事件循環是在事件循環線程池中的某個線程* 2.所以TcpConnection也就屬于它所在的事件驅動循環所在的那個線程* 3.調用TcpConnection的函數時也就應該在自己所在線程調用* 4.所以需要調用runInLoop在自己的那個事件驅動循環所在線程調用這個函數* 5.當前線程是TcpServer的主線程,不是TcpConnection的線程,如果在這個線程直接調用會阻塞監聽客戶端請求* 6.其實這里不是因為線程不安全,即使在這個線程調用也不會出現線程不安全,因為TcpConnection本就是由這個線程創建的*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }connectionCallback_和messageCallback_就是echo.cc中設置的,connectEstablished會調用了用戶設置的connectionCallback.
新連接到達之后就new了一個TcpConnection,看一下TcpConnection的構造函數。
TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting), //輸入輸出緩沖區reading_(true),socket_(new Socket(sockfd)), //RAII管理已連接套接字channel_(new Channel(loop, sockfd)), //使用Channel管理套接字上的讀寫localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64*1024*1024) {//設置事件分發器的各事件回調 (將TcpConnection類的四個事件處理函數設置為事件分發器對應的回調函數)channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;socket_->setKeepAlive(true); }設置了套接字可讀,可讀對應的回調函數。當client發送數據過來時,TcpConnection::handleRead被調用。
void TcpConnection::handleRead(Timestamp receiveTime) {loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){//調用回調函數,使用shared_from_this()得到自身的shared_ptr,延長了該對象的生命期,保證了它的生命期長過messageCallback_函數,messageCallback_能安全的使用它messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_SYSERR << "TcpConnection::handleRead";handleError();} }? ?最終調用到了用戶設置的messageCallback_,到此流程大致分析完畢。
tcpserver: setNewConnectionCallback //新連接到達(新連接到達由acccptor處理)acceptor:setReadCallback //有可讀事件linsten:enableReading //注冊可讀事件echo:setConnectionCallback //連接到達setMessageCallback //數據到達TcpConnection:setReadCallback //可讀事件setWriteCallback //可寫事件? ? 上面是各個類設置的對應的回調函數,當新連接達到時,acceptor處理,然后到tcpserver,然后到TcpConnection,在后面就是TcpConnection和echo進行交互。
總結
以上是生活随笔為你收集整理的muduo源码client/server通信流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: git reset和git revert
- 下一篇: muduo之EventLoop