.Net使用RabbitMQ详解
序言
這里原來(lái)有一句話,觸犯啦天條,被閹割!!!!
首先不去討論我的日志組件怎么樣。因?yàn)橛行┤罩拘枰呔W(wǎng)絡(luò),有的又不需要走網(wǎng)路,也是有性能與業(yè)務(wù)場(chǎng)景的多般變化在其中,就把他拋開,我們只談消息RabbitMQ。
那么什么是RabbitMQ,它是用來(lái)解決什么問(wèn)題的,性能如何,又怎么用?我會(huì)在下面一一闡述,如有錯(cuò)誤,不到之處,還望大家不吝賜教。
RabbitMQ簡(jiǎn)介
必須一提的是rabbitmq是由LShift提供的一個(gè)消息隊(duì)列協(xié)議(AMQP)的開源實(shí)現(xiàn),由以高性能、健壯以及可伸縮性出名的Erlang寫成(因此也是繼承了這些優(yōu)點(diǎn))。
百度百科對(duì)RabbitMQ闡述也非常明確,建議去看下,還有amqp協(xié)議。
RabbitMQ官網(wǎng):http://www.rabbitmq.com/?如果你要下載安裝,那么必須先把Erlang語(yǔ)言裝上。
RabbitMQ的.net客戶端,可以在nuget中輸入rabbitmq輕松獲得。
RabbitMQ與其他消息隊(duì)列的對(duì)比,早有仙人給寫出來(lái)。?Message Queue Shootout
這篇文章中的測(cè)試案例為:1百萬(wàn)條1k的消息,每秒種的收發(fā)情況如下圖。
如果你安裝好啦,rabbitmq,他會(huì)提供一個(gè)操作監(jiān)控頁(yè)面,頁(yè)面如下,他幾乎提供啦,對(duì)rabbitmq的所有操作,與監(jiān)控,所以,你裝上后,自己多看看,多操作下。
?RabbitMQ中的一些名詞闡述與消息從投遞到消費(fèi)的整個(gè)過(guò)程
從上圖的標(biāo)題中可以看到一些陌生的英文單詞,讓我們感覺(jué)一無(wú)所知,更無(wú)從操作,那么我給大家弄啦一個(gè)圖片大家可以看下,或許對(duì)您理解這些新鮮的單詞有所幫助。
看過(guò)這些名詞,之后,或許你還毫無(wú)頭緒,那么我把消息從生產(chǎn)到消費(fèi)的整個(gè)流程給大家說(shuō)一下,或許會(huì)更深入一點(diǎn),其中Exchange,與Queue都是可以設(shè)置相關(guān)屬性,隊(duì)列的持久化,交換器類型制定。
Note:首先這個(gè)過(guò)程走分三個(gè)部分,1、客戶端(生產(chǎn)消息隊(duì)列),2、RabbitMQ服務(wù)端(負(fù)責(zé)路由規(guī)則的綁定與消息的分發(fā)),3、客戶端(消費(fèi)消息隊(duì)列中的消息)
Note:由圖可以看出,一個(gè)消息可以走一次網(wǎng)絡(luò)卻被分發(fā)到不同的消息隊(duì)列中,然后被多個(gè)的客戶端消費(fèi),那么這個(gè)過(guò)程就是RabbitMQ的核心機(jī)制,RabbitMQ的路由類型與消費(fèi)模式。
RabbitMQ中Exchange的類型
類型有4種,direct,fanout,topic,headers。其中headers不常用,本篇不做介紹,其他三種類型,會(huì)做詳細(xì)介紹。
那么這些類型是什么意思呢?就是Exchange與隊(duì)列進(jìn)行綁定后,消息根據(jù)exchang的類型,按照不同的綁定規(guī)則分發(fā)消息到消息隊(duì)列中,可以是一個(gè)消息被分發(fā)給多個(gè)消息隊(duì)列,也可以是一個(gè)消息分發(fā)到一個(gè)消息隊(duì)列。具體請(qǐng)看下文。
介紹之初還要說(shuō)下RoutingKey,這是個(gè)什么玩意呢?他是exchange與消息隊(duì)列綁定中的一個(gè)標(biāo)識(shí)。有些路由類型會(huì)按照標(biāo)識(shí)對(duì)應(yīng)消息隊(duì)列,有些路由類型忽略routingkey。具體看下文。
1、Exchange類型direct
他是根據(jù)交換器名稱與routingkey來(lái)找隊(duì)列的。
Note:消息從client發(fā)出,傳送給交換器ChangeA,RoutingKey為routingkey.ZLH,那么不管你發(fā)送給Queue1,還是Queue2一個(gè)消息都會(huì)保存在Queue1,Queue2,Queue3,三個(gè)隊(duì)列中。這就是交換器的direct類型的路由規(guī)則。只要找到路由器與routingkey綁定的隊(duì)列,那么他有多少隊(duì)列,他就分發(fā)給多少隊(duì)列。
2、Exchange類型fanout
這個(gè)類型忽略Routingkey,他為廣播模式。
Note:消息從客戶端發(fā)出,只要queue與exchange有綁定,那么他不管你的Routingkey是什么他都會(huì)將消息分發(fā)給所有與該exchang綁定的隊(duì)列中。
3、Exchange類型topic
這個(gè)類型的路由規(guī)則如果你掌握啦,那是相當(dāng)?shù)暮糜?#xff0c;與靈活。他是根據(jù)RoutingKey的設(shè)置,來(lái)做匹配的,其中這里還有兩個(gè)通配符為:
*,代表任意的一個(gè)詞。例如topic.zlh.*,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,代表任意多個(gè)詞。例如topic.#,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
Note:這個(gè)圖看上去很亂,但是他是根據(jù)匹配符做匹配的,這里我建議你自己做下消息隊(duì)列的具體操作。
具體操作如下
public static void Producer(int value){try{var qName = "lhtest1";var exchangeName = "fanoutchange1";var exchangeType = "fanout";//topic、fanoutvar routingKey = "*";var uri = new Uri("amqp://192.168.10.121:5672/");var factory = new ConnectionFactory{UserName = "123",Password = "123",RequestedHeartbeat = 0,Endpoint = new AmqpTcpEndpoint(uri)};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//設(shè)置交換器的類型channel.ExchangeDeclare(exchangeName, exchangeType);//聲明一個(gè)隊(duì)列,設(shè)置隊(duì)列是否持久化,排他性,與自動(dòng)刪除channel.QueueDeclare(qName, true, false, false, null);//綁定消息隊(duì)列,交換器,routingkeychannel.QueueBind(qName, exchangeName, routingKey);var properties = channel.CreateBasicProperties();//隊(duì)列持久化properties.Persistent = true;var m = new QMessage(DateTime.Now, value+"");var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));//發(fā)送信息channel.BasicPublish(exchangeName, routingKey, properties, body);}}}catch (Exception ex){Console.WriteLine(ex.Message);}}消息隊(duì)列的消費(fèi)與消息確認(rèn)Ack
1、消息隊(duì)列的消費(fèi)
Note:如果一個(gè)消息隊(duì)列中有大量消息等待操作時(shí),我們可以用多個(gè)客戶端來(lái)處理消息,這里的分發(fā)機(jī)制是采用負(fù)載均衡算法中的輪詢。第一個(gè)消息給A,下一個(gè)消息給B,下下一個(gè)消息給A,下下下一個(gè)消息給B......以此類推。
2、為啦保證消息的安全性,保證此消息被正確處理后才能在服務(wù)端的消息隊(duì)列中刪除。那么rabbitmq提供啦ack應(yīng)答機(jī)制,來(lái)實(shí)現(xiàn)這一功能。
ack應(yīng)答有兩種方式:1、自動(dòng)應(yīng)答,2、手動(dòng)應(yīng)答。具體實(shí)現(xiàn)如下。
public static void Consumer(){try{var qName = "lhtest1";var exchangeName = "fanoutchange1";var exchangeType = "fanout";//topic、fanoutvar routingKey = "*";var uri = new Uri("amqp://192.168.10.121:5672/");var factory = new ConnectionFactory{UserName = "123",Password = "123",RequestedHeartbeat = 0,Endpoint = new AmqpTcpEndpoint(uri)};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.ExchangeDeclare(exchangeName, exchangeType);channel.QueueDeclare(qName, true, false, false, null);channel.QueueBind(qName, exchangeName, routingKey);//定義這個(gè)隊(duì)列的消費(fèi)者QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);//false為手動(dòng)應(yīng)答,true為自動(dòng)應(yīng)答channel.BasicConsume(qName, false, consumer);while (true){BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body;var messageStr = Encoding.UTF8.GetString(bytes);var message = DoJson.JsonToModel<QMessage>(messageStr);Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);//如果是自動(dòng)應(yīng)答,下下面這句代碼不用寫啦。if ((Convert.ToInt32(message.Title) % 2) == 1){channel.BasicAck(ea.DeliveryTag, false);}}}}}catch (Exception ex){Console.WriteLine(ex.Message);}}轉(zhuǎn):http://www.cnblogs.com/knowledgesea/p/5296008.html
總結(jié)
以上是生活随笔為你收集整理的.Net使用RabbitMQ详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ASp.net中Froms验证方式
- 下一篇: 奥迪发动机高温后手刹锁死了