redis源码客户端和服务端通信过程
? ? ? ? ? 最近想學習一下redis源碼,先看一下redis通信流程。由于功力有限,不足之處望大家指正。服務端和客戶端通信,一般都是服務端先啟動,那先從服務端的源碼看起。
? ? ? ? ?首先啟動服務端會做一些初始化動作,初始化事件處理器狀態(tài),先看一下事件處理器狀態(tài)的結(jié)構(gòu)
//事件處理器的狀態(tài) typedef struct aeEventLoop {// 目前已注冊的最大描述符int maxfd; /* highest file descriptor currently registered */// 目前已追蹤的最大描述符int setsize; /* max number of file descriptors tracked */// 用于生成時間事件 idlong long timeEventNextId;// 最后一次執(zhí)行時間事件的時間time_t lastTime; /* Used to detect system clock skew */// 已注冊的文件事件aeFileEvent *events; /* Registered events */// 已就緒的文件事件aeFiredEvent *fired; /* Fired events */// 時間事件aeTimeEvent *timeEventHead;// 事件處理器的開關(guān)int stop;// 多路復用庫的私有數(shù)據(jù)void *apidata; /* This is used for polling API specific data */// 在處理事件前要執(zhí)行的函數(shù)aeBeforeSleepProc *beforesleep;} aeEventLoop;? ? ? ? 下面要對事件處理器進行初始化。
//事件狀態(tài) typedef struct aeApiState {// epoll_event 實例描述符int epfd;// 事件槽struct epoll_event *events; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 初始化事件槽空間state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 創(chuàng)建 epoll 實例state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}// 賦值給 eventLoopeventLoop->apidata = state;return 0; }? 上面創(chuàng)建的epoll句柄和初始化的事件槽保存到傳入的eventLoop事件對象中。這個事件對象保存在全局的一個redisserver中,redisServer中結(jié)構(gòu)體成員很多,這里只展示一個
struct redisServer {//...// 事件狀態(tài)aeEventLoop *el;// 一個鏈表,保存了所有客戶端狀態(tài)結(jié)構(gòu)list *clients; /* List of active clients *//... };aeEventLoop *el 存儲剛才創(chuàng)建的事件狀態(tài)
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {int s, rv;char _port[6]; /* strlen("65535") */struct addrinfo hints, *servinfo, *p;snprintf(_port,6,"%d",port);memset(&hints,0,sizeof(hints));hints.ai_family = af;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {anetSetError(err, "%s", gai_strerror(rv));return ANET_ERR;}for (p = servinfo; p != NULL; p = p->ai_next) {if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)continue;if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;goto end;}if (p == NULL) {anetSetError(err, "unable to bind socket");goto error;}error:s = ANET_ERR; end:freeaddrinfo(servinfo);return s; }上面的函數(shù)用來打開監(jiān)聽端口
// 為 TCP 連接關(guān)聯(lián)連接應答(accept)處理器// 用于接受并應答客戶端的 connect() 調(diào)用for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, //使文件讀關(guān)聯(lián)一個函數(shù)acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");}} /** 根據(jù) mask 參數(shù)的值,監(jiān)聽 fd 文件的狀態(tài),* 當 fd 可用時,執(zhí)行 proc 函數(shù)*/ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) {if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}if (fd >= eventLoop->setsize) return AE_ERR;// 取出文件事件結(jié)構(gòu)aeFileEvent *fe = &eventLoop->events[fd];// 監(jiān)聽指定 fd 的指定事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;// 設置文件事件類型,以及事件的處理器fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;// 私有數(shù)據(jù)fe->clientData = clientData;// 如果有需要,更新事件處理器的最大 fdif (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK; }aeCreateFileEvent函數(shù)用來注冊回調(diào)用,參數(shù)aeEventLoop *eventLoop就是前面初始化的事件處理器的狀態(tài),當AE_READABLE產(chǎn)生時就會調(diào)用acceptTcpHandler函數(shù),這時是有客戶端connect了。前面已經(jīng)初始化了一定數(shù)量的處理器,aeApiAddEvent把所有的事件對象都注冊到epoll,后面接著設置對應AE_READABLE和AE_WRITABLE對應的回調(diào)函數(shù)。
/* File event structure** 文件事件結(jié)構(gòu)*/ typedef struct aeFileEvent {// 監(jiān)聽事件類型掩碼,// 值可以是 AE_READABLE 或 AE_WRITABLE ,// 或者 AE_READABLE | AE_WRITABLEint mask; /* one of AE_(READABLE|WRITABLE) */// 讀事件處理器aeFileProc *rfileProc;// 寫事件處理器aeFileProc *wfileProc;// 多路復用庫的私有數(shù)據(jù)void *clientData;} aeFileEvent;上面是文件事件結(jié)構(gòu)的結(jié)構(gòu)體,對應的讀和寫的回調(diào)函數(shù)都保存在aeFileEvent(文件事件)中,aeFileEvent(文件事件)就是aeEventLoop(事件處理器狀態(tài))的成員,aeEventLoop(事件處理器狀態(tài))就是redisServer結(jié)構(gòu)體中aeEventLoop *el(事件狀態(tài)成員),所有的這些都保存在全局的redisServer結(jié)構(gòu)體中。接下來就是事件處理器主循環(huán)中
/** 事件處理器的主循環(huán)*/ void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {// 如果有需要在事件處理前執(zhí)行的函數(shù),那么運行它if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 開始處理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);//一直循環(huán)調(diào)用這個函數(shù)等到消息} } //處理所有已到達的時間事件,以及所有已就緒的文件事件。 int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 獲取最近的時間事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {// 如果時間事件存在的話// 那么根據(jù)最近可執(zhí)行時間事件和現(xiàn)在時間的時間差來決定文件事件的阻塞時間long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */// 計算距今最近的時間事件還要多久才能達到// 并將該時間距保存在 tv 結(jié)構(gòu)中aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}// 時間差小于 0 ,說明事件已經(jīng)可以執(zhí)行了,將秒和毫秒設為 0 (不阻塞)if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// 執(zhí)行到這一步,說明沒有時間事件// 那么根據(jù) AE_DONT_WAIT 是否設置來決定是否阻塞,以及阻塞的時間長度/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {// 設置文件事件不阻塞tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */// 文件事件可以阻塞直到有事件到達為止tvp = NULL; /* wait forever */}}// 處理文件事件,阻塞時間由 tvp 決定numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {// 從已就緒數(shù)組中獲取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 讀事件if (fe->mask & mask & AE_READABLE) {// rfired 確保讀/寫事件只能執(zhí)行其中一個rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}// 寫事件if (fe->mask & mask & AE_WRITABLE) {printf("can writable\n");if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */// 執(zhí)行時間事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ } /** 獲取可執(zhí)行事件*/ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 等待時間retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//epoll_wait用于向用戶進程返回ready list// 有至少一個事件就緒?if (retval > 0) {int j;// 為已就緒事件設置相應的模式// 并加入到 eventLoop 的 fired 數(shù)組中numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}// 返回已就緒事件個數(shù)return numevents; }aeProcessEvents一直被循環(huán)調(diào)用用來處理就緒的文件事件(時間事件這里不考慮),通過調(diào)用aeApiPoll中的epoll_wait等待事件的促發(fā)。
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; /* A fired event** 已就緒事件*/ typedef struct aeFiredEvent {// 已就緒文件描述符int fd;// 事件類型掩碼,// 值可以是 AE_READABLE 或 AE_WRITABLE// 或者是兩者的或int mask;} aeFiredEvent;上面列出了epoll結(jié)構(gòu)體和aeFiredEvent(已就緒事件結(jié)構(gòu)體),aeFiredEvent屬于事件處理器狀態(tài)(aeEventLoop)成員,并循環(huán)保存就緒文件事件對應中已就緒描述符和其類型,這些又都保存在事件處理器狀態(tài)(aeEventLoop)中。函數(shù)返回到aeProcessEvents中,然后走對應的回調(diào)(這時候還沒講回調(diào)關(guān)聯(lián)對應的函數(shù))。
現(xiàn)在假如有客戶端來連接了,按前面說的,套接字變的可讀,acceptTcpHandler被調(diào)用,acceptTcpHandler函數(shù)接收客戶端的連接,并為客戶端創(chuàng)建狀態(tài),并注冊讀取客戶端命令的函數(shù)readQueryFromClient。并把創(chuàng)建的客戶端保存在redisServer里面的list *clients里面。
/** 創(chuàng)建一個新客戶端*/ redisClient *createClient(int fd) {printf("-----------%s--------\n",__FUNCTION__);// 分配空間redisClient *c = zmalloc(sizeof(redisClient));/* passing -1 as fd it is possible to create a non connected client.* This is useful since all the Redis commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */// 當 fd 不為 -1 時,創(chuàng)建帶網(wǎng)絡連接的客戶端// 如果 fd 為 -1 ,那么創(chuàng)建無網(wǎng)絡連接的偽客戶端// 因為 Redis 的命令必須在客戶端的上下文中使用,所以在執(zhí)行 Lua 環(huán)境中的命令時// 需要用到這種偽終端if (fd != -1) {// 非阻塞anetNonBlock(NULL,fd);// 禁用 Nagle 算法anetEnableTcpNoDelay(NULL,fd);// 設置 keep aliveif (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);// 綁定讀事件到事件 loop (開始接收命令請求)if (aeCreateFileEvent(server.el,fd,AE_READABLE, //客戶端連接上之后,再為客戶端關(guān)聯(lián)一個讀數(shù)據(jù)的函數(shù)。之前關(guān)聯(lián)的建立連接readQueryFromClient, c) == AE_ERR) //沒有建立連接之前關(guān)聯(lián)建立函數(shù),建立連接之后關(guān)聯(lián)讀數(shù)據(jù)的函數(shù){close(fd);zfree(c);return NULL;}}// 初始化各個屬性// 默認數(shù)據(jù)庫selectDb(c,0);// 套接字c->fd = fd;// 名字c->name = NULL;// 回復緩沖區(qū)的偏移量c->bufpos = 0;// 查詢緩沖區(qū)c->querybuf = sdsempty();// 查詢緩沖區(qū)峰值c->querybuf_peak = 0;// 命令請求的類型c->reqtype = 0;// 命令參數(shù)數(shù)量c->argc = 0;// 命令參數(shù)c->argv = NULL;// 當前執(zhí)行的命令和最近一次執(zhí)行的命令c->cmd = c->lastcmd = NULL;// 查詢緩沖區(qū)中未讀入的命令內(nèi)容數(shù)量c->multibulklen = 0;// 讀入的參數(shù)的長度c->bulklen = -1;// 已發(fā)送字節(jié)數(shù)c->sentlen = 0;// 狀態(tài) FLAGc->flags = 0;// 創(chuàng)建時間和最后一次互動時間c->ctime = c->lastinteraction = server.unixtime;// 認證狀態(tài)c->authenticated = 0;// 復制狀態(tài)c->replstate = REDIS_REPL_NONE;// 復制偏移量c->reploff = 0;// 通過 ACK 命令接收到的偏移量c->repl_ack_off = 0;// 通過 AKC 命令接收到偏移量的時間c->repl_ack_time = 0;// 客戶端為從服務器時使用,記錄了從服務器所使用的端口號c->slave_listening_port = 0;// 回復鏈表c->reply = listCreate();// 回復鏈表的字節(jié)量c->reply_bytes = 0;// 回復緩沖區(qū)大小達到軟限制的時間c->obuf_soft_limit_reached_time = 0;// 回復鏈表的釋放和復制函數(shù)listSetFreeMethod(c->reply,decrRefCountVoid);listSetDupMethod(c->reply,dupClientReplyValue);// 阻塞類型c->btype = REDIS_BLOCKED_NONE;// 阻塞超時c->bpop.timeout = 0;// 造成客戶端阻塞的列表鍵c->bpop.keys = dictCreate(&setDictType,NULL);// 在解除阻塞時將元素推入到 target 指定的鍵中// BRPOPLPUSH 命令時使用c->bpop.target = NULL;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;// 進行事務時監(jiān)視的鍵c->watched_keys = listCreate();// 訂閱的頻道和模式c->pubsub_channels = dictCreate(&setDictType,NULL);c->pubsub_patterns = listCreate();c->peerid = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);listSetMatchMethod(c->pubsub_patterns,listMatchObjects);// 如果不是偽客戶端,那么添加到服務器的客戶端鏈表中if (fd != -1) listAddNodeTail(server.clients,c);// 初始化客戶端的事務狀態(tài)initClientMultiState(c);// 返回客戶端return c; } /* With multiplexing we need to take per-client state.* Clients are taken in a liked list.** 因為 I/O 復用的緣故,需要為每個客戶端維持一個狀態(tài)。** 多個客戶端狀態(tài)被服務器用鏈表連接起來。*/ typedef struct redisClient {// 套接字描述符int fd;// 當前正在使用的數(shù)據(jù)庫redisDb *db;// 當前正在使用的數(shù)據(jù)庫的 id (號碼)int dictid;// 客戶端的名字robj *name; /* As set by CLIENT SETNAME */// 查詢緩沖區(qū)sds querybuf;// 查詢緩沖區(qū)長度峰值size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */// 參數(shù)數(shù)量int argc;// 參數(shù)對象數(shù)組robj **argv;// 記錄被客戶端執(zhí)行的命令struct redisCommand *cmd, *lastcmd;// 請求的類型:內(nèi)聯(lián)命令還是多條命令int reqtype;// 剩余未讀取的命令內(nèi)容數(shù)量int multibulklen; /* number of multi bulk arguments left to read */// 命令內(nèi)容的長度long bulklen; /* length of bulk argument in multi bulk request */// 回復鏈表list *reply;// 回復鏈表中對象的總大小unsigned long reply_bytes; /* Tot bytes of objects in reply list */// 已發(fā)送字節(jié),處理 short write 用int sentlen; /* Amount of bytes already sent in the currentbuffer or object being sent. */// 創(chuàng)建客戶端的時間time_t ctime; /* Client creation time */// 客戶端最后一次和服務器互動的時間time_t lastinteraction; /* time of the last interaction, used for timeout */// 客戶端的輸出緩沖區(qū)超過軟性限制的時間time_t obuf_soft_limit_reached_time;// 客戶端狀態(tài)標志int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */// 當 server.requirepass 不為 NULL 時// 代表認證的狀態(tài)// 0 代表未認證, 1 代表已認證int authenticated; /* when requirepass is non-NULL */// 復制狀態(tài)int replstate; /* replication state if this is a slave */// 用于保存主服務器傳來的 RDB 文件的文件描述符int repldbfd; /* replication DB file descriptor */// 讀取主服務器傳來的 RDB 文件的偏移量off_t repldboff; /* replication DB file offset */// 主服務器傳來的 RDB 文件的大小off_t repldbsize; /* replication DB file size */sds replpreamble; /* replication DB preamble. */// 主服務器的復制偏移量long long reploff; /* replication offset if this is our master */// 從服務器最后一次發(fā)送 REPLCONF ACK 時的偏移量long long repl_ack_off; /* replication ack offset, if this is a slave */// 從服務器最后一次發(fā)送 REPLCONF ACK 的時間long long repl_ack_time;/* replication ack time, if this is a slave */// 主服務器的 master run ID// 保存在客戶端,用于執(zhí)行部分重同步char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */// 從服務器的監(jiān)聽端口號int slave_listening_port; /* As configured with: SLAVECONF listening-port */// 事務狀態(tài)multiState mstate; /* MULTI/EXEC state */// 阻塞類型int btype; /* Type of blocking op if REDIS_BLOCKED. */// 阻塞狀態(tài)blockingState bpop; /* blocking state */// 最后被寫入的全局復制偏移量long long woff; /* Last write global replication offset. */// 被監(jiān)視的鍵list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */// 這個字典記錄了客戶端所有訂閱的頻道// 鍵為頻道名字,值為 NULL// 也即是,一個頻道的集合dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */// 鏈表,包含多個 pubsubPattern 結(jié)構(gòu)// 記錄了所有訂閱頻道的客戶端的信息// 新 pubsubPattern 結(jié)構(gòu)總是被添加到表尾list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */sds peerid; /* Cached peer ID. *//* Response buffer */// 回復偏移量int bufpos;// 回復緩沖區(qū)char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;當客戶端發(fā)送命令過來時,epoll返回,readQueryFromClient被調(diào)用,注意回調(diào)函數(shù)的轉(zhuǎn)變。沒建立連接之前是關(guān)聯(lián)acceptTcpHandler,建立連接之后關(guān)聯(lián)readQueryFromClient函數(shù)讀取客戶端的數(shù)據(jù)。
/** 讀取客戶端的查詢緩沖區(qū)內(nèi)容*/ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {printf("-----------%s--------\n",__FUNCTION__);redisClient *c = (redisClient*) privdata;int nread, readlen;size_t qblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);// 設置服務器的當前客戶端server.current_client = c;// 讀入長度(默認為 16 MB)readlen = REDIS_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= REDIS_MBULK_BIG_ARG){int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;}// 獲取查詢緩沖區(qū)當前內(nèi)容的長度// 如果讀取出現(xiàn) short read ,那么可能會有內(nèi)容滯留在讀取緩沖區(qū)里面// 這些滯留內(nèi)容也許不能完整構(gòu)成一個符合協(xié)議的命令,qblen = sdslen(c->querybuf);// 如果有需要,更新緩沖區(qū)內(nèi)容長度的峰值(peak)if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;// 為查詢緩沖區(qū)分配空間c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);// 讀入內(nèi)容到查詢緩存nread = read(fd, c->querybuf+qblen, readlen);//接收客戶端發(fā)送過來的數(shù)據(jù)到// 讀入出錯if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}// 遇到 EOF} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {// 根據(jù)內(nèi)容,更新查詢緩沖區(qū)(SDS) free 和 len 屬性// 并將 '\0' 正確地放到內(nèi)容的最后sdsIncrLen(c->querybuf,nread);// 記錄服務器和客戶端最后一次互動的時間c->lastinteraction = server.unixtime;// 如果客戶端是 master 的話,更新它的復制偏移量if (c->flags & REDIS_MASTER) c->reploff += nread;} else {// 在 nread == -1 且 errno == EAGAIN 時運行server.current_client = NULL;return;}// 查詢緩沖區(qū)長度超出服務器最大緩沖區(qū)長度// 清空緩沖區(qū)并釋放客戶端if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}// 從查詢緩存重讀取內(nèi)容,創(chuàng)建參數(shù),并執(zhí)行命令// 函數(shù)會執(zhí)行到緩存中的所有內(nèi)容都被處理完為止processInputBuffer(c);server.current_client = NULL; }收到客戶端的命令之后就要分析并執(zhí)行命令,然后被結(jié)果返給客戶端。readQueryFromClient->processInputBuffer->processCommand->addReply->prepareClientToWrite。prepareClientToWrite這個函數(shù)就是注冊回復客戶端的函數(shù)sendReplyToClient。
int prepareClientToWrite(redisClient *c) {// LUA 腳本環(huán)境所使用的偽客戶端總是可寫的if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;// 客戶端是主服務器并且不接受查詢,// 那么它是不可寫的,出錯if ((c->flags & REDIS_MASTER) &&!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;// 無連接的偽客戶端總是不可寫的if (c->fd <= 0) return REDIS_ERR; /* Fake client */// 一般情況,為客戶端套接字安裝寫處理器到事件循環(huán)if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||c->replstate == REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR) return REDIS_ERR;return REDIS_OK; }每一個階段都關(guān)聯(lián)一個回調(diào)函數(shù),當事件觸發(fā)后走回調(diào)函數(shù)。
總結(jié)
以上是生活随笔為你收集整理的redis源码客户端和服务端通信过程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机组成原理:储存系统和结构
- 下一篇: redis源码epoll用法