NetCore基于EasyNetQ的高级API使用RabbitMq
一、消息隊列
?消息隊列作為分布式系統中的重要組件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至于各種消息隊列的優缺點比較,在這里就不做擴展了,網上資源很多。
?更多內容可參考?消息隊列及常見消息隊列介紹。我在這里選用的是RabbitMq。
官網地址:http://www.rabbitmq.com
安裝和配置:Windows下RabbitMq安裝及配置
二、RabbitMq簡單介紹?
?RabbitMQ是一款基于AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、? ?Erlang等。在RabbitMq中首先要弄清楚的概念是 交換機、隊列、綁定。基本的消息通訊步驟就是首先定義ExChange,然后定義隊列,然后綁定交換機和隊列。
?需要明確的一點兒是,發布者在發送消息是,并不是把消息直接發送到隊列中,而是發送到Exchang,然后由交互機根據定義的消息匹配規則,在將消息發送到隊列中。
?Exchange有四種消息消息分發規則:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。
?詳細的概念介紹推薦查看:消息隊列之RabbitMq
三、EasyNetQ使用
?Easynetq是一個簡單易用的Rabbitmq Net客戶端。同時支持 NetFramework和NetCore。GitHub地址。它是針對RabbitMq Net客戶端的進一步封裝。關于EasyNetQ的簡單使用推薦教程:EasyNetQ的介紹。
?本文主要介紹基于EasyNeq的高級API的使用。EasyNetQ的作者在核心的IBus接口中盡量避免暴露AMQP中的交換機、隊列、綁定這些概念,使用者即使不去了解這些概念,也能完成消息的發送接收。這相當簡潔,但某些情況下,基于應用場景的需要,我們需要自定義交換機、隊列、綁定這些信息,EasyNetQ允許你這么做,這些都是通過IAdvanceBus接口實現。
3.1 項目裝備
?這里為了演示,首先新建一個項目,包括一個發布者,兩個接收者,一個公共的類庫
安裝EasyNetQ: NuGet>Install-Package EasyNetQ
3.2 簡單封裝
在Common項目里面是針對Easynetq的使用封裝,主要目錄如下
?
?在RabbitMq文件夾下,是針對消息發送接收的簡單封裝。
?首先來看下RabbitMqManage,主要的發送和訂閱操作都在這個類中。其中ISend接口定義了發送消息的規范,SendMessageManage是ISend的實現。IMessageConsume接口定義訂閱規范。
?MesArg 和PushMsg分別是訂閱和發送需用到的參數類。RabbitMQManage是暴露在外的操作類。
?首先看發送的代碼
在EasyNetQ中對于異步發送消息的時候,消息是否送達Broker只需要查看異步發送方法最終執行成功還是失敗,成功就表示消息送達,如果失敗可以將失敗后的消息存入數據庫中,然后用后臺線程輪詢
數據庫表,將失敗后的消息進行重新 發送。這種方式還可以進一步變成消息表,就是先將要發送的消息存入消息表中,然后后臺線程輪詢消息表來進行消息發送。一般這種方式被廣泛用于分布式事務中,
將本地數據庫操作和消息表寫入放入同一個本地事務中,來保證消息發送和本地數據操作的同步成功,因為我的系統中,分布式事務的涉及很少,所以就沒這樣去做,只是簡單的在異步發送的時候監控下
是否發送失敗,然后針對失敗的消息做一個重新發送的機制。這里,推薦大佬的NetCore分布式事務解決方案 CAP?GitHub地址。
?接著看一下消息訂閱接收涉及的代碼
在訂閱中我定義了一個接口,最終業務代碼中,所有的消息訂閱類,都需要繼續此接口
最后,我們來看下對外使用的操作類
這里面主要封裝了消息的發送和訂閱,以及IBus單例的創建。在后續的消息發送和訂閱主要就通過此處來實現。我們看到一開始的類目結構中還有一個RaExMessageHandleJob類,這個類就是一個后臺
循環任務,用來監測數據庫中是否保存了發送失敗的消息,如果有,則將消息取出,嘗試重新發送。在此就不做多的介紹,大家可以根據自己的實際需求來實現。
3.3 發布者
?現在來看一下消息發布者的代碼
?
?主要的發送代碼都在Send類中,其中appsettings.json里面配置了Rabbitmq的連接地址,TestDto只是一個為了方便演示的參數類。
?下面看一下Program里面的代碼
?很簡單的一個發送消息調用。
?然后來看一下Send類中的代碼
3.4 消費者
?首先來看下消費者端的目錄結構
?
其中appsettings.json中配置Rabbitmq的連接信息,Program中只是簡單調用消息訂閱
主要的消息訂閱代碼都在MessageManage文件夾下,MessageManService用于定義消息訂閱類型
Consume文件夾下主要定義了消息的業務處理
可以看到,所有的類都集成自我們定義的接口IMessageConsume。
四、總結
在EasyNetQ中如果需要消費者確認功能,則需要在Rabbitmq的連接配置中設置publisherConfirms=true,這將會開啟自動確認。在使用高級api定義交換機和隊列時可以自己定義多種參數,比如消息是否持久化,消息最大長度等等,具體大家可以去看官方文檔,上面有詳細介紹。Easynetq會自動去捕獲消費異常的消息并將其放入到錯誤隊列中,而且官方提供了重新發送錯誤隊列中消息的方法,當然你也可以自己去監視錯誤列隊,對異常消息進行處理。EasyNetQ里面作者針對消息的發布確認和消費確認都做了封裝。在EasyNetQ中發布消息的時候如果選用的同步發送,只要沒有拋出異常,我們就可以認為任務消息已經正確到達Broker,而異步發送的話需要我們自己去監視Task是否成功 。如果開啟了自動確認,并不需要我們在消息處理的方法體中手動返回ack信息,只要消息被 正確處理就會自動ack。雖然RabbitMq中也有事務消息,但由于性能比較差,并不推薦使用。其實,只要我們能明確消息是否發布成功和消費成功,就將會很容易在這個基礎上擴展出分布式事務的處理。
原文地址:https://www.cnblogs.com/dandan123/p/10097711.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的NetCore基于EasyNetQ的高级API使用RabbitMq的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开源库支付库Magicodes.Pay发
- 下一篇: 【.NET Core项目实战-统一认证平