Redis源码剖析(二)io多路复用函数及事件驱动流程
作為服務器監聽客戶端請求的方法,io多路復用起到了不可忽略的作用,利用io復用監聽的方法叫Reactor模式,在前一篇也提到過,使用io復用是現在常用的提高并發性的方法,而且效果顯著。
通常io多路復用連同事件回調是一起出現的,在將文件描述符(套接字)注冊到io多路復用函數中時,同時也需要保存當這個文件描述符被激活時調用的函數(稱作回調函數),這樣,使用者無需考慮何時事件被激活又何時調用相應處理函數,只管注冊即可,執行回調函數的任務由Reactor接管,極大提高了并發性
在C語言中,回調函數通常是以函數指針的形式出現的(參考libevent)
在C++語言中,回調函數可以是函數指針,但是通常會是通過std::bind綁定的std::function對象,當然隨著C++11的出現,也可以以lambda代替std::bind
既然Redis是C語言實現的,就老老實實使用函數指針好了,不過在此之前,先簡單復習一下io多路復用函數
io多路復用函數
Linux平臺三種io多路復用函數的區別
在不同的平臺(linux,window),存在著不同的io復用函數,以Linux平臺為例,就有select,poll,epoll三種,這三種的區別主要在于監聽事件的底層方法不同,從而導致效率的差異
- select是早期Linux引入的io復用函數,底層采用輪詢的方法判斷每個文件描述符是否被激活。所謂輪詢就是一遍遍的遍歷,依次判斷每一個文件描述符的狀態,效率可想而知,慢
- poll是在select之后引入的,使用方法上稍微簡單于select,但是仍然沒有擺脫輪詢帶來的問題
- epoll作為輪詢的終結者,底層沒有采用輪詢的方法,而是基于事件回調的。簡單的說,就是在內核中當文件描述符被激活時都會調用一個回調函數,epoll根據回調函數直接定位文件描述符,極大提高了效率,同時也減輕了CPU的負擔,不用一遍遍輪詢
當然,除了效率問題,三者在使用上也是存在諸多差異
select接口
/* * maxfds : 最大的文件描述符 + 1* readfs : 可讀事件集* writefds : 可寫事件集* exceptfds : 其它(錯誤)事件集* tvptr : 超時時間*/ int select(int maxfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* tvptr);其中fd_set結構保存的是需要監聽的文件描述符,select將可讀,可寫,其它(錯誤)事件分開監聽,返回被激活描述符的個數。但是仍需要一個一個遍歷使用FD_ISSET判斷是否被激活
poll接口
/* * fdarray[] : 監聽事件集* nfds : 監聽事件個數* timeout : 超時時間*/ int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);在pollfd結構中保存需要監聽的文件描述符,需要監聽的事件,激活原因。使用起來比select簡便的多
epoll接口
/* * epollfd : epoll文件描述符,用于監聽所有的注冊事件* events : 保存所有激活事件* maxevents : events最大可容納的激活事件個數* timeout : 超時時間*/ int epoll_wait(int epollfd, struct epoll_event* events, int maxevents, int timeout);epoll_event結構保存了監聽的文件描述符,監聽的事件以及激活原因,與select和poll不同的是,epoll_wait直接將所有激活的事件保存在events中,這樣就不需要一個個遍歷判斷哪個激活了
Redis對io多路復用的封裝
接下來以epoll為例,了解Redis內部是如何封裝io多路復用的
為了將所有io復用統一,Redis為所有io復用統一了類型名aeApiState,對于epoll而言,類型成員就是調用epoll_wait所需要的參數
//ae_epoll.c typedef struct aeApiState {int epfd; //epollfd,文件描述符struct epoll_event *events; //保存激活的事件(epoll_event) } aeApiState;為什么保存兩個就夠了呢,epoll_wait明明需要4個參數。原因是在Redis初始化時,已經將保存激活事件的數組(events)的容量調至最大,所以maxevents只需要設置成最大即可,無需保存。對于超時時間,Redis的策略是在時間事件中找到最早超時的那個,計算還有多久到達超時時間,將這個時間差(相對時間)作為io復用的超時時間
這么設計的原因是如果Redis中沒有時間事件,那么io復用函數可以一直阻塞在那里直到有事件被激活,如果有時間事件,為了不影響超時事件的回調,需要在事件超時時從io復用中返回,那么設置成超時時間是最合適的(這一點和libevent的策略相同)
接下來就是一些對epoll接口的封裝了,包括創建epoll(epoll_create),注冊事件(epoll_ctl),刪除事件(epoll_ctl),阻塞監聽(epoll_wait)等
創建epoll就是簡單的為aeApiState申請內存空間,然后將返回的指針保存在事件驅動循環中
//ae_epoll.c /* 創建epollfd,即調用::epoll_create */ static int aeApiCreate(aeEventLoop *eventLoop) {/* 申請內存 */aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;/* events用于保存激活的事件,需要足夠大的空間(不小于epoll_create時傳入的參數) *//* eventLoop->setsize是初始化時設置的最大文件描述符個數 */state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}/* 創建epoll文件描述符 */state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}/* 保存io復用數據成員到事件驅動中 */eventLoop->apidata = state;return 0; }注冊事件和刪除事件就是對epoll_ctl的封裝,根據操作不同選擇不同的參數,以注冊事件為例
//ae_epoll.c /* * 將文件描述符和對應事件注冊到io多路復用中* 即調用::epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)*/ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {/* 從事件驅動中獲取io復用 */aeApiState *state = eventLoop->apidata;/* 用于傳給epoll_ctl的參數 */struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. *//* 判斷是否是第一次注冊,如果是則添加否則是修改 */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;/* 合并以前的監聽事件,因為不一定是首次添加 */mask |= eventLoop->events[fd].mask; /* Merge old events *//* 根據監聽事件的不同設置struct epoll_event中的events字段 */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;/* 保存監聽的文件描述符 */ee.data.fd = fd;/* 調用接口 */if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0; }阻塞監聽是對epoll_wait的封裝,在返回后將激活的事件保存在事件驅動中
//ae_epoll.c /* 阻塞監聽,即調用::epoll_wait(epollfd, struct epoll_event*, int, struct timeval*); */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;/* 時間單位是毫秒 */retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);/* 有事件被激活 */if (retval > 0) {int j;numevents = retval;/* 保存所有激活的事件,將其文件描述符和激活原因保存在fired數組中 */for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;/* fired數組中只保存文件描述符和激活原因* 當需要獲取激活事件時,根據文件描述符從eventLoop->events數組中查找 */eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}/* 返回激活事件的個數 */return numevents; }事件驅動循環流程
io復用的封裝實現完成,那么Redis是何時調用io復用函數的呢,這就需要從server.c/main函數入手,可以猜測到當main函數初始化工作完成后,就需要進行事件驅動循環,而在循環中,會調用io復用函數進行監聽
在初始化完成后,main函數調用了aeMain函數,傳入的參數就是服務器的事件驅動
//server.c int main(int argc, char **argv) {/* 一系列的初始化工作 */...aeMain(server.el);... }在ae_epoll.c中可以找到aeMain函數,這個函數便是一直在循環,每次循環會調用aeProcessEvents函數
//ae_epoll.c void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;/* 一直循環監聽 */while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);} }可以猜測,aeProcessEvents函數中一定調用io復用函數進行監聽,當io復用返回后,執行每個激活事件的回調函數,這個函數比較長,但是還是蠻好理解的
/* 每次事件循環都會調用一次該函數 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;/* 為io復用函數尋找超時時間(通常是最先超時的時間事件的時間(相對時間)) *//* redis中有時間事件 */if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);/* 根據最早超時的那個時間事件獲取超時的相對時間 */if (shortest) {long now_sec, now_ms;/* 獲取當前時間 */aeGetTime(&now_sec, &now_ms);tvp = &tv;/* 計算時間差(相對時間) */long long ms =(shortest->when_sec - now_sec)*1000 +shortest->when_ms - now_ms;if (ms > 0) {tvp->tv_sec = ms/1000;tvp->tv_usec = (ms % 1000)*1000;} else {tvp->tv_sec = 0;tvp->tv_usec = 0;}} else {/* 如果沒有時間事件,那么io復用要么一直等,要么不等,取決于flags的設置 *//* 傳入的struct timeval*是NULL表示一直等直到有事件被激活* 傳入的timeval->tv_src = timeval->tv_usec = 0表示不等,直接返回 */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}/* 調用io復用函數,返回被激活事件的個數,所有被激活的事件保存在epollLoop->fired數組中 */numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {/* fired只保存的文件描述符和激活原因,實際的文件事件仍需要從events數組中取出 */aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* 根據激活原因調用回調函數(先執行可讀,再執行可寫) */if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* 處理可能超時的時間事件 */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ }至此一次事件驅動循環就執行完畢,里面的細節比較多,比如如何為io復用函數尋找超時時間,如果從激活事件調用回調函數,如果處理已超時事件等
Redis對于時間事件是采用鏈表的形式記錄的,這導致每次尋找最早超時的那個事件都需要遍歷整個鏈表,容易造成性能瓶頸。而libevent是采用最小堆記錄時間事件,尋找最早超時事件只需要O(1)的復雜度
如何選擇合適的io多路復用函數
到目前位置還有一個問題沒有解決,既然有那么多io復用函數,Redis怎么知道應該選擇哪個呢,Redis的策略是選擇當前平臺存在的,效率最高的io復用函數
#ifdef HAVE_EVPORT #include "ae_evport.c" #else#ifdef HAVE_EPOLL#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE#include "ae_kqueue.c"#else#include "ae_select.c"#endif#endif #endif小結
其實任何一個基于網絡請求的程序在這部分的內容都是相似的,無非就是Reactor模式的實現,不過畢竟Redis主要內容在數據庫方面,網絡這一塊不會太過苛刻,如果只是想要學習服務器設計,可以參考libevent(C語言),muduo(C++語言)
總結
以上是生活随笔為你收集整理的Redis源码剖析(二)io多路复用函数及事件驱动流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----将二叉
- 下一篇: Redis源码剖析(三)字典结构的设计与