Redis源码剖析(六)事务模块
Redis允許客戶端開啟事務模式,在事務模式中,客戶端輸入的命令不會立即執行而是被保存在事務隊列中,只有當客戶端輸入事務運行命令時,Redis才會將事務隊列中的所有命令按照FIFO的順序一個個執行
一個事務從開始到結束通常會經歷三個階段
- 事務開始
- 命令入隊
- 事務執行
事務命令
客戶端可以使用MULTI命令開啟事務,隨后服務器會根據這個客戶端輸入的不同命令執行不同的操作
- 如果客戶端發送的命令為EXEC,DISCARD,WATCH,MULTI四個命令中的一個,那么服務器立刻執行這個命令,同時是否關閉事務取決于每個命令的功能
- 如果客戶端發送的命令為上述四個命令之外的其他命令,那么服務器不會立刻執行輸入的命令,而是將命令存放在事務隊列中
存儲結構
要想當輸入EXEC時一次性執行之前輸入的所有命令,就需要將之前的命令保存起來,Redis采用數組保存所有命令信息,在client的定義中可以找到
//server.h typedef struct client {...multiState mstate; /* 事務屬性,保存事務隊列以及事務狀態 */... } client;multiState是事務屬性結構,它保存著事務隊列以及事務隊列中命令個數,定義如下
//server.h /* 事務屬性 */ typedef struct multiState {multiCmd *commands; /* 事務隊列,保存多條命令 */ int count; /* 事務隊列中命令個數 */ ... } multiState;commands是一個事務隊列,實際上是一個multiCmd類型的數組,multiCmd結構保存一條命令的信息
//server.h /* 保存一條命令的信息 */ typedef struct multiCmd {robj **argv; /* 命令關鍵字和參數 */int argc; /* 命令參數個數 */struct redisCommand *cmd; /* 命令結構,主要包含命令處理函數 */ } multiCmd;可以看到,multiCmd中保存的實際上就是執行一條命令需要的三個信息,分別是
- 命令關鍵字和參數
- 參數個數
- 命令處理函數
在客戶端client的定義中也可以找到這三個變量的定義
//server.h typedef struct client {...int argc; robj **argv; struct redisCommand *cmd; ... } client;所以大體可以猜測,當要執行事務隊列中的命令時,只需要遍歷事務隊列,依次將這三個變量賦值給client中的對應變量,然后調用對應命令處理函數即可。后面會看到,事實也正是如此
事務實現
開啟事務
在Redis內部,事務的開啟十分簡單,僅僅是將客戶端的事務標志位打開,表示進行事務狀態,隨后的大多數操作都會先判斷該標志位以確定是將命令添加到事務隊列中,還是執行命令
//multi.c /* 開啟事務 */ void multiCommand(client *c) {/* 若已經開啟,則報錯 */if (c->flags & CLIENT_MULTI) {addReplyError(c,"MULTI calls can not be nested");return;}/* 設置CLIENT_MULTI標志代表客戶端已經開啟事務 */c->flags |= CLIENT_MULTI;addReply(c,shared.ok); }添加命令到事務隊列
在第一篇服務器與客戶端交互流程中得知,當客戶端輸入命令后,會存在一個解析命令的操作,將命令參數,參數個數以及命令處理函數找到,然后執行call函數,在這個函數中調用命令處理函數執行命令。但是一旦開啟事務功能,Redis就不能再執行命令了,如上所述,應該將命令添加到事務隊列中。
在processCommand函數中,可以看到對于這兩種情況的判斷
/* 處理客戶端輸入的命令 */ int processCommand(client *c) {.../* 從命令字典中查找該命令名字,返回redisCommand結構,其中包含命令處理函數 */c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);.../* 如果客戶端開啟事務,則不執行命令而是將命令添加到事務隊列中 */if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){queueMultiCommand(c);/* 回復客戶端當前命令已經添加到事務隊列中 */addReply(c,shared.queued);} else {/* 沒有開啟事務,執行執行命令 */call(c,CMD_CALL_FULL);...}return C_OK; }將命令添加到事務隊列中由queueMultiCommand函數完成,函數首先創建一個multiCmd對象,這個結構在上面提到過,保存著執行一條命令需要的三個元素,分別是命令參數,參數個數以及命令處理函數。而multiState結構是保存事務隊列的結構(實際是數組),在這里需要將新的命令添加到這個數組中
/* 將當前命令添加到客戶端的事務隊列中 */ void queueMultiCommand(client *c) {multiCmd *mc;int j;/* mstate是multiState類型,保存事務中所有命令的信息* 每增加一條命令到事務隊列中,都需要為原事務隊列重新申請n+1大小的空間* 多的那一個用來存儲當前命令*/c->mstate.commands = zrealloc(c->mstate.commands,sizeof(multiCmd)*(c->mstate.count+1));mc = c->mstate.commands+c->mstate.count;/* 保存執行一條命令所需的三個元素 */mc->cmd = c->cmd;mc->argc = c->argc;mc->argv = zmalloc(sizeof(robj*)*c->argc);/* 將參數復制到multiCmd結構中 */memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);/* 因為參數是robj*類型,所以引用計數加一 */for (j = 0; j < c->argc; j++)incrRefCount(mc->argv[j]);/* 事務隊列中命令個數加一 */c->mstate.count++; }執行命令
當客戶端輸入EXEC命令后,Redis會從客戶端的事務隊列中取出命令,按照保存的先后順序一個個執行,由于事務隊列中有命令的所有信息,所以可以執行調用處理函數。這部分操作由execCommand函數執行,因為執行一條命令是從客戶端對象client中取出命令參數,參數個數以及命令處理函數,所以這里就直接將隊列中的命令信息復制給客戶端對象的對應變量,然后和執行正常命令一樣調用call函數
/* 啟動事務命令 */ void execCommand(client *c) {int j;robj **orig_argv;int orig_argc;struct redisCommand *orig_cmd;int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? *//* CLIENT_MULTI標識代表當前客戶端是否開啟事務,如果沒有開啟,執行EXEC指令是沒有意義的 */if (!(c->flags & CLIENT_MULTI)) {addReplyError(c,"EXEC without MULTI");return;}/* CLIENT_DIRTY_CAS標識代表客戶端監視的鍵是否被修改過* 如果被修改過,那么執行事務就不再安全,直接返回 */if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :shared.nullmultibulk);discardTransaction(c);goto handle_monitor;}/* Exec all the queued commands *//* 開始執行事務,監視任務就可以結束了,將該客戶端的監視字典清空 */unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles *//* 臨時保存客戶端當前參數信息 */orig_argv = c->argv;orig_argc = c->argc;orig_cmd = c->cmd;addReplyMultiBulkLen(c,c->mstate.count);/* 開始執行客戶端數據隊列(數組)中的命令 */for (j = 0; j < c->mstate.count; j++) {/* 將每個命令作為客戶端當前的參數信息(這就是為什么需要臨時保存以前參數的原因) */c->argc = c->mstate.commands[j].argc;c->argv = c->mstate.commands[j].argv;c->cmd = c->mstate.commands[j].cmd;.../* 開始執行命令 */call(c,CMD_CALL_FULL);/* Commands may alter argc/argv, restore mstate. *//* call執行的命令可能會改變事務隊列中的當前命令,這里是確保其不被改變 */c->mstate.commands[j].argc = c->argc;c->mstate.commands[j].argv = c->argv;c->mstate.commands[j].cmd = c->cmd;}/* 還原客戶端以前的參數信息 */c->argv = orig_argv;c->argc = orig_argc;c->cmd = orig_cmd;... }函數有點長,不過還算好理解,先判斷客戶端是否開啟事務(利用CLIENT_MULTI標記),然后清空客戶端的監視鏈表(后面會提到),最后遍歷事務隊列,依次執行每個命令
需要注意的是,Redis在執行事務隊列中的命令時保證即使某條命令是錯誤的,也不會影響到其他命令的執行,即如果中間某條命令執行出錯,那么后面的命令仍然會繼續執行。另外,Redis不支持事務回滾功能,即不支持將已執行的命令撤銷
小結
Redis的事務功能還是比較簡單的,另外需要提的是,由于Redis是運行在單線程下的,而且對于客戶端的監聽是基于io多路復用函數的,所以對于客戶端的響應是串行的,不會出現當執行某個客戶端事務隊列中的命令時切換到另一個客戶端的情況。這保證了事務執行具有原子性,另外Redis設計與實現書中還講到Redis的事務具有一致性,隔離性和耐久性,有興趣的話可以翻閱書籍查看
總結
以上是生活随笔為你收集整理的Redis源码剖析(六)事务模块的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----为二叉
- 下一篇: 每天一道LeetCode-----杨辉三