深入redis内部--事件处理机制
1. redis事件的定義
/* State of an event based program */ typedef struct aeEventLoop {int maxfd; /* highest file descriptor currently registered */int setsize; /* max number of file descriptors tracked */long long timeEventNextId; /*下一個定時器的id*/time_t lastTime; /* Used to detect system clock skew */aeFileEvent *events; /* Registered events注冊的文件事件 */aeFiredEvent *fired; /* Fired events 已注銷的文件事件 */aeTimeEvent *timeEventHead; /*定時器事件鏈表的首部*/int stop;
void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep;
} aeEventLoop;
? 1.1 事件定義
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
2.封裝事件處理的實現
/* Include the best multiplexing layer supported by this system.* The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else#ifdef HAVE_EPOLL#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE#include "ae_kqueue.c"#else#include "ae_select.c"#endif#endif #endif?3.事件處理的主函數
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);} }? 3.1事件處理過程
/* Process every pending time event, then every pending file event* (that may be registered by time event callbacks just processed).* Without special flags the function sleeps until some file event* fires, or when the next time event occurs (if any).** If flags is 0, the function does nothing and returns.* if flags has AE_ALL_EVENTS set, all the kind of events are processed.* if flags has AE_FILE_EVENTS set, file events are processed.* if flags has AE_TIME_EVENTS set, time events are processed.* if flags has AE_DONT_WAIT set the function returns ASAP until all* the events that's possible to process without to wait are processed.** The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ } /* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) {int processed = 0;aeTimeEvent *te;long long maxId;time_t now = time(NULL);/* If the system clock is moved to the future, and then set back to the* right value, time events may be delayed in a random way. Often this* means that scheduled operations will not be performed soon enough.** Here we try to detect system clock skews, and force all the time* events to be processed ASAP when this happens: the idea is that* processing events earlier is less dangerous than delaying them* indefinitely, and practice suggests it is. */if (now < eventLoop->lastTime) {te = eventLoop->timeEventHead;while(te) {te->when_sec = 0;te = te->next;}}eventLoop->lastTime = now;te = eventLoop->timeEventHead;maxId = eventLoop->timeEventNextId-1;while(te) {long now_sec, now_ms;long long id;if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms)){int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData);processed++;/* After an event is processed our time event list may* no longer be the same, so we restart from head.* Still we make sure to don't process events registered* by event handlers itself in order to don't loop forever.* To do so we saved the max ID we want to handle.** FUTURE OPTIMIZATIONS:* Note that this is NOT great algorithmically. Redis uses* a single time event so it's not a problem but the right* way to do this is to add the new elements on head, and* to flag deleted elements in a special way for later* deletion (putting references to the nodes to delete into* another linked list). */if (retval != AE_NOMORE) {aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);} else {aeDeleteTimeEvent(eventLoop, id);}te = eventLoop->timeEventHead;} else {te = te->next;}}return processed; }?
?
?
?
?
?
轉載于:https://www.cnblogs.com/davidwang456/p/3506340.html
總結
以上是生活随笔為你收集整理的深入redis内部--事件处理机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 13走了,14来了,新的一年,新的开始。
- 下一篇: 用C语言实现Ping程序功能---转