muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭
通常服務器在處理客戶端連接請求時,為了不阻塞在accept函數上,會將監聽套接字注冊到io復用函數中,當客戶端請求連接時,監聽套接字變為可讀,隨后在回調函數調用accept接收客戶端連接。muduo將這一部分封裝成了Acceptor類,用于執行接收客戶端請求的任務。
類的定義如下,主要就是監聽套接字變為可讀的回調函數
class EventLoop; class InetAddress;/// /// Acceptor of incoming TCP connections. /// /* * 對TCP socket, bind, listen, accept的封裝 * 將sockfd以Channel的形式注冊到EventLoop的Poller中,檢測到sockfd可讀時,接收客戶端*/ class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);~Acceptor();/* 由服務器TcpServer設置的回調函數,在接收完客戶端請求后執行,用于創建TcpConnection */void setNewConnectionCallback(const NewConnectionCallback& cb){ newConnectionCallback_ = cb; }bool listenning() const { return listenning_; }/* 調用listen函數,轉為監聽套接字,同時將監聽套接字添加到Poller中 */void listen();private:/* 回調函數,當有客戶端請求連接時執行(監聽套接字變為可讀) */void handleRead();/* 事件驅動主循環 */EventLoop* loop_;/* 封裝socket的一些接口 */Socket acceptSocket_;/* Channel,保存著sockfd,被添加到Poller中,等待被激活 */Channel acceptChannel_;/* * 當有客戶端連接時首先內部接收連接,然后調用的用戶提供的回調函數* 客戶端套接字和地址作為參數傳入*/NewConnectionCallback newConnectionCallback_;bool listenning_;/* * Tcp連接建立的流程* 1.服務器調用socket,bind,listen開啟監聽套接字監聽客戶端請求* 2.客戶端調用socket,connect連接到服務器* 3.第一次握手客戶端發送SYN請求分節(數據序列號)* 4.服務器接收SYN后保存在本地然后發送自己的SYN分節(數據序列號)和ACK確認分節告知客戶端已收到* 同時開啟第二次握手* 5.客戶端接收到服務器的SYN分節和ACK確認分節后保存在本地然后發送ACK確認分節告知服務器已收到* 此時第二次握手完成,客戶端connect返回* 此時,tcp連接已經建立完成,客戶端tcp狀態轉為ESTABLISHED,而在服務器端,新建的連接保存在內核tcp* 連接的隊列中,此時服務器端監聽套接字變為可讀,等待服務器調用accept函數取出這個連接* 6.服務器接收到客戶端發來的ACK確認分節,服務器端調用accept嘗試找到一個空閑的文件描述符,然后* 從內核tcp連接隊列中取出第一個tcp連接,分配這個文件描述符用于這個tcp連接* 此時服務器端tcp轉為ESTABLISHED,三次握手完成,tcp連接建立* * 服務器啟動時占用的一個空閑文件描述符,/dev/null,作用是解決文件描述符耗盡的情況* 原理如下:* 當服務器端文件描述符耗盡,當客戶端再次請求連接,服務器端由于沒有可用文件描述符* 會返回-1,同時errno為EMFILE,意為描述符到達hard limit,無可用描述符,此時服務器端* accept函數在獲取一個空閑文件描述符時就已經失敗,還沒有從內核tcp連接隊列中取出tcp連接* 這會導致監聽套接字一直可讀,因為tcp連接隊列中一直有客戶端的連接請求* * 所以服務器在啟動時打開一個空閑描述符/dev/null(文件描述符),先站著'坑‘,當出現上面* 情況accept返回-1時,服務器暫時關閉idleFd_讓出'坑',此時就會多出一個空閑描述符* 然后再次調用accept接收客戶端請求,然后close接收后的客戶端套接字,優雅的告訴* 客戶端關閉連接,然后再將'坑'占上*/int idleFd_;};一個不好理解的變量是idleFd_;,它是一個文件描述符,這里是打開"/dev/null"文件后返回的描述符,用于解決服務器端描述符耗盡的情況。
如果當服務器文件描述符耗盡后,服務器端accept還沒等從tcp連接隊列中取出連接請求就已經失敗返回了,此時內核tcp隊列中一直有客戶端請求,內核會一直通知監聽套接字,導致監聽套接字一直處于可讀,在下次直接poll函數時會直接返回。
解決的辦法就是在服務器剛啟動時就預先占用一個文件描述符,通??梢允谴蜷_一個文件,這里是"/dev/null"。此時服務器就有一個空閑的文件描述符了,當出現上述情況無法取得tcp連接隊列中的請求時,先關閉這個文件讓出一個文件描述符,此時調用accept函數再次接收,由于已經有一個空閑的文件描述符了,accept會正常返回,將連接請求從tcp隊列中取出,然后優雅的關閉這個tcp連接(調用close函數),最后再打開"/dev/null"這個文件把”坑“占住。
成員函數的實現也有比較重點的地方,首先是構造函數
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listenning_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {assert(idleFd_ >= 0);/* * setsockopt設置套接字選項SO_REUSEADDR,對于端口bind,如果這個地址/端口處于TIME_WAIT,也可bind成功* int flag = 1;* setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));*/acceptSocket_.setReuseAddr(true);/** setsockopt設置套接字選項SO_REUSEPORT,作用是對于多核cpu,允許在同一個<ip, port>對上運行多個相同服務器* 內核會采用負載均衡的的方式分配客戶端的連接請求給某一個服務器*/acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);/* Channel設置讀事件的回調函數,此時還沒有開始監聽這個Channel,需要調用Channel::enableReading() */acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }構造函數中為用于監聽的套接字設置了SO_REUSEPORT和SO_REUSEADDR屬性,一個是端口重用,一個是地址重用。
- SO_REUSEPORT,端口重用,可以使用還處于TIME_WAIT狀態的端口
- SO_REUSEADDR,地址重用,服務器可以同時建立多個用于監聽的socket,每個socket綁定的地址端口都相同,內核會采用負載均衡的方法將每個將每個客戶端請求分配給某一個socket,可以很大程序的提高并發性,充分利用CPU資源
acceptChannel_用于保存這個用于監聽的套接字,綁定回調函數,在合適的時機注冊到Poller上(調用listen時)
void Acceptor::listen() {loop_->assertInLoopThread();listenning_ = true;acceptSocket_.listen();/* * 開始監聽Channel,也就是設置fd關心的事件(EPOLLIN/EPOLLOUT等),然后添加到Poller中 * Poller中保存著所有注冊到EventLoop中的Channel*/acceptChannel_.enableReading(); }比較重要的是事件處理函數,當監聽套接字可讀時,調用accept接收客戶端請求,如果描述符耗盡,釋放idleFd_重新accept,然后關閉,再占用idleFd_
/** 當有客戶端嘗試連接服務器時,監聽套接字變為可讀,epoll_wait/poll返回* EventLoop處理激活隊列中的Channel,調用對應的回調函數* 監聽套接字的Channel的回調函數是handleRead(),用于接收客戶端請求*/ void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;/* * 如果設置了回調函數,那么就調用,參數是客戶端套接字和地址/端口* 否則就關閉連接,因為并沒有要處理客戶端的意思* * 這個回調函數是TcpServer中的newConnection,用于創建一個TcpConnection連接*/if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);}else{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.// /* 解決服務器端描述符耗盡的情況,原因見.h文件 */if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}} }對文件描述符耗盡的處理比較重要,以前沒怎么接觸過
回調函數中調用的newConnectionCallback_函數是在Acceptor創建之初由TcpServer設置的(TcpServer表示服務器,內有一個監聽類Acceptor),這個函數主要用于初始化一個TcpConnection,一個TcpConnection對象代表著一個tcp連接
TcpConnection的定義主要都是寫set*函數,成員變量比較多,但是重要的就
- 事件驅動循環loop_
- 用于tcp通信的socket_
- 用于監聽sockfd的channel_
- 輸入輸出緩沖區inputBuffer_/outputBuffer_
- 由TcpServer提供的各種回調函數
首先是構造函數的實現,主要是為Channel提供各種回調函數
/* * 構造函數,設置當fd就緒時調用的回調函數* Channel代表一個對fd事件的監聽*/ TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting),reading_(true),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64*1024*1024) {/* 設置各種回調函數 */channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;/** 設置KEEP-ALIVE屬性,如果客戶端很久沒有和服務器通訊,tcp會自動判斷客戶端是否還處于連接(類似心跳包)* * int setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &sockopt, static_cast<socklen_t>(sizeof(sockopt)));*/socket_->setKeepAlive(true); }回調函數的設置對應了Channel的hanleEvent函數中根據不同激活原因調用不同回調函數(handleEvent調用handleEventWithGuard)。
另外,handleEvent中的tie_是對TcpConnection的弱引用(在后面設置),因為回調函數都是TcpConnection的,所以在調用之前需要確保TcpConnection沒有被銷毀,所以將tie_提升為shared_ptr判斷TcpConnection是否還存在,之后再調用TcpConnection的一系列回調函數
當TcpServer創建完TcpConnection后,會設置各種會調用書,然后調用TcpConnection的connectEstablished函數,主要用于將Channel添加到Poller中,同時調用用戶提供的連接建立成功后的回調函數
TcpServer創建并設置TcpConnection的部分,可以看到,TcpServer會將用戶提供的所有回調函數都傳給TcpConnection,然后執行TcpConnection的connectEstablished函數,這個函數的執行要放到它所屬的那個事件驅動循環線程做,不要阻塞TcpServer線程(這個地方不是為了線程安全性考慮,因為TcpConnection本身就是在TcpServer線程創建的,暴露給TcpServer線程很正常,而且TcpServer中也記錄著所有創建的TcpConnection,這里的主要目的是不阻塞TcpServer線程,讓它繼續監聽客戶端請求)
connectEstablished函數如下
/* * 1.創建服務器(TcpServer)時,創建Acceptor,設置接收到客戶端請求后執行的回調函數* 2.Acceptor創建監聽套接字,將監聽套接字綁定到一個Channel中,設置可讀回調函數為Acceptor的handleRead* 3.服務器啟動,調用Acceptor的listen函數創建監聽套接字,同時將Channel添加到Poller中* 4.有客戶端請求連接,監聽套接字可讀,Channel被激活,調用可讀回調函數(handleRead)* 5.回調函數接收客戶端請求,獲得客戶端套接字和地址,調用TcpServer提供的回調函數(newConnection)* 6.TcpServer的回調函數中創建TcpConnection代表這個tcp連接,設置tcp連接各種回調函數(由用戶提供給TcpServer)* 7.TcpServer讓tcp連接所屬線程調用TcpConnection的connectEstablished* 8.connectEstablished開啟對客戶端套接字的Channel的可讀監聽,然后調用用戶提供的回調函數*/ void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);/* Channel中對TcpConnection的弱引用在這里設置 */channel_->tie(shared_from_this());/* 設置對可讀事件的監聽,同時將Channel添加到Poller中 */channel_->enableReading();/* 用戶提供的回調函數,在連接建立成功后調用 */connectionCallback_(shared_from_this()); }至此tcp連接建立完成,在用戶提供的回調函數中,傳入的參數便是這個TcpConnection的shared_ptr,用戶可以使用TcpConnection::send操作向客戶端發送消息(放到后面)
有連接的建立就有連接的關閉,當客戶端主動關閉(調用close)時,服務器端對應的Channel被激活,激活原因為EPOLLHUP,表示連接已關閉,此時會調用TcpConnection的回調函數handleClose,在這個函數中,TcpConnection處理執行各種關閉動作,包括
- 將Channel從Poller中移除
- 調用TcpServer提供的關閉回調函數,將自己從TcpServer的tcp連接map中移除
- 調用客戶提供的關閉回調函數(如果有的話)
connectionCallback_是由用戶提供的,連接建立/關閉時調用,主要調用closeCallback_函數(由TcpServer)提供
這個函數主要就存在線程不安全的問題,原因就是此時的線程是TcpConnection所在線程
函數執行順序為:
EventLoop::loop->Poller::poll->Channel::handleEvent->TcpConnection::handleClose->TcpServer::removeConnection
此時就將TcpServer暴露給其他線程,導致線程不安全的問題
為了減輕線程不安全帶來的危險,盡量將線程不安全的函數縮短,muduo中使用runInLoop直接將要調用的函數放到自己線程執行,轉換到線程安全,所以這部分只有這一條語句是線程不安全的
removeConnectionInLoop函數如下
/* * 這個函數是線程安全的,因為是由TcpServer所在事件驅動循環調用的*/ void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) {loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();/* * 為什么不能用runInLoop, why? *//* * std::bind綁定函數指針,注意是值綁定,也就是說conn會復制一份到bind上* 這就會延長TcpConnection生命期,否則* 1.此時對于TcpConnection的引用計數為2,參數一個,connections_中一個* 2.connections_刪除掉TcpConnection后,引用計數為1* 3.removeConnectionInLoop返回,上層函數handleClose返回,引用計數為0,會被析構* 4.bind會值綁定,conn復制一份,TcpConnection引用計數加1,就不會導致TcpConnection被析構*/ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn)); }比較重要的地方是TcpConnection生命期的問題,注釋中也有提及。因為muduo中對象使用智能指針shared_ptr存儲的,所以只有當shard_ptr的引用計數為0時才會析構它保存的對象。對于TcpConnection而言,它的引用計數在
所以如果在第7步不使用std::bind增加TcpConnection生命期的話,TcpConnection可能在handleClose函數返回后就銷毀了,根本不能執行TcpConnection::connectDestroyed函數
總結
以上是生活随笔為你收集整理的muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----将链表
- 下一篇: 每天一道LeetCode-----删除序