redis 发布订阅实际案例_Redis源码分析之发布订阅+慢查询+排序以及监视器
發(fā)布訂閱
發(fā)布訂閱就是一個經典的觀察者模式,其中通道是指channel字符串本身,而模式是指正則表達式,進行匹配。結合Redis設計與實現(xiàn)一書
數(shù)據(jù)結構
基本數(shù)據(jù)結構
在client對象中,分別記錄了,當前client訂閱的通道和模式。
| 1234 | struct client{dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */}; |
當然當開啟訂閱后,客戶端的flag會設置對應標記:
| 12345678 | void subscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB;} |
在redisServer對象中,分別記錄了,當前訂閱的通道和模式所對應的客戶端client。
| 1234 | struct redisServer { dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */}; |
模式要比通道多一個match內容,因此redis使用pubsubPattern對象進行記錄,差異體現(xiàn)在redisServer的記錄中。
| 1234 | typedef struct pubsubPattern { client *client; robj *pattern;} pubsubPattern; |
Redis設計與實現(xiàn)一書中一張圖,很直觀:
引用計數(shù)
一個通道或模式可能被多個客戶端引用,因此發(fā)布消息之后,只有等到引用計數(shù)為0時,才是真正的釋放對象。
| 12345678910111213141516171819 | void decrRefCount(robj *o) { if (o->refcount == 1) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; case OBJ_SET: freeSetObject(o); break; case OBJ_ZSET: freeZsetObject(o); break; case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); } else { if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; }} |
訂閱消息
訂閱消息實際很簡單,就是設置對應的client和server中的值:
客戶端就是將對應的模式或者通道添加至對應的客戶端中
| 123456789101112131415161718192021222324252627282930 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval;} |
模式
模式其實比較特殊,因為他支持正則表達式,所以沒法放入hash中,所以直接用list來保存。
| 12345678910111213141516171819202122 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval;} |
這里的match是函數(shù)指針,而match的設置是在創(chuàng)建client時候設置的,見createClient函數(shù):
需要注意的是模式情況下,判斷某個客戶端是否已經包含了該模式,是直接通過listMatchObjects函數(shù),功能比較簡單(注意要與后文中的發(fā)布消息時,那個匹配分開):
| 123 | int listMatchObjects(void *a, void *b) { return equalStringObjects(a,b);} |
再看看equalStringObjects函數(shù),算了我們直接到最底層的函數(shù)吧:
| 1234567891011121314151617181920212223242526272829303132333435 | int compareStringObjects(robj *a, robj *b) { return compareStringObjectsWithFlags(a,b,REDIS_COMPARE_BINARY);}int compareStringObjectsWithFlags(robj *a, robj *b, int flags) { serverAssertWithInfo(NULL,a,a->type == OBJ_STRING && b->type == OBJ_STRING); char bufa[128], bufb[128], *astr, *bstr; size_t alen, blen, minlen; if (a == b) return 0; if (sdsEncodedObject(a)) { astr = a->ptr; alen = sdslen(astr); } else { alen = ll2string(bufa,sizeof(bufa),(long) a->ptr); astr = bufa; } if (sdsEncodedObject(b)) { bstr = b->ptr; blen = sdslen(bstr); } else { blen = ll2string(bufb,sizeof(bufb),(long) b->ptr); bstr = bufb; } if (flags & REDIS_COMPARE_COLL) { return strcoll(astr,bstr); } else { int cmp; minlen = (alen < blen) ? alen : blen; cmp = memcmp(astr,bstr,minlen); if (cmp == 0) return alen-blen; return cmp; }} |
害,最后居然是memcmp。
發(fā)布消息
發(fā)布消息就是讀取該模式或者主題下鎖監(jiān)聽的客戶端,向他們發(fā)送消息
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 | /* Publish a message */int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers;} |
模式
模式總是特殊的,模式采用了Glob風格的正則表達式來進行匹配。
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 | /* Glob-style pattern matching. */int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase){ while(patternLen && stringLen) { switch(pattern[0]) { case '*': while (pattern[1] == '*') { pattern++; patternLen--; } if (patternLen == 1) return 1; /* match */ while(stringLen) { if (stringmatchlen(pattern+1, patternLen-1, string, stringLen, nocase)) return 1; /* match */ string++; stringLen--; } return 0; /* no match */ break; case '?': if (stringLen == 0) return 0; /* no match */ string++; stringLen--; break; case '[': { int not, match; pattern++; patternLen--; not = pattern[0] == '^'; if (not) { pattern++; patternLen--; } match = 0; while(1) { if (pattern[0] == '\\' && patternLen >= 2) { pattern++; patternLen--; if (pattern[0] == string[0]) match = 1; } else if (pattern[0] == ']') { break; } else if (patternLen == 0) { pattern--; patternLen++; break; } else if (pattern[1] == '-' && patternLen >= 3) { int start = pattern[0]; int end = pattern[2]; int c = string[0]; if (start > end) { int t = start; start = end; end = t; } if (nocase) { start = tolower(start); end = tolower(end); c = tolower(c); } pattern += 2; patternLen -= 2; if (c >= start && c <= end) match = 1; } else { if (!nocase) { if (pattern[0] == string[0]) match = 1; } else { if (tolower((int)pattern[0]) == tolower((int)string[0])) match = 1; } } pattern++; patternLen--; } if (not) match = !match; if (!match) return 0; /* no match */ string++; stringLen--; break; } case '\\': if (patternLen >= 2) { pattern++; patternLen--; } /* fall through */ default: if (!nocase) { if (pattern[0] != string[0]) return 0; /* no match */ } else { if (tolower((int)pattern[0]) != tolower((int)string[0])) return 0; /* no match */ } string++; stringLen--; break; } pattern++; patternLen--; if (stringLen == 0) { while(*pattern == '*') { pattern++; patternLen--; } break; } } if (patternLen == 0 && stringLen == 0) return 1; return 0;} |
排序
數(shù)據(jù)結構
| 123456789101112131415 | typedef struct _redisSortObject { //存儲的是對象指針,因此不會深復制 robj *obj; union { //排序權重,適合數(shù)值類型 double score; //自定義排序時使用的比較對象 robj *cmpobj; } u;} redisSortObject;typedef struct _redisSortOperation { int type; robj *pattern;} redisSortOperation; |
可以看出他支持模式匹配。
限制
參數(shù)
ALPHA
ASC和DESC
BY和alpha
limit
Get
store
慢查詢
數(shù)據(jù)結構
在server中會有l(wèi)ist進行記錄相應的慢查詢日志。
| 1234 | list *slowlog; /* SLOWLOG list of commands */ long long slowlog_entry_id; /* SLOWLOG current entry ID */ long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ |
list中記錄著慢查詢日志實體,其結構如下:
| 1234567891011121314 | #define SLOWLOG_ENTRY_MAX_ARGC 32#define SLOWLOG_ENTRY_MAX_STRING 128/* This structure defines an entry inside the slow log list */typedef struct slowlogEntry { robj **argv; int argc; long long id; /* Unique entry identifier. */ long long duration; /* Time spent by the query, in microseconds. */ time_t time; /* Unix time at which the query was executed. */ sds cname; /* Client name. */ sds peerid; /* Client network address. */} slowlogEntry; |
配置文件
操作
插入
| 1234567891011121314 | /* Push a new entry into the slow log. * This function will make sure to trim the slow log accordingly to the * configured max length. */void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration) { if (server.slowlog_log_slower_than < 0) return; /* Slowlog disabled */ if (duration >= server.slowlog_log_slower_than) listAddNodeHead(server.slowlog, slowlogCreateEntry(c,argv,argc,duration)); /* Remove old entries if needed. */ while (listLength(server.slowlog) > server.slowlog_max_len) listDelNode(server.slowlog,listLast(server.slowlog));} |
新插入的慢查詢日志放置在鏈表的表頭
受參數(shù)限制,會更新慢查詢隊列
查詢
支持獲取指定個數(shù)的慢查詢日志,如果不指定默認值為10.
監(jiān)視器
當在客戶端執(zhí)行monitor命令后,會將該客戶端加入到monitor的鏈表中。其組成為:
格式為:時間戳 +數(shù)據(jù)庫id號+ (客戶端ip+port)+ 命令+命令參數(shù)
在call函數(shù)中,會執(zhí)行replicationFeedMonitors函數(shù),向所有的monitors發(fā)送指令.
秒殺小玩意
發(fā)現(xiàn)好多地方介紹秒殺很復雜,我感覺不需要那么復雜吧。不就下面幾個點嗎?
總結
以上是生活随笔為你收集整理的redis 发布订阅实际案例_Redis源码分析之发布订阅+慢查询+排序以及监视器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: u盘装系统后一直蓝屏怎么办 如何解决U盘
- 下一篇: crypto安装_CryptoPP库在L