Redis源码简要分析
在文章的開頭我們把所有服務端文件列出來,并且標示出其作用:
adlist.c //雙向鏈表
ae.c //事件驅動
ae_epoll.c //epoll接口, linux用
ae_kqueue.c //kqueue接口, freebsd用
ae_select.c //select接口, windows用
anet.c //網絡處理
aof.c //處理AOF文件
config.c //配置文件解析
db.c //DB處理
dict.c //hash表
intset.c //轉換為數字類型數據
multi.c //事務,多條命令一起打包處理
networking.c //讀取、解析和處理客戶端命令
object.c //各種對像的創建與銷毀,string、list、set、zset、hash
rdb.c //redis數據文件處理
redis.c //程序主要文件
replication.c //數據同步master-slave
sds.c //字符串處理
sort.c //用于list、set、zset排序
t_hash.c //hash類型處理
t_list.c //list類型處理
t_set.c //set類型處理
t_string.c //string類型處理
t_zset.c //zset類型處理
ziplist.c //節省內存方式的list處理
zipmap.c //節省內存方式的hash處理
zmalloc.c //內存管理
上面基本是redis最主要的處理文件,部分沒有列出來,如VM之類的,就不在這里講了。
首先我們來回顧一下redis的一些基本知識:
1、redis有N個DB(默認為16個DB),并且每個db有一個hash表負責存放key,同一個DB不能有相同的KEY,但是不同的DB可以相同的KEY;
2、支持的幾種數據類型:string、hash、list、set、zset;
3、redis可以使用aof來保存寫操作日志(也可以使用快照方式保存數據文件)
對于數據類型在這里簡單的介紹一下(網上有圖,下面我貼上圖片可能更容易理解)
1、對于一個string對像,直接存儲內容;
2、對于一個hash對像,當成員數量少于512的時候使用zipmap(一種很省內存的方式實現hash table),反之使用hash表(key存儲成員名,value存儲成員數據);
3、對于一個list對像,當成員數量少于512的時候使用ziplist(一種很省內存的方式實現list),反之使用雙向鏈表(list);
4、對于一個set對像,使用hash表(key存儲數據,內容為空)
5、對于一個zset對像,使用跳表(skip list),關于跳表的相關內容可以查看本blog的跳表學習筆記;
下面正式進入源代碼的分析
1、首先是初始化配置,initServerConfig(redis.c:759)
void initServerConfig() {
server.port = REDIS_SERVERPORT;
server.bindaddr = NULL;
server.unixsocket = NULL;
server.ipfd = -1;
server.sofd = -1;
2.在初始化配置中調用了populateCommandTable(redis.c:925)函數,該函數的目地是將命令集分布到一個hash table中,大家可以看到每一個命令都對應一個處理函數,因為redis支持的命令集還是蠻多,所以如果要靠if分支來做命令處理的話即繁瑣效率還底, 因此放到hash table中,在理想的情況下只需一次就能定位命令的處理函數。
void populateCommandTable(void) {
int j;
int numcommands = sizeof(readonlyCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = readonlyCommandTable+j;
int retval;
retval = dictAdd(server.commands, sdsnew(c->name), c);
assert(retval == DICT_OK);
}
}
3、對參數的解析,redis-server有一個參數(可以不需要),這個參數是指定配置文件路徑,然后由函數loadServerConfig(config.c:28)加載所有配置
if (argc == 2) {
if (strcmp(argv[1], “-v”) == 0 ||
strcmp(argv[1], “–version”) == 0) version();
if (strcmp(argv[1], “–help”) == 0) usage();
resetServerSaveParams();
loadServerConfig(argv[1]);
4、初始化服務器initServer(redis.c:836),?該函數初始化一些服務器信息,包括創建事件處理對像、db、socket、客戶端鏈表、公共字符串等。
void initServer() {
int j;
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();//設置信號處理
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
5、在上面初始化服務器中有一段代碼是創建事件驅動,aeCreateTimeEvent是創建一個定時器,下面創建的定時器將會每毫秒調用?serverCron函數,而aeCreateFileEvent是創建網絡事件驅動,當server.ipfd和server.sofd有數據可讀的情 況將會分別調用函數acceptTcpHandler和acceptUnixHandler。
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom(“creating file event”);
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) oom(“creating file event”);
6、接下來就是初始化數據,如果開啟了AOF,那么會調用loadAppendOnlyFile(aof.c:216)去加載AOF文件,在AOF?文件中存放了客戶端的命令,函數將數據讀取出來然后依次去調用命令集去處理,當AOF文件很大的時候勢必為影響客戶端的請求,所以每處理1000條命令就 會去嘗試接受和處理客戶端的請求,其代碼在aof.c第250行;?但是如果沒有開啟AOF并且有rdb的情況,會調用rdbLoad(redis.c:873)嘗試去加載rdb文件,理所當然的在加載rdb文件的內部也 會考慮文件太大而影響客戶端請求,所以跟AOF一樣,每處理1000條也會嘗試去接受和處理客戶端請求。
7、當所有初始化工作做完之后,服務端就開始正式工作了
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
8、大家都知道redis是單線程模式,所有的請求、處理都是在同一個線程里面進行,也就是一個無限循環,在這個無限循環的內部有兩件事要做,第一 件就是調用通過aeSetBeforeSleepProc函數設置的回調函數,第二件就是開始接受客戶端的請求和處理,所以我們可以在第7節看到設置了回 調函數為beforeSleep,但是這個beforeSleep到底有什么作用呢?我們在第9節再詳細講述。對于aeMain(ae.c:375)就是 整個程序的主要循環。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
9、在beforeSleep內部一共有三部分,第一部分對vm進行處理(即第一個if塊),這里我們略過;第二部分是釋放客戶端的阻塞操作,在?redis里有兩個命令BLPOP和BRPOP需要使用這些操作(彈出列表頭或者尾,實現方式見t_list.c:862行的?blockingPopGenericCommand函數),當指定的key不存在或者列表為空的情況下,那么客戶端會一直阻塞,直到列表有數據時,服務 端就會去執行lpop或者rpop并返回給客戶端,那么什么時候需要用到BLPOP和BRPOP呢?大家平時肯定用redis做過隊列,最常見的處理方式 就是使用llen去判斷隊列有沒有數據,如果有數據就去取N條,然后處理,如果沒有就sleep(3),然后繼續循環,其實這里就可以使用BLPOP或者?BRPOP來輕松實現,而且可以減少請求,具體怎么實現留給大家思考;第三部分就是flushAppendOnlyFile(aof.c:60),這個函 數主要目的是將aofbuf的數據寫到文件,那aofbuf是什么呢?他是AOF的一個緩沖區,所以客戶端的命令都會在處理完后把這些命令追加到這個緩沖 區中,然后待一輪數據處理完之后統一寫到文件(所以aof也是不能100%保證數據不丟失的,因為如果當redis正在處理這些命令的情況下服務就掛掉, 那么這部分的數據是沒有保存到硬盤的),大家都知道寫數據到文件并不是立即寫到硬盤,只是保存到一個文件緩沖區中,什么情況下會把緩沖區的數據轉到硬盤 呢?只要滿足如下三種條件的一種就能將數據真正存到硬盤:1、手動調用刷新緩沖區;2、緩沖區已滿;3、程序正常退出。因此redis將數據寫到文件緩沖 區之后會判斷是否需要刷到硬盤,server.appendfsync有兩種方式,第一種(APPENDFSYNC_ALWAYS):無條件刷新,即每次 寫文件都會保存到硬盤,第二種(APPENDFSYNC_EVERYSEC):每隔一秒保存到硬盤。
10、接下來我們開始講解aeProcessEvents(ae.c:275)的處理流程,首先我們來回顧一下第5節設置的定時器和監聽?socket事件處理,其中socket事件處理會回調acceptTcpHandler(networking.c:410)和定時器回調函數?serverCron(redis.c:519),在aeProcessEvents的內部有兩部分需要處理,第一部分是調用aeApiPoll判斷?socket是否有數據可讀,整個服務端的socket里面要分監聽socket和客戶端socket,當有客戶端鏈接服務器時,會觸發監聽socket?的事件處理函數,也就是acceptTcpHandler,而acceptTcpHandler會去調用?createClient(networking.c:13)創建客戶端對像,然后為這個客戶端設置事件處理函數?readQueryFromClient(networking.c:827),所以當客戶端有消息時就會觸發客戶端socket事件處理函數,處理數據部分講在后面詳細講解,接下來的第二部分就是定時器,每次在socket部分處理完后就用調用processTimeEvents(ae.c:212)來處理定時器,那么內部實現也很簡單,當設置定時器的時候就會計算好應該觸發的時間,所以這里就 只需要判斷當前時間是否大于或者等于應該觸發的時間即可。那么這個定時器到底做了什么呢?請繼續第11節。
11、我們繼續跟蹤源代碼serverCron(redis.c:519),整個函數分為七個部分,第一部分:在服務端打印一些關于DB的信息(包 括key數量,內存使用量等);第二部分:判斷DB的hash table是否需要擴展大小tryResizeHashTables(redis.c:432);第三部分:關閉太長時間沒有通信的鏈接closeTimedoutClients(networking.c:629);第四部分:保存rdb文件?rdbSaveBackground(rdb.c:507),當然也是在需要保存的情況才會保存,即設置save參數;第五部分:清除過期的key,當然 這里不是清除全部,他只是隨機取出一些activeExpireCycle(redic.c:477);第六部分:虛擬內存交換部分,將一部分key轉到 虛擬內存中,這里的key也是隨機抽取的, vmSwapOneObjectBlocking(vm.c:521);第七部分:主從同 步,replicationCron(replication.c:500)。
12、在第10節中我們講到客戶端socket處理函數readQueryFromClient,這里我們一層層分析,首先是從客戶端讀取數據,然 后調用processInputBuffer,在內部先是判斷類型,然后調用processInlineBuffer或者?processMultibulkBuffer解析參數,解析后的參數由argv存儲參數,其類型是一個指向指針的指針,其中argv[0]是命令名稱, 后面就是命令參數,argc存儲參數數量;然后調用processCommand(redis.c:979)處理命令,在內部調用?lookupCommand(redis.c:940)獲取命令對應的函數,然后調用freeMemoryIfNeeded(redis.c:1385)?判斷是否需要釋放一些內存,接下來就是調用call(redis.c:954)去執行命令,執行命令后會調用feedAppendOnlyFile(aof.c:137)把命令行保存到aofbuf中,然后判斷是否需要同步數據到slave,如果需要則調用replicationFeedSlaves(replication.c:10),接下來就是判斷是否需要將數據發送到監控端,如果需要則調用replicationFeedMonitors(replication.c:82),到這里整個服務流程就結束了。至于每條命令如何執行,大家可以去 查看以t_開頭的幾個文件。下面是一張整個服務的流程圖。
注:redis源代碼為2.2.8,希望大家看文章的時候配合源代碼一起看,更容易理解
Redis通過定義一個 struct redisServer 類型的全局變量server 來保存服務器的相關信息(比如:配置信息,統計信息,服務器狀態等等)。啟動時通過讀取配置文件里邊的信息對server進行初始化(如果沒有指定配置文件,將使用默認值對sever進行初始化),初始化的內容有:起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,虛擬內存初始化,log初始化等等。
啟動
初始化服務器配置
先來看看redis 的main函數的入口
Redis.c:1694
| int?main(int?argc,?char?**argv)?{? ????time_t?start;? ????initServerConfig();? ????if?(argc?==?2)?{? ????????if?(strcmp(argv[1],?"-v")?==?0?||? ????????????strcmp(argv[1],?"--version")?==?0)?version();? ????????if?(strcmp(argv[1],?"--help")?==?0)?usage();? ????????resetServerSaveParams();? ????????loadServerConfig(argv[1]);? ????}?else?if?((argc?>?2))?{? ????????usage();? ????}?else?{? ????????...? ????}? ????if?(server.daemonize)?daemonize();? ????initServer();? ????... |
- initServerConfig初始化全局變量 server 的屬性為默認值。
- 如果命令行指定了配置文件, resetServerSaveParams重置對落地備份的配置(即重置為默認值)并讀取配置文件的內容對全局變量 server 再進行初始化 ,沒有在配置文件中配置的將使用默認值。
- 如果服務器配置成后臺執行,則對服務器進行 daemonize。
- initServer初始化服務器,主要是設置信號處理函數,初始化事件輪詢,起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,初始化虛擬內存和log等等。
- 創建服務器監聽端口。
Redis.c:923
| ????if?(server.port?!=?0)?{? ????????server.ipfd=?anetTcpServer(server.neterr,server.port,server.bindaddr);? ????????if?(server.ipfd?==?ANET_ERR)?{? ????????????redisLog(REDIS_WARNING,?"Opening?port?%d:?%s",? ????????????????server.port,?server.neterr);? ????????????exit(1);? ????????}? ????} |
- anetTcpServer創建一個socket并進行監聽,然后把返回的socket fd賦值給server.ipfd。
事件輪詢結構體定義
先看看事件輪詢的結構體定義
Ae.h:88
| /*?State?of?an?event?based?program?*/? typedef?struct?aeEventLoop?{? ????int?maxfd;? ????long?long?timeEventNextId;? ????aeFileEvent?events[AE_SETSIZE];?/*?Registered?events?*/? ????aeFiredEvent?fired[AE_SETSIZE];?/*?Fired?events?*/? ????aeTimeEvent?*timeEventHead;? ????int?stop;? ????void?*apidata;?/*?This?is?used?for?polling?API?specific?data?*/? ????aeBeforeSleepProc?*beforesleep;? }?aeEventLoop; |
- maxfd是最大的文件描述符,主要用來判斷是否有文件事件需要處理(ae.c:293)和當使用select 來處理網絡IO時作為select的參數(ae_select.c:50)。
- timeEventNextId 是下一個定時事件的ID。
- events[AE_SETSIZE]用于保存通過aeCreateFileEvent函數創建的文件事件,在sendReplyToClient函數和freeClient函數中通過調用aeDeleteFileEvent函數刪除已經處理完的事件。
- fired[AE_SETSIZE]用于保存已經觸發的文件事件,在對應的網絡I/O函數中進行賦值(epoll,select,kqueue),不會對fired進行刪除操作,只會一直覆蓋原來的值。然后在aeProcessEvents函數中對已經觸發的事件進行處理。
- timeEventHead 是定時事件鏈表的頭,定時事件的存儲用鏈表實現。
- Stop 用于停止事件輪詢處理。
- apidata 用于保存輪詢api需要的數據,即aeApiState結構體,對于epoll來說,aeApiState結構體的定義如下:
| typedef?struct?aeApiState?{? ????int?epfd;? ????struct?epoll_event?events[AE_SETSIZE];? }?aeApiState; |
- beforesleep 是每次進入處理事件時執行的函數。
創建事件輪詢
Redis.c:920
| ??server.el?=?aeCreateEventLoop();? Ae.c:55? aeEventLoop?*aeCreateEventLoop(void)?{? ????aeEventLoop?*eventLoop;? ????int?i;? ????eventLoop?=?zmalloc(sizeof(*eventLoop));? ????if?(!eventLoop)?return?NULL;? ????eventLoop->timeEventHead?=?NULL;? ????eventLoop->timeEventNextId?=?0;? ????eventLoop->stop?=?0;? ????eventLoop->maxfd?=?-1;? ????eventLoop->beforesleep?=?NULL;? ????if?(aeApiCreate(eventLoop)?==?-1)?{? ????????zfree(eventLoop);? ????????return?NULL;? ????}? /*?Events?with?mask?==?AE_NONE?are?not?set.?So?let's?initialize? ?*?the?vector?with?it.?*/? ????for?(i?=?0;?i?<?AE_SETSIZE;?i++)? ????????eventLoop->events[i].mask?=?AE_NONE;? ????return?eventLoop;? } |
綁定定時函數和有新連接時的回調函數
redis.c:973
| aeCreateTimeEvent(server.el,?1,?serverCron,?NULL,?NULL);? if?(server.ipfd?>?0?&&? ????aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,? acceptTcpHandler,NULL)?==?AE_ERR)?oom("creating?file?event"); |
- aeCreateTimeEvent創建定時事件并綁定回調函數serverCron,這個定時事件第一次是超過1毫秒就有權限執行,如果其他事件的處理時間比較長,可能會出現超過一定時間都沒執行情況。這里的1毫秒只是超過后有可執行的權限,并不是一定會執行。第一次執行后,如果還要執行,是由定時函數的返回值確定的,在processTimeEvents(ae.c:219)中,當調用定時回調函數后,獲取定時回調函數的返回值,如果返回值不等于-1,則設置定時回調函數的下一次觸發時間為當前時間加上定時回調函數的返回值,即調用間隔時間。serverCron的返回值是100ms,表明從二次開始,每超過100ms就有權限執行。(定時回調函數serverCron用于更新lru時鐘,更新服務器的狀態,打印一些服務器信息,符合條件的情況下對hash表進行重哈希,啟動后端寫AOF或者檢查后端寫AOF或者備份是否完成,檢查過期的KEY等等)
- aeCreateFileEvent創建監聽端口的socket fd的文件讀事件(即注冊網絡io事件)并綁定回調函數acceptTcpHandler。
進入事件輪詢
初始化后將進入事件輪詢
Redis.c:1733
| ????aeSetBeforeSleepProc(server.el,beforeSleep);? ????aeMain(server.el);? ????aeDeleteEventLoop(server.el); |
- 設置每次進入事件處理前會執行的函數beforeSleep。
- 進入事件輪詢aeMain。
- 退出事件輪詢后刪除事件輪詢,釋放事件輪詢占用內存aeDeleteEventLoop(不過沒在代碼中發現有執行到這一步的可能,服務器接到shutdown命令時通過一些處理后直接就通過exit退出了,可能是我看錯了,待驗證)。
事件輪詢函數aeMain
看看aeMain的內容
Ae.c:382
| void?aeMain(aeEventLoop?*eventLoop)?{? ????eventLoop->stop?=?0;? ????while?(!eventLoop->stop)?{? ????????if?(eventLoop->beforesleep?!=?NULL)? ????????????eventLoop->beforesleep(eventLoop);? ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);? ????}? } |
- 每次進入事件處理前,都會調用設置的beforesleep,beforeSleep函數主要是處理被阻塞的命令和根據配置寫AOF。
- aeProcessEvents處理定時事件和網絡io事件。
啟動完畢,等待客戶端請求
到進入事件輪詢函數后,redis的啟動工作就做完了,接下來就是等待客戶端的請求了。
接收請求
新連接到來時的回調函數
在綁定定時函數和有新連接時的回調函數中說到了綁定有新連接來時的回調函數acceptTcpHandler,現在來看看這個函數的具體內容
Networking.c:427
| void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{? ????int?cport,?cfd;? ????char?cip[128];? ????REDIS_NOTUSED(el);? ????REDIS_NOTUSED(mask);? ????REDIS_NOTUSED(privdata);? ????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?&cport);? ????if?(cfd?==?AE_ERR)?{? ????????redisLog(REDIS_WARNING,"Accepting?client?connection:?%s",?server.neterr);? ????????return;? ????}? ????redisLog(REDIS_VERBOSE,"Accepted?%s:%d",?cip,?cport);? ????acceptCommonHandler(cfd);? } |
- anetTcpAccept 函數 accept新連接,返回的cfd是新連接的socket fd。
- acceptCommonHandler 函數是對新建立的連接進行處理,這個函數在使用 unix socket 時也會被用到。
接收客戶端的新連接
接下來看看anetTcpAccept函數的具體內容
| Anet.c:330? int?anetTcpAccept(char?*err,?int?s,?char?*ip,?int?*port)?{? ????int?fd;? ????struct?sockaddr_in?sa;? ????socklen_t?salen?=?sizeof(sa);? ????if?((fd?=?anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen))?==?ANET_ERR)? ????????return?ANET_ERR;? ????if?(ip)?strcpy(ip,inet_ntoa(sa.sin_addr));? ????if?(port)?*port?=?ntohs(sa.sin_port);? ????return?fd;? } |
再進去anetGenericAccept 看看
Anet.c:313
| static?int?anetGenericAccept(char?*err,?int?s,?struct?sockaddr?*sa,?socklen_t?*len)?{? ????int?fd;? ????while(1)?{? ????????fd?=?accept(s,sa,len);? ????????if?(fd?==?-1)?{? ????????????if?(errno?==?EINTR)? ????????????????continue;? ????????????else?{? ????????????????anetSetError(err,?"accept:?%s",?strerror(errno));? ????????????????return?ANET_ERR;? ????????????}? ????????}? ????????break;? ????}? ????return?fd;? } |
- anetTcpAccept 函數中調用anetGenericAccept 函數進行接收新連接,anetGenericAccept函數在 unix socket 的新連接處理中也會用到。
- anetTcpAccept 函數接收新連接后,獲取客戶端得ip,port 并返回。
創建redisClient進行接收處理
anetTcpAccept 運行完后,返回新連接的socket fd, 然后返回到調用函數acceptTcpHandler中,繼續執行acceptCommonHandler 函數
Networking.c:403
| static?void?acceptCommonHandler(int?fd)?{? ????redisClient?*c;? ????if?((c?=?createClient(fd))?==?NULL)?{? ????????redisLog(REDIS_WARNING,"Error?allocating?resoures?for?the?client");? ????????close(fd);?/*?May?be?already?closed,?just?ingore?errors?*/? ????????return;? ????}? ????/*?If?maxclient?directive?is?set?and?this?is?one?client?more...?close?the? ?????*?connection.?Note?that?we?create?the?client?instead?to?check?before? ?????*?for?this?condition,?since?now?the?socket?is?already?set?in?nonblocking? ?????*?mode?and?we?can?send?an?error?for?free?using?the?Kernel?I/O?*/? ????if?(server.maxclients?&&?listLength(server.clients)?>?server.maxclients)?{? ????????char?*err?=?"-ERR?max?number?of?clients?reached\r\n";? ????????/*?That's?a?best?effort?error?message,?don't?check?write?errors?*/? ????????if?(write(c->fd,err,strlen(err))?==?-1)?{? ????????????/*?Nothing?to?do,?Just?to?avoid?the?warning...?*/? ????????}? ????????freeClient(c);? ????????return;? ????}? ????server.stat_numconnections++;? } |
- 創建一個 redisClient 來處理新連接,每個連接都會創建一個 redisClient 來處理。
- 如果配置了最大并發客戶端,則對現有的連接數進行檢查和處理。
- 最后統計連接數。
綁定有數據可讀時的回調函數
Networking.c:15
| redisClient?*createClient(int?fd)?{? ????redisClient?*c?=?zmalloc(sizeof(redisClient));? ????c->bufpos?=?0;? ????anetNonBlock(NULL,fd);? ????anetTcpNoDelay(NULL,fd);? ????if?(aeCreateFileEvent(server.el,fd,AE_READABLE,? ????????readQueryFromClient,?c)?==?AE_ERR)? ????{? ????????close(fd);? ????????zfree(c);? ????????return?NULL;? ????}? ????selectDb(c,0);? ????c->fd?=?fd;? ????c->querybuf?=?sdsempty();? c->reqtype?=?0;? ...? } |
- 創建新連接的socket fd對應的文件讀事件,綁定回調函數readQueryFromClient。
- 如果創建成功,則對 redisClient 進行一系列的初始化,因為 redisClient 是通用的,即不管是什么命令的請求,都是通過創建一個 redisClient 來處理的,所以會有比較多的字段需要初始化。
createClient 函數執行完后返回到調用處acceptCommonHandler函數,然后從acceptCommonHandler函數再返回到acceptTcpHandler函數。
接收請求完畢,準備接收客戶端得數據
到此為止,新連接到來時的回調函數acceptTcpHandler執行完畢,在這個回調函數中創建了一個redisClient來處理這個客戶端接下來的請求,并綁定了接收的新連接的讀文件事件。當有數據可讀時,網絡i/o輪詢(比如epoll)會有事件觸發,此時綁定的回調函數readQueryFromClient將會調用來處理客戶端發送過來的數據。
讀取客戶端請求的數據
在綁定有數據可讀時的回調函數中的createClient函數中綁定了一個有數據可讀時的回調函數readQueryFromClient函數,現在看看這個函數的具體內容
Networking.c:874
| void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{? ????redisClient?*c?=?(redisClient*)?privdata;? ????char?buf[REDIS_IOBUF_LEN];? ????int?nread;? ????REDIS_NOTUSED(el);? ????REDIS_NOTUSED(mask);? ????server.current_client?=?c;? ????nread?=?read(fd,?buf,?REDIS_IOBUF_LEN);? ????if?(nread?==?-1)?{? ????????if?(errno?==?EAGAIN)?{? ????????????nread?=?0;? ????????}?else?{? ????????????redisLog(REDIS_VERBOSE,?"Reading?from?client:?%s",strerror(errno));? ????????????freeClient(c);? ????????????return;? ????????}? ????}?else?if?(nread?==?0)?{? ????????redisLog(REDIS_VERBOSE,?"Client?closed?connection");? ????????freeClient(c);? ????????return;? ????}? ????if?(nread)?{? ????????c->querybuf?=?sdscatlen(c->querybuf,buf,nread);? ????????c->lastinteraction?=?time(NULL);? ????}?else?{? ????????server.current_client?=?NULL;? ????????return;? ????}? ????if?(sdslen(c->querybuf)?>?server.client_max_querybuf_len)?{? ????????sds?ci?=?getClientInfoString(c),?bytes?=?sdsempty();? ????????bytes?=?sdscatrepr(bytes,c->querybuf,64);? ????????redisLog(REDIS_WARNING,"Closing?client?that?reached?max?query?buffer?length:?%s?(qbuf?initial?bytes:?%s)",?ci,?bytes);? ????????sdsfree(ci);? ????????sdsfree(bytes);? ????????freeClient(c);? ????????return;? ????}? ????processInputBuffer(c);? ????server.current_client?=?NULL;? } |
- 調用系統函數read來讀取客戶端傳送過來的數據,調用read后對讀取過程中被系統中斷的情況(nread == -1 && errno == EAGAIN),客戶端關閉的情況(nread == 0)進行了判斷處理。
- 如果讀取的數據超過限制(1GB)則報錯。
- 讀取完后進入processInputBuffer進行協議解析。
請求協議
從readQueryFromClient函數讀取客戶端傳過來的數據,進入processInputBuffer函數進行協議解析,可以把processInputBuffer函數看作是輸入數據的協議解析器
Networking.c:835
| void?processInputBuffer(redisClient?*c)?{? ????/*?Keep?processing?while?there?is?something?in?the?input?buffer?*/? ????while(sdslen(c->querybuf))?{? ????????/*?Immediately?abort?if?the?client?is?in?the?middle?of?something.?*/? ????????if?(c->flags?&?REDIS_BLOCKED?||?c->flags?&?REDIS_IO_WAIT)?return;? ????????/*?REDIS_CLOSE_AFTER_REPLY?closes?the?connection?once?the?reply?is? ?????????*?written?to?the?client.?Make?sure?to?not?let?the?reply?grow?after? ?????????*?this?flag?has?been?set?(i.e.?don't?process?more?commands).?*/? ????????if?(c->flags?&?REDIS_CLOSE_AFTER_REPLY)?return;? ????????/*?Determine?request?type?when?unknown.?*/? ????????if?(!c->reqtype)?{? ????????????if?(c->querybuf[0]?==?'*')?{? ????????????????c->reqtype?=?REDIS_REQ_MULTIBULK;? ????????????}?else?{? ????????????????c->reqtype?=?REDIS_REQ_INLINE;? ????????????}? ????????}? ????????if?(c->reqtype?==?REDIS_REQ_INLINE)?{? ????????????if?(processInlineBuffer(c)?!=?REDIS_OK)?break;? ????????}?else?if?(c->reqtype?==?REDIS_REQ_MULTIBULK)?{? ????????????if?(processMultibulkBuffer(c)?!=?REDIS_OK)?break;? ????????}?else?{? ????????????redisPanic("Unknown?request?type");? ????????}? ????????/*?Multibulk?processing?could?see?a?<=?0?length.?*/? ????????if?(c->argc?==?0)?{? ????????????resetClient(c);? ????????}?else?{? ????????????/*?Only?reset?the?client?when?the?command?was?executed.?*/? ????????????if?(processCommand(c)?==?REDIS_OK)? ????????????????resetClient(c);? ????????}? ????}? } |
- Redis支持兩種協議,一種是inline,一種是multibulk。inline協議是老協議,現在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協議。
- 如果客戶端傳送的數據的第一個字符時‘*’,那么傳送數據將被當做multibulk協議處理,否則將被當做inline協議處理。Inline協議的具體解析函數是processInlineBuffer,multibulk協議的具體解析函數是processMultibulkBuffer。
- 當協議解析完畢,即客戶端傳送的數據已經解析出命令字段和參數字段,接下來進行命令處理,命令處理函數是processCommand。
Inline請求協議
Networking.c:679
| int?processInlineBuffer(redisClient?*c)?{? ????...? } |
- 根據空格分割客戶端傳送過來的數據,把傳送過來的命令和參數保存在argv數組中,把參數個數保存在argc中,argc的值包括了命令參數本身。即set key value命令,argc的值為3。詳細解析見協議詳解
Multibulk請求協議
Multibulk協議比inline協議復雜,它是二進制安全的,即傳送數據可以包含不安全字符。Inline協議不是二進制安全的,比如,如果set key value命令中的key或value包含空白字符,那么inline協議解析時將會失敗,因為解析出來的參數個數與命令需要的的參數個數會不一致。
協議格式
| *<number?of?arguments>?CR?LF? $<number?of?bytes?of?argument?1>?CR?LF? <argument?data>?CR?LF? ...? $<number?of?bytes?of?argument?N>?CR?LF? <argument?data>?CR?LF |
協議舉例
| *3? $3? SET? $5? mykey? $7? myvalue |
具體解析代碼位于
Networking.c:731
| int?processMultibulkBuffer(redisClient?*c)?{? ...? } |
詳細解析見協議詳解
處理命令
當協議解析完畢,則表示客戶端的命令輸入已經全部讀取并已經解析成功,接下來就是執行客戶端命令前的準備和執行客戶端傳送過來的命令
Redis.c:1062
| /*?If?this?function?gets?called?we?already?read?a?whole? ?*?command,?argments?are?in?the?client?argv/argc?fields.? ?*?processCommand()?execute?the?command?or?prepare?the? ?*?server?for?a?bulk?read?from?the?client.? ?*? ?*?If?1?is?returned?the?client?is?still?alive?and?valid?and? ?*?and?other?operations?can?be?performed?by?the?caller.?Otherwise? ?*?if?0?is?returned?the?client?was?destroied?(i.e.?after?QUIT).?*/? int?processCommand(redisClient?*c)?{? ...? ?/*?Now?lookup?the?command?and?check?ASAP?about?trivial?error?conditions? ??*?such?as?wrong?arity,?bad?command?name?and?so?forth.?*/? c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr);? ...? call(c);? ...? } |
- lookupCommand先根據客戶端傳送過來的數據查找該命令并找到命令的對應處理函數。
- Call函數調用該命令函數來處理命令,命令與對應處理函數的綁定位于。
Redi.c:72
| struct?redisCommand?*commandTable;? struct?redisCommand?readonlyCommandTable[]?=?{? {"get",getCommand,2,0,NULL,1,1,1},? ...? } |
回復請求
回復請求位于對應的命令中,以get命令為例
T_string.c:67
| void?getCommand(redisClient?*c)?{? ????getGenericCommand(c);? } |
T_string.c:52
| int?getGenericCommand(redisClient?*c)?{? ????robj?*o;? ????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk))?==?NULL)? ????????return?REDIS_OK;? ????if?(o->type?!=?REDIS_STRING)?{? ????????addReply(c,shared.wrongtypeerr);? ????????return?REDIS_ERR;? ????}?else?{? ????????addReplyBulk(c,o);? ????????return?REDIS_OK;? ????}? } |
- getGenericCommand在getset 命令中也會用到。
- lookupKeyReadOrReply是以讀數據為目的查詢key函數,并且如果該key不存在,則在該函數中做不存在的回包處理。
- 如果該key存在,則返回該key對應的數據,addReply函數以及以addReply函數開頭的都是回包函數。
綁定寫數據的回調函數
接下來看看addReply函數里的內容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{? ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return;? ????...? } |
Networking.c:64
| int?_installWriteEvent(redisClient?*c)?{? ????if?(c->fd?<=?0)?return?REDIS_ERR;? ????if?(c->bufpos?==?0?&&?listLength(c->reply)?==?0?&&? ????????(c->replstate?==?REDIS_REPL_NONE?||? ?????????c->replstate?==?REDIS_REPL_ONLINE)?&&? ????????aeCreateFileEvent(server.el,?c->fd,?AE_WRITABLE,? ????????sendReplyToClient,?c)?==?AE_ERR)?return?REDIS_ERR;? ????return?REDIS_OK;? } |
- addReply函數一進來就先調用綁定寫數據的回調函數installWriteEvent。
- installWriteEvent函數中創建了一個文件寫事件和綁定寫事件的回調函數為sendReplyToClient。
準備寫的數據內容
??? addReply函數一進來后就綁定寫數據的回調函數,接下來就是準備寫的數據內容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{? ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return;? ????redisAssert(!server.vm_enabled?||?obj->storage?==?REDIS_VM_MEMORY);? ????/*?This?is?an?important?place?where?we?can?avoid?copy-on-write? ?????*?when?there?is?a?saving?child?running,?avoiding?touching?the? ?????*?refcount?field?of?the?object?if?it's?not?needed.? ?????*? ?????*?If?the?encoding?is?RAW?and?there?is?room?in?the?static?buffer? ?????*?we'll?be?able?to?send?the?object?to?the?client?without? ?????*?messing?with?its?page.?*/? ????if?(obj->encoding?==?REDIS_ENCODING_RAW)?{? ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK)? ????????????_addReplyObjectToList(c,obj);? ????}?else?{? ????????/*?FIXME:?convert?the?long?into?string?and?use?_addReplyToBuffer()? ?????????*?instead?of?calling?getDecodedObject.?As?this?place?in?the? ?????????*?code?is?too?performance?critical.?*/? ????????obj?=?getDecodedObject(obj);? ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK)? ????????????_addReplyObjectToList(c,obj);? ????????decrRefCount(obj);? ????}? } |
- 先嘗試把要返回的內容添加到發送數據緩沖區中(redisClient->buf),如果該緩沖區的大小已經放不下這次想放進去的數據,或者已經有數據在排隊(redisClient->reply 鏈表不為空),則把數據添加到發送鏈表的尾部。
給客戶端答復數據
在綁定寫數據的回調函數中看到綁定了回調函數sendReplyToClient,現在來看看這個函數的主要內容
Networking.c:566
| void?sendReplyToClient(aeEventLoop?*el,?int?fd,?...)?{? ????...? while(c->bufpos?>?0?||?listLength(c->reply))?{? ????...? ????if(c->bufpos?>?0){? ????????...? ????????????nwritten=write(fd,...,c->bufpos-c->sentlen);? ????????????...? ????????}?else?{? ????????????o?=?listNodeValue(listFirst(c->reply));? ????????????...? ????????????nwritten=write(fd,...,objlen-c->sentlen);? ????????????...? ????????}? ????}? } |
- 通過調用系統函數write給客戶端發送數據,如果緩沖區有數據就把緩沖區的數據發送給客戶端,緩沖區的數據發送完了,如果有排隊數據,則繼續發送。
退出
Redis 服務器的退出是通過shutdown命令來退出的,退出前會做一系列的清理工作
Db.c:347
| void?shutdownCommand(redisClient?*c)?{? ????if?(prepareForShutdown()?==?REDIS_OK)? ????????exit(0);? ????addReplyError(c,"Errors?trying?to?SHUTDOWN.?Check?logs.");? } |
總結
框架從啟動,接收請求,讀取客戶端數據,請求協議解析,處理命令,回復請求,退出對redis運行的整個流程做了一個梳理。對整個redis的運作和框架有了一個初步的了解。
總結
以上是生活随笔為你收集整理的Redis源码简要分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cJSON库源码分析
- 下一篇: 移动开发框架剖析