AspNetCore结合Redis实践消息队列
這是年中首發在博客園上的文章,個人覺得是AspNetCore結合Redis做的一次比較優秀的消息隊列重構,其中對于點對點/發布-訂閱的思路應該也是面試必考題。
引言
.Net?TPL?Dataflow是一個進程內數據流管道,應對高并發、低延遲的要求非常有效, 但在實際Docker部署的過程中, 有一個問題一直無法回避:
單體程序部署的瞬間(服務不可用)會有少量流量無法處理;
更糟糕的情況下,迭代部署的這個版本有問題,上線后無法工作, 導致更多流量沒有處理。
????背負神圣使命(巨大壓力)的程序猿心生一計,為何不將單體程序改成分布式:
增加服務ReceiverApp,ReceiverApp只接受數據,WebApp只處理數據。
知識儲備
? ? 消息隊列和訂閱發布作為老生常談的兩個知識點被反復提及,按照JMS的規范, 官方稱為點對點(point to point, queue)和發布/訂閱(publish/subscribe,topic)
點對點
????生產者發送消息到Message Queue中,然后消費者從隊列中取出消息并消費。
隊列會保留消息,直到他們被消費或超時;?
①?MQ支持多消費者,但每個消息只能被一個消費者處理
②?發送者和消費者在時間上沒有依賴性,當發送者發送消息之后,不管消費者有沒有在運行(甚至不管有沒有消費者),都不會影響到消息被發送到隊列
③ 一般消費者在消費之后需要向隊列應答成功
如果你希望發送的每個消息都應該被成功處理,你應該使用p2p模型
發布/訂閱
消息生產者將消息發布到Channel,在此之前已有多個消費者訂閱該通道。
和點對點方式不同,發布到特定通道的消息會被通道訂閱者收到。
通道沒有暫存隊列機制,發布的消息只能被當前收聽的訂閱者接收到
① 每個消息可以有多個訂閱者
② 發布者和消費者有時間上依賴性:某通道的訂閱者,必須先創建該通道訂閱,才能收到消息
發布消息至通道,不關注訂閱者是誰;訂閱者可收聽自己感興趣的多個通道(類似于topic),也不關注發布者是誰。
③ 故如果沒有訂閱者,發布的消息將得不到處理;
頭腦風暴
Redis內置的List數據結構能形成輕量級消息隊列的效果;Redis原生支持發布/訂閱?模型
如上分析, Pub/Sub模型在訂閱者宕機的時候,發布的消息得不到處理,故此模型不能用于強業務的數據接收和處理。
本次采用的消息隊列模型:
解耦業務:新建ReceiverApp作為生產者,專注于接收并發送到隊列;原有的WebApp作為消費者專注數據處理。
起到削峰填谷的作用,若縮放出多個WebApp消費者容器,還能形成負載均衡的效果。?
需要關注Redis操作List結構的兩個命令( 左進右出,右進左出同理):
? ? LPUSH? &? RPOP/BRPOP
Brpop中的B 表示"Block",是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端連接,直到超時返回nil
AspNetCore編程實踐
本次使用AspNetCore 完成RedisMQ的實踐,引入Redis國產第三方開源庫CSRedisCore
生產者ReceiverApp
生產者使用LPush命令向Redis List數據結構寫入消息。
------------------截取自Startup.cs------------------------- public void ConfigureServices(IServiceCollection services) {// Redis客戶端要定義成單例, 不然在大流量并發收數的時候, 會造成redis client來不及釋放。另一方面也確認api控制器不是單例模式,var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");RedisHelper.Initialization(csredis);services.AddSingleton(csredis);services.AddMvc(); } ------------------截取自數據接收Controller------------------- [Route("batch")] [HttpPost] public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs) {if (!ModelState.IsValid)throw new ArgumentException("Http Body Payload Error.");var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);if (eqidPairs != null && eqidPairs.Any())RedisHelper.LPush(redisKey, eqidPairs.ToArray());await Task.CompletedTask;}消費者WebApp
????根據以上RedisMQ思路,事件消費方式是拉取pull,故需要輪詢Redis? List數據結構,這里使用AspNetCore內置的BackgroundService后臺服務類后臺輪詢消費:
關注后臺Job中的循環接收方法。
public class BackgroundJob : BackgroundService {private?readonly?IEqidPairHandler?_eqidPairHandler;private readonly CSRedisClient[] _cSRedisClients;private readonly IConfiguration _conf;private readonly ILogger _logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory){_eqidPairHandler = eqidPairHandler;_cSRedisClients = csRedisClients;_conf = conf;_logger = loggerFactory.CreateLogger(nameof(BackgroundJob));}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_logger.LogInformation("Service starting");if (_cSRedisClients[0] == null){_cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);}RedisHelper.Initialization(_cSRedisClients[0]);while (!stoppingToken.IsCancellationRequested){var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";var?eqidpair?=?RedisHelper.BRPop(5,?key);if (eqidpair != null)await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));//?強烈建議無論如何休眠一段時間,防止突發大流量導致WebApp進程CPU滿載,自行根據場景設置合理休眠時間await Task.Delay(10, stoppingToken);}_logger.LogInformation("Service stopping");} }迭代驗證
使用docker-compose單機部署Nginx,ReceiverApp,WebApp容器。
docker-compose up指令默認只會重建[Service配置或Image變更]的容器。
If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation,?docker-compose up?picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the?--no-recreate?flag.
做一次迭代驗證,更新docke-compose.yml文件WebApp服務的鏡像版本,
docker-compose up;
下圖顯示僅 數據處理容器 WebApp被Recreate:
Nice,分布式改造完成,效果很明顯,現在可以放心安全的迭代核心WebApp數據處理程序。
+?https://redis.io/commands/brpop
+?https://redis.io/commands/lpush
文字+制圖,均為原創,
掃碼點贊,
讓干貨飛一會。
............
往期推薦??
TPL Dataflow組件應對高并發,低延遲要求
docker stack,docker-compose前世今生
點贊的朋友年后老板加雞腿!
總結
以上是生活随笔為你收集整理的AspNetCore结合Redis实践消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET ORM FreeSql 第一个
- 下一篇: 提升Azure App Service的