Redis 消息队列的三种方案(List、Streams、Pub/Sub)
現如今的互聯網應用大都是采用 分布式系統架構 設計的,所以 消息隊列 已經逐漸成為企業應用系統 內部通信 的核心手段,它具有 低耦合、可靠投遞、廣播、流量控制、最終一致性 等一系列功能。
當前使用較多的 消息隊列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分數據庫 如 Redis、MySQL 以及 phxsql ,如果硬搞的話,其實也可實現消息隊列的功能。
可能有人覺得,各種開源的 MQ 已經足夠使用了,為什么需要用 Redis 實現 MQ 呢?
- 有些簡單的業務場景,可能不需要重量級的 MQ 組件(相比 Redis 來說,Kafka 和 RabbitMQ 都算是重量級的消息隊列) 
那你有考慮過用 Redis 做消息隊列嗎?
這一章,我會結合消息隊列的特點和 Redis 做消息隊列的使用方式,以及實際項目中的使用,來和大家探討下 Redis 消息隊列的方案。
一、回顧消息隊列
消息隊列 是指利用 高效可靠 的 消息傳遞機制 進行與平臺無關的 數據交流,并基于數據通信來進行分布式系統的集成。
通過提供 消息傳遞 和 消息排隊 模型,它可以在 分布式環境 下提供 應用解耦、彈性伸縮、冗余存儲、流量削峰、異步通信、數據同步 等等功能,其作為 分布式系統架構 中的一個重要組件,有著舉足輕重的地位。
mq_overview現在回顧下,我們使用的消息隊列,一般都有什么樣的特點:
- 三個角色:生產者、消費者、消息處理中心 
- 異步處理模式:生產者 將消息發送到一條 虛擬的通道(消息隊列)上,而無須等待響應。消費者 則 訂閱 或是 監聽 該通道,取出消息。兩者互不干擾,甚至都不需要同時在線,也就是我們說的 松耦合 
- 可靠性:消息要可以保證不丟失、不重復消費、有時可能還需要順序性的保證 
撇開我們常用的消息中間件不說,你覺得 Redis 的哪些數據類型可以滿足 MQ 的常規需求~~
二、Redis 實現消息隊列
思來想去,只有 List 和 Streams 兩種數據類型,可以實現消息隊列的這些需求,當然,Redis 還提供了發布、訂閱(pub/sub) 模式。
我們逐一看下這 3 種方式的使用和場景。
2.1 List 實現消息隊列
Redis 列表是簡單的字符串列表,按照插入順序排序。你可以添加一個元素到列表的頭部(左邊)或者尾部(右邊)。
所以常用來做異步隊列使用。將需要延后處理的任務結構體序列化成字符串塞進 Redis 的列表,另一個線程從這個列表中輪詢數據進行處理。
Redis 提供了好幾對 List 指令,先大概看下這些命令,混個眼熟
List 常用命令
| LPUSH | LPUSH key value [value ...] | 將一個或多個值 value 插入到列表 key 的表頭如果有多個 value 值,那么各個 value 值按從左到右的順序依次插入到表頭 | 
| RPUSH | RPUSH key value [value ...] | 將一個或多個值 value 插入到列表 key 的表尾(最右邊) | 
| LPOP | LPOP key | 移除并返回列表 key 的頭元素。 | 
| BLPOP | BLPOP key [key ...] timeout | 移出并獲取列表的第一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止 | 
| RPOP | RPOP key | 移除并返回列表 key 的尾元素。 | 
| BRPOP | BRPOP key [key ...] timeout | 移出并獲取列表的最后一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。 | 
| BRPOPLPUSH | BRPOPLPUSH source destination timeout | 從列表中彈出一個值,將彈出的元素插入到另外一個列表中并返回它;如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。 | 
| RPOPLPUSH | RPOPLPUSH source destinationb | 命令 RPOPLPUSH 在一個原子時間內,執行以下兩個動作:將列表 source 中的最后一個元素(尾元素)彈出,并返回給客戶端。將 source 彈出的元素插入到列表 destination ,作為 destination 列表的的頭元素 | 
| LLEN | LLEN key | 返回列表 key 的長度。如果 key 不存在,則 key 被解釋為一個空列表,返回 0 .如果 key 不是列表類型,返回一個錯誤 | 
| LRANGE | LRANGE key start stop | 返回列表 key 中指定區間內的元素,區間以偏移量 start 和 stop 指定 | 
挑幾個彈入、彈出的命令就可以組合出很多姿勢
- LPUSH、RPOP ?左進右出 
- RPUSH、LPOP ?右進左出 
即時消費問題
通過 LPUSH,RPOP 這樣的方式,會存在一個性能風險點,就是消費者如果想要及時的處理數據,就要在程序中寫個類似 while(true) 這樣的邏輯,不停的去調用 RPOP 或 LPOP 命令,這就會給消費者程序帶來些不必要的性能損失。
所以,Redis 還提供了 BLPOP、BRPOP 這種阻塞式讀取的命令(帶 B-Bloking的都是阻塞式),客戶端在沒有讀到隊列數據時,自動阻塞,直到有新的數據寫入隊列,再開始讀取新數據。這種方式就節省了不必要的 CPU 開銷。
- LPUSH、BRPOP ?左進右阻塞出 
- RPUSH、BLPOP ?右進左阻塞出 
如果將超時時間設置為 0 時,即可無限等待,直到彈出消息
因為 Redis 單線程的特點,所以在消費數據時,同一個消息會不會同時被多個 consumer 消費掉,但是需要我們考慮消費不成功的情況。
可靠隊列模式 | ack 機制
以上方式中, List 隊列中的消息一經發送出去,便從隊列里刪除。如果由于網絡原因消費者沒有收到消息,或者消費者在處理這條消息的過程中崩潰了,就再也無法還原出這條消息。究其原因,就是缺少消息確認機制。
為了保證消息的可靠性,消息隊列都會有完善的消息確認機制(Acknowledge),即消費者向隊列報告消息已收到或已處理的機制。
Redis List 怎么搞一搞呢?
再看上邊的表格中,有兩個命令, ?RPOPLPUSH、BRPOPLPUSH (阻塞)從一個 list 中獲取消息的同時把這條消息復制到另一個 list 里(可以當做備份),而且這個過程是原子的。
這樣我們就可以在業務流程安全結束后,再刪除隊列元素,實現消息確認機制。
127.0.0.1:6379>?rpush?myqueue?one (integer)?1 127.0.0.1:6379>?rpush?myqueue?two (integer)?2 127.0.0.1:6379>?rpush?myqueue?three (integer)?3 127.0.0.1:6379>?rpoplpush?myqueue?queuebak "three" 127.0.0.1:6379>?lrange?myqueue?0?-1 1)?"one" 2)?"two" 127.0.0.1:6379>?lrange?queuebak?0?-1 1)?"three" redis-rpoplpush之前做過的項目中就有用到這樣的方式去處理數據,數據標識從一個 List 取出后放入另一個 List,業務操作安全執行完成后,再去刪除 List 中的數據,如果有問題的話,很好回滾。
當然,還有更特殊的場景,可以通過 zset 來實現延時消息隊列,原理就是將消息加到 zset 結構后,將要被消費的時間戳設置為對應的 score 即可,只要業務數據不會是重復數據就 OK。
2.2 訂閱與發布實現消息隊列
我們都知道消息模型有兩種
- 點對點:Point-to-Point(P2P) 
- 發布訂閱:Publish/Subscribe(Pub/Sub) 
List 實現方式其實就是點對點的模式,下邊我們再看下 Redis 的發布訂閱模式(消息多播),這才是“根正苗紅”的 Redis MQ
redis-pub_sub"發布/訂閱"模式同樣可以實現進程間的消息傳遞,其原理如下:
"發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。
Redis 通過 PUBLISH 、 SUBSCRIBE 等命令實現了訂閱與發布模式, 這個功能提供兩種信息機制, 分別是訂閱/發布到頻道和訂閱/發布到模式。
這個 頻道 和 模式 有什么區別呢?
頻道我們可以先理解為是個 Redis 的 key 值,而模式,可以理解為是一個類似正則匹配的 Key,只是個可以匹配給定模式的頻道。這樣就不需要顯式的去訂閱多個名稱了,可以通過模式訂閱這種方式,一次性關注多個頻道。
我們啟動三個 Redis 客戶端看下效果:
redis-subscribe先啟動兩個客戶端訂閱(subscribe) 名字叫 framework 的頻道,然后第三個客戶端往 framework 發消息,可以看到前兩個客戶端都會接收到對應的消息:
redis-publish我們可以看到訂閱的客戶端每次可以收到一個 3 個參數的消息,分別為:
- 消息的種類 
- 始發頻道的名稱 
- 實際的消息 
再來看下訂閱符合給定模式的頻道,這回訂閱的命令是 PSUBSCRIBE
redis-psubscribe我們往 java.framework 這個頻道發送了一條消息,不止訂閱了該頻道的 Consumer1 和 Consumer2 可以接收到消息,訂閱了模式 java.* 的 Consumer3 和 Consumer4 也可以接收到消息。
redis-psubscribe1Pub/Sub 常用命令:
| PSUBSCRIBE | PSUBSCRIBE pattern [pattern ...] | 訂閱一個或多個符合給定模式的頻道 | 
| PUBSUB | PUBSUB subcommand [argument [argument ...]] | 查看訂閱與發布系統狀態 | 
| PUBLISH | PUBLISH channel message | 將信息發送到指定的頻道 | 
| PUNSUBSCRIBE | PUNSUBSCRIBE [pattern [pattern ...]] | 退訂所有給定模式的頻道 | 
| SUBSCRIBE | SUBSCRIBE channel [channel ...] | 訂閱給定的一個或多個頻道的信息 | 
| UNSUBSCRIBE | UNSUBSCRIBE [channel [channel ...]] | 指退訂給定的頻道 | 
2.3 Streams 實現消息隊列
Redis 發布訂閱 (pub/sub) 有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。而且也沒有 Ack 機制來保證數據的可靠性,假設一個消費者都沒有,那消息就直接被丟棄了。
后來 Redis 的父親 Antirez,又單獨開啟了一個叫 Disque 的項目來完善這些問題,但是沒有做起來,github 的更新也定格在了 5 年前,所以我們就不討論了。
Redis 5.0 版本新增了一個更強大的數據結構——Stream。它提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
它就像是個僅追加內容的消息鏈表,把所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容。而且消息是持久化的。
redis-stream每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創建。
Streams 是 Redis 專門為消息隊列設計的數據類型,所以提供了豐富的消息隊列操作命令。
Stream 常用命令
| 添加消息到末尾,保證有序,可以自動生成唯一ID | XADD key ID field value [field value ...] | 
| 對流進行修剪,限制長度 | XTRIM key MAXLEN [~] count | 
| 刪除消息 | XDEL key ID [ID ...] | 
| 獲取流包含的元素數量,即消息長度 | XLEN key | 
| 獲取消息列表,會自動過濾已經刪除的消息 | XRANGE key start end [COUNT count] | 
| 以阻塞或非阻塞方式獲取消息列表 | XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] | 
| 創建消費者組 | XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername] | 
| 讀取消費者組中的消息 | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] | 
| 將消息標記為"已處理" | XACK key group ID [ID ...] | 
| 為消費者組設置新的最后遞送消息ID | XGROUP SETID [CREATE key groupname id-or-] [DESTROY key groupname] | 
| 刪除消費者 | XGROUP DELCONSUMER [CREATE key groupname id-or-] [DESTROY key groupname] | 
| 刪除消費者組 | XGROUP DESTROY [CREATE key groupname id-or-] [DESTROY key groupname] [DEL | 
| 顯示待處理消息的相關信息 | XPENDING key group [start end count] [consumer] | 
| 查看流和消費者組的相關信息 | XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP] | 
| 打印流信息 | XINFO STREAM [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP] | 
CRUD 工程師上線
增刪改查來一波
#?*?號表示服務器自動生成?ID,后面順序跟著一堆?key/value 127.0.0.1:6379>?xadd?mystream?*?f1?v1?f2?v2?f3?v3 "1609404470049-0"??##?生成的消息?ID,有兩部分組成,毫秒時間戳-該毫秒內產生的第1條消息#?消息ID?必須要比上個?ID?大 127.0.0.1:6379>?xadd?mystream?123?f4?v4?? (error)?ERR?The?ID?specified?in?XADD?is?equal?or?smaller?than?the?target?stream?top?item#?自定義ID 127.0.0.1:6379>?xadd?mystream?1609404470049-1?f4?v4 "1609404470049-1"#?-表示最小值?,?+?表示最大值,也可以指定最大消息ID,或最小消息ID,配合?-、+?使用 127.0.0.1:6379>?xrange?mystream?-?+ 1)?1)?"1609404470049-0"2)?1)?"f1"2)?"v1"3)?"f2"4)?"v2"5)?"f3"6)?"v3" 2)?1)?"1609404470049-1"2)?1)?"f4"2)?"v4"127.0.0.1:6379>?xdel?mystream?1609404470049-1 (integer)?1 127.0.0.1:6379>?xlen?mystream (integer)?1 #?刪除整個?stream 127.0.0.1:6379>?del?mystream (integer)?1獨立消費
xread 以阻塞或非阻塞方式獲取消息列表,指定 BLOCK 選項即表示阻塞,超時時間 0 毫秒(意味著永不超時)
#?從ID是0-0的開始讀前2條 127.0.0.1:6379>?xread?count?2?streams?mystream?0 1)?1)?"mystream"2)?1)?1)?"1609405178536-0"2)?1)?"f5"2)?"v5"2)?1)?"1609405198676-0"2)?1)?"f1"2)?"v1"3)?"f2"4)?"v2"#?阻塞的從尾部讀取流,開啟新的客戶端xadd后發現這里就讀到了,block?0?表示永久阻塞 127.0.0.1:6379>?xread?block?0?streams?mystream?$ 1)?1)?"mystream"2)?1)?1)?"1609408791503-0"2)?1)?"f6"2)?"v6" (42.37s)可以看到,我并沒有給流 mystream 傳入一個常規的 ID,而是傳入了一個特殊的 ID $這個特殊的 ID 意思是 XREAD 應該使用流 mystream 已經存儲的最大 ID 作為最后一個 ID。以便我們僅接收從我們開始監聽時間以后的新消息。這在某種程度上相似于 Unix 命令tail -f。
當然,也可以指定任意有效的 ID。
而且, XREAD 的阻塞形式還可以同時監聽多個 Strema,只需要指定多個鍵名即可。
127.0.0.1:6379>?xread?block?0?streams?mystream?yourstream?$?$創建消費者組
xread 雖然可以扇形分發到 N 個客戶端,然而,在某些問題中,我們想要做的不是向許多客戶端提供相同的消息流,而是從同一流向許多客戶端提供不同的消息子集。比如下圖這樣,三個消費者按輪訓的方式去消費一個 Stream。
redis-stream-cgRedis Stream 借鑒了很多 Kafka 的設計。
- Consumer Group:有了消費組的概念,每個消費組狀態獨立,互不影響,一個消費組可以有多個消費者 
- last_delivered_id :每個消費組會有個游標 last_delivered_id 在數組之上往前移動,表示當前消費組已經消費到哪條消息了 
- pending_ids :消費者的狀態變量,作用是維護消費者的未確認的 id。pending_ids 記錄了當前已經被客戶端讀取的消息,但是還沒有 ack。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱之為 PEL,也就是 Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。 
Stream 不像 Kafak 那樣有分區的概念,如果想實現類似分區的功能,就要在客戶端使用一定的策略將消息寫到不同的 Stream。
- xgroup create:創建消費者組 
- xgreadgroup:讀取消費組中的消息 
- xack:ack 掉指定消息 
按消費組消費
Stream 提供了 xreadgroup 指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息 ID。它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PEL(正在處理的消息) 結構里,客戶端處理完畢后使用 xack 指令通知服務器,本條消息已經處理完畢,該消息 ID 就會從 PEL 中移除。
#??消費組?mygroup1?中的?消費者?c1?從?mystream?中?消費組數據 #?>?號表示從當前消費組的?last_delivered_id?后面開始讀 #?每當消費者讀取一條消息,last_delivered_id?變量就會前進 127.0.0.1:6379>?xreadgroup?group?mygroup1?c1?count?1?streams?mystream?> 1)?1)?"mystream"2)?1)?1)?"1609727806627-0"2)?1)?"f1"2)?"v1"3)?"f2"4)?"v2"5)?"f3"6)?"v3" 127.0.0.1:6379>?xreadgroup?group?mygroup1?c1?count?1?streams?mystream?> 1)?1)?"mystream"2)?1)?1)?"1609727818650-0"2)?1)?"f4"2)?"v4" #?已經沒有消息可讀了???????????? 127.0.0.1:6379>?xreadgroup?group?mygroup1?c1?count?2?streams?mystream?> (nil)#?還可以阻塞式的消費 127.0.0.1:6379>?xreadgroup?group?mygroup1?c2?block?0?streams?mystream?> μ1)?1)?"mystream"2)?1)?1)?"1609728270632-0"2)?1)?"f5"2)?"v5" (89.36s)#?觀察消費組信息 127.0.0.1:6379>?xinfo?groups?mystream 1)?1)?"name"2)?"mygroup1"3)?"consumers"4)?(integer)?2??#?2個消費者5)?"pending"6)?(integer)?3???#?共?3?條正在處理的信息還沒有?ack7)?"last-delivered-id"8)?"1609728270632-0"127.0.0.1:6379>?xack?mystream?mygroup1?1609727806627-0??#?ack掉指定消息 (integer)?1嘗鮮到此結束,就不繼續深入了。
個人感覺,就目前來說,Stream 還是不能當做主流的 MQ 來使用的,而且使用案例也比較少,慎用。
寫在最后
- 當然,還有需要注意的就是,業務上避免過度復用一個 Redis。既用它做緩存、做計算,還拿它做任務隊列,這樣的話 Redis 會很累的。 
- 沒有絕對好的技術、只有對業務最友好的技術,共勉 
以夢為馬,越騎越傻。詩和遠方,越走越慌。不忘初心是對的,但切記要出發,加油吧,程序員。
參考
- 《Redis 設計與實現》 
- Redis 官網 
- https://segmentfault.com/a/1190000012244418 
- https://www.cnblogs.com/williamjie/p/11201654.html 
Spring Boot集成Redis,這個坑把我害慘了!
硬核Redis總結,看這篇就夠了!
Socket粘包問題終極解決方案—Netty版(2W字)!
關注我,每天陪你進步一點點!
總結
以上是生活随笔為你收集整理的Redis 消息队列的三种方案(List、Streams、Pub/Sub)的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Redis 过期策略与源码分析
- 下一篇: 面试题:彻底搞懂 Cookie 和 Se
