RabbitMQ教程C#版 - 工作队列
先決條件
本教程假定RabbitMQ已經安裝,并運行在localhost標準端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。
從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表聯系我們。
1.工作隊列
(使用.NET客戶端)
在第一篇教程RabbitMQ教程C#版 “Hello World”中,我們編寫了兩個程序,用于從一個指定的隊列發送和接收消息。在本文中,我們將創建一個工作隊列,用于在多個工作線程間分發耗時的任務。
工作隊列(又名:任務隊列)背后的主要想法是避免立即執行資源密集型、且必須等待其完成的任務。相反的,我們把這些任務安排在稍后完成。我們可以將任務封裝為消息并把它發送到隊列中,在后臺運行的工作進程將從隊列中取出任務并最終執行。當您運行多個工作線程,這些任務將在這些工作線程之間共享。
這個概念在Web應用程序中特別有用,因為在一個HTTP請求窗口中無法處理復雜的任務。
2.準備
我們將略微修改上一個示例中的Send程序,以其可以在命令行發送任意消息。
這個程序將調度任務到我們的工作隊列中,所以讓我們把它命名為NewTask:
像教程[1],我們需要生成兩個項目:
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",
? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,
? ? ? ? ? ? ? ? ? ? ?body: body);
從命令行參數獲取消息的幫助方法:
private static string GetMessage(string[] args)
{
? ? return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
我們舊的Receive.cs腳本也需要進行一些更改:它需要為消息體中的每個點模擬一秒種的時間消耗。它將處理由RabbitMQ發布的消息,并執行任務,因此我們把它復制到Worker項目并修改:
// 構建消費者實例。
var consumer = new EventingBasicConsumer(channel);
// 綁定消息接收事件。
consumer.Received += (model, ea) =>
{
? ? var body = ea.Body;
? ? var message = Encoding.UTF8.GetString(body);
? ? Console.WriteLine(" [x] Received {0}", message);
? ? // 模擬耗時操作。
? ? int dots = message.Split('.').Length - 1;
? ? Thread.Sleep(dots * 1000);
? ? Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
模擬虛擬任務的執行時間:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
3.循環調度
使用任務隊列的優點之一是能夠輕松地并行工作。如果我們正在積累積壓的工作,我們僅要增加更多的工作者,并以此方式可以輕松擴展。
首先,我們嘗試同時運行兩個Worker實例。他們都會從隊列中獲取消息,但究竟如何?讓我們來看看。
您需要打開三個控制臺,兩個運行Worker程序,這些控制臺作為我們的兩個消費者 - C1和C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
在第三個控制臺中,我們將發布一些新的任務。一旦你已經運行了消費者,你可以嘗試發布幾條消息:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
讓我們看看有什么發送到了我們的Worker程序:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默認情況下,RabbitMQ會按順序將每條消息發送給下一個消費者。消費者數量平均的情況下,每個消費者將會獲得相同數量的消息。這種分配消息的方式稱為循環(Round-Robin)。請嘗試開啟三個或更多的Worker程序來驗證。
4.消息確認
處理一項任務可能會需要幾秒鐘的時間。如果其中一個消費者開啟了一項長期的任務并且只完成了部分就掛掉了,您可能想知道會發生什么?在我們當前的代碼中,一旦RabbitMQ把消息分發給了消費者,它會立即將這條消息標記為刪除。在這種情況下,如果您停掉某一個Worker,我們將會丟失這條正在處理的消息,也將丟失所有分發到該Worker但尚未處理的消息。
但是我們不想丟失任何一個任務。如果一個Worker掛掉了,我們希望這個任務能被重新分發給其他Worker。
為了確保消息永遠不會丟失,RabbitMQ支持消息確認機制。消費者回發一個確認信號Ack(nowledgement)給RabbitMQ,告訴它某個消息已經被接收、處理并且可以自由刪除它。
如果一個消費者在還沒有回發確認信號之前就掛了(其通道關閉,連接關閉或者TCP連接丟失),RabbitMQ會認為該消息未被完全處理,并將其重新排隊。如果有其他消費者同時在線,該消息將會被會迅速重新分發給其他消費者。這樣,即便Worker意外掛掉,也可以確保消息不會丟失。
沒有任何消息會超時;當消費者死亡時,RabbitMQ將會重新分發消息。即使處理消息需要非常非常長的時間也沒關系。
默認情況下,手動消息確認模式是開啟的。在前面的例子中,我們通過將autoAck(“自動確認模式”)參數設置為true來明確地關閉手動消息確認模式。一旦完成任務,是時候刪除這個標志并且從Worker手動發送一個恰當的確認信號給RabbitMQ。
// 構建消費者實例。
var consumer = new EventingBasicConsumer(channel);
// 綁定消息接收事件。
consumer.Received += (model, ea) =>
{
? ? var body = ea.Body;
? ? var message = Encoding.UTF8.GetString(body);
? ? Console.WriteLine(" [x] Received {0}", message);
? ??
? ? // 模擬耗時操作。
? ? int dots = message.Split('.').Length - 1;
? ? Thread.Sleep(dots * 1000);
? ? Console.WriteLine(" [x] Done");
? ??
? ? // 手動發送消息確認信號。
? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// autoAck:false - 關閉自動消息確認,調用`BasicAck`方法進行手動消息確認。
// autoAck:true? - 開啟自動消息確認,當消費者接收到消息后就自動發送ack信號,無論消息是否正確處理完畢。
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用上面這段代碼,我們可以確定的是,即使一個Worker在處理消息時,我們通過使用CTRL + C來終止它,也不會丟失任何消息。Worker掛掉不久,所有未確認的消息將會被重新分發。
忘記確認
遺漏BasicAck是一個常見的錯誤。這是一個很簡單的錯誤,但導致的后果卻是嚴重的。當客戶端退出時(看起來像是隨機分發的),消息將會被重新分發,但是RabbitMQ會吃掉越來越多的內存,因為它不能釋放未確認的消息。
為了調試這種錯誤,您可以使用rabbitmqctl來打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,刪除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
5.消息持久化
我們已經學習了如何確保即使消費者掛掉,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務還是會丟失。
當RabbitMQ退出或崩潰時,它會忘記已存在的隊列和消息,除非告訴它不要這樣做。為了確保消息不會丟失,有兩件事是必須的:我們需要將隊列和消息標記為持久。
首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。為了做到這一點,我們需要把隊列聲明是持久的(Durable):
// 聲明隊列,通過指定durable參數為`true`,對消息進行持久化處理。?
channel.QueueDeclare(queue: "hello",
? ? ? ? ? ? ? ? ? ? ?durable: true,
? ? ? ? ? ? ? ? ? ? ?exclusive: false,
? ? ? ? ? ? ? ? ? ? ?autoDelete: false,
? ? ? ? ? ? ? ? ? ? ?arguments: null);
雖然這個命令本身是正確的,但是它在當前設置中不會起作用。那是因為我們已經定義過一個名為hello的隊列,并且這個隊列不是持久化的。RabbitMQ不允許使用不同的參數重新定義已經存在的隊列,并會向嘗試執行該操作的程序返回一個錯誤。但有一個快速的解決辦法 - 讓我們用不同的名稱聲明一個隊列,例如task_queue:
channel.QueueDeclare(queue: "task_queue",
? ? ? ? ? ? ? ? ? ? ?durable: true,
? ? ? ? ? ? ? ? ? ? ?exclusive: false,
? ? ? ? ? ? ? ? ? ? ?autoDelete: false,
? ? ? ? ? ? ? ? ? ? ?arguments: null);
注意,該聲明隊列QueueDeclare方法的更改需要同時應用于生產者和消費者代碼。
此時,我們可以確定的是,即使RabbitMQ重新啟動,task_queue隊列也不會丟失。現在我們需要將我們的消息標記為持久的(Persistent)?- 通過將IBasicProperties.Persistent設置為true。
// 將消息標記為持久性。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
關于消息持久性的說明
將消息標記為Persistent并不能完全保證消息不會丟失。盡管它告訴RabbitMQ將消息保存到磁盤,但當RabbitMQ接收到消息并且尚未保存消息時仍有一段時間間隔。此外,RabbitMQ不會為每條消息執行fsync(2)?- 它可能只是保存到緩存中,并沒有真正寫入磁盤。消息的持久化保證并不健壯,但對于簡單的任務隊列來說已經足夠了。如果您需要一個更加健壯的保證,可以使用發布者確認。
6.公平調度
您可能已經注意到調度仍然無法完全按照我們期望的方式工作。例如,在有兩個Worker的情況下,假設所有奇數消息都很龐大、偶數消息都很輕量,那么一個Worker將會一直忙碌,而另一個Worker幾乎不做任何工作。是的,RabbitMQ并不知道存在這種情況,它仍然會平均地分發消息。
發生這種情況是因為RabbitMQ只是在消息進入隊列后就將其分發。它不會去檢查每個消費者所擁有的未確認消息的數量。它只是盲目地將第n條消息分發給第n位消費者。
為了改變上述這種行為,我們可以使用參數設置prefetchCount = 1的basicQos方法。
這就告訴RabbitMQ同一時間不要給一個Worker發送多條消息。或者換句話說,不要向一個Worker發送新的消息,直到它處理并確認了前一個消息。
相反,它會這個消息調度給下一個不忙碌的Worker。
channel.BasicQos(0, 1, false);
關于隊列大小的說明
如果所有的Worker都很忙,您的隊列可能會被填滿。請留意這一點,可以嘗試添加更多的Worker,或者使用其他策略。
7.組合在一起
我們NewTask.cs類的最終代碼:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
? ? public static void Main(string[] args)
? ? {
? ? ? ? // 實例化連接工廠。
? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };
? ? ? ??
? ? ? ? // 創建連接、信道。
? ? ? ? using(var connection = factory.CreateConnection())
? ? ? ? using(var channel = connection.CreateModel())
? ? ? ? {
? ? ? ? ? ? // 聲明隊列,標記為持久性。
? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);
? ? ? ? ? ??
? ? ? ? ? ? // 獲取發送消息。
? ? ? ? ? ? var message = GetMessage(args);
? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);
? ? ? ? ? ??
? ? ? ? ? ? // 將消息標記為持久性。
? ? ? ? ? ? var properties = channel.CreateBasicProperties();
? ? ? ? ? ? properties.Persistent = true;
? ? ? ? ? ??
? ? ? ? ? ? // 發送數據包。
? ? ? ? ? ? channel.BasicPublish(exchange: "",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?body: body);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? Console.WriteLine(" [x] Sent {0}", message);
? ? ? ? }
? ? ? ? Console.WriteLine(" Press [enter] to exit.");
? ? ? ? Console.ReadLine();
? ? }
? ? private static string GetMessage(string[] args)
? ? {
? ? ? ? return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
? ? }
}
(NewTask.cs源碼)
還有我們的Worker.cs:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
? ? public static void Main()
? ? {
? ? ? ? // 實例化連接工廠。
? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };
? ? ? ??
? ? ? ? ?// 創建連接、信道。
? ? ? ? using(var connection = factory.CreateConnection())
? ? ? ? using(var channel = connection.CreateModel())
? ? ? ? {
? ? ? ? ? ? // 聲明隊列,標記為持久性。
? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);
? ? ? ? ? ??
? ? ? ? ? ? // 告知RabbitMQ,在未收到當前Worker的消息確認信號時,不再分發給消息,確保公平調度。
? ? ? ? ? ? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
? ? ? ? ? ? Console.WriteLine(" [*] Waiting for messages.");
? ? ? ? ? ? // 構建消費者實例。
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);
? ? ? ? ? ??
? ? ? ? ? ? // 綁定消息接收事件。
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var body = ea.Body;
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(body);
? ? ? ? ? ? ? ? Console.WriteLine(" [x] Received {0}", message);
? ? ? ? ? ? ? ? // 模擬耗時操作。
? ? ? ? ? ? ? ? int dots = message.Split('.').Length - 1;
? ? ? ? ? ? ? ? Thread.Sleep(dots * 1000);
? ? ? ? ? ? ? ? Console.WriteLine(" [x] Done");
? ? ? ? ? ? ? ? // 手動發送消息確認信號。
? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
? ? ? ? ? ? };
? ? ? ? ? ??
? ? ? ? ? ? channel.BasicConsume(queue: "task_queue",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoAck: false,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer: consumer);
? ? ? ? ? ? Console.WriteLine(" Press [enter] to exit.");
? ? ? ? ? ? Console.ReadLine();
? ? ? ? }
? ? }
}
(Worker.cs源碼)
使用消息確認機制和BasicQ您可以創建一個工作隊列。即使RabbitMQ重新啟動,通過持久性選項也可讓任務繼續存在。
有關IModel方法和IBasicProperties的更多信息,您可以在線瀏覽RabbitMQ .NET客戶端API參考。
現在,我們可以繼續閱讀教程[3],學習如何向多個消費者發送相同的消息。
8.寫在最后
本文翻譯自RabbitMQ官方教程C#版本。本文介紹如與官方有所出入,請以官方最新內容為準。
水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
原文鏈接:RabbitMQ tutorial - Work Queues
實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
最后更新:2018-04-03
相關文章:?
.net core 使用Redis的發布訂閱
RabbitMQ知多少
RabbitMQ系列教程之四:路由(Routing)
RabbitMQ系列教程之三:發布/訂閱(Publish/Subscribe)
RabbitMQ系列教程之二:工作隊列(Work Queues)
如何優雅的使用RabbitMQ
.NET 使用 RabbitMQ 圖文簡介
RabbitMQ 高可用集群搭建及電商平臺使用經驗總結
.NET Core 使用RabbitMQ
ASP.NET Core Web API下事件驅動型架構的實現(三):基于RabbitMQ的事件總線
RabbitMQ教程C#版 “Hello World”
原文地址:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html?
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的RabbitMQ教程C#版 - 工作队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 这个拖后腿的“in”
- 下一篇: IdentityServer4实战 -