Thrift异步IO服务器源码分析
http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html
最近在使用?libevent?開發項目,想起之前寫?Thrift源碼剖析?的時候說到關于 TNonblockingServer 以后會單獨寫一篇解析, 現在是時候了,就這篇了。
以下內容依然是基于?thrift-0.9.0?。
概述
現在隨著 Node.js 的興起,很多人著迷 eventloop , 經常是不明真相就會各種追捧,其實 eventloop 只是 一種高并發的解決方案。Thrift 的 TNonblockingServer 就是該解決方案的典型實現之一。
而且,Thrift 的 TNonblockingServer 實現代碼干凈,注釋豐富, 并沒有用到什么奇淫巧計,核心就是使用libevent?進行異步 驅動和狀態的轉換,只要有些?libevent?經驗的人就很容易 能看懂。
想進一步了解?libevent?可以看看?C1000K之Libevent源碼分析?。
事件注冊
Thrift 使用?libevent?作為服務的事件驅動器, libevent 其實就是 epoll 更高級的封裝而已(在linux下是epoll),而struct event?事件是?libevent?編程的最小單元,只要是使用?libevent?就會使用到它,或者是包裝它。
整個 TNonblockingServer 有三個關鍵的地方和 libevent 有關。
1. listener_event
第一種是服務的監聽事件也就是服務負責 listen 和 accept 的 主 socket ,如下。
// Register the server event event_set(&serverEvent_,listenSocket_,EV_READ | EV_PERSIST,TNonblockingIOThread::listenHandler,server_);當新的連接請求進來的時候,TNonblockingIOThread::listenHandler 函數被 觸發,在 TNonblockingIOThread::listenHandler 里主要負責 accept 新連接。
2. pipe_event
第二種比較有意思,這個事件對應的文件描述符是 socket pair ,使用 evutil_socketpair 創建,其實也是調用linux接口 socketpair 搞出來的。這個東西不是之前理解的 網絡通信套接字,在這里可以把它理解成一個管道來使用,如下:
// Create an event to be notified when a task finishes event_set(¬ificationEvent_,getNotificationRecvFD(),EV_READ | EV_PERSIST,TNonblockingIOThread::notifyHandler,this);代碼里面的 getNotificationRecvFD 就是拿這個 socket pair 管道的讀文件描述符, 也就是當這個 socketpair 管道有數據可讀時,該事件就會被觸發,也就是回調函數 TNonblockingIOThread::notifyHandler 會被調用。
其實第二種事件非常好理解,可以類比多線程編程里面的任務隊列, 不同線程之間共享著同一個任務隊列來進行消息的傳遞。 而在 TNonblockingServer 里面,則通過該管道進行消息的傳遞。
3. connection_event
第三種是每個連接的狀態變化事件,每一個 TConnection 代表一個連接, 每一個 TConnection 含有一個 socket 文件描述符,并且當 TConnection 生成之后,會為它注冊一個事件,負責對該 socket 的異步讀寫。 如下:
event_set(&event_, tSocket_->getSocketFD(), eventFlags_,TConnection::eventHandler, this);注意到,每個連接都會注冊一個 第三種事件, 也就是說,程序的整個運行過程中,假設并發連接數為 n , 則第三種事件的數量也為 n,而第一種和第二種始終 只有一個事件。 所以真個程序運行過程中事件的數量是【2 + n】。
socket狀態轉移
因為是異步編程,每個socket都必須設置為非阻塞。 當可讀的事件發生時,則讀,可寫的事件發生時,則寫。 讀和寫兩種操作會互相交替進行,所以我們需要用 狀態值來進行不同的邏輯處理。
TNonblockingServer 里的狀態值有以下三種:
/// Three states for sockets: recv frame size, recv data, and send mode enum TSocketState {SOCKET_RECV_FRAMING,SOCKET_RECV,SOCKET_SEND };需要補充說明的是,要和 Thrift 的 TNonblockingServer 通信,則客戶端 需要使用
shared_ptr<TTransport> transport(new TFramedTransport(socket));來作為傳輸工具,就是因為 TNonblockingServer 的 socket recv 數據是 按 frame 來一幀幀的接受。所以第一個狀態值 SOCKET_RECV_FRAMING 代表進入該狀態就是有幀頭(數據包的大小)可以讀取, 而第二個狀態值 SOCKET_RECV 代表有數據可以讀取,先讀完幀頭才讀該數據。 第三個狀態 SOCKET_SEND 代表有數據可以發送。
每次 rpc 調用的過程的狀態轉移先后過程如下:
SOCKET_RECV_FRAMING -> SOCKET_RECV -> SOCKET_SEND這三個狀態都有可能被重復調用,取決于數據包的大小。
每次 socket 狀態轉移靠 workSocket 函數完成:
/** * Libevent handler called (via our static wrapper) when the connection * socket had something happen. Rather than use the flags [libevent] passed, * we use the connection state to determine whether we need to read or * write the socket. */ void workSocket();app狀態轉移
上面的 socket 狀態轉移,是針對每個連接的數據收發狀態轉移, 和 socket 緊密相關,而這里的 app狀態轉移則是針對整個 rpc 遠程函數調用(不過每次rpc調用其實也是建立在某個連接的基礎之上)。
app狀態的代碼如下:
enum TAppState {APP_INIT,APP_READ_FRAME_SIZE,APP_READ_REQUEST,APP_WAIT_TASK,APP_SEND_RESULT,APP_CLOSE_CONNECTION };狀態的轉移順序如下:
每次app狀態轉移由 TConnetion::transition 函數完成:
/** * This is called when the application transitions from one state into * another. This means that it has finished writing the data that it needed * to, or finished receiving the data that it needed to. */ void transition();狀態3 -> 狀態4 -> 狀態5 轉移很關鍵,涉及到線程池和主線程的交互。 請看下文。
任務的線程池
總所周知的是,異步服務器最適合的場景是高并發,IO 密集型程序。 對于 CPU 密集型的應用場景一般使用多線程服務來解決。 而對于 RPC 服務,TNonblockingServer 想使用異步 IO 來應對高并發。 但是對于 rpc 遠程函數調用,如果被方法的函數是 CPU 密集型的函數, 則運行該函數的過程整個主線程就會被阻塞,也就是傳說中的 【block the whole world】, 對于此,TNonblockingServer 的解決方法是將該函數包裝成一個任務, 然后扔進線程池,以此來避免主線程的阻塞。
線程池本身沒什么好說的,但是在 TNonblockingServer 里 面需要了解的就是 線程池和主線程的交互:
當 TConnetion 的 app狀態 進入 APP_READ_REQUEST 之后 讀取完請求數據之后,則將任務包裝好扔進線程池。 并且將狀態改變(APP_READ_REQUEST -> APP_WAIT_TASK):
// The application is now waiting on the task to finish appState_ = APP_WAIT_TASK;并且將該連接標識為 Idle ,如下函數:
// Set this connection idle so that [libevent] doesn't process more // data on it while we're still waiting for the threadmanager to // finish this task setIdle();setIdle 的目的在于將該連接對應的 socket事件標志位清空, 也就是在 Idle階段不再關心該 socket是否有數據可讀或者可寫。
而當線程池里的某個 Task 運行完畢后,則會觸發主線程的 pipe_event (上文中的已注冊事件種的第二種事件),告知主線程任務已完成。 如下:
// Signal completion back to the libevent thread via a pipe if (!connection_->notifyIOThread()) {throw TException("TNonblockingServer::Task::run: failed write on notify pipe"); }主線程收到通知之后,則會從 狀態4(APP_WAIT_TASK) 轉 移向 狀態5(APP_SEND_RESULT) ,進入向 客戶端發送函數調用結果的過程。
總結
Thrift 的 TNonblockingServer 注釋很豐富,原理清晰。 個人認為基本上是事件驅動服務器的入門教科書級代碼了, 事件驅動服務器核心在于狀態轉移, 因為事件驅動的原因,每次轉換 事件我們都需要保存當前的狀態。 沒啥,都是狀態而已。
哦對了,在下讀源碼的時候習慣加?cout?,然后跑起來看結果, 文末有一份運行示例可以幫助理解,有興趣的可以看看, 修改后的源碼在?MyTNonblockingServer?。
總結
以上是生活随笔為你收集整理的Thrift异步IO服务器源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 由浅入深了解Thrift(三)——Thr
- 下一篇: sar