【BCVP】实现基于 Redis 的消息队列
聆聽自己的聲音
如果自己學不動了,或者感覺沒有動力的時候,看看書,聽聽音樂,跑跑步,休息兩天,重新出發,偷懶雖好,可不要貪杯。
話說上回書我們說到了,Redis的使用修改《【BCVP更新】StackExchange.Redis的異步開發方式》,通過異步的時候,基本上會解決StackExRedis組件使用過程中,可能在并發的時候遇到的問題,而且該組件也是微軟官方推薦的(參考微軟微服務框架eShopOnContainers),如果一定要抬杠說不好用,其實是沒必要的。那今天我們繼續往下說,簡單說下如何基于Redis實現消息隊列。
目前在市面上比較主流的消息隊列中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等這幾種。當然常見的還是基于RabbitMQ來實現的,Redis份額稍微小了一點,但是因為Redis的倉儲、緩存等多個方面的好處,使得Redis也是很火。
1
什么是消息隊列
這個其實我今天不打算重點講,因為我詳細每個人能看這篇文章,肯定都知道消息隊列的相關內容,但是為了不那么突兀,我就從網上粘貼幾塊基本概念,了解一二:
基本概念:
消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的貯列用來處理一系列的輸入,通常是來自用戶。
消息隊列提供了異步的通信協議,每一個貯列中的紀錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列交互。消息會保存在隊列中,直到接收者取回它。
最終可以實現解耦的目的。
下面通過一個簡單的架構模型來解釋:
Producer:消息生產者,負責產生和發送消息到Broker。
Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個Queue。
Consumer:消息消費者,負責從 Broker 中獲取消息,并進行相應處理。
有哪些優缺點:
從上邊的定義中,我們可以看出來,優點主要是三塊:
異步、流量削峰與流控、解耦。
這三個優點在高并發等三高場景還是很有必要的,甚至說是十分必要的。
典型的廣播模式,一個消息可以發布到多個消費者;
消息即時發送,消息不用等待消費者讀取,消費者會自動接收到信道發布的消息;
比如我們某寶下訂單,或某6搶車票,那都是放到隊列里緩沖的,要是都用服務端等待,可能早就崩了,當然實際上比這個復雜的多。
而且,通過訂閱發布的模式,異步執行,這樣就會大大緩解時間壓力。
但是,隨之而來的弊端也是有的:
比如為了異步,就是接收者必須輪詢消息隊列,才能收到最近的消息。然后還有就是不能達到實時性,說白了就是用空間換時間,從而降低瓶頸。
消息一旦發布,不能接收。換句話就是發布時若客戶端不在線,則消息丟失,不能尋回。不能保證每個消費者接收的時間是一致的。若消費者客戶端出現消息積壓,到一定程度,會被強制斷開,導致消息意外丟失。
五種常見模式
簡單模式Hello World
功能:一個生產者P發送消息到隊列Q,一個消費者C接收
工作隊列模式Work Queue
功能:一個生產者,多個消費者,每個消費者獲取到的消息唯一,多個消費者只有一個隊列
發布/訂閱模式Publish/Subscribe
功能:一個生產者發送的消息會被多個消費者獲取。一個生產者、一個交換機、多個隊列、多個消費者
路由模式Routing
說明:生產者發送消息到交換機并且要指定路由key,消費者將隊列綁定到交換機時需要指定路由key
通配符(主題)模式Topic
說明:生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配;
更多具體的內容呢,自己感興趣多去搜索下吧,肯定還是有很多其他問題的,我這里就不鋪開了講了,下邊咱們就說說,如何在Blog.Core里添加隊列吧。
2
訂閱發布相關配置案例
案例有很多,自己可以根據情況自定義。
那既然要講東西,肯定不能隨便放一個算法,肯定是需要一個小demo,一個應用場景,這樣更有助于初學者去理解,之前考慮了很多,一直沒有想好在BlogCore里邊使用什么案例場景來說一說消息隊列,最后實在是沒辦法,只能說日志了,萬事不決就說日志,好像軟件開發都是這么舉例的。
這里說一下,假設我們自定義了一個日志記錄的方法,就是在txt里寫數據,其實我現在也是這么用的,平時肯定會一邊查一邊寫,如果并發高一下,肯定就會出現死鎖或者異常的出現,那我們就可以把寫日志放到消息隊列里,緩沖一下,然后在寫一個訂閱者,專門來“盯著”隊列,一有消息傳過來,就寫到日志文件里,這樣就能很好的實現相應的目的。如果不緩沖下,有時候日志可能高達幾萬條,瞬間爆炸。
那說了這個小場景,接下來就簡單的模擬一下吧。
1、定義消息隊列操作類與接口
既然要發布和訂閱消息,肯定就需要有相應的操作方法,在上一篇文章中,我新建了一個RedisBasketRepository.cs的操作類,那我們還繼續在這個類文件里寫吧,注意,這個實現類和接口,已經注冊到服務容器了,如果你第一次操作,可以參考文章開頭上篇文章內容:
我這里只是簡單的Copy出來幾個做例子,總的一共有12個,當然你也可以自定義增加或刪除某些不必要的,核心的可以看出來,都是根據redisKey來操作的:
Task<RedisValue[]> ListRangeAsync(string redisKey);Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;Task<string> ListLeftPopAsync(string redisKey, int db = -1);Task<string> ListRightPopAsync(string redisKey, int db = -1);Task<long> ListLengthAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);Task?ListClearAsync(string?redisKey,?int?db?=?-1);2、如何發布消息與接收消息
上邊定義好了相應的操作方法以后,就很簡單了,我們來發布一條消息來試試:
就是這么簡單,構造函數注入以后,直接調用相應的方法,就把消息msg推送到了隊列里了,這里的redisKey,我用了常量定義,具體可操作Blog.Core源代碼。
現在是發布消息特別簡單,只需要一行接口,那如何去獲取呢,在上邊的獲取方法中,我們定義的是:
Task<RedisValue[]> ListRangeAsync(string redisKey);這個方法也是可以的,只不過我們需要對其進行轉換,畢竟存的msg是字符串string類型的,但是這里的返回類型的RedisValue[],所以需要劈里啪啦轉化一下。
但是這里有一個問題,就是如何去定時獲取呢,也就是如何設計一個訂閱者進行消費消息呢,這需要思考下,當然比較簡單的就是while(true){},可能平時就是這么使用的,不過還是不是那么爽快,可以寫一個組件來處理,簡單快捷,正好,有一個大佬已經封裝好了,我們可以直接拿來用,如果你有什么問題,可以給他提issue。
3、InitQ組件來訂閱消息
在nuget中,可以直接安裝組個組件:
他的開源地址是:
https://github.com/wmowm/Initq
使用方法很簡單,可以參考他的README里的介紹:
1、先添加服務
/// <summary> /// Redis 消息隊列 啟動服務 /// </summary> public static class RedisInitMqSetup {public static void AddRedisInitMqSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));services.AddInitQ(m =>{//時間間隔m.SuspendTime = 5000;//redis服務器地址m.ConnectionString = "127.0.0.1:6379";//對應的訂閱者類,需要new一個實例對象,當然你也可以傳參,比如日志對象m.ListSubscribe = new List<IRedisSubscribe>() { new RedisSubscribe()};//顯示日志m.ShowLog = false;});} }2、定義訂閱者
public class RedisSubscribe : IRedisSubscribe{[Subscribe(RedisMqKey.Loging)]private async Task SubRedisLoging(string msg){Console.WriteLine($"隊列{RedisMqKey.Loging} 消費到/接受到 消息:{msg}");await Task.CompletedTask;}}整體很簡單,繼承接口,然后添加上特性,這個特性里的參數,就是我們消息發布的時候的那個key,然后方法的參數,就是對應的消息msg,是不是很簡單。
當然這里你可以傳遞一個日志的對象實例,這樣就把日志信息分流到了隊列里,然后隊列走到這個訂閱者里,由這里進行緩沖,然后把日志填充到日志文件,從而達到減峰的目的。
最終的效果可以看看:
好啦,今天的redis消息隊列已經說完了,還是很簡單的,其中重點還是那五種模式要自己好好了解下,然后整體過程自己把握把握,至于RabbitMQ,這個以后再說吧。
END
掃碼關注
老張的哲學
更多精彩等著你
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的【BCVP】实现基于 Redis 的消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于技术规划的想法
- 下一篇: 初识ABP vNext(8):ABP特征