[原]tornado源码分析系列(三)[网络层 IOLoop类]
生活随笔
收集整理的這篇文章主要介紹了
[原]tornado源码分析系列(三)[网络层 IOLoop类]
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
引言:由于都是在工作當中抽出時間看源代碼,所以更新速度比較慢,但是還是希望通過對好的源碼的分析和探討,大家相互學習,發現不好的地方共同討論。
上次講了IOLoop中的幾個重要的方法,inistance() 和 add_handler() .. 今天看看Demo中一個最重要的方法,start(),順帶用stop()收尾
def start(self):"""Starts the I/O loop.The loop will run until one of the I/O handlers calls stop(), whichwill make the loop stop after the current event iteration completes."""if self._stopped:self._stopped = Falsereturnself._running = Truewhile True:# Never use an infinite timeout here - it can stall epollpoll_timeout = 0.2# Prevent IO event starvation by delaying new callbacks# to the next iteration of the event loop.callbacks = self._callbacksself._callbacks = []#先運行注冊了的回調函數for callback in callbacks:self._run_callback(callback)if self._callbacks:poll_timeout = 0.0#檢查超時事件#方法是,在timeout這個bisect的排序的列表,每次取出頭部最小的一個#將deadline與當前時間比較,如果 <= 當前時間,就認為超時,然后調用相應的超時處理的回調函數#這里不好理解的是deadline <= 當前時間 , 如果說deadline 大于當前時間,就代表還沒有到#超時條件#循環檢查,直到超時事件處理完成#值得一說的是在libevent中是使用了最小堆每次取出當前的最小deadline#由于最小堆的特性,每次從頭取出的都是最小的#Nginx的網絡模塊是用的紅黑樹來做,原理也是一樣的if self._timeouts:now = time.time()while self._timeouts and self._timeouts[0].deadline <= now:timeout = self._timeouts.pop(0)self._run_callback(timeout.callback)#處理完了超時時間之后,需要將epoll最大阻塞時間改為小于當前最小超時時間的絕對值#不然可能在epoll返回后,本來不屬于超時事件的事件被超時if self._timeouts:milliseconds = self._timeouts[0].deadline - nowpoll_timeout = min(milliseconds, poll_timeout)#判斷“反應堆”是否結束#結束有兩個方式,一個是設置_running 標志位,第二個就是往寫管道寫入"x"if not self._running:break#從注釋中可以看出,每次進入epoll等待事件之前都需要把sigalrm清空,以免在#epoll阻塞期間收到信號,在epoll完成后重新設置if self._blocking_signal_threshold is not None:# clear alarm so it doesn't fire while poll is waiting for# events.signal.setitimer(signal.ITIMER_REAL, 0, 0)#進入epoll循環try:event_pairs = self._impl.poll(poll_timeout)except Exception, e:#在 epoll和 select 阻塞過程當中,經常會收到系統或者其他方式發過來的信號,這#時候系統的 errno 會被設置為 EINTR ,如果將遇到這樣的情況,直接重啟epoll就可以#如果不是這樣的錯誤,則看做是致命錯誤# Depending on python version and IOLoop implementation,# different exception types may be thrown and there are# two ways EINTR might be signaled:# * e.errno == errno.EINTR# * e.args is like (errno.EINTR, 'Interrupted system call')if (getattr(e, 'errno', None) == errno.EINTR or(isinstance(getattr(e, 'args', None), tuple) andlen(e.args) == 2 and e.args[0] == errno.EINTR)):continueelse:raise#將被阻塞的sigalarm 還原 , 第二個參數是最大阻塞閾值if self._blocking_signal_threshold is not None:signal.setitimer(signal.ITIMER_REAL,self._blocking_signal_threshold, 0)# Pop one fd at a time from the set of pending fds and run# its handler. Since that handler may perform actions on# other file descriptors, there may be reentrant calls to# this IOLoop that update self._events#將新的事件加入到待處理隊列中,現代非阻塞的網絡庫都使用的是這種方式self._events.update(event_pairs)#作者在寫這段代碼的過程當中不是使用的簡單的順序遍歷這個隊列,而使用的方式是#將就緒事件逐個彈出,以防止在處理過程當中就緒事件發生改變while self._events:fd, events = self._events.popitem()#在處理過程當中,常常會遇到客戶端異常終止的情況#一般情況下如果讀取錯誤,服務端會產生一個 sigpipe信號#這時候需要忽略這個信號#這里我有一個疑問就是為什么在add_handler 的時候 handler是經過 context.wrap包裝過的#而在這里是直接調用,按道理應該是通過_running_callback調用,不過這里顯然處理了異常情況了try:self._handlers[fd](fd, events)except (KeyboardInterrupt, SystemExit):raiseexcept (OSError, IOError), e:if e.args[0] == errno.EPIPE:# Happens when the client closes the connectionpasselse:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)except:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)# reset the stopped flag so another start/stop pair can be issuedself._stopped = False#將定時事件清空if self._blocking_signal_threshold is not None:signal.setitimer(signal.ITIMER_REAL, 0, 0)
這段代碼中值得注意的部分就是在幾個方面:
1.超時事件的處理,timeout是一個排序后的列表,每次都是取得最前面最小的一個
2.在開始epoll循環的過程當中,設置阻塞sigalarm
3.在處理事件過程當中忽略sigpipe信號
4.在處理就緒事件過程當中,是通過每次pop一個來處理,而不是一次遍歷
stop()函數
def stop(self):"""Stop the loop after the current event loop iteration is complete.If the event loop is not currently running, the next call to start()will return immediately.To use asynchronous methods from otherwise-synchronous code (such asunit tests), you can start and stop the event loop like this:ioloop = IOLoop()async_method(ioloop=ioloop, callback=ioloop.stop)ioloop.start()ioloop.start() will return after async_method has run its callback,whether that callback was invoked before or after ioloop.start."""self._running = Falseself._stopped = Trueself._wake()
簡單的設置標志位后,向管道發送"x"停止事件循環
總結:IOLoop差不多就是這些內容,利用python簡單和高可讀性,看網絡模塊的實現會讓我們更加的專注于
實現,而不是繁瑣的基礎代碼的使用過程。
后面將看看IOStream類,是建立在IOLoop的一個上層封裝,實現了基本的buffer事件
?
轉載于:https://www.cnblogs.com/Bozh/archive/2012/07/19/2598696.html
總結
以上是生活随笔為你收集整理的[原]tornado源码分析系列(三)[网络层 IOLoop类]的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无情的网名微信
- 下一篇: “自题诗后属杨家”上一句是什么