ACE_Reactor(二)ACE_Dev_Poll_Reactor
ACE_Reactor一些重要的細節
看下具體ACE_Dev_Poll_Reactor的實現,如何將一個處理集和handle關聯起來,代碼如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
step1
首先就是要將handle和處理這個handle的event_handler綁定,若已有處理的event_handler,則只單純將可能新增的mask添加進監測的epoll函數參數中。
這里的handler_rep_是一個Handler_Repository類的指針,其內部是一個map表可以對handle和event_handler進行有效關聯對應和存儲。
step2
從上面可以看出,如果是epoll的話,且注冊了notify handler, 則epoll_wait會自動將EPOLLONESHOT標識添加上,這樣在事件被監測到得分發處理時就不用再手動的去suspend。
Notify機制的好處是:
1. 讓反應器擁有了處理無限處理器的能力
2. 其次是提供了必要時解除反應器事件檢查阻塞的能力。
Reactor的notify()讓用戶直接提供給Reactor待通知反應器的指針,而這些處理器無需注冊到反應器上,從而提供了無限的擴展能力。
不過在ACE中顯然Notify機制有不同的實現方式。一種是采用ACE_Pipe的實現,這個屬于默認的方式。另一種則是內存隊列保存Notify消息。你可以通過定義ACE_HAS_REACTOR_NOTIFICATION_QUEUE的宏編譯ACE,這樣ACE將不使用ACE_Pipe作為Notify消息的管道,而使用一個自己的內存隊列保存Notify消息。
但是需要注意的是,在使用ACE_Pipe的實現中,如果使用不當,可能會造成嚴重的后果。
如果你用不到Notify機制,最好在ACE_Reactor初始化的時候徹底關閉Notify機制。很多Reactor的初始化函數都提供了關閉notify pipe的方式。比如ACE_Select_Reactor_T的open函數的disable_notify_pipe參數。當其為1的時候表示關閉notify 管道。
潛在的風險:
1. 處理器被銷毀后,排隊等候的對應通知才被分派
2. ACE_Pipe的數據緩沖是有限的,大量通知到來可能會造成阻塞甚至死鎖
因為通知的信息(包括處理器指針和掩碼)以流數據的方式被寫入ACE_Pipe,不能進行遍歷查找,所以當對應處理器被銷毀后,如果其在ACE_Pipe的數據區中還有存儲有通知,則ACE_Reactor將會在分派中使用懸空指針,造成未知的后果。另外,在ACE_Pipe的數據緩沖已滿情況下,在處理器的回調中依然發送通知,就會因阻塞發生死鎖。
通過
int ACE_Dev_Poll_Reactor ::notify(eh, mask,timeout) {//pass over both the event_handler* and * the mask o allow the caller to dictate which Event_Handler method the receiver invokes.Note that this call can timeout.n = this->notify_handle_->notify(eh, mask, timeout);return n=-1?-1:0; }- 1
- 2
- 3
- 4
- 5
- 6
在源碼的owner接口中,看到了這樣一句注釋
//There is no need to set the owner of the event loop.Multiple threads may invoke the event loop simulataneously.
說明有可能多個線程同時調用ACE_Dev_Poll_Reactor的event_loop函數,網上很多示例代碼寫的都比較簡單,例如:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
這里有一個問題,監測事件觸發和同時執行的是不是就只有一個main函數的這一個主線程就完全搞定了。現在看是這樣的,但是實際中應該是不同線程都可以去調用上述的run_reactor_event_loop()來完成事件的監測和觸發才對。
上述的Notify的實現機制,在其open函數中可能可以看出點東西來:
int ACE_Dev_Poll_Reactor_Notify ::open(ACE_Reactor_Impl*r , ACE_Timer_Queue*,in disable_notify_pipe) {if(0==disable_notify_pipe){this->dp_reactor=(ACE_Dev_Poll_Reactor*)r;if(this->notification_pipe_.read_handle()==-1)return -1;#if defined (F_SETFD)ACE_OS::fcntl(this->notification_pipe_.read_handle(),F_SETFD,1);ACE_OS::fcntl(this->notification_pipe_.write_handle(),F_SETFD,1); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)if(this->notification_queue_.open()==-1)return -1;if(ACE_OS::set_flags(this->notification_pipe_.write_handle(),ACE_NONBLOCK) == -1)return -1; #endifif(ACE_OS::set_flags(this->notification_pipe_.read_handle(),ACE_NONBLOCK) == -1)return -1; }return 0; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
若是不用pipe機制作為Notify通知機制實現,則這個open直接就return掉了。
再來看:
int ACE_Dev_Poll_Reactor ::dispatch_io_event(Token_Guard &guard) {{CGuard(this->repo_loc_)//對于handler的respository訪問,因此加鎖保護//step1 獲取event_handlerEvent_Tuple *info=this->handle_rep_.find(handle);if(info==0) return 0;//說明已經沒有對應的event_handler了if(info->suspended) return 0;//說明其他線程可能已經更改了這個handle的mask。short revents=(pollfd*)pfds->revents;//即已觸發的事件//step2 根據revents來指定disp_mask=WIRTE/READ/EXCPET_MASK;callback=handle_output/input/exception;//step3 將當前的event_handler標識置為suspended為true,以防止被重復觸發if(eh!=this->notify_handler_)info->suspended=true;}//step4 將notify的event_handler直接處理,分發,且注意不要suspend和resume notify的handler,因為如果要這樣做可能引起無休止的暫停和恢復,又要去經常等待獲取token。if(eh==this->notify_handler_)//說明是notify handler,直接處理{ACE_Notification_Buffer b;notify_handler_->dequeue_one(b);guard.release_token();return notify_handler_->dispatch_notify(b);//直接去分發處理notify消息}//step5 執行handler中對應的回調{ACE_Dev_Poll_Handler_Guard eh_guard(eh);guard.release_token();status=this->upcall(eh, callback,handle);}//step6 檢查回調的返回值status,是否等于0去恢復由于執行回調而暫停的handle//step7 檢查handle對應的回調的返回值,若小于0則需要remove_handler_i }而上述upcall在實際的內聯實現文件中:int ACE_Dev_Poll_Reactor ::upcall(eventhandler,callback,handle) {do {status=(event_handler->*callback)(handle);}while(status>0 && event_handler != this->notify_handler_);return status; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
上面雖然對于注冊handler和handle關聯以及最終事件觸發后如何調用handler中的回調函數理清了。但是有一點,如何完成多路復用和事件分離,并沒有很清楚。這里就要提到
int ACE_Dev_Poll_Reactor::work_pending_i ( ACE_Time_Value * max_wait_time ) [protected]01071 { 01072 ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i"); 01073 01074 if (this->deactivated_) 01075 return 0; 01076 01077 #if defined (ACE_HAS_EVENT_POLL) 01078 if (this->start_pevents_ != this->end_pevents_) 01079 #else 01080 if (this->start_pfds_ != this->end_pfds_) 01081 #endif /* ACE_HAS_EVENT_POLL */ 01082 return 1; // We still have work_pending(). Do not poll for 01083 // additional events. 01084 01085 ACE_Time_Value timer_buf (0); 01086 ACE_Time_Value *this_timeout = 0; 01087 01088 this_timeout = this->timer_queue_->calculate_timeout (max_wait_time, 01089 &timer_buf); 01090 01091 // Check if we have timers to fire. 01092 const int timers_pending = 01093 ((this_timeout != 0 && max_wait_time == 0) 01094 || (this_timeout != 0 && max_wait_time != 0 01095 && *this_timeout != *max_wait_time) ? 1 : 0); 01096 01097 const long timeout = 01098 (this_timeout == 0 01099 ? -1 /* Infinity */ 01100 : static_cast<long> (this_timeout->msec ())); 01101 01102 #if defined (ACE_HAS_EVENT_POLL) 01103 01104 // Wait for events. 01105 const int nfds = ::epoll_wait (this->poll_fd_, 01106 this->events_, 01107 this->size_, 01108 static_cast<int> (timeout)); 01109 01110 if (nfds > 0) 01111 { 01112 this->start_pevents_ = this->events_; 01113 this->end_pevents_ = this->start_pevents_ + nfds; 01114 } 01115 01116 #else 01117 01118 struct dvpoll dvp; 01119 01120 dvp.dp_fds = this->dp_fds_; 01121 dvp.dp_nfds = this->size_; 01122 dvp.dp_timeout = timeout; // Milliseconds 01123 01124 // Poll for events 01125 const int nfds = ACE_OS::ioctl (this->poll_fd_, DP_POLL, &dvp); 01126 01127 // Retrieve the results from the pollfd array. 01128 this->start_pfds_ = dvp.dp_fds; 01129 01130 // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is 01131 // no work pending. If nfds > 0 then there is work pending. 01132 // Otherwise an error occurred. 01133 if (nfds > -1) 01134 this->end_pfds_ = this->start_pfds_ + nfds; 01135 #endif /* ACE_HAS_EVENT_POLL */ 01136 01137 // If timers are pending, override any timeout from the poll. 01138 return (nfds == 0 && timers_pending != 0 ? 1 : nfds); 01139 }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
上述代碼是5.5.2版本而在最新的6.3.3版本中這里的epoll_wait 第三個參數即MaxEvents是直接用魔數1來指定的。也即每次最多返回1個觸發的事件。
其他的線程當然也可以調用handle_events來監測和處理事件,但是在handle_events中對這些其他的線程也做了限制,即必須是token的owner才能執行,否則直接return。
而如何成為owner,則需要調用函數Token_Guard 的函數acquire_quietly,其代碼如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
這樣就變成了串行化的使用epoll獲取poll去監測事件,然后監測到線程去處理,而沒有成為token的owner的線程則持續等待。
參考:
http://blog.csdn.net/fullsail/article/details/2901800
總結
以上是生活随笔為你收集整理的ACE_Reactor(二)ACE_Dev_Poll_Reactor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ACE_Select_Reactor 一
- 下一篇: ACE反应器(Reactor)模式