你还不了解Redis的发布/订阅功能与Redis的Stream吗
一、Redis 中的發布/訂閱功能
發布/ 訂閱系統?是 Web 系統中比較常用的一個功能。簡單點說就是?發布者發布消息,訂閱者接受消息,這有點類似于我們的報紙/ 雜志社之類的:?(借用前邊的一張圖)
圖片引用自:「消息隊列」看過來! - https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/
從我們?前面(下方相關閱讀)?學習的知識來看,我們雖然可以使用一個?list?列表結構結合?lpush?和?rpop?來實現消息隊列的功能,但是似乎很難實現實現?消息多播?的功能:
為了支持消息多播,Redis?不能再依賴于那 5 種基礎的數據結構了,它單獨使用了一個模塊來支持消息多播,這個模塊就是?PubSub,也就是?PublisherSubscriber?(發布者/ 訂閱者模式)。
PubSub 簡介
我們從?上面的圖?中可以看到,基于?list?結構的消息隊列,是一種?Publisher?與?Consumer?點對點的強關聯關系,Redis?為了消除這樣的強關聯,引入了另一種概念:頻道?(channel):
當?Publisher?往?channel?中發布消息時,關注了指定?channel?的?Consumer?就能夠同時受到消息。但這里的?問題?是,消費者訂閱一個頻道是必須?明確指定頻道名稱?的,這意味著,如果我們想要?訂閱多個?頻道,那么就必須?顯式地關注多個?名稱。
為了簡化訂閱的繁瑣操作,Redis?提供了?模式訂閱?的功能?Pattern Subscribe,這樣就可以?一次性關注多個頻道?了,即使生產者新增了同模式的頻道,消費者也可以立即受到消息:
例如上圖中,所有?位于圖片下方的?Consumer?都能夠受到消息。
Publisher?往?wmyskxz.chat?這個?channel?中發送了一條消息,不僅僅關注了這個頻道的?Consumer 1?和?Consumer 2?能夠受到消息,圖片中的兩個?channel?都和模式?wmyskxz.*?匹配,所以?Redis?此時會同樣發送消息給訂閱了?wmyskxz.*?這個模式的?Consumer 3?和關注了在這個模式下的另一個頻道?wmyskxz.log?下的?Consumer 4?和?Consumer 5。
另一方面,如果接收消息的頻道是?wmyskxz.chat,那么?Consumer 3?也會受到消息。
快速體驗
在?Redis?中,PubSub?模塊的使用非常簡單,常用的命令也就下面這么幾條:
# 訂閱頻道: SUBSCRIBE channel [channel ....] # 訂閱給定的一個或多個頻道的信息 PSUBSCRIBE pattern [pattern ....] # 訂閱一個或多個符合給定模式的頻道 # 發布頻道: PUBLISH channel message # 將消息發送到指定的頻道 # 退訂頻道: UNSUBSCRIBE [channel [channel ....]] # 退訂指定的頻道 PUNSUBSCRIBE [pattern [pattern ....]] #退訂所有給定模式的頻道我們可以在本地快速地來體驗一下?PubSub:
具體步驟如下:
開啟本地 Redis 服務,新建兩個控制臺窗口;
在其中一個窗口輸入?SUBSCRIBE wmyskxz.chat?關注?wmyskxz.chat?頻道,讓這個窗口成為?消費者。
在另一個窗口輸入?PUBLISH wmyskxz.chat 'message'?往這個頻道發送消息,這個時候就會看到?另一個窗口實時地出現?了發送的測試消息。
實現原理
可以看到,我們通過很簡單的兩條命令,幾乎就可以簡單使用這樣的一個?發布/ 訂閱系統?了,但是具體是怎么樣實現的呢?
每個 Redis 服務器進程維持著一個標識服務器狀態?的?redis.h/redisServer?結構,其中就?保存著有訂閱的頻道?以及?訂閱模式?的信息:
struct redisServer {// ...dict *pubsub_channels; // 訂閱頻道list *pubsub_patterns; // 訂閱模式// ... };訂閱頻道原理
當客戶端訂閱某一個頻道之后,Redis 就會往?pubsub_channels?這個字典中新添加一條數據,實際上這個?dict?字典維護的是一張鏈表,比如,下圖展示的?pubsub_channels?示例中,client 1、client 2?就訂閱了?channel 1,而其他頻道也分別被其他客戶端訂閱:
SUBSCRIBE 命令
SUBSCRIBE?命令的行為可以用下列的偽代碼表示:
def SUBSCRIBE(client, channels):# 遍歷所有輸入頻道for channel in channels:# 將客戶端添加到鏈表的末尾redisServer.pubsub_channels[channel].append(client)通過?pubsub_channels?字典,程序只要檢查某個頻道是否為字典的鍵,就可以知道該頻道是否正在被客戶端訂閱;只要取出某個鍵的值,就可以得到所有訂閱該頻道的客戶端的信息。
PUBLISH 命令
了解?SUBSCRIBE,那么?PUBLISH?命令的實現也變得十分簡單了,只需要通過上述字典定位到具體的客戶端,再把消息發送給它們就好了:(偽代碼實現如下)
def PUBLISH(channel, message):# 遍歷所有訂閱頻道 channel 的客戶端for client in server.pubsub_channels[channel]:# 將信息發送給它們send_message(client, message)UNSUBSCRIBE 命令
使用?UNSUBSCRIBE?命令可以退訂指定的頻道,這個命令執行的是訂閱的反操作:它從?pubsub_channels?字典的給定頻道(鍵)中,刪除關于當前客戶端的信息,這樣被退訂頻道的信息就不會再發送給這個客戶端。
訂閱模式原理
正如我們上面說到了,當發送一條消息到?wmyskxz.chat?這個頻道時,Redis 不僅僅會發送到當前的頻道,還會發送到匹配于當前模式的所有頻道,實際上,pubsub_patterns?背后還維護了一個?redis.h/pubsubPattern?結構:
typedefstruct pubsubPattern {redisClient *client; // 訂閱模式的客戶端robj *pattern; // 訂閱的模式 } pubsubPattern;每當調用?PSUBSCRIBE?命令訂閱一個模式時,程序就創建一個包含客戶端信息和被訂閱模式的?pubsubPattern?結構,并將該結構添加到?redisServer.pubsub_patterns?鏈表中。
我們來看一個?pusub_patterns?鏈表的示例:
這個時候客戶端?client 3?執行?PSUBSCRIBE wmyskxz.java.*,那么?pubsub_patterns?鏈表就會被更新成這樣:
通過遍歷整個?pubsub_patterns?鏈表,程序可以檢查所有正在被訂閱的模式,以及訂閱這些模式的客戶端。
PUBLISH 命令
上面給出的偽代碼并沒有?完整描述?PUBLISH?命令的行為,因為?PUBLISH?除了將?message?發送到?所有訂閱?channel?的客戶端?之外,它還會將?channel?和?pubsub_patterns?中的?模式?進行對比,如果?channel?和某個模式匹配的話,那么也將?message?發送到?訂閱那個模式的客戶端。
完整描述?PUBLISH?功能的偽代碼定于如下:
def PUBLISH(channel, message):# 遍歷所有訂閱頻道 channel 的客戶端for client in server.pubsub_channels[channel]:# 將信息發送給它們send_message(client, message)# 取出所有模式,以及訂閱模式的客戶端for pattern, client in server.pubsub_patterns:# 如果 channel 和模式匹配if match(channel, pattern):# 那么也將信息發給訂閱這個模式的客戶端send_message(client, message)PUNSUBSCRIBE 命令
使用?PUNSUBSCRIBE?命令可以退訂指定的模式,這個命令執行的是訂閱模式的反操作:序會刪除?redisServer.pubsub_patterns?鏈表中,所有和被退訂模式相關聯的?pubsubPattern?結構,這樣客戶端就不會再收到和模式相匹配的頻道發來的信息。
PubSub 的缺點
盡管?Redis?實現了?PubSub?模式來達到了?多播消息隊列?的目的,但在實際的消息隊列的領域,幾乎?找不到特別合適的場景,因為它的缺點十分明顯:
沒有 Ack 機制,也不保證數據的連續:?PubSub 的生產者傳遞過來一個消息,Redis 會直接找到相應的消費者傳遞過去。如果沒有一個消費者,那么消息會被直接丟棄。如果開始有三個消費者,其中一個突然掛掉了,過了一會兒等它再重連時,那么重連期間的消息對于這個消費者來說就徹底丟失了。
不持久化消息:?如果 Redis 停機重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機就相當于一個消費者都沒有,所有的消息都會被直接丟棄。
基于上述缺點,Redis 的作者甚至單獨開啟了一個 Disque 的項目來專門用來做多播消息隊列,不過該項目目前好像都沒有成熟。不過后來在 2018 年 6 月,Redis 5.0?新增了?Stream?數據結構,這個功能給 Redis 帶來了?持久化消息隊列,從此 PubSub 作為消息隊列的功能可以說是就消失了..
二、更為強大的 Stream | 持久化的發布/訂閱系統
Redis Stream?從概念上來說,就像是一個?僅追加內容?的?消息鏈表,把所有加入的消息都一個一個串起來,每個消息都有一個唯一的 ID 和內容,這很簡單,讓它復雜的是從 Kafka 借鑒的另一種概念:消費者組(Consumer Group)?(思路一致,實現不同):
上圖就展示了一個典型的?Stream?結構。每個 Stream 都有唯一的名稱,它就是 Redis 的?key,在我們首次使用?xadd?指令追加消息時自動創建。我們對圖中的一些概念做一下解釋:
Consumer Group:消費者組,可以簡單看成記錄流狀態的一種數據結構。消費者既可以選擇使用?XREAD?命令進行?獨立消費,也可以多個消費者同時加入一個消費者組進行?組內消費。同一個消費者組內的消費者共享所有的 Stream 信息,同一條消息只會有一個消費者消費到,這樣就可以應用在分布式的應用場景中來保證消息的唯一性。
last_delivered_id:用來表示消費者組消費在 Stream 上?消費位置?的游標信息。每個消費者組都有一個 Stream 內?唯一的名稱,消費者組不會自動創建,需要使用?XGROUP CREATE?指令來顯式創建,并且需要指定從哪一個消息 ID 開始消費,用來初始化?last_delivered_id?這個變量。
pending_ids:每個消費者內部都有的一個狀態變量,用來表示?已經?被客戶端?獲取,但是?還沒有 ack?的消息。記錄的目的是為了?保證客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失而沒有對消息進行處理。如果客戶端沒有 ack,那么這個變量里面的消息 ID 就會越來越多,一旦某個消息被 ack,它就會對應開始減少。這個變量也被 Redis 官方稱為?PEL?(Pending Entries List)。
消息 ID 和消息內容
1. 消息 ID
消息 ID 如果是由?XADD?命令返回自動創建的話,那么它的格式會像這樣:timestampInMillis-sequence?(毫秒時間戳-序列號),例如?1527846880585-5,它表示當前的消息是在毫秒時間戳?1527846880585?時產生的,并且是該毫秒內產生的第 5 條消息。
這些 ID 的格式看起來有一些奇怪,為什么要使用時間來當做 ID 的一部分呢??一方面,我們要?滿足 ID 自增?的屬性,另一方面,也是為了?支持范圍查找?的功能。由于 ID 和生成消息的時間有關,這樣就使得在根據時間范圍內查找時基本上是沒有額外損耗的。
當然消息 ID 也可以由客戶端自定義,但是形式必須是?"整數-整數",而且后面加入的消息的 ID 必須要大于前面的消息 ID。
2. 消息內容
消息內容就是普通的鍵值對,形如 hash 結構的鍵值對。
增刪改查示例
增刪改查命令很簡單,詳情如下:
xadd:追加消息
xdel:刪除消息,這里的刪除僅僅是設置了標志位,不影響消息總長度
xrange:獲取消息列表,會自動過濾已經刪除的消息
xlen:消息長度
del:刪除Stream
使用示例:
# *號表示服務器自動生成ID,后面順序跟著一堆key/value 127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲 1527849609889-0 # 生成的消息ID 127.0.0.1:6379> xadd codehole * name xiaoyu age 29 1527849629172-0 127.0.0.1:6379> xadd codehole * name xiaoqian age 1 1527849637634-0 127.0.0.1:6379> xlen codehole (integer) 3 127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 3) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小消息ID的列表 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大消息ID的列表 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 127.0.0.1:6379> xdel codehole 1527849609889-0 (integer) 1 127.0.0.1:6379> xlen codehole # 長度不受影響 (integer) 3 127.0.0.1:6379> xrange codehole - + # 被刪除的消息沒了 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> del codehole # 刪除整個Stream (integer) 1獨立消費示例
我們可以在不定義消費組的情況下進行 Stream 消息的?獨立消費,當 Stream 沒有新消息時,甚至可以阻塞等待。Redis 設計了一個單獨的消費指令?xread,可以將 Stream 當成普通的消息隊列(list)來使用。使用?xread?時,我們可以完全忽略?消費組(Consumer Group)?的存在,就好比 Stream 就是一個普通的列表(list):
# 從Stream頭部讀取兩條消息 127.0.0.1:6379> xread count 2 streams codehole 0-0 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30"2) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29" # 從Stream尾部讀取一條消息,毫無疑問,這里不會返回任何消息 127.0.0.1:6379> xread count 1 streams codehole $ (nil) # 從尾部阻塞等待新消息到來,下面的指令會堵住,直到新消息到來 127.0.0.1:6379> xread block 0 count 1 streams codehole $ # 我們從新打開一個窗口,在這個窗口往Stream里塞消息 127.0.0.1:6379> xadd codehole * name youming age 60 1527852774092-0 # 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內容 # 而且還顯示了一個等待時間,這里我們等待了93s 127.0.0.1:6379> xread block 0 count 1 streams codehole $ 1) 1) "codehole"2) 1) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60" (93.11s)客戶端如果想要使用?xread?進行?順序消費,一定要?記住當前消費?到哪里了,也就是返回的消息 ID。下次繼續調用?xread?時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的消息。
block 0?表示永遠阻塞,直到消息到來,block 1000?表示阻塞?1s,如果?1s?內沒有任何消息到來,就返回?nil:
127.0.0.1:6379> xread block 1000 count 1 streams codehole $ (nil) (1.07s)創建消費者示例
Stream 通過?xgroup create?指令創建消費組(Consumer Group),需要傳遞起始消息 ID 參數用來初始化?last_delivered_id?變量:
127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開始消費 OK # $表示從尾部開始消費,只接受新消息,當前Stream消息會全部忽略 127.0.0.1:6379> xgroup create codehole cg2 $ OK 127.0.0.1:6379> xinfo codehole # 獲取Stream信息1) length2) (integer) 3 # 共3個消息3) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 2 # 兩個消費組9) first-entry # 第一個消息 10) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 11) last-entry # 最后一個消息 12) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組信息 1) 1) name2) "cg1"3) consumers4) (integer) 0 # 該消費組還沒有消費者5) pending6) (integer) 0 # 該消費組沒有正在處理的消息 2) 1) name2) "cg2"3) consumers # 該消費組還沒有消費者4) (integer) 05) pending6) (integer) 0 # 該消費組沒有正在處理的消息組內消費示例
Stream 提供了?xreadgroup?指令可以進行消費組的組內消費,需要提供?消費組名稱、消費者名稱和起始消息 ID。它同?xread?一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的?PEL?(正在處理的消息)?結構里,客戶端處理完畢后使用?xack?指令?通知服務器,本條消息已經處理完畢,該消息 ID 就會從?PEL?中移除,下面是示例:
# >號表示從當前消費組的last_delivered_id后面開始讀 # 每當消費者讀取一條消息,last_delivered_id變量就會前進 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1"2) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60" # 再繼續讀取,就沒有新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > (nil) # 那就阻塞等待吧 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > # 開啟另一個窗口,往里塞消息 127.0.0.1:6379> xadd codehole * name lanying age 61 1527854062442-0 # 回到前一個窗口,發現阻塞解除,收到新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527854062442-02) 1) "name"2) "lanying"3) "age"4) "61" (36.54s) 127.0.0.1:6379> xinfo groups codehole # 觀察消費組信息 1) 1) name2) "cg1"3) consumers4) (integer) 1 # 一個消費者5) pending6) (integer) 5 # 共5條正在處理的信息還有沒有ack 2) 1) name2) "cg2"3) consumers4) (integer) 0 # 消費組cg2沒有任何變化,因為前面我們一直在操縱cg15) pending6) (integer) 0 # 如果同一個消費組有多個消費者,我們可以通過xinfo consumers指令觀察每個消費者的狀態 127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個消費者 1) 1) name2) "c1"3) pending4) (integer) 5 # 共5條待處理消息5) idle6) (integer) 418715 # 空閑了多長時間ms沒有讀取消息了 # 接下來我們ack一條消息 127.0.0.1:6379> xack codehole cg1 1527851486781-0 (integer) 1 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 4 # 變成了5條5) idle6) (integer) 668504 # 下面ack所有消息 127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0 (integer) 4 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 0 # pel空了5) idle6) (integer) 745505QA 1:Stream 消息太多怎么辦??| Stream 的上限
很容易想到,要是消息積累太多,Stream 的鏈表豈不是很長,內容會不會爆掉就是個問題了。xdel?指令又不會刪除消息,它只是給消息做了個標志位。
Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在?xadd?的指令提供一個定長長度?maxlen,就可以將老的消息干掉,確保最多不超過指定長度,使用起來也很簡單:
> XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-02) 1) "value"2) "2" 2) 1) 1526655000369-02) 1) "value"2) "3"如果使用?MAXLEN?選項,當 Stream 的達到指定長度后,老的消息會自動被淘汰掉,因此 Stream 的大小是恒定的。目前還沒有選項讓 Stream 只保留給定數量的條目,因為為了一致地運行,這樣的命令必須在很長一段時間內阻塞以淘汰消息。(例如在添加數據的高峰期間,你不得不長暫停來淘汰舊消息和添加新的消息)
另外使用?MAXLEN?選項的花銷是很大的,Stream 為了節省內存空間,采用了一種特殊的結構表示,而這種結構的調整是需要額外的花銷的。所以我們可以使用一種帶有?~?的特殊命令:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...它會基于當前的結構合理地對節點執行裁剪,來保證至少會有?1000?條數據,可能是?1010?也可能是?1030。
QA 2:PEL 是如何避免消息丟失的?
在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 里已經保存了發出去的消息 ID,待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過此時?xreadgroup?的起始消息 ID 不能為參數?>?,而必須是任意有效的消息 ID,一般將參數設為?0-0,表示讀取所有的 PEL 消息以及自?last_delivered_id?之后的新消息。
Redis Stream Vs Kafka
Redis 基于內存存儲,這意味著它會比基于磁盤的 Kafka 快上一些,也意味著使用 Redis 我們?不能長時間存儲大量數據。不過如果您想以?最小延遲?實時處理消息的話,您可以考慮 Redis,但是如果?消息很大并且應該重用數據?的話,則應該首先考慮使用 Kafka。
另外從某些角度來說,Redis Stream?也更適用于小型、廉價的應用程序,因為?Kafka?相對來說更難配置一些。
參考資料
訂閱與發布——Redis 設計與實現
-?https://redisbook.readthedocs.io/en/latest/feature/pubsub.html
《Redis 深度歷險》 - 錢文品/ 著 -?https://book.douban.com/subject/30386804/
Introduction to Redis Streams【官方文檔】 -?https://redis.io/topics/streams-intro
Kafka vs. Redis: Log Aggregation Capabilities and Performance -?https://logz.io/blog/kafka-vs-redis/
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的你还不了解Redis的发布/订阅功能与Redis的Stream吗的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: csv文件导入sqlite
- 下一篇: arthas用的好好的,写个lambda