| 當 OS 平臺支持異步操作時,一種高效而方便的實現高性能 Web 服務器的方法是使用前攝式事件分派。使用前攝式事件分派模型設計的 Web 服務器通過一或多個線程控制來處理異步操作的完成。這樣,通過集成完成事件多路分離(completion event demultiplexing)和事件處理器分派,前攝器模式簡化了異步的 Web 服務器。 異步的 Web 服務器將這樣來利用前攝器模式:首先讓 Web 服務器向 OS 發出異步操作,并將回調方法登記到 Completion Dispatcher(完成分派器),后者將在操作完成時通知 Web 服務器。于是 OS 代表 Web 服務器執行操作,并隨即在一個周知的地方將結果排隊。Completion Dispatcher 負責使完成通知出隊,并執行適當的、含有應用特有的 Web 服務器代碼的回調。 使用前攝器模式的主要優點是可以啟動多個并發操作,并可并行運行,而不要求應用必須擁有多個線程。操作被應用異步地啟動,它們在 OS 的 I/O 子系統中運行直到完成。發起操作的線程現在可以服務 另外的請求了。 在ACE中,可以通過ACE_Proactor實現前攝器模式。實現方式如下。 1。創建服務處理器: Proactor框架中服務處理器均派生自ACE_Service_Handler,它和Reactor框架的事件處理器非常類似。當發生IO操作完成事件時,會觸發相應的事件完成會調函數。 2。實現服務處理器IO操作 Proactor框架中所有的IO操作都由相應的異步操作類來完成,這些異步操作類都繼承自ACE_Asynch_Operation。常用的有以下幾種。 ACE_Asynch_Read_Stream, 提供從TCP/IP socket連接中進行異步讀操作. ACE_Asynch_Write_Stream, 提供從TCP/IP socket連接中進行異步寫操作. 使用這些操作類的一般方式如下: 初始化 將相關的操作注冊到服務處理器中,一般可通過調用其open方法實現。 發出IO操作 發出異步IO操作請求,該操作不會阻塞,具體的IO操作過程由操作系統異步完成。 IO操作完成回調處理 異步IO操作完成后,OS會觸發服務處理器中的相應回調函數,可通過該函數的ACE_Asynch_Result參數獲取相應的返回值。 3。使用連接器或接受器和遠端進行連接 ACE為Proactor框架提供了兩個工廠類來建立TCP/IP連接。 ACE_Asynch_Acceptor, 用于被動地建立連接 ACE_Asynch_Connector 用于主動地建立連接 當遠端連接建立時,連接器或接受器便會創建相應的服務處理器,從而可以實現服務處理。 4。啟動Proactor事件分發處理 啟動事件分發處理只需如下調用: ????while(true) ????????ACE_Proactor::instance ()->handle_events (); 5。程序示例 服務器端: 服務器端簡單的實現了一個EchoServer,流程如下: 當客戶端建立連接時,首先發出一個異步讀的異步請求,當讀完成時,將所讀的數據打印出來,并發出一個新的異步請求。 #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Acceptor.h"
class HA_Proactive_Service : public ACE_Service_Handler { public: ~HA_Proactive_Service () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); }
virtual void open (ACE_HANDLE h, ACE_Message_Block&) { ???? this->handle (h); ???? if (this->reader_.open (*this) != 0 ) ???? { ???????? ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p/n"), ???????????? ACE_TEXT ("HA_Proactive_Service open"))); ???????? delete this; ???????? return; ???? }
???? ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024); ???? if (this->reader_.read (*mb, mb->space ()) != 0) ???? { ???????? ACE_OS::printf("Begin read fail/n"); ???????? delete this; ???????? return; ???? }
???? return; }
//異步讀完成后會調用此函數 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { ???? ACE_Message_Block &mb = result.message_block (); ???? if (!result.success () || result.bytes_transferred () == 0) ???? { ???????? mb.release (); ???????? delete this; ???????? return; ???? }
???? mb.copy("");????//為字符串添加結束標記'/0' ???? ACE_OS::printf("rev:/t%s/n",mb.rd_ptr()); ???? mb.release();
???? ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024); ???? if (this->reader_.read (*nmb, nmb->space ()) != 0)
???? return; }
private: ACE_Asynch_Read_Stream reader_; char buffer[1024]; };
int main(int argc, char *argv[]) { ????int port=3000; ????ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor; ???? ????if (acceptor.open (ACE_INET_Addr (port)) == -1) ????????return -1;
????while(true) ????????ACE_Proactor::instance ()->handle_events (); ???? ????return 0; } 客戶端: 客戶端代碼比較簡單,就是每隔1秒鐘將當前的系統時間轉換為字符串形式通過異步形式發送給服務器,發送完成后,釋放時間字符的內存空間。 #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h"
class HA_Proactive_Service : public ACE_Service_Handler { public: ~HA_Proactive_Service () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); }
virtual void open (ACE_HANDLE h, ACE_Message_Block&) { ???? this->handle (h); ???? if (this->writer_.open (*this) != 0 ) ???? { ???????? ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p/n"), ???????????? ACE_TEXT ("HA_Proactive_Service open"))); ???????? delete this; ???????? return; ???? }
???? ACE_OS::printf("connceted");
???? for(int i=0;i<10;i++)????//每隔秒中發送時間至服務器 ???? { ???????? ACE_OS::sleep(1); ???????? time_t now = ACE_OS::gettimeofday().sec(); ???????? char *time = ctime(&now);????????//獲取當前時間的字符串格式 ???????? ACE_Message_Block *mb = new ACE_Message_Block(100); ???????? mb->copy(time);
???????? if (this->writer_.write(*mb,mb->length()) !=0) ???????? { ???????????? ACE_OS::printf("Begin read fail/n"); ???????????? delete this; ???????????? return; ???????? } ???? }
???? return; }
//異步寫完成后會調用此函數 virtual void handle_write_dgram (const ACE_Asynch_Write_Stream::Result &result) { ???? ACE_Message_Block &mb = result.message_block (); ???? mb.release(); ???? return; }
private: ACE_Asynch_Write_Stream writer_; };
int main(int argc, char *argv[]) { ???? ????ACE_INET_Addr addr(3000,"192.168.1.142");
????HA_Proactive_Service *client = new HA_Proactive_Service(); ????ACE_Asynch_Connector<HA_Proactive_Service> connector; ???? ????connector.open(); ????if (connector.connect(addr) == -1) ????????return -1;
????while(true) ????????ACE_Proactor::instance ()->handle_events (); ???? ????return 0; } |