我的WCF之旅(12):使用MSMQ进行Reliable Messaging(转载)
一、為什么要使用MSMQ
在一個分布式的環境中,我們往往需要根據具體的 情況采用不同的方式進行數據的傳輸。比如在一個Intranet內,我們一般通過TCP進行高效的數據通信;而在一個Internet的環境中,我們則通 常使用Http進行跨平臺的數據交換。而這些通信方式具有一個顯著的特點,那就是他們是基于Connection的,也就是說,交互雙方在進行通信的時候 必須保證有一個可用的Connection存在于他們之間。而在某些時候,比如那些使用撥號連接的用戶、以及使用便攜式計算機的用戶,我們不能保證在他們 和需要訪問的Server之間有一個的可靠的連接,在這種情況下,基于Messaging Queue的連接就顯得尤為重要了。我們今天就來談談在WCF中如何使用MSMQ。
MSMQ不僅僅是作為支持客戶端連接工具而存在,合理的使用MSMQ可以在很大程度上提升系統的Performance和Scalability。我們先來看看MSMQ能給我們帶來怎樣的好處:
1.MSMQ是基于Disconnection
MSMQ通過Message Queue進行通信,這種通信方式為離線工作成為了可能。比如在介紹MSMQ時都會提到的Order Delivery的例子:在一個基于B2C的系統中,訂單從各種各樣的客戶傳來,由于 客戶的各異性,不能保證每個客戶在每時每刻都和用于接收訂單的Server保持一個可靠的連接,我們有時候甚至允許客戶即使在離線的 情況下也可以遞交訂單(雖然訂單不能發送到訂單的接收方,但是我們可以通過某種機制保證先在本地保存該訂單,一旦連接建立,則馬上向接收方遞交訂單),而 MSMQ則有效地提供了這樣的機制:Server端建立一個Message Queue來接收來個客戶的訂單,客戶端通過向該Message Queue發送承載了訂單數據的Message實現訂單的遞交。如果在客戶離線的情況下,他仍然可以通過客戶端程序進行訂單遞交的操作,存儲著訂單數據的 Message會被暫時保存在本地的Message Queue中,一旦客戶聯機,MSMQ將Message從中取出,發送到真正的接收方,而這個動作對于用戶的透明的。
2.MSMQ天生是One-way、異步的
在MSMQ中,Message始終以One-way的方式進行發送,所以MSMQ具有天生的異步特性。所以MSMQ使用于那些對于用戶的請求,Server端無需立即響應的場景。也就是說Server對數據的處理無需和Client的數據的發送進行同步,它可以獨自地按照自己的Schedule進行工作。這可以避免峰值負載。比如Server端可以在一個相對低負載的時段(比如深夜)來對接收到的Order進行批處理,而無需一天24小時一直進行Order的監聽、接收和處理。
3.MSMQ能夠提供高質量的Reliable Messaging
我們知道,在一般的情況下,如果Client端 以異步的方式對Service進行調用就意味著:Client無法獲知Message是否成功抵達Service端;也不會獲得Service端執行的結 果和出錯信息。但是我們仍然說MSMQ為我們提供了可靠的傳輸(Reliable Messaging),這主要是因為MSMQ為我們提供一些列Reliable Messaging的機制:
- 超時機制(Timeout):可以設置發送和接收的時間,超出該時間則被認為操作失敗。
- 確認機制(Acknowledgement):當Message成功抵達Destination Queue,或者被成功接收,向發送端發送一個Acknowledgement message用以確認操作的狀態。
- 日志機制(Journaling):當Message被發送或接收后,被Copy一份存放在Journal Queue中。
此外,MSMQ還提供了死信隊列(Dead letter Queue)用以保存發送失敗的message。這一切保證了保證了Reliable Messaging。
二、?MSMQ在WCF的運用
在WCF中,MSMQ提供的數據傳輸功能被封裝在一個Binding中,提供WCF Endpoint之間、以及Endpoint和現有的基于MSMQ的Application進行通信的實現。為此WCF為我們提供了兩種不同的built-in binding:
- NetMsmqBinding: 從提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding沒有什么區別: 在兩個Endpoint之間實現了數據的通信,所不同的是,它提供的是基于MSMQ的Reliable Messaging。從變成模式上看,和一般的binding完全一樣。
- MsmqIntegrationBinding: 從命名上我們可以看出,MsmqIntegrationBinding主要用于需要將我們的WCF Application和現有的基于MSMQ的Application集成的情況。MsmqIntegrationBinding實現了WCF Endpoint和某個Message Queue進行數據的通信,具體來說,就是實現了單一的向某個Message Queue 發送Message,和從某個Message Queue中接收Message的功能。從編程模式上看,也有所不同,比如Operation只接收一個MsmqMessage<T>的參數。
這是Client和Service通信的圖示:
三、MSMQ和Transaction
MSMQ提供對Transaction的支持。在一般的情況下,MSMQ通過Message Queue Transaction實現對Transaction的原生的支持,借助Message Queue Transaction,可以把基于一個或多個Message Queue的相關操作納入同一個Transaction中。
Message Queue Transaction僅僅限于基于Message Queue的操作,倘若操作涉及到另外一些資源,比如SQL Server, 則可以使用基于DTC的分布式Transaction。
對于WCF中MSMQ,由于Client和Service的相對獨立(可能Client發送Message到Service處理Message會相隔很長一段時間),所以Client和Service的操作只能納入不同的Transaction中,如下圖。
四、Sample1:NetMsmqBinding
我們首先做一個基于NetMsmqBinding Sample,實現的功能就是我們開篇所提出的Order Delivery。我們說過,NetMsmqBinding和一般的binding在實現的功能和變成模式上完全一樣。下面是我們熟悉的4層結構:
1.Contract
DataContract:Order & OrderItem
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.Runtime.Serialization;
namespace?Artech.QueuedService.Contract
{
????[DataContract]
????[KnownType(typeof(OrderItem))]
????public?class?Order
????{
????????Private?Fields
????????Constructors
????????Public?Properties
????????Public?Methods
????}
}
using?System.Collections.Generic;
using?System.Text;
using?System.Runtime.Serialization;
namespace?Artech.QueuedService.Contract
{
????[DataContract]
????public?class?OrderItem
????{
????????Private?Fields
????????Constructors
????????Public?Properties
????}
}
?
ServiceContract: IOrderProcessor
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.ServiceModel;
namespace?Artech.QueuedService.Contract
{
????[ServiceContract]?
????[ServiceKnownType(typeof(Order))]
????public?interface?IOrderProcessor
????{
????????[OperationContract(IsOneWay?=?true)]
????????void?Submit(Order?order);
????}
}
2.Service:IOrderProcessor:
using?System.Collections.Generic;
using?System.Text;
using?Artech.QueuedService.Contract;
using?System.ServiceModel;
namespace?Artech.QueuedService.Service
{
????public?class?OrderProcessorService:IOrderProcessor
????{
????????ISubmitOrder?Members
????}
}
using?System;
using?System.Collections.Generic;
using?System.Text;
using?Artech.QueuedService.Contract;
namespace?Artech.QueuedService.Service
{
????public?static?class?Orders
????{
????????private?static?IDictionary<Guid,?Order>?_orderList?=?new?Dictionary<Guid,?Order>();
????????public?static?void?Add(Order?order)
????????{
????????????_orderList.Add(order.OrderNo,?order);
????????}
????????public?static?Order?GetOrder(Guid?orderNo)
????????{
????????????return?_orderList[orderNo];
????????}
????}
}
3.Hosting
Configuration
<?xml?version="1.0"?encoding="utf-8"??><configuration>
????<system.serviceModel>
????????<bindings>
????????????<netMsmqBinding>
????????????????<binding?name="msmqBinding">
????????????????????<security>
????????????????????????<transport?msmqAuthenticationMode="None"?msmqProtectionLevel="None"?/>
????????????????????????<message?clientCredentialType="None"?/>
????????????????????</security>
????????????????</binding>
????????????</netMsmqBinding>
????????</bindings>
????????<services>
????????????<service?name="Artech.QueuedService.Service.?OrderProcessorService">
????????????????<endpoint?address="net.msmq://localhost/private/orders"?binding="netMsmqBinding"
????????????????????bindingConfiguration="msmqBinding"?contract="Artech.QueuedService.Contract.IOrderProcessor"?/>
????????????</service>
????????</services>
????</system.serviceModel>
</configuration>
在默認的情況下,netMsmqBinding 的msmqAuthenticationMode為WindowsDomain,由于基于WindowsDomain必須安裝AD,利于在本機模擬,我把msmqAuthenticationMode改為None,相應的ProtectionLevel和clientCredentialType改為None。
Program:
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.Messaging;
using?System.ServiceModel;
using?Artech.QueuedService.Service;
namespace?Artech.QueuedService.Hosting
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????string?path?=?@".\private$\orders";
????????????if(!MessageQueue.Exists(path))
????????????{
????????????????MessageQueue.Create(path,true);
????????????}
????????????using?(ServiceHost?host?=?new?ServiceHost(typeof(OrderProcessorService)))
????????????{
????????????????host.Opened?+=?delegate
????????????????{
????????????????????Console.WriteLine("Service?has?begun?to?listen\n\n");
????????????????};
????????????????host.Open();
????????????????Console.Read();
????????????}
????????}
????}
}
在Host Service之前,通過MessageQueue.Create創建一個Message Queue,第二個參數為表明Queue是否支持Transaction的indicator,這里支持Transaction。
4.Client:
Configuration
<?xml?version="1.0"?encoding="utf-8"??><configuration>
????<system.serviceModel>
????????<bindings>
????????????<netMsmqBinding>
????????????????<binding?name="msmqBinding">
????????????????????<security>
????????????????????????<transport?msmqAuthenticationMode="None"?msmqProtectionLevel="None"?/>
????????????????????????<message?clientCredentialType="None"?/>
????????????????????</security>
????????????????</binding>
????????????</netMsmqBinding>
????????</bindings>
????????<client>
????????????<endpoint?address="net.msmq://localhost/private/orders"?binding="netMsmqBinding"
????????????????bindingConfiguration="msmqBinding"?contract="Artech.QueuedService.Contract.IOrderProcessor"
????????????????name="defaultEndpoint"?/>
????????</client>
????</system.serviceModel>
</configuration>
Program
using?System;using?System.Collections.Generic;
using?System.Text;
using?Artech.QueuedService.Contract;
using?System.ServiceModel;
using?System.Transactions;
namespace?Artech.QueuedService.Client
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????ChannelFactory<IOrderProcessor>?channelFactory?=?new?ChannelFactory<IOrderProcessor>("defaultEndpoint");
????????????IOrderProcessor?channel?=?channelFactory.CreateChannel();
????????????Order?order?=?new?Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(),"A?Company");
????????????order.OrderItems.Add(new?OrderItem(Guid.NewGuid(),"PC",5000,20));
????????????order.OrderItems.Add(new?OrderItem(Guid.NewGuid(),"Printer",7000,2));
????????????Console.WriteLine("Submit?order?to?server");
????????????using?(TransactionScope?scope?=?new?TransactionScope(TransactionScopeOption.Required))
????????????{?
?????????????????channel.Submit(order);
?????????????????scope.Complete();
????????????}???????
????????????Console.Read();
????????}
????}
}
先后運行Host和Client,Host端有下面的輸出:
轉載于:https://www.cnblogs.com/zpc870921/archive/2012/09/10/2678629.html
總結
以上是生活随笔為你收集整理的我的WCF之旅(12):使用MSMQ进行Reliable Messaging(转载)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JS获取HTML元素属性【转】
- 下一篇: 前些天做的一个物联网架构三层扩展