RabbitMQ 官方NET教程(二)【工作队列】
這篇中我們將會創建一個工作隊列用來在工作者(consumer)間分發耗時任務。
工作隊列的主要任務是:避免立刻執行資源密集型任務和避免必須等待其完成。相反地,我們進行任務調度:我們把任務封裝為消息發送給隊列。工作進行在后臺運行并不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,任務隊列中的任務將會被這些工作進程共享執行。
這樣的概念在web應用中極其有用,當在很短的HTTP請求間需要執行復雜的任務。
準備
在本教程的前面部分,我們發送了一個包含Hello World!的消息。 現在我們將發送代替復雜任務的字符串。 我們沒有一個現實世界的任務,比如圖像被調整大小,或者是要渲染的pdf文件,所以假設我們很忙 - 通過使用Thread.sleep()函數來假冒它。 我們將把字符串中的點數作為其復雜度; 每個點都將占“work”的一秒鐘。 例如,由Hello...描述的假任務將需要三秒鐘。
我們將稍后從之前的例子中修改Send程序,以允許從命令行發送任意消息。 這個程序會將任務安排到我們的工作隊列中,所以讓我們命名為NewTask:
有些幫助從命令行參數獲取消息:
private static string GetMessage(string[] args) {return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }我們的舊的Receive.cs腳本還需要一些更改:它需要為消息體中的每個點偽造一秒的工作時間。 它將處理RabbitMQ發送的消息并執行任務,因此我們將其復制到Worker項目并修改:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);我們假任務到模擬執行時間:
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);循環調度
使用任務隊列的優點之一是能夠輕松地并行工作。 如果我們正在建立積壓的工作,我們可以增加更多的工作者,這樣可以輕松擴展。
首先,我們同時嘗試運行兩個Worker實例。 他們都會從隊列中獲取消息,但是究竟如何? 讓我們來看看。
你需要三個控制臺打開。 兩個將運行Worker程序。 這些控制臺將是我們兩個消費者 - C1和C2。
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C# shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
在第三個我們將發布新的任務。 一旦您已經開始使用消費者,您可以發布一些消息:
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."讓我們看看送給我們workers的內容:
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。 平均每個消費者將獲得相同數量的消息。 這種分發消息的方式叫做循環(round-robin)。 與三名或更多的workers一起嘗試。
消息應答(message acknowledgments)
執行一個任務需要花費幾秒鐘。你可能會擔心當一個消費者在執行任務時發生中斷。使用我們當前的代碼,一旦RabbitMQ向客戶發送消息,它立即將其從內存中刪除。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正在處理的信息。我們也會丟失已經轉發給這個工作者且它還未執行的消息。
但是我們不想失去任何任務。如果一個worker掛了,我們希望把這個任務交給另一個工作者。
為了確保消息永遠不會丟失,RabbitMQ支持消息確認。從消費者發送一個確認信息告訴RabbitMQ已經收到,處理了特定的消息,然后RabbitMQ可以自由刪除它。
如果消費者死機(其通道關閉,連接關閉或TCP連接丟失),而不發送確認信息,RabbitMQ將會明白消息未被完全處理并重新排隊。如果同時有其他消費者在線,則會迅速將其重新提供給另一個消費者。這樣就可以確保沒有消息丟失,即使工作者偶然死亡。
沒有任何消息超時; RabbitMQ將在消費者掛了時重新發送消息。如果消費者處理一個信息需要耗費特別特別長的時間是允許的。
消息確認默認情況下打開。 在前面的例子中,我們通過將noAck(“no manual acks”)參數設置為true來明確地將其關閉。 一旦完成任務,現在該刪除這個標志并發送正確的確認。
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);使用這個代碼,我們可以確定即使在處理消息時,使用CTRL + C殺死一個工作者,也不會丟失任何東西。工作者掛了之后不久,所有未確認的消息將被重新發送。
忘記確認
丟失BasicAck是一個常見的錯誤。 這是一個容易的錯誤,但后果是嚴重的。
當您的客戶端退出(可能看起來像隨機重新傳遞)時,消息將被重新傳遞,但是RabbitMQ將會消耗越來越多的內存,因為它將無法釋放任何未包含的消息。
為了調試這種錯誤,您可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows上:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged消息持久化(Message durability)
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。 但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會忘記隊列和消息,除非你告訴它不要丟失。需要兩件事來確保消息不會丟失:我們需要將所有隊列和消息標記為持久化。
首先,我們需要確保RabbitMQ不會丟失我們的隊列。 為了這樣做,我們需要將其聲明為持久的:
channel.QueueDeclare(queue: "hello",durable: true,exclusive: false,autoDelete: false,arguments: null);雖然這個命令本身是正確的,但是在我們目前的設置中是不行的。 這是因為我們已經定義了一個非持久化的名為hello的隊列。 RabbitMQ不允許您重新定義具有不同參數的現有隊列,并會向嘗試執行此操作的任何程序返回錯誤。 但是有一個快速的解決方法 - 讓我們用不同的名稱聲明一個隊列,例如task_queue:
channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);這個queueDeclare更改需要應用于生產者和消費者代碼。
在這一點上,我們確信,即使RabbitMQ重新啟動,task_queue隊列也不會丟失。 現在我們需要將我們的消息標記為持久性 - 將IBasicProperties.SetPersistent設置為true。
var properties = channel.CreateBasicProperties(); properties.Persistent = true;注意消息持久性
將消息標記為持久性不能完全保證消息不會丟失。 雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且還沒有保存時,仍然有一個很短的時間窗口。 此外,RabbitMQ不會對每個消息執行`fsync`(同步內存中所有已修改的文件數據到儲存設備) - 它可能只是保存到緩存中,而不是真正寫入磁盤。 持久性保證不強,但對我們的簡單任務隊列來說已經足夠了。 如果您需要更強大的保證,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。公平轉發(Fair dispatch)
或許會發現,目前的消息轉發機制(Round-robin)并非是我們想要的。例如,這樣一種情況,對于兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕松,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務后等待。
造成這樣的原因是因為RabbitMQ僅僅是當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
注意隊列大小
如果所有的工作者都處于繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然后增加工作者,或者使用別的什么策略。完整的代碼
NewTask.cs 類:
using System; using RabbitMQ.Client; using System.Text;class NewTask {public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties,body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");} }Worker.cs類:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading;class Worker {public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "task_queue",noAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}} }轉載于:https://www.cnblogs.com/Wulex/p/6965057.html
總結
以上是生活随笔為你收集整理的RabbitMQ 官方NET教程(二)【工作队列】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 识别不了socket未知的名称或服务
- 下一篇: Android application捕