说说 RabbiMQ 的应答模式
RabbiMQ 我們都很熟悉了,是很常用的一個開源消息隊列。搞懂 RabbiMQ 的應答模式對我們排查錯誤很有幫助,也能避免一些坑。本文說說 RabbiMQ 的應答模式。
生產者發出一條消息給 RabbiMQ ,RabbiMQ 將消息推送給消費者,消費者處理完消息后告訴 RabbiMQ,我已經接收到消息并處理了,RabbiMQ 收到通知后會將消息從隊列中刪除。消費者通知 MQ 的這個過程就是消息的應答。在 RabbiMQ 中有兩種應答模式:自動應答和手動應答。
版本
dotNET Core :3.1
RabbitMQ:3.8.2
RabbitMQ.Client:6.2.1
自動應答
當 RabbiMQ 開啟了消息的自動應答,一旦 RabbiMQ 將消息分發給了消費者,就會將消息從內存中刪除。這種情況下,如果正在執行的消費者掛掉,就會丟失正在處理的消息。
生產者代碼
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("RabbitMQ連接成功,請輸入消息,輸入exit退出");channel.QueueDeclare("oec2003",?false,?false,?false,?null);string?input;do{input?=?Console.ReadLine();var?body?=?Encoding.UTF8.GetBytes(input);channel.BasicPublish("",?"oec2003",?null,?body);}while?(input.Trim().ToLower()?!=?"exit");} }消費者代碼
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監聽......");channel.QueueDeclare("oec2003",?false,?false,?false,?null);EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");System.Threading.Thread.Sleep(10000);};channel.BasicConsume("oec2003",?true,?consumer);Console.ReadKey();} }channel.BasicConsume 方法的第二個參數設置為 true 表示自動應答;
開啟自動應答后,消息是生產者發布后,當有消費者連接上后,所有的消息都會被自動確認,并且從內存中刪除,這時如果消費者進程掛掉,沒有處理的消息會丟失,正在處理中的消息也不會被重新投遞;
自動應答的好處是消息隊列不會處于堵塞狀態,但代價有點大,生產環境中還是不建議使用。
手動應答
手動應答,當消費者接收到消息處理完后,需要發送一個回執,告訴 RabbiMQ 服務端,這時 RabbiMQ 才會將該消息刪除。
生產者的代碼和上面的一樣,消費者代碼需要做相關調整,如下:
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監聽......");EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");channel.BasicAck(ea.DeliveryTag,?false);};channel.BasicConsume("oec2003",?false,?consumer);Console.ReadKey();} }channel.BasicConsume 方法的第二個參數設置為 false ,表示手動應答模式;
在處理完消息后調用 channel.BasicAck(ea.DeliveryTag, false); 來進行應答,告訴 RabbiMQ 消息已經收到,RabbiMQ 收到這個回執后,才會刪除消息。
可能遇到的問題
流量控制問題
在手動模式下,生產者發送消息后消息會從 Ready 進入到 Unacked 中,當消費者進行應答之后消息從 Unacked 中刪除。
如果消息的產生速度遠遠大于消費者的處理速度,這時消息就會都在消費者處進行積壓了。我們會看到 Unacked 中的數量會越來越大,這樣消費者的壓力就會越來越大,這時就需要使用 Qos 來進行限流。
Qos
在消費者中使用 channel.BasicQos(0, 2, false); 來進行 Qos 的設置,如下圖:
BasicQos 方法有三個參數:
prefetchSize:批量獲取消息的總大小,0為不限制;
prefetchCount:每次處理消息的個數,比如 prefetchCount 設置為 2 ,那么處于 Unacked 狀態的消息最多就 2 條,當其中一條進行了得到了應答后,才會從 Ready 中轉入一條到 Unacked
global:設置為 true 表示對 channel 進行控制,否則對每個消費者進行限制,一個 channel 可以有多個消費者
為什么使用 Qos :
提高服務穩定性,因為有 prefetchCount 參數的控制,不會有海量的數據涌進來導致消費者服務掛掉;
提高吞吐量,當隊列有多個消費者時,每個消費者的能力不一樣,我們可以通過 prefetchCount 參數來合理安排每個消費者的處理能力,不會出現有的空閑,有的積壓。
prefetchCount 是一個非常關鍵的參數,當消費者處理消息時,出現一些異常情況,導致無法進行 Ack 應答,沒有應答的數量大于等于 prefetchCount 時,隊列就會發生堵塞。所以我們一定要確保消息的處理能夠被異常捕獲,并在 finally 中進行 Ack 應答,代碼如下:
try {string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");if?(message?==?"error"){throw?new?Exception("mq?error");}else?if?(message?==?"sleep"){System.Threading.Thread.Sleep(60000);} } catch?(Exception) {//處理異常 } finally {channel.BasicAck(ea.DeliveryTag,?false); }一旦隊列堵塞了,一種處理方式就是斷掉客戶端,這樣,處在 Unacked 中的消息會重新回到 Ready 中,會重新進行投遞進行消費。
總結
1、自動應答模式需要慎用,特別是生產環境;
2、不開啟 Qos ,消費者可能會面臨很大壓力,但消息不會堵塞(測試過 500 個未進行 Ack 沒有造成堵塞),現在不確定在沒有 Qos 的情況下,有沒有默認的最大 prefetchCount ;
3、開啟 Qos ,prefetchCount 的值很關鍵,并且需要做好異常處理,防止堵塞。
希望本文對您有所幫助!
總結
以上是生活随笔為你收集整理的说说 RabbiMQ 的应答模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenTelemetry - 云原生下
- 下一篇: 微软2020开源回顾:止不住的挨骂,停不