ACE_Select_Reactor 一 ——入门
? ? ACE Reactor 框架實現(xiàn)了Reactor模式,允許事件驅(qū)動的應(yīng)用對源自許多不同事件源的事件做出反映,比如IO句柄,定時器,以及信號,應(yīng)用重新定義框架所定義的掛鉤方法。框架隨機對其進行分派來處理事件,Reactor負責(zé):(1)檢查多路分離器來自各種事件源的、不同類型的連接和數(shù)據(jù)事件,(2)將這些事件分派給應(yīng)用所定義的處理器,由它進行處理。
? ? 反應(yīng)式服務(wù)器響應(yīng)來自一個或多個事件源的時間,在理想情況下,對時間的響應(yīng)會足夠快,以使所有請求看起來像是被同時處理的,盡管事件處理通常是由單個線程處理的。同步事件多路分離器位于各個反應(yīng)式服務(wù)器的心臟處。這種機制檢測源自許多事件源的事件并對其作出響應(yīng),從而同步地使事件作為服務(wù)器正常執(zhí)行路徑的一部分提供給服務(wù)器。
? ? select()函數(shù)是最為常見的同步事件多路分離器。這個系統(tǒng)函數(shù)在同一組IO句柄上等待指定的事件發(fā)生,當一個或者是多個IO句柄開始活動時,或是在指定的時間過去后,select函數(shù)就會返回。
? ? ACE_Select_Reactor是ACE_Reactor接口的一種實現(xiàn),它使用select同步時間多路分離器函數(shù)來檢測IO和定時器事件,除了支持ACE_Reactor接口的所有特性外,ACE_Select_Reactor類還提供了以下能力:
? ? 1、它支持重入的反應(yīng)器調(diào)用,應(yīng)用可以從正在由統(tǒng)一反應(yīng)器分派的事件處理器中調(diào)用handle_event方法;
? ? 2、它可以被配置為同步化的或異步化的,在線程安全性和降低開銷之間進行折中;
? ? 3、它在再次調(diào)用select函數(shù)之前,分派其句柄集中的所有活動句柄,從而保證了公正性。
?
?
[cpp] view plain copy? ? 在上面的代碼中,在51行,由ACE_Thread_Manager單體派生一個線程,并讓其運行event_loop()函數(shù),在52行,由ACE_Thread_Manager單體派生一個線程,讓其運行controller()函數(shù)。
? ? 在從main函數(shù)返回之前,等待其他兩個線程推出,ACE_Thread_Manager:wait()還會收取兩個線程的退出狀態(tài),以免內(nèi)存泄漏。
ACE_Select_Reactor 2 —— 服務(wù)器網(wǎng)關(guān)
? ? ACE中的流包裝提供面向連接的通信。流數(shù)據(jù)傳輸包裝類包括ACE_SOCK_Stream和ACE_LSOCK_Stream,他們分別包裝TCP/IP和UNIX域Socket協(xié)議數(shù)據(jù)傳輸功能。連接建立類包括針對TCP/IP的ACE_SOCK_Connector和ACE_SOCK_Acceptor,以及針對UNIX域Socket的ACE_LSOCK_Connector和ACE_LSOCK_Acceptor。
?
?
[cpp] view plain copy? ? ?在上面的Server類中,創(chuàng)建了一個被動服務(wù)器,偵聽到來的客戶端連接,在連接建立之后,服務(wù)器接收來自客戶端的數(shù)據(jù),然后關(guān)閉鏈接。
? ? Server類包含的accept_connection()方法使用接收器來將連接接受“進”ACE_SOCK_Stream new_stream_。該操作完成的基本流程是:調(diào)用接收器上的accept(),并將流作為參數(shù)傳入其中。一旦連接已經(jīng)建立進流中,流的包裝方法send()和recv()就可以用來在新建立的鏈路上發(fā)送和接收數(shù)據(jù),還有一個空的ACE_INET_Addr也被傳入接收器的accept()方法,并在其中被設(shè)定為發(fā)起連接的遠程機器地址。
? ? 在連接建立后,服務(wù)器調(diào)用handle_connection()方法,它開始從客戶端那里收取一個預(yù)先知道的單詞,然后將流關(guān)閉。連接關(guān)閉通過調(diào)用流上的close()方法來完成,該方法會釋放所有的Socket資源并終止連接。
?
? ? http://acme-ltt.iteye.com/blog/1455556中提到的ACE_Select_Reactor,在static?ACE_THR_FUNC_RETURN?controller?(void?*arg)函數(shù)中,調(diào)用上述的Server類,搭建基于ACE_Select_Reactor的Socket服務(wù)器網(wǎng)關(guān)。
?
?
? ? 客戶端程序:
?
[cpp] view plain copy? ? ?客戶端由單個Client類表示。Client含有connect_to_server()和send_to_server()方法。
? ? connect_to_server()方法使用類型為ACE_SOCK_Connector的連接器來主動地建立連接。連接的設(shè)置通過調(diào)用連接器上的connect()方法來完成:傳入的參數(shù)為想要連接的機器的遠程地址,以及用于在其中建立連接的空ACE_SOCK_Stream 。遠程機器在運行時參數(shù)中指定。一旦connect()方法成功返回,通過使用ACE_SOCK_Stream封裝類中的send()和recv()方法族,流就可以用于在新建立的鏈路上發(fā)送和接收數(shù)據(jù)。
?
? ? 在該代碼中,一旦連接建立好,send_to_server()方法就會被調(diào)用,將一個iovec類型的數(shù)組,用sendv_n()方法,發(fā)送到服務(wù)器上。
采用 ACE Reactor 實現(xiàn)服務(wù)程序例子
此文版權(quán)屬于作者所有,任何人、媒體或者網(wǎng)站轉(zhuǎn)載、借用都必須征得作者本人同意!
ACE 使用方法及例子,網(wǎng)上有不少,下面貼一段我寫的采用 ACE Reactor 模式寫的 echo 服務(wù)的例子代碼,通過例子可以看出,采用 ACE 開發(fā)多客戶端的服務(wù)程序那是相當簡單的!
代碼中,handle_input(…)和 handle_output(…)都會對 _bufs?進行操作,因為這兩個函數(shù)都是運行在 reactor 的線程里,不會沖突,所以沒有必要對 _bufs 的操作進行鎖操作。
/* $Id: cpp.tpl 3412 2009-11-14 14:23:44Z luozhiyong $ */ /** * \file ACEReactorSvrSample.cpp * * \brief?采用ACE Reactor 實現(xiàn)服務(wù)程序例子 * * \version $Rev: 3412 $ * \author?? * \date???? 2009年09月08日08:17:10 * * \note?修改歷史:<br> * <table> *???? <tr><th>日期</th><th>修改人</th><th>內(nèi)容</th></tr> *???? <tr><td>2009-9-8</td><td></td><td>創(chuàng)建初稿</td> *???? </tr> * </table> */ #include?<ace/Message_Block.h> #include?<ace/Svc_Handler.h> #include?<ace/SOCK_Acceptor.h> #include?<ace/Acceptor.h> #include?<ace/Select_Reactor.h> #include?<list> #include?<string> ? #ifdef?_DEBUG #??? define?ACE_RT_OPT?"d" #else #??? define ACE_RT_OPT #endif ? #if?defined_DLL #??? define?ACE_LIB_THREAD_OPT #else #??? define ACE_LIB_THREAD_OPT "s" #endif ? #pragma?comment(lib,?"ACE"ACE_LIB_THREAD_OPT?ACE_RT_OPT?".lib") class?EchoService :?public?ACE_Event_Handler { public: typedef?ACE_SOCK_STREAM?stream_type; typedef?EchoService?my_type; typedef?ACE_Acceptor<my_type,?ACE_SOCK_ACCEPTOR>?acceptor_type; EchoService() { printf("EchoService創(chuàng)建\n"); } ~EchoService() { printf("EchoService銷毀\n"); } //?響應(yīng)socket 已經(jīng)打開,連接已經(jīng)建立事件 int?open(void*) { //?注冊讀事件 if?(reactor()->register_handler(this,ACE_Event_Handler::READ_MASK)) { //?無法注冊handler return?-1; } //?注冊寫事件 if?(reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK)) { //?無法注冊handler return?-1; } //?取消寫事件,等待有數(shù)據(jù)時喚醒 reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK); printf("EchoService已打開\n"); return?0; } //?響應(yīng)有數(shù)據(jù)可讀事件 int?handle_input(ACE_HANDLE) { char?buf[24]; ssize_t?c?=?_peer.recv(buf,sizeof(buf) - 1); if?(c?== 0) { //?連接已經(jīng)關(guān)閉 return?-1; } _bufs.push_back(std::string(buf,c)); if?(_bufs.size() == 1) { //?緩沖區(qū)尺寸為1 說明原來緩沖區(qū)為空,寫事件是取消的,這里喚醒它 reactor()->schedule_wakeup(this,ACE_Event_Handler::WRITE_MASK); } return?0; } //?響應(yīng)可以發(fā)送數(shù)據(jù)了事件 int?handle_output(ACE_HANDLE) { while?(!_bufs.empty()) { std::string&buf(*_bufs.begin()); char?const*??????s(buf.c_str()); char?const*const?e(s?+buf.size()); while?(s?!=e) { ssize_t?c(_peer.send(s,e?-?s)); if?(c?== -1 ||c?== 0) { //?發(fā)送不成功不論發(fā)送過程中是否發(fā)生阻塞, if?(ACE_OS::last_error() ==EWOULDBLOCK) { //?輸出緩沖區(qū)滿,無法再發(fā)送數(shù)據(jù)了(如果你還是繼續(xù)發(fā)送數(shù)據(jù),發(fā)送會阻塞的) break; }else{ //?連接已關(guān)閉 break; } }else{ s?+=?c; } } if?(s?==e) { _bufs.pop_front(); }else{ buf?=?std::string(s,e?-?s); break; } } if?(_bufs.empty()) { //?緩沖區(qū)為空,取消寫事件監(jiān)聽 reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK); } //?不論發(fā)送是否成功都返回0,因為,如果發(fā)送失敗,handle_input 也會發(fā)生讀失敗事件, //?錯誤處理有handle_input 返回-1 來觸發(fā) return?0; } int?handle_close(ACE_HANDLE?=?ACE_INVALID_HANDLE,?ACE_Reactor_Mask?mask?=ACE_Event_Handler::ALL_EVENTS_MASK) { if?(mask?==ACE_Event_Handler::WRITE_MASK) return?0; _peer.close(); delete?this; return?0; } //?這個函數(shù)主要給reactor::register_handler 時使用的 ACE_HANDLE?get_handle?()?const { return?_peer.get_handle(); } //?這個函數(shù)主要給acceptor 使用的 stream_type&?peer() { return?_peer; } //?這個函數(shù)主要給acceptor 使用的 int?close?(u_long?= 0) { return?handle_close(); } private: stream_type?_peer; std::list<std::string>_bufs; }; int?main(int?/*argc*/,char*?/*argv*/[]) { u_short?port?= 20001; ACE_Reactor::instance(newACE_Reactor(newACE_Select_Reactor,?true)); EchoService::acceptor_typeacceptor; ACE_INET_Addr?svrAddr(port); if?(acceptor.open(svrAddr)) { fprintf(stderr,"服務(wù)打開失敗:%s\n",ACE_OS::strerror(ACE_OS::last_error())); return?1; }else{ fprintf(stdout,"服務(wù)已打開,端口為:%u\n",port); ACE_Reactor::instance()->run_reactor_event_loop(); return?0; } }總結(jié)
以上是生活随笔為你收集整理的ACE_Select_Reactor 一 ——入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高性能I/O设计模式Reactor和Pr
- 下一篇: ACE_Reactor(二)ACE_De