RabbitMQ一个简单可靠的方案(.Net Core实现)
前言
最近需要使用到消息隊列相關技術,于是重新接觸RabbitMQ。其中遇到了不少可靠性方面的問題,歸納了一下,大概有以下幾種:
1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等;
2. 時序異常,如A任務依賴于B任務,但可能由于調度或消費者分配的原因,導致A任務先于B任務執行;
3. 業務異常,由于系統測試不充分,上線后發現某幾個或某幾種消息無法正常處理;
4. 系統異常,業務中間件無法正常操作,如網絡中斷、數據庫宕機等;
5. 非法異常,一些偽造、攻擊類型的消息。?
針對這些異常,我采用了一種基于消息審計、消息重試、消息檢索、消息重發的方案。
?
方案
?
?
1. 消息均使用Exchange進行通訊,方式可以是direct或topic,不建議fanout。
2. 根據業務在Exchange下分配一個或多個Queue,同時設置一個審計線程(Audit)監聽所有Queue,用于記錄消息到MongoDB,同時又不阻塞正常業務處理。
3.?生產者(Publisher)在發布消息時,基于AMQP協議,生成消息標識MessageId和時間戳Timestamp,根據消息業務添加頭信息Headers便于跟蹤。
4.?消費者(Comsumer)消息處理失敗時,則把消息發送到重試交換機(Retry Exchange),并設置過期(重試)時間及更新重試次數;如果超過重試次數則刪除消息。
5. 重試交換機Exchange設置死信交換機(Dead Letter Exchange),消息過期后自動轉發到業務交換機(Exchange)。
6.?WebApi可以根據消息標識MessageId、時間戳Timestamp以及頭信息Headers在MongoDB中對消息進行檢索或重試。
?
注:選擇MongoDB作為存儲介質的主要原因是其對頭信息(headers)的動態查詢支持較好,同等的替代產品還可以是Elastic Search這些。
?
生產者(Publisher)
1. 設置斷線自動恢復
var factory = new ConnectionFactory{Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),AutomaticRecoveryEnabled = true };?
2. 定義Exchange,模式為direct
channel.ExchangeDeclare("Exchange", "direct");?
3. 根據業務定義QueueA和QueueB
channel.QueueDeclare("QueueA", true, false, false);channel.QueueBind("QueueA", "Exchange", "RouteA");channel.QueueDeclare("QueueB", true, false, false);channel.QueueBind("QueueB", "Exchange", "RouteB");?
4. 啟動消息發送確認機制,即需要收到RabbitMQ服務端的確認消息
channel.ConfirmSelect();?
5. 設置消息持久化
var properties = channel.CreateBasicProperties();properties.Persistent = true;?
6. 生成消息標識MessageId、時間戳Timestamp以及頭信息Headers
properties.MessageId = Guid.NewGuid().ToString("N");properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Headers = new Dictionary<string, object> {{ "key", "value" + i}};?
7. 發送消息,偶數序列發送到QueueA(RouteA),奇數序列發送到QueueB(RouteB)
channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);?
8. 確定收到RabbitMQ服務端的確認消息
var isOk = channel.WaitForConfirms();if (!isOk){throw new Exception("The message is not reached to the server!");}?完整代碼
效果:QueueA和QueueB各一條消息,QueueAudit兩條消息
? 注:Exchange下必須先聲明Queue才能接收到消息,上述代碼并沒有QueueAudit的聲明;需要手動聲明,或者先執行下面的消費者程序進行聲明。
?
正常消費者(ComsumerA)
1. 設置預取消息,避免公平輪訓問題,可以根據需要設置預取消息數,這里是1
_channel.BasicQos(0, 1, false);
?
2. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueA", true, false, false);_channel.QueueBind("QueueA", "Exchange", "RouteA");?
3. 編寫回調函數
注:設置了RabbitMQ的斷線恢復機制,當RabbitMQ連接不可用時,與MQ通訊的操作會拋出AlreadyClosedException的異常,導致主線程退出,哪怕連接恢復了,程序也無法恢復,因此,需要捕獲處理該異常。
?
異常消費者(ComsumerB)
1. 設置預取消息
_channel.BasicQos(0, 1, false);?
2. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueB", true, false, false);_channel.QueueBind("QueueB", "Exchange", "RouteB");?
3.? 設置死信交換機(Dead Letter Exchange)
  var retryDic = new Dictionary<string, object>  {
4. 重試設置,3次重試;第一次1秒,第二次10秒,第三次30秒
_retryTime = new List<int> {1 * 1000,10 * 1000,30 * 1000 };?
5. 獲取當前重試次數
var retryCount = 0;if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")){retryCount = (int)ea.BasicProperties.Headers["retryCount"];_logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");}?
6. 發生異常,判斷是否可以重試
private bool CanRetry(int retryCount){return retryCount <= _retryTime.Count - 1;}?7. 可以重試,則啟動重試機制
審計消費者(Audit Comsumer)
1. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueAudit", true, false, false);_channel.QueueBind("QueueAudit", "Exchange", "RouteA");_channel.QueueBind("QueueAudit", "Exchange", "RouteB");?
2. 排除死信Exchange轉發過來的重復消息
if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death")){...}?
3. 生成消息實體
var message = new Message{MessageId = ea.BasicProperties.MessageId,Body = ea.Body,Exchange = ea.Exchange,Route = ea.RoutingKey};4. RabbitMQ會用bytes來存儲字符串,因此,要把頭中bytes轉回字符串
if (ea.BasicProperties.Headers != null){var headers = new Dictionary<string, object>();foreach (var header in ea.BasicProperties.Headers){if (header.Value is byte[] bytes){headers[header.Key] = Encoding.UTF8.GetString(bytes);}else{headers[header.Key] = header.Value;}}message.Headers = headers;}5. 把Unix格式的Timestamp轉成UTC時間
if (ea.BasicProperties.Timestamp.UnixTime > 0){message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);message.Timestamp = offset.UtcDateTime;}?
6. 消息存入MongoDB
_mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);?
MongoDB記錄:
?
重試記錄:
?
消息檢索及重發(WebApi)
1. 通過消息Id檢索消息
?
2. 通過頭消息檢索消息
?
3. 消息重發,會重新生成MessageId
?
Ack,Nack,Reject的關系
1. 消息處理成功,執行Ack,RabbitMQ會把消息從隊列中刪除。
2. 消息處理失敗,執行Nack或者Reject:
a) 當requeue=true時,消息會重新回到隊列,然后當前消費者會馬上再取回這條消息;
b) 當requeue=false時,如果Exchange有設置Dead Letter Exchange,則消息會去到Dead Letter Exchange;
c) 當requeue=false時,如果Exchange沒設置Dead Letter Exchange,則消息從隊列中刪除,效果與Ack相同。
?
3. Nack與Reject的區別在于:Nack可以批量操作,Reject只能單條操作。
RabbitMQ自動恢復
連接(Connection)恢復
1. 重連(Reconnect)
2. 恢復連接監聽(Listeners)
3. 重新打開通道(Channels)
4. 恢復通道監聽(Listeners)
5. 恢復basic.qos,publisher confirms以及transaction設置
?
拓撲(Topology)恢復
1. 重新聲明交換機(Exchanges)
2. 重新聲明隊列(Queues)
3. 恢復所有綁定(Bindings)
4. 恢復所有消費者(Consumers)
?
異常處理機制
1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等
通過短時間重試(如1秒后)的方式處理,也可以考慮Nack/Reject來實現重試(時效性更高)。
?
2. 時序異常,如A任務依賴于B任務,但可能由于調度或消費者分配的原因,導致A任務先于B任務執行
通過長時間重試(如1分鐘、30分鐘、1小時、1天等),等待B任務先執行完的方式處理。
3. 業務異常,由于系統測試不充分,上線后發現某幾個或某幾種消息無法正常處理
等系統修正后,通過消息重發的方式處理。
?
4. 系統異常,業務中間件無法正常操作,如網絡中斷、數據庫宕機等
等系統恢復后,通過消息重發的方式處理。
?
5. 非法異常,一些偽造、攻擊類型的消息
多次重試失敗后,消息從隊列中被刪除,也可以針對此業務做進一步處理。
?
源碼地址
https://github.com/ErikXu/RabbitMesage
相關文章:
- CAP帶你輕松玩轉ASP.NETCore消息隊列 
- .NetCore Cap 結合 RabbitMQ 實現消息訂閱 
- [譯]RabbitMQ教程C#版 - 發布訂閱 
原文地址:?https://www.cnblogs.com/Erik_Xu/p/9515208.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com 
總結
以上是生活随笔為你收集整理的RabbitMQ一个简单可靠的方案(.Net Core实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: ASP.NET Core应用程序的参数配
- 下一篇: 分布式事务解决方案以及 .Net Cor
