[译]RabbitMQ教程C#版 - 发布订阅
先決條件
本教程假定RabbitMQ已經安裝,并運行在localhost標準端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。
從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表聯系我們。
1.發布/訂閱
(使用.NET客戶端)
在教程[2]中,我們創建了一個工作隊列,假設在工作隊列中的每一個任務都只被分發給一個Worker。那么在這一章節,我們要做與之完全不同的事,那就是我們將要把一條消息分發給多個消費者。這種模式被稱為“發布/訂閱”。
為了說明、體現這種模式,我們將會建一個簡單的日志系統。它將會包含兩個程序 - 第一個用來發送日志消息,第二個用來接收并打印它們。
在我們建立的日志系統中,每個接收程序的運行副本都會收到消息。這樣我們就可以運行一個接收程序接收消息并將日志寫入磁盤;同時運行另外一個接收程序接收消息并將日志打印到屏幕上。
實質上,發布的日志消息將會被廣播給所有的接收者。
2.交換器
在教程的前幾部分,我們是發送消息到隊列并從隊列中接收消息?,F在是時候介紹Rabbit中完整的消息傳遞模型了。
讓我們快速回顧一下前面教程中的內容:
生產者是發送消息的用戶應用程序。
隊列是存儲消息的緩沖區。
消費者是接收消息的用戶應用程序。
在RabbitMQ中,消息傳遞模型的核心理念是生產者從來不會把任何消息直接發送到隊列,其實,通常生產者甚至不知道消息是否會被分發到任何隊列中。
然而,生產者只能把消息發送給交換器。交換器非常簡單,一方面它接收來自生產者的消息,另一方面又會把接收的消息推送到隊列中。交換器必須明確知道該如何處理收到的消息,應該追加到一個特定隊列中?還是應該追加到多個隊列中?或者應該把它丟棄?這些規則都被定義在交換器類型中。
目前有這幾種的交換器類型可用:direct,topic,headers和fanout。我們重點關注最后一個 -- fanout,讓我們來創建一個這種類型的交換器,將其命名為logs:
channel.ExchangeDeclare("logs", "fanout");fanout類型交換器非常簡單。正如您可能從名字中猜出的那樣,它會把收到的所有消息廣播到它已知的所有隊列中。這恰巧是我們的日志系統所需要的。
列舉交換器
要列舉出服務器上的交換器,您可以使用非常有用的rabbitmqctl命令行工具:
執行上述命令后,出現的列表中將會有一些amq.*交換器和默認(未命名)交換器。這些是默認創建的,不過目前您可能用不到它們。
默認交換器
在教程的前些部分,我們對交換器這一概念還一無所知,但仍然可以把消息發送到隊列。之所以這樣,是因為我們使用了一個用空字符串("")標識的默認交換器。
回顧一下我們之前如何發布消息:
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);第一個參數就是交換器的名稱,空字符串表示默認或匿名交換器:將消息路由到routingKey指定的隊列(如果存在)中。
現在,我們可以把消息發布到我們指定的交換器:
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs",routingKey: "",basicProperties: null,body: body);3.臨時隊列
您是否還記得之前我們使用過的隊列,它們都有一個特定的名稱(記得應該是hello和task_queue吧)。給隊列命名對我們來說是至關重要的 -- 因為我們可能需要多個Worker指向同一個隊列;當您想要在生產者和消費者之間共享隊列時,給隊列一個名稱也是非常重要的。
但是,我們創建的日志系統并不希望如此。我們希望監聽所有的日志消息,而不僅僅是其中一部分。我們也只對目前流動的消息感興趣,而不是舊消息。為解決這個問題,我們需要做好兩件事。
首先,我們無論何時連接Rabbit,都需要一個新的、空的隊列。要做到這一點,我們可以使用隨機名稱來創建隊列,或許,甚至更好的方案是讓服務器為我們選擇一個隨機隊列名稱。
其次,一旦我們與消費者斷開連接,與之相關的隊列應該被自動刪除。
在.NET客戶端中,如果不向QueueDeclare()方法提供任何參數,實際上就是創建了一個非持久化、獨占、且自動刪除的隨機命名隊列:
var queueName = channel.QueueDeclare().QueueName;您可以在隊列指南中了解更多關于exclusive參數和其他隊列屬性的信息。
此時,queueName包含一個隨機隊列名稱。例如,它看起來可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
4.綁定
我們已經創建好了一個fanout交換器和一個隊列。現在我們需要告訴交換器把消息發送到我們的隊列。而交換器和隊列之間的關系就稱之為綁定。
// 把一個隊列綁定到指定交換器。channel.QueueBind(queue: queueName, ? ? ? ? ? ? ? ? ?exchange: "logs", ? ? ? ? ? ? ? ? ?routingKey: "");從現在起,logs交換器會把消息追加到我們的隊列中。
列舉綁定
您可以使用(您或許已經猜到了),列舉出現有的綁定。
5.組合在一起
生產者程序負責分發消息,這與之前的教程看起來沒有太大區別。
最重要的變化是我們現在想把消息發布到我們的logs交換器,而不是匿名交換器。在發送時我們需要提供一個路由鍵routingKey,但是對于fanout交換器,它的值可以被忽略。這里是EmitLog.cs文件的代碼:
(EmitLog.cs源碼)
如你所見,在建立連接后,我們聲明了交換器。這一步非常有必要,因為發布消息到一個不存在的交換器,這種情況是被禁止的。
如果沒有隊列綁定到交換器上,消息將會丟失,但這對我們來說并沒有什么沒問題;如果沒有消費者正在監聽,我們是可以放心地把消息丟棄的。
ReceiveLogs.cs的代碼:
(ReceiveLogs.cs源碼)
按照教程[1]中的設置說明生成EmitLogs和ReceiveLogs項目。
如果您想把日志保存到文件中,只需打開一個控制臺并輸入:
cd ReceiveLogs dotnet run > logs_from_rabbit.log如果你想在屏幕上看到日志,我可以新開一個終端并運行:
cd ReceiveLogs dotnet run當然,分發日志需要輸入:
cd EmitLog dotnet run使用rabbitmqctl list_bindings命令,您可以驗證代碼是否真正創建了我們想要的綁定和隊列。當有兩個ReceiveLogs.cs程序運行時,您應該看到如下所示的內容:
sudo rabbitmqctl list_bindings# => Listing bindings ...# => logs ? ?exchange ? ? ? ?amq.gen-JzTY20BRgKO-HjmUJj0wLg ?queue ? ? ? ? ? []# => logs ? ?exchange ? ? ? ?amq.gen-vso0PVvyiRIL2WoV3i48Yg ?queue ? ? ? ? ? []# => ...done.對執行結果的解釋簡潔明了:來自logs交換器的數據轉發到了兩個由服務器隨機分配名稱的隊列。這正是我們期待的結果。
想要了解如何監聽消息的這一塊內容,讓我們繼續閱讀教程[4]。
6.寫在最后
本文翻譯自RabbitMQ官方教程C#版本。本文介紹如與官方有所出入,請以官方最新內容為準。
水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
原文鏈接:RabbitMQ tutorial - Publish/Subscribe
實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
最后更新:2018-06-11
作者:Esofar
出處:http://www.cnblogs.com/esofar/p/rabbitmq-publish-subscribe.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的[译]RabbitMQ教程C#版 - 发布订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Core微服务之基于Ocelo
- 下一篇: 用ASP.NET Core 2.1 建立