RabbitMQ知多少
1.引言
RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構如下:
RabbitMQ作為一個消息代理,主要和消息打交道,負責接收并轉發消息。RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用于排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。
下面我們就來學習下RabbitMQ。
2. 環境搭建
本文主要基于Windows下使用Vs Code 基于.net core進行demo演示。開始之前我們需要準備好以下環境。
安裝Erlang運行環境
下載安裝Erlang。安裝RabbitMQ
下載安裝Windows版本的RabbitMQ。啟動RabbitMQ Server
點擊Windows開始按鈕,輸入RabbitMQ找到RabbitMQ Comman Prompt,以管理員身份運行。依次執行以下命令啟動RabbitMQ服務
rabbitmq-service installrabbitmq-service enablerabbitmq-service start執行rabbitmqlctl status檢查RabbitMQ狀態
安裝管理平臺插件
執行rabbitmq-plugins enable rabbitmq_management即可成功安裝,使用默認賬號密碼(guest/guest)登錄http://localhost:15672/即可。
3. Hello RabbitMQ
在開始之前我們先來了解下消息模型:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,隊列再將消息發送到監聽的消費者。
下面我們我們通過demo來了解RabbitMQ的基本用法。
3.1.消息的發送和接收
創建RabbitMQ文件夾,打開命令提示符,分別創建兩個控制臺項目Send、Receive。
dotnet new console --name Send //創建發送端控制臺應用cd Send //進入Send目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包 dotnet new console --name Receive //創建接收端控制臺應用
cd Receive //進入Receive目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包
我們先來添加消息發送端邏輯:
//Send.cs public static void Main(string[] args) { ? ?//1.1.實例化連接工廠var factory = new ConnectionFactory() { HostName = "localhost" }; ?
//2. 建立連接using (var connection = factory.CreateConnection()){ ? ? ?
//3. 創建信道using (var channel = connection.CreateModel()){ ? ? ? ? ?
//4. 申明隊列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ? ? ? ?
//5. 構建byte消息數據包string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";var body = Encoding.UTF8.GetBytes(message); ? ? ? ?
?//6. 發送數據包channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}} }
再來完善消息接收端邏輯:
//Receive.cs ?省略部分代碼public static void Main(){ ? ?//1.實例化連接工廠var factory = new ConnectionFactory() { HostName = "localhost" }; ?
?//2. 建立連接using (var connection = factory.CreateConnection()){ ? ? ?
??//3. 創建信道using (var channel = connection.CreateModel()){ ? ? ? ? ? ?
??//4. 申明隊列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ? ? ? ? ?
???//5. 構造消費者實例var consumer = new EventingBasicConsumer(channel); ? ? ? ? ? ?//6. 綁定消息接收后的事件委托consumer.Received += (model, ea) =>{ ? ? ? ? ? ? ? ?var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(" [x] Received {0}", message);Thread.Sleep(6000);//模擬耗時Console.WriteLine (" [x] Done");}; ? ? ? ? ? ?
???//7. 啟動消費者channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}} }
先運行消息接收端,再運行消息發送端,結果如下圖。
從上面的代碼中可以看出,發送端和消費端的代碼前4步都是一樣的。主要的區別在于發送端調用channel.BasicPublish方法發送消息;而接收端需要實例化一個EventingBasicConsumer實例來進行消息處理邏輯。另外一點需要注意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這里指定的隊列名稱為hello。
3.2. 循環調度
使用工作隊列的好處就是它能夠并行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了。我們先啟動兩個接收端,等待消息接收,再啟動一個發送端進行消息發送。
我們增加運行一個消費端后的運行結果:
從圖中可知,我們循環發送4條信息,兩個消息接收端按順序被循環分配。
默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每個消費者將獲得相同數量的消息。這種分發消息的方式叫做循環(round-robin)。
3.3. 消息確認
按照我們上面的demo,一旦RabbitMQ將消息發送到消費端,消息就會立即從內存中移出,無論消費端是否處理完成。在這種情況下,消息就會丟失。
為了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息并且處理完成后,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號后,就可以刪除掉這條已經處理的消息任務。但如果消費端掛掉了(比如,通道關閉、連接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會重新將消息入隊,如果有另外一個消費端在線,就會快速的重新發送到另外一個消費端。
RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ才會重新分發消息。
微調下Receive中的代碼邏輯:
//5. 構造消費者實例var consumer = new EventingBasicConsumer(channel);?//6. 綁定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(" [x] Received {0}", message);Thread.Sleep(6000);//模擬耗時Console.WriteLine(" [x] Done"); ? ?
? // 7. 發送消息確認信號(手動消息確認)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};
? //8. 啟動消費者//autoAck:true;自動進行消息確認,當消費端接收到消息后,就自動發送ack信號,不管消息是否正確處理完畢//autoAck:false;關閉自動消息確認,通過調用BasicAck方法手動進行消息確認channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
主要改動的是將?autoAck:true修改為autoAck:fasle,以及在消息處理完畢后手動調用BasicAck方法進行手動消息確認。
從圖中可知,消息發送端連續發送4條消息,其中消費端1先被分配處理第一條消息,消費端2被循環分配第二條消息,第三條消息由于沒有空閑消費者仍然在隊列中。
在消費端2未處理完第一條消息之前,手動中斷(ctrl+c)。我們可以發現RabbitMQ在下一次分發時,會優先將被中斷的消息分發給消費端1處理。
3.4. 消息持久化
消息確認確保了即使消費端異常,消息也不會丟失能夠被重新分發處理。但是如果RabbitMQ服務端異常,消息依然會丟失。除非我們指定durable:true,否則當RabbitMQ退出或奔潰時,消息將依然會丟失。通過指定durable:true,并指定Persistent=true,來告知RabbitMQ將消息持久化。
//send.cs//4. 申明隊列(指定durable:true,告知rabbitmq對消息進行持久化)channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//將消息標記為持久性 - 將IBasicProperties.SetPersistent設置為truevar properties = channel.CreateBasicProperties(); properties.Persistent = true;
//5. 構建byte消息數據包string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包(指定basicProperties)channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
將消息標記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且還沒有保存時,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,并沒有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對于一個簡單任務隊列來說已經足夠。如果需要確保消息隊列的持久化,可以使用publisher confirms.
3.5. 公平分發
RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閑忙情況。這就可能出現某個消費端一直處理耗時任務處于阻塞狀態,某個消費端一直處理一般任務處于空置狀態,而只是它們分配的任務數量一樣。
但我們可以通過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處于忙碌狀態時,不再分配任務。
//4. 申明隊列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處于忙碌狀態時
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
這時你需要注意的是如果所有的消費端都處于忙碌狀態,你的隊列可能會被塞滿。你需要注意這一點,要么添加更多的消費端,要么采取其他策略。
4. Exchange
細心的你也許發現上面的demo,生產者和消費者直接是通過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者創建消息發布到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個局限性,即消費者一次只能發送消息到某一個隊列。
那消費者如何才能發送消息到多個消息隊列呢?
RabbitMQ提供了Exchange,它類似于路由器的功能,它用于對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另一方面將消息推送到隊列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊列還是附加到多個隊列,還是直接忽略。而這些規則由exchange type定義,exchange的原理如下圖所示。
常見的exchange type 有以下幾種:
direct(明確的路由規則:消費端綁定的隊列名稱必須和消息發布時指定的路由名稱一致)
topic (模式匹配的路由規則:支持通配符)
fanout (消息廣播,將消息分發到exchange上綁定的所有隊列上)
下面我們就來一一這介紹它們的用法。
4.1 fanout
本著先易后難的思想,我們先來了解下fanout的廣播路由機制。fanout的路由機制如下圖,即發送到 fanout 類型exchange的消息都會分發到所有綁定該exchange的隊列上去。
生產者示例代碼:
// 生成隨機隊列名稱var queueName = channel.QueueDeclare().QueueName;//使用fanout exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);//發布到指定exchange,fanout類型無需指定routingKeychannel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);消費者示例代碼:
//申明fanout類型exchangechannel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");//申明隨機隊列名稱var queuename = channel.QueueDeclare ().QueueName;//綁定隊列到指定fanout類型exchange,無需指定路由鍵channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");4.2. direct
direct相對于fanout就屬于完全匹配、單播的模式,路由機制如下圖,即隊列名稱和消息發送時指定的路由完全匹配時,消息才會發送到指定隊列上。
生產者示例代碼:
// 生成隨機隊列名稱var queueName = channel.QueueDeclare().QueueName;//使用direct exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "directEC", type: "direct"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);
//發布到direct類型exchange,必須指定routingKeychannel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);
消費者示例代碼:
//申明direct類型exchangechannel.ExchangeDeclare (exchange: "directEC", type: "direct");
//綁定隊列到direct類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");
4.3. topic
topic是direct的升級版,是一種模式匹配的路由機制。它支持使用兩種通配符來進行模式匹配:符號#和符號*。其中*匹配一個單詞,?#則表示匹配0個或多個單詞,單詞之間用.分割。如下圖所示。
生產者示例代碼:
// 生成隨機隊列名稱var queueName = channel.QueueDeclare().QueueName;//使用topic exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);//發布到topic類型exchange,必須指定routingKeychannel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);消費者示例代碼:
//申明topic類型exchangechannel.ExchangeDeclare (exchange: "topicEC", type: "topic");//申明隨機隊列名稱var queuename = channel.QueueDeclare ().QueueName;//綁定隊列到topic類型exchange,需指定路由鍵routingKeychannel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");5. RPC
RPC——Remote Procedure Call,遠程過程調用。
那RabbitMQ如何進行遠程調用呢?示意圖如下:
第一步,主要是進行遠程調用的客戶端需要指定接收遠程回調的隊列,并申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。
遠程調用客戶端:
//申明唯一guid用來標識此次發送的遠程調用請求var correlationId = Guid.NewGuid().ToString();//申明需要監聽的回調隊列var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties();properties.ReplyTo = replyQueue;//指定回調隊列properties.CorrelationId = correlationId;//指定消息唯一標識string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //發布消息channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);Console.WriteLine($"[*] Request fib({number})"); //
//創建消費者用于處理消息回調(遠程調用返回結果)var callbackConsumer = new EventingBasicConsumer(channel);channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);callbackConsumer.Received += (model, ea) =>{ ? ?
?//僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。if (ea.BasicProperties.CorrelationId == correlationId){ ? ? ? ? var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";Console.WriteLine($"[x]: {responseMsg}");}};
遠程調用服務端:
//申明隊列接收遠程調用請求channel.QueueDeclare(queue: "rpc_queue", durable: false, ? ?exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message.");//請求處理邏輯
consumer.Received += (model, ea) => {var message = Encoding.UTF8.GetString(ea.Body); ?
?int n = int.Parse(message);Console.WriteLine($"Receive request of Fib({n})"); ?
??int result = Fib(n); ? ?//從請求的參數中獲取請求的唯一標識,在消息回傳時同樣綁定var properties = ea.BasicProperties;var replyProerties = channel.CreateBasicProperties();replyProerties.CorrelationId = properties.CorrelationId; ? ?//將遠程調用結果發送到客戶端監聽的隊列上channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, ? ? ? ?basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); ? ?//手動發回消息確認channel.BasicAck(ea.DeliveryTag, false);Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
6. 總結
基于上面的demo和對幾種不同exchange路由機制的學習,我們發現RabbitMQ主要是涉及到以下幾個核心概念:
Publisher:生產者,消息的發送方。
Connection:網絡連接。
Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。
Exchange:交換器(路由器),負責消息的路由到相應隊列。
Binding:隊列與交換器間的關聯綁定。消費者將關注的隊列綁定到指定交換器上,以便Exchange能準確分發消息到指定隊列。
Queue:隊列,消息的緩沖存儲區。
Virtual Host:虛擬主機,虛擬主機提供資源的邏輯分組和分離。包含連接,交換,隊列,綁定,用戶權限,策略等。
Broker:消息隊列的服務器實體。
Consumer:消費者,消息的接收方。
這次作為入門就講到這里,下次我們來講解下EventBus + RabbitMQ如何實現事件的分發。
相關文章:
RabbitMQ系列教程之一:我們從最簡單的事情開始!Hello World
RabbitMQ系列教程之二:工作隊列(Work Queues)
RabbitMQ系列教程之三:發布/訂閱(Publish/Subscribe)
RabbitMQ系列教程之四:路由(Routing)
如何優雅的使用RabbitMQ
.NET 使用 RabbitMQ 圖文簡介
RabbitMQ 高可用集群搭建及電商平臺使用經驗總結
搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驅動連接
RabbitMQ消息隊列應用
體驗Rabbitmq強大的【優先級隊列】之輕松面對現實業務場景
原文地址:http://www.cnblogs.com/sheng-jie/p/7192690.html
.NET社區新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關注
總結
以上是生活随笔為你收集整理的RabbitMQ知多少的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Exceptionless 日
- 下一篇: .NET Core快速入门教程 1、开篇