首先繼續回憶下,之前子線程執行操作里面有一個未涉及的內容ngx_process_events_and_timers,今天我們就來研究下這個函數。
本篇文章來自于:http://blog.csdn.net/lengzijian/article/details/7601730
先來看一下第十九節的部分截圖:
今天主要講解的就是事件驅動函數,圖中的紅色部分:
?
[cpp]?view plaincopyprint?
src/event/ngx_event.c????void??ngx_process_events_and_timers(ngx_cycle_t?*cycle)??{??????ngx_uint_t??flags;??????ngx_msec_t??timer,?delta;????????if?(ngx_timer_resolution)?{??????????timer?=?NGX_TIMER_INFINITE;??????????flags?=?0;????????}?else?{??????????timer?=?ngx_event_find_timer();??????????flags?=?NGX_UPDATE_TIME;??????}????????????/*?????ngx_use_accept_mutex變量代表是否使用accept互斥體?????默認是使用,可以通過accept_mutex?off;指令關閉;?????accept?mutex?的作用就是避免驚群,同時實現負載均衡?????*/??????if?(ngx_use_accept_mutex)?{????????????????????/*?????????ngx_accept_disabled變量在ngx_event_accept函數中計算。?????????如果ngx_accept_disabled大于0,就表示該進程接受的鏈接過多,?????????因此放棄一次爭搶accept?mutex的機會,同時將自己減一。?????????然后,繼續處理已有連接上的事件。?????????nginx就利用這一點實現了繼承關于連接的基本負載均衡。?????????*/??????????if?(ngx_accept_disabled?>?0)?{??????????????ngx_accept_disabled--;????????????}?else?{??????????????/*?????????????嘗試鎖accept?mutex,只有成功獲取鎖的進程,才會將listen套接字放到epoll中。?????????????因此,這就保證了只有一個進程擁有監聽套接口,故所有進程阻塞在epoll_wait時,?????????????才不會驚群現象。?????????????*/??????????????if?(ngx_trylock_accept_mutex(cycle)?==?NGX_ERROR)?{??????????????????return;??????????????}????????????????if?(ngx_accept_mutex_held)?{??????????????????/*?????????????????如果進程獲得了鎖,將添加一個?NGX_POST_EVENTS?標志。?????????????????這個標志的作用是將所有產生的事件放入一個隊列中,等釋放后,在慢慢來處理事件。?????????????????因為,處理時間可能會很耗時,如果不先施放鎖再處理的話,該進程就長時間霸占了鎖,?????????????????導致其他進程無法獲取鎖,這樣accept的效率就低了。?????????????????*/??????????????????flags?|=?NGX_POST_EVENTS;????????????????}?else?{??????????????????/*?????????????????沒有獲得所得進程,當然不需要NGX_POST_EVENTS標志。?????????????????但需要設置延時多長時間,再去爭搶鎖。?????????????????*/??????????????????if?(timer?==?NGX_TIMER_INFINITE??????????????????????||?timer?>?ngx_accept_mutex_delay)??????????????????{??????????????????????timer?=?ngx_accept_mutex_delay;??????????????????}??????????????}??????????}??????}????????delta?=?ngx_current_msec;????????????/*接下來,epoll要開始wait事件,?????ngx_process_events的具體實現是對應到epoll模塊中的ngx_epoll_process_events函數?????這里之后會詳細講解的哦?????*/??????(void)?ngx_process_events(cycle,?timer,?flags);??????//統計本次wait事件的耗時??????delta?=?ngx_current_msec?-?delta;????????ngx_log_debug1(NGX_LOG_DEBUG_EVENT,?cycle->log,?0,?????????????????????"timer?delta:?%M",?delta);????????/*?????ngx_posted_accept_events是一個事件隊列,暫存epoll從監聽套接口wait到的accept事件。?????前文提到的NGX_POST_EVENTS標志被使用后,會將所有的accept事件暫存到這個隊列?????*/??????if?(ngx_posted_accept_events)?{??????????ngx_event_process_posted(cycle,?&ngx_posted_accept_events);??????}??????//所有accept事件處理完之后,如果持有鎖的話,就釋放掉。??????if?(ngx_accept_mutex_held)?{??????????ngx_shmtx_unlock(&ngx_accept_mutex);??????}????????????/*?????delta是之前統計的耗時,存在毫秒級的耗時,就對所有時間的timer進行檢查,?????如果timeout?就從time?rbtree中刪除到期的timer,同時調用相應事件的handler函數處理?????*/??????if?(delta)?{??????????ngx_event_expire_timers();??????}????????ngx_log_debug1(NGX_LOG_DEBUG_EVENT,?cycle->log,?0,?????????????????????"posted?events?%p",?ngx_posted_events);????????/*?????處理普通事件(連接上獲得的讀寫事件),?????因為每個事件都有自己的handler方法,?????*/??????if?(ngx_posted_events)?{??????????if?(ngx_threaded)?{??????????????ngx_wakeup_worker_thread(cycle);????????????}?else?{??????????????ngx_event_process_posted(cycle,?&ngx_posted_events);??????????}??????}??}??
?
?
之前有說過accept事件,其實他就是監聽套接口上是否有新來的事件,下面介紹下accept時間的handler方法:
ngx_event_accept:
?
[cpp]?view plaincopyprint?
src/event/ngx_event_accept.c????void??ngx_event_accept(ngx_event_t?*ev)??{??????socklen_t??????????socklen;??????ngx_err_t??????????err;??????ngx_log_t?????????*log;??????ngx_socket_t???????s;??????ngx_event_t???????*rev,?*wev;??????ngx_listening_t???*ls;??????ngx_connection_t??*c,?*lc;??????ngx_event_conf_t??*ecf;??????u_char?????????????sa[NGX_SOCKADDRLEN];????????????//省略部分代碼????????lc?=?ev->data;??????ls?=?lc->listening;??????ev->ready?=?0;????????ngx_log_debug2(NGX_LOG_DEBUG_EVENT,?ev->log,?0,?????????????????????"accept?on?%V,?ready:?%d",?&ls->addr_text,?ev->available);????????do?{??????????socklen?=?NGX_SOCKADDRLEN;??????????//accept一個新的連接??????????s?=?accept(lc->fd,?(struct?sockaddr?*)?sa,?&socklen);??????????//省略部分代碼????????????????????/*?????????accept到一個新的連接后,就重新計算ngx_accept_disabled的值,?????????它主要是用來做負載均衡,之前有提過。??????????????????這里,我們可以看到他的就只方式?????????“總連接數的八分之一???-???剩余的連接數“?????????總連接指每個進程設定的最大連接數,這個數字可以再配置文件中指定。??????????????????所以每個進程到總連接數的7/8后,ngx_accept_disabled就大于零,連接超載了??????????????????*/????????????ngx_accept_disabled?=?ngx_cycle->connection_n?/?8????????????????????????????????-?ngx_cycle->free_connection_n;????????????????????//獲取一個connection??????????c?=?ngx_get_connection(s,?ev->log);????????????//為新的鏈接創建起一個memory?pool??????????//連接關閉的時候,才釋放pool????????????c->pool?=?ngx_create_pool(ls->pool_size,?ev->log);??????????if?(c->pool?==?NULL)?{??????????????ngx_close_accepted_connection(c);??????????????return;??????????}????????????c->sockaddr?=?ngx_palloc(c->pool,?socklen);??????????if?(c->sockaddr?==?NULL)?{??????????????ngx_close_accepted_connection(c);??????????????return;??????????}????????????ngx_memcpy(c->sockaddr,?sa,?socklen);????????????log?=?ngx_palloc(c->pool,?sizeof(ngx_log_t));??????????if?(log?==?NULL)?{??????????????ngx_close_accepted_connection(c);??????????????return;??????????}????????????/*?set?a?blocking?mode?for?aio?and?non-blocking?mode?for?others?*/????????????if?(ngx_inherited_nonblocking)?{??????????????if?(ngx_event_flags?&?NGX_USE_AIO_EVENT)?{??????????????????if?(ngx_blocking(s)?==?-1)?{??????????????????????ngx_log_error(NGX_LOG_ALERT,?ev->log,?ngx_socket_errno,????????????????????????????????????ngx_blocking_n?"?failed");??????????????????????ngx_close_accepted_connection(c);??????????????????????return;??????????????????}??????????????}????????????}?else?{??????????????//我們使用epoll模型,這里我們設置連接為nonblocking??????????????if?(!(ngx_event_flags?&?(NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)))?{??????????????????if?(ngx_nonblocking(s)?==?-1)?{??????????????????????ngx_log_error(NGX_LOG_ALERT,?ev->log,?ngx_socket_errno,????????????????????????????????????ngx_nonblocking_n?"?failed");??????????????????????ngx_close_accepted_connection(c);??????????????????????return;??????????????????}??????????????}??????????}????????????*log?=?ls->log;??????????//初始化新的連接??????????c->recv?=?ngx_recv;??????????c->send?=?ngx_send;??????????c->recv_chain?=?ngx_recv_chain;??????????c->send_chain?=?ngx_send_chain;????????????c->log?=?log;??????????c->pool->log?=?log;????????????c->socklen?=?socklen;??????????c->listening?=?ls;??????????c->local_sockaddr?=?ls->sockaddr;????????????c->unexpected_eof?=?1;????#if?(NGX_HAVE_UNIX_DOMAIN)??????????if?(c->sockaddr->sa_family?==?AF_UNIX)?{??????????????c->tcp_nopush?=?NGX_TCP_NOPUSH_DISABLED;??????????????c->tcp_nodelay?=?NGX_TCP_NODELAY_DISABLED;??#if?(NGX_SOLARIS)??????????????/*?Solaris's?sendfilev()?supports?AF_NCA,?AF_INET,?and?AF_INET6?*/??????????????c->sendfile?=?0;??#endif??????????}??#endif????????????rev?=?c->read;??????????wev?=?c->write;????????????wev->ready?=?1;????????????if?(ngx_event_flags?&?(NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))?{??????????????/*?rtsig,?aio,?iocp?*/??????????????rev->ready?=?1;??????????}????????????if?(ev->deferred_accept)?{??????????????rev->ready?=?1;??#if?(NGX_HAVE_KQUEUE)??????????????rev->available?=?1;??#endif??????????}????????????rev->log?=?log;??????????wev->log?=?log;????????????/*??????????*?TODO:?MT:?-?ngx_atomic_fetch_add()??????????*?????????????or?protection?by?critical?section?or?light?mutex??????????*??????????*?TODO:?MP:?-?allocated?in?a?shared?memory??????????*???????????-?ngx_atomic_fetch_add()??????????*?????????????or?protection?by?critical?section?or?light?mutex??????????*/????????????c->number?=?ngx_atomic_fetch_add(ngx_connection_counter,?1);????????????????????if?(ngx_add_conn?&&?(ngx_event_flags?&?NGX_USE_EPOLL_EVENT)?==?0)?{??????????????if?(ngx_add_conn(c)?==?NGX_ERROR)?{??????????????????ngx_close_accepted_connection(c);??????????????????return;??????????????}??????????}????????????log->data?=?NULL;??????????log->handler?=?NULL;????????????????????/*?????????這里listen?handler很重要,它將完成新連接的最后初始化工作,?????????同時將accept到的新的連接放入epoll中;掛在這個handler上的函數,?????????就是ngx_http_init_connection?在之后http模塊中在詳細介紹?????????*/??????????ls->handler(c);????????????if?(ngx_event_flags?&?NGX_USE_KQUEUE_EVENT)?{??????????????ev->available--;??????????}????????}?while?(ev->available);??}??
?
?
accpt事件的handler方法也就是如此了。之后就是每個連接的讀寫事件handler方法,這一部分會直接將我們引入http模塊,我們還不急,還要學習下nginx經典模塊epoll。
總結
以上是生活随笔為你收集整理的nginx 源码学习笔记(二十一)—— event 模块(二) ——事件驱动核心ngx_process_events_and_timers的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。