Redis源码剖析(五)订阅与发布
Redis提供了訂閱和發布的功能,允許客戶端訂閱一個或多個頻道,當其他客戶端向某個頻道發送消息時,服務器會將消息轉發給所有訂閱該頻道的客戶端
這一點有點像群聊的功能,一個客戶端將消息發往群中(向某個頻道發送消息),所有在群中的客戶端(訂閱該頻道的客戶端)都會收到這個消息。事實也正是如此,接下來將會看到,服務器采用字典保存每個頻道(鍵)和訂閱該頻道的所有客戶端(值),每當其他客戶端向某個頻道發送消息時,服務器便從字典中獲取所有訂閱該頻道的客戶端,依次將消息發送。每個頻道,可以看成是每個群,一個頻道的所有訂閱客戶端,可以看成該群的所有群成員,唯一不同的是,向頻道發送消息的那個客戶端并不需要訂閱同樣的頻道,也就是該客戶端并不需要也在群中
稍后會看到,除了訂閱特定頻道,Redis也允許客戶端進行模式訂閱,即一次訂閱所有匹配的頻道
Redis的訂閱與發布功能由PUBLISH, SUBSCRIBE, PSUBSCRIBE等命令組成
普通訂閱
使用SUBSCRIBE [頻道名]即可訂閱特定頻道,頻道名可以自定義,也可以同時訂閱多個頻道,只需要后面添加多個頻道名即可
127.0.0.1:6379> SUBSCRIBE "news.redis" //訂閱"news.redis"頻道 Reading messages... (press Ctrl-C to quit) 1) "subscribe" //命令關鍵字 2) "news.redis" //頻道名 3) (integer) 1 //訂閱該頻道的客戶端數量使用PUBLISH [頻道名] [消息]即可向特定頻道發送消息
127.0.0.1:6379> PUBLISH "news.redis" "send a message" //向"news.redis"頻道發送消息 (integer) 1 //返回發送給了多少個客戶端 127.0.0.1:6379>此時,如果再查看訂閱news.redis頻道的那個客戶端,會發現終端上打印出”send a message”信息
//PUBLISH之前 127.0.0.1:6379> SUBSCRIBE "news.redis" Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.redis" 3) (integer) 1//PUBLISH之后 1) "message" //消息類型 2) "news.redis" //頻道名 3) "send a message" //信息不過如果一個客戶端處于訂閱狀態,它好像就不能執行其他操作了
存儲結構
實現一個訂閱與發布功能十分簡單,開篇也提到了,只需要將每個頻道以及它的訂閱者記錄在字典中,如果客戶端向某個頻道發送消息,則在字典中查找該頻道的所有訂閱者,依次將消息發送過去即可。
在深入源代碼之前,先看兩個結構的定義,一個是客戶端,一個是服務器,它們都定義在server.h頭文件中
//server.h typedef struct client {...dict *pubsub_channels; ... } client; //server.h struct redisServer {...dict *pubsub_channels; ... };這兩個結構都太長了,不過目前用得到的其實就一個pubsub_channels變量,根據類型得知該變量是一個字典(以下簡稱為訂閱字典),兩個變量的作用分別是
- 客戶端的訂閱字典記錄著當前客戶端訂閱的所有頻道,鍵是頻道名,值為空
- 服務器的訂閱字典記錄著所有頻道以及每個頻道的訂閱者,鍵是頻道名,值是客戶端鏈表
到這里其實可以簡單猜測訂閱功能是如何實現的,當某個客戶端使用SUBSCRIBE命令訂閱一個或多個頻道時,Redis會將<頻道名,客戶端>這個鍵值對添加到服務器的訂閱字典中,同時也會將頻道名添加到客戶端自己的訂閱字典中
而當客戶端使用PUBLISH命令向某個頻道發送消息時,Redis會在訂閱字典中獲取該頻道的所有訂閱者(客戶端),依次將消息發送給客戶端。如果該頻道不存在或沒有訂閱者,則不執行任何操作
訂閱功能
訂閱功能由subscribeCommand函數完成,函數主要任務是遍歷每一參數(頻道名),調用pubsubSubscribeChannel函數將頻道名和客戶端添加到訂閱字典中
//pubsub.c /* 訂閱命令 */ void subscribeCommand(client *c) {int j;/* 將客戶端和它訂閱的頻道進行關聯,添加到訂閱字典中* 鍵是頻道名,值是客戶端 */for (j = 1; j < c->argc; j++)pubsubSubscribeChannel(c,c->argv[j]);/* 標記當前客戶端訂閱過某些頻道 */c->flags |= CLIENT_PUBSUB; }pubsubSubscribeChannel函數完成實際的添加操作
//pubsub.c /* * 將客戶端和它訂閱的頻道進行關聯,添加到客戶端和服務器兩個訂閱字典中 * * 注:服務器和客戶端都有訂閱字典,分別是* c->pubsub_channels* server.pubsub_channels*/ int pubsubSubscribeChannel(client *c, robj *channel) {dictEntry *de;list *clients = NULL;int retval = 0;/* 判斷當前客戶端是否已經訂閱了該頻道,如果是則不進行處理,否則添加到客戶端的訂閱字典中 *//* 注意這里添加的是客戶端的訂閱字典,該字典記錄當前客戶端訂閱的所有頻道 */if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;/* 所有的robj對象都是基于引用計數的,因為已將其添加到字典中,所有引用計數加一 */incrRefCount(channel);/* 從服務器的訂閱字典中尋找該頻道對應的鍵節點鏈表(記錄所有訂閱該頻道的客戶端鏈表) */de = dictFind(server.pubsub_channels,channel);if (de == NULL) {/* 服務器訂閱字典中沒有關于該頻道的記錄,創建該頻道對應的客戶端鏈表 */clients = listCreate();/* 將<頻道,客戶端鏈表>添加到服務器的訂閱字典中 */dictAdd(server.pubsub_channels,channel,clients);/* 頻道的引用計數加一 ,因為在字典中也有一份*/incrRefCount(channel);} else {/* 服務器訂閱字典中有關于該頻道的記錄,直接將客戶端鏈表返回 */ clients = dictGetVal(de);}/* 將當前客戶端連接到鏈表上 */listAddNodeTail(clients,c);}/* 通知客戶端訂閱成功 */ addReply(c,shared.mbulkhdr[3]);addReply(c,shared.subscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,clientSubscriptionsCount(c));return retval; }至此訂閱操作完成,可以發現訂閱僅僅是將頻道名和客戶端這個鍵值對添加到訂閱字典中,并不執行其他操作。
退訂功能
有訂閱就有退訂,退訂命令是UNSUBSCRIBE,有unsubscribeCommand函數執行。不過既然訂閱功能是阻塞的,怎么執行退訂啊…
退訂分兩種,一種是退訂當前客戶端訂閱的所有頻道,此時退訂命令不帶參數。另一種則帶參數,僅退訂參數指出的頻道
//pubsub.c /* 退訂命令 */ void unsubscribeCommand(client *c) {if (c->argc == 1) {/* 退訂當前客戶端訂閱的所有頻道 */pubsubUnsubscribeAllChannels(c,1);} else {int j;/* 退訂參數指出的頻道 */for (j = 1; j < c->argc; j++)pubsubUnsubscribeChannel(c,c->argv[j],1);}/* 客戶端訂閱的頻道數為0時,改變標志 */if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; }退訂所有頻道是遍歷當前客戶端的訂閱字典,對訂閱的每個頻道調用pubsubUnsubscribeChannel函數,實際上和指定參數效果相同,所以就直接看退訂參數指定頻道的函數好了
//pubsub.c /* * 退訂* c : 客戶端* channel : 要退訂的頻道* notify : 退訂后是否通知客戶端*/ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {dictEntry *de;list *clients;listNode *ln;int retval = 0;incrRefCount(channel); /* channel may be just a pointer to the same objectwe have in the hash tables. Protect it... *//* 從客戶端訂閱字典中刪除關于該頻道的訂閱信息 */if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {/* 刪除成功,表示這個客戶端訂閱過channel */retval = 1;/* 從服務器訂閱字典中查找關于該頻道的所有訂閱信息,返回鍵節點 */de = dictFind(server.pubsub_channels,channel);serverAssertWithInfo(c,NULL,de != NULL);/* 從鍵節點中獲取客戶端鏈表 */clients = dictGetVal(de);/* 從客戶端鏈表中搜索當前退訂的客戶端 */ln = listSearchKey(clients,c);serverAssertWithInfo(c,NULL,ln != NULL);/* 將鏈表節點ln從鏈表clients中刪除 */listDelNode(clients,ln);/* 如果該頻道只有該客戶端訂閱過,那么刪除后客戶端鏈表為空,從服務器訂閱字典中刪除該頻道的信息 */if (listLength(clients) == 0) {dictDelete(server.pubsub_channels,channel);}}/* 如果要求通知,則通知客戶端 */if (notify) {addReply(c,shared.mbulkhdr[3]);addReply(c,shared.unsubscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));}decrRefCount(channel); /* it is finally safe to release it */return retval; }退訂函數雖然長了點,但是還是蠻好理解的,僅僅是將客戶端和頻道的關聯信息從訂閱字典中刪除
普通訂閱的信息發布
Redis的發布功能由PUBLISH命令實現,底層由pubsubPublishMessage函數實現,該函數向訂閱特定頻道的所有客戶端發送消息。訂閱分兩種,一個是普通訂閱(如上),另一個是模式訂閱,所以函數中也分為向普通訂閱的客戶端發送消息和向模式訂閱的客戶端發送消息。因為還沒有接觸模式訂閱,所以先看普通訂閱的發布好了
普通訂閱的發送消息僅僅是在服務器的訂閱字典中尋找特定頻道的所有訂閱者,依次將消息發送就完成了,比較簡單
//pubsub.c /* 發送通知信息 */ /* * channel : 通知信息* message : 事件名稱*/ int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;dictEntry *de;listNode *ln;listIter li;/* 服務器的訂閱字典保存著所有頻道和它的所有訂閱者 *//* 從該字典中查找頻道channel的訂閱者,返回鍵節點 */de = dictFind(server.pubsub_channels,channel);if (de) {/* 從鍵節點中獲取訂閱該頻道的客戶端鏈表 */list *list = dictGetVal(de); listNode *ln;listIter li;/* 將迭代器方向設置為從頭到尾 */listRewind(list,&li);/* 遍歷客戶端鏈表的所有客戶端,發送通知信息 */while ((ln = listNext(&li)) != NULL) {client *c = ln->value;addReply(c,shared.mbulkhdr[3]);addReply(c,shared.messagebulk);addReplyBulk(c,channel);addReplyBulk(c,message);receivers++;}}...return receivers; }模式訂閱
Redis允許客戶端使用正則表達式訂閱一組頻道,命令格式為PSUBSCRIBE [頻道名]
127.0.0.1:6379> PSUBSCRIBE "news.redi[xy]" //訂閱"news.redix"和"news.rediy"兩個頻道 Reading messages... (press Ctrl-C to quit) 1) "psubscribe" //命令關鍵字 2) "news.redi[xy]" //頻道名 3) (integer) 1 //訂閱該頻道的客戶端數量此時,如果打開另一個客戶端,不管是向news.redix頻道發送還是向news.rediy頻道發送消息,上面這個客戶端都會接收到消息
127.0.0.1:6379> PUBLISH "news.redix" "send to news.redix" //向"news.redix"頻道發送消息 (integer) 1 127.0.0.1:6379> PUBLISH "news.rediy" "send to news.rediy" //向"news.rediy"頻道發送消息 (integer) 1 127.0.0.1:6379> 127.0.0.1:6379> PSUBSCRIBE "news.redi[xy]" //訂閱"news.redix"和"news.rediy"兩個頻道 Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.redi[xy]" 3) (integer) 1 1) "pmessage" //從頻道news.redix接收到消息 2) "news.redi[xy]" 3) "news.redix" //頻道名 4) "send to news.redix" //消息內容 1) "pmessage" //從頻道news.rediy接收到消息 2) "news.redi[xy]" 3) "news.rediy" //頻道名 4) "send to news.rediy" //消息內容以下將用正則表達式表示的頻道稱為頻道組,如”news.redi[xy]”就是一個頻道組
存儲結構
由于模式訂閱的頻道名代表一組頻道,所以不能用字典存儲,因為字典的鍵是已知的,當然可以將用正則表達式代表的頻道的所有可能都計算處然后添加到字典中,不過Redis不會這么做,論誰誰都不會,因為結果集太大了。
所以字典在這里算是沒有用武之地了,Redis采用鏈表將每個客戶端和它訂閱的頻道組記錄起來,每當向特定頻道發布消息時,Redis就會遍歷這個鏈表判斷每個客戶端的頻道組是否可以和當前頻道匹配,如果匹配則向該客戶端發送消息。當然,每個客戶端也有這么個鏈表記錄自己訂閱的頻道組,在它們的定義中可以清楚的看到
//server.h typedef struct client {dict *pubsub_channels; //訂閱字典list *pubsub_patterns; //模式訂閱鏈表 } client; //server.h struct redisServer {dict *pubsub_channels; //訂閱字典list *pubsub_patterns; //模式訂閱鏈表 };與訂閱字典相同,模式訂閱鏈表在客戶端和服務器的作用也不相同
- 客戶端的模式訂閱鏈表保存當前客戶端訂閱的所有頻道組
- 服務器的模式訂閱鏈表保存所有客戶端訂閱的所有頻道組(鏈表中可能有多個節點指向的客戶端相同,但是頻道組不同)
客戶端鏈表的節點保存的是頻道組
服務器鏈表的節點保存的結構是pubsubPattern類型,該結構記錄著客戶端和一個頻道組
//server.h typedef struct pubsubPattern {client *client; //客戶端robj *pattern; //頻道組 } pubsubPattern;有了訂閱模塊的基礎,到這里可以猜測模式訂閱也僅僅是將客戶端和其模式訂閱的頻道組組成pubsubPattern添加到服務器的模式訂閱鏈表中,將頻道組添加到客戶端的模式訂閱鏈表中,并不做其他處理
模式訂閱功能
事實也正是如此,模式訂閱功能由pubsubSubscribePattern函數實現
//pubsub.c /* 模式訂閱 */ int pubsubSubscribePattern(client *c, robj *pattern) {int retval = 0;/* 從客戶端的模式訂閱鏈表中查找要訂閱的模式,如果不存在,才進行添加 */if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {retval = 1;/* pubsubPattern結構記錄著客戶端c和頻道組pattern */pubsubPattern *pat;/* 將頻道組添加到客戶端模式訂閱鏈表尾部 */listAddNodeTail(c->pubsub_patterns,pattern);incrRefCount(pattern);/* 申請空間,組裝pubsubPattern結構 */pat = zmalloc(sizeof(*pat));pat->pattern = getDecodedObject(pattern);pat->client = c;/* 將訂閱節點添加到服務器的訂閱鏈表中 */listAddNodeTail(server.pubsub_patterns,pat);/* 因為客戶端的訂閱鏈表只需要記錄自己訂閱頻道組即可,所以無需存儲pubsubPattern結構* 而服務器需要記錄每個客戶端和其頻道組,二者都需要記錄,所以存儲pubsubPattern結構 */}/* Notify the client */addReply(c,shared.mbulkhdr[3]);addReply(c,shared.psubscribebulk);addReplyBulk(c,pattern);addReplyLongLong(c,clientSubscriptionsCount(c));return retval; }模式退訂功能
退訂和訂閱是相反的,對于模式訂閱的退訂也是如此,僅僅是將頻道組從模式訂閱鏈表中刪除,需要注意的是要退訂就退訂整個頻道組,Redis不支持將特定頻道從頻道組中去除
//pubsub.c /* 退訂模式 */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {listNode *ln;pubsubPattern pat;int retval = 0;incrRefCount(pattern); /* Protect the object. May be the same we remove *//* 從客戶端自己的模式訂閱鏈表中查找相應模式 */if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {retval = 1;/* 如果找到,則刪除鏈表節點 */listDelNode(c->pubsub_patterns,ln);pat.client = c;pat.pattern = pattern;/* 從服務器的模式訂閱鏈表中查找,然后刪除 */ln = listSearchKey(server.pubsub_patterns,&pat);listDelNode(server.pubsub_patterns,ln);}/* Notify the client */if (notify) {addReply(c,shared.mbulkhdr[3]);addReply(c,shared.punsubscribebulk);addReplyBulk(c,pattern);addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));}decrRefCount(pattern);return retval; }模式訂閱的信息發布
最后一個就是關于向模式訂閱發布消息的實現了,在上面訂閱模塊處,僅僅看到了pubsubPublishMessage函數向訂閱特定頻道的客戶端發送消息。而實際上,它還有一部分是向模式訂閱的客戶端發送消息,方法是遍歷模式訂閱鏈表,對于每一個節點判斷其頻道組是否和當前頻道匹配,如果匹配,則向客戶端發送消息
//pubsub.c /* 發送通知信息 */ /* * channel : 通知信息* message : 事件名稱*/ int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;dictEntry *de;listNode *ln;listIter li;... //這里省略普通訂閱的發布功能/* 查找模式訂閱的客戶端 *//* 這里就體現了為什么訂閱頻道和客戶端是用字典存儲,而模式訂閱則用鏈表存儲 * 因為訂閱可以直接使用哈希表定位,而模式訂閱類似正則匹配,需要判斷當前的頻道是否* 是匹配的模式訂閱,然后發送給訂閱者,哈希表在這里是沒有作用的 */if (listLength(server.pubsub_patterns)) {/* 迭代器方向設置為從頭到尾 */listRewind(server.pubsub_patterns,&li);channel = getDecodedObject(channel);/* 遍歷服務器的模式訂閱鏈表 */while ((ln = listNext(&li)) != NULL) {/* 獲取每個節點的頻道組 */pubsubPattern *pat = ln->value;/* 判斷頻道組是否和當前頻道匹配,如果匹配,則發送通知信息 */if (stringmatchlen((char*)pat->pattern->ptr,sdslen(pat->pattern->ptr),(char*)channel->ptr,sdslen(channel->ptr),0)) {addReply(pat->client,shared.mbulkhdr[4]);addReply(pat->client,shared.pmessagebulk);addReplyBulk(pat->client,pat->pattern);addReplyBulk(pat->client,channel);addReplyBulk(pat->client,message);receivers++;}}decrRefCount(channel);}return receivers; }可以看到,對于模式訂閱,Redis會使用stringmatchlen函數進行正則匹配,如果匹配成功,說明該客戶端關注的頻道組中包含當前頻道,那么就需要將消息發送給客戶端
小結
本篇注意是對Redis訂閱與發布功能的分析,源碼比較簡單,對于訂閱功能,僅僅是將客戶端和頻道名(組)記錄在某個數據結構中,當有其他客戶端向某個頻道執行發布功能時,檢查數據結構中那些訂閱了該頻道的客戶端,并向其發送消息
總結
以上是生活随笔為你收集整理的Redis源码剖析(五)订阅与发布的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 每天一道LeetCode-----计算字
- 下一篇: 每天一道LeetCode-----为二叉
