我的WCF之旅(13):创建基于MSMQ的Responsive Service
一、One-way MEP V.S. Responsible Service
我們知道MSMQ天生就具有異步的特性,它只能以One-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調用Service之后立即返回,它無法獲得Service的執行結果,也無法捕捉Service運行的Exception。下圖簡單表述了基于MSMQ的WCF Service中Client和Service的交互。
?
但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創建一個Responsive Service來解決這個問題:Client不再是對Service的執行情況一無所知,它可以獲知Order是否被Service正確處理了。
二、?Solution
雖然我們的目的很簡單:當Client向Service遞交了Order之后,能以某種方式獲知Order的執行結果;對于Service端來說,在正確把Order從Message Queue中獲取出來、并正確處理之后,能夠向Order的遞交者發送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:
- Order No.: 被處理的Order的一個能夠為一標志它的ID。
- Exception: 如果處理失敗的Exception,如果成功處理為null。
要在WCF中實現這樣的目的,對于Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,并等待Service的Response,Service在處理接收到Order之后直接將處理結果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現“曲線救國”。
我們的解決方案是:在每個Client Domain也創建一個基于MSMQ的本地的WCF Service,用于接收來自Order處理端發送的Acknowledge Message。對于處理Order 的Service來說,在正確處理Order之后,想對應的Client發送Acknowledge Message。下圖簡單演示整個過程:
?
三、Implementation
了解了上面的Solution之后,我們來看看該Solution在真正實現過程中有什么樣的困難。對于處理Order的Service來說,在向Client端發送Acknowledge Message的時候,它必須要知道該Order對應的Client的Response Service的MSMQ的Address以及其他和Operation相關的Context信息(在這里我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這里叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用于接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應的Client來提供。基于此,我們具有兩面兩種解決方式:
方式一、修改Service Contract,把OrderResponseContext當成是Operation的一個參數
這是我們最容易想到的,比如我們原來的Operation這樣定義:
namespace?Artech.ResponsiveQueuedService.Contract{
????[ServiceContract]
????[ServiceKnownType(typeof(Order))]
????public?interface?IOrderProcessor
????{
????????[OperationContract(IsOneWay?=?true)]
????????void?Submit(Order?order);
????}
}
現在變成:
namespace?Artech.ResponsiveQueuedService.Contract{
????[ServiceContract]
????[ServiceKnownType(typeof(Order))]
????public?interface?IOrderProcessor
????{
????????[OperationContract(IsOneWay?=?true)]
????????void?Submit(Order?order,?OrderResponseContext?responseContext);
????}
}
雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩定的,一經確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業務邏輯的抽象、和具體實現無關,而對于我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業務邏輯來看,OrderResponseContext和抽象的業務邏輯毫無關系。基于此,我們需要尋求一種和Service Contract無關的解決方式:
方式二、將OrderResponseContext放到Soap Message 的Header中
其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置于Soap Message中發送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置于Header中。基于WCF的編程模式很容易地幫助我們實現對Soap Header的插入和獲取:
我們可以通過下面的方式獲得當前Operation Context的Incoming Message Headers和Outgoing Message Headers
OperationContext.Current.IncomingMessageHeadersOperationContext.Current.OutgoingMessageHeaders
如果我們要把一個OrderResponseContext 對象插入到當前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現:
OrderResponseContext?context?=?new?OrderResponseContext();MessageHeader<OrderResponseContext>?header?=?new?MessageHeader<OrderResponseContext>(?context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name",?"namespace"));
相應的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數據獲取的內容:
OrderResponseContext?context?=?OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name",?"namespace"));四、Sample
我們照例給出一個完整的Sample,下面是整個Solution的結構:
?
除了一貫使用的4層結構(Contract-Service-Hosting-Client),還為ResponseService增加了下面兩層:
- Localservice: 作為Client Domain的ResponseService。
- LocalHosting:Host Localservice。
1.Contract: ?Artech.ResponsiveQueuedService.Contract
Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.ServiceModel;
namespace?Artech.ResponsiveQueuedService.Contract
{
????[ServiceContract]
????[ServiceKnownType(typeof(Order))]
????public?interface?IOrderProcessor
????{
????????[OperationContract(IsOneWay?=?true)]
????????void?Submit(Order?order);
????}
}
Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.ServiceModel;
namespace?Artech.ResponsiveQueuedService.Contract
{
????[ServiceContract]
????public?interface??IOrderRessponse
????{
????????[OperationContract(IsOneWay?=true)]
????????void?SubmitOrderResponse(Guid?orderNo,FaultException?exception);
????}
}
接收來自Order processing端的Response:Order No.和Exception。
Data Contract: Artech.ResponsiveQueuedService.Contract.Order
using?System.Collections.Generic;
using?System.Text;
using?System.Runtime.Serialization;
namespace?Artech.ResponsiveQueuedService.Contract
{
????[DataContract]
????public?class?Order
????{
????????Private?Fields#region?Private?Fields
????????private?Guid?_orderNo;
????????private?DateTime?_orderDate;
????????private?Guid?_supplierID;
????????private?string?_supplierName;
????????#endregion
????????Constructors#region?Constructors
????????public?Order(Guid?orderNo,?DateTime?orderDate,?Guid?supplierID,?string?supplierName)
????????{
????????????this._orderNo?=?orderNo;
????????????this._orderDate?=?orderDate;
????????????this._supplierID?=?supplierID;
????????????this._supplierName?=?supplierName;
????????}
????????#endregion
????????Public?Properties#region?Public?Properties
????????[DataMember]
????????public?Guid?OrderNo
????????{
????????????get?{?return?_orderNo;?}
????????????set?{?_orderNo?=?value;?}
????????}
????????[DataMember]
????????public?DateTime?OrderDate
????????{
????????????get?{?return?_orderDate;?}
????????????set?{?_orderDate?=?value;?}
????????}
????????[DataMember]
????????public?Guid?SupplierID
????????{
????????????get?{?return?_supplierID;?}
????????????set?{?_supplierID?=?value;?}
????????}
????????[DataMember]
????????public?string?SupplierName
????????{
????????????get?{?return?_supplierName;?}
????????????set?{?_supplierName?=?value;?}
????????}
????????#endregion
????????Public?Methods#region?Public?Methods
????????public?override?string?ToString()
????????{
????????????string?description?=?string.Format("Order?No./t:?{0}/n/tOrder?Date/t:?{1}/n/tSupplier?No./t:?{2}/n/tSupplier?Name/t:?{3}",?
????????????????this._orderNo,?this._orderDate.ToString("yyyy/MM/dd"),?this._supplierID,?this._supplierName);
????????????return?description;
????????}
????????#endregion
????}
}
對Order的封裝。
Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.Runtime.Serialization;
using?System.ServiceModel;
namespace?Artech.ResponsiveQueuedService.Contract
{????
????[DataContract]
????public?class?OrderResponseContext
????{
????????private?Uri?_responseAddress;
????????[DataMember]
????????public?Uri?ResponseAddress
????????{
????????????get?{?return?_responseAddress;?}
????????????set?{?_responseAddress?=?value;?}
????????}
????????public?static?OrderResponseContext?Current
????????{
????????????get
????????????{
????????????????if?(OperationContext.Current?==?null)
????????????????{
????????????????????return?null;
????????????????}
????????????????return?OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext",?"Artech.ResponsiveQueuedService.Contract");
????????????}
????????????set
????????????{
????????????????MessageHeader<OrderResponseContext>?header?=?new?MessageHeader<OrderResponseContext>(value);
????????????????OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext",?"Artech.ResponsiveQueuedService.Contract"));
????????????}
????????}
????}
}
ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。
2.Order Processing Service:Artech.ResponsiveQueuedService.Service
?
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?Artech.ResponsiveQueuedService.Contract;
using?System.ServiceModel;
using?System.Net.Security;
namespace?Artech.ResponsiveQueuedService.Service
{
????public?class?OrderProcessorService:IOrderProcessor
????{
????????private?void?ProcessOrder(Order?order)
????????{
????????????if?(order.OrderDate?<?DateTime.Today)
????????????{
????????????????throw?new?Exception();
????????????}
????????}
????????IOrderProcessor?Members#region?IOrderProcessor?Members
????????public?void?Submit(Order?order)
????????{
????????????Console.WriteLine("Begin?to?process?the?order?of?the?order?No.:?{0}",?order.OrderNo);
????????????FaultException?exception=?null;
????????????if?(order.OrderDate?<?DateTime.Today)
????????????{
????????????????exception?=?new?FaultException(new?FaultReason("The?order?has?expried"),?new?FaultCode("sender"));
????????????????Console.WriteLine("It's?fail?to?process?the?order./n/tOrder?No.:?{0}/n/tReason:{1}",?order.OrderNo,?"The?order?has?expried");
????????????}
????????????else
????????????{
????????????????Console.WriteLine("It's?successful?to?process?the?order./n/tOrder?No.:?{0}",?order.OrderNo);
????????????}
????????????NetMsmqBinding?binding?=?new?NetMsmqBinding();
????????????binding.ExactlyOnce?=?false;
????????????binding.Security.Transport.MsmqAuthenticationMode?=?MsmqAuthenticationMode.None;
????????????binding.Security.Transport.MsmqProtectionLevel?=?ProtectionLevel.None;
????????????ChannelFactory<IOrderRessponse>?channelFacotry?=?new?ChannelFactory<IOrderRessponse>(binding);
????????????OrderResponseContext?responseContext?=?OrderResponseContext.Current;
????????????IOrderRessponse?channel?=?channelFacotry.CreateChannel(new?EndpointAddress(responseContext.ResponseAddress));
????????????using?(OperationContextScope?contextScope?=?new?OperationContextScope(channel?as?IContextChannel))
????????????{
????????????????channel.SubmitOrderResponse(order.OrderNo,?exception);
????????????}
????????}
????????#endregion
????}
}
在這里我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創建一個FaultException,否則正確處理該Order,然后通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創建Binding并調用Response Service.
3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting
Configuration
?
<?xml?version="1.0"?encoding="utf-8"??><configuration>
??<appSettings>
????<add?key="msmqPath"?value="./private$/orderprocessor"/>
??</appSettings>
??<system.serviceModel>
????<bindings>
??????<netMsmqBinding>
????????<binding?name="MsmqBinding"?exactlyOnce="false"?useActiveDirectory="false">
??????????<security>
????????????<transport?msmqAuthenticationMode="None"?msmqProtectionLevel="None"?/>
??????????</security>
????????</binding>
??????</netMsmqBinding>
????</bindings>
????<services>
??????<service?name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
????????<endpoint?address="net.msmq://localhost/private/orderprocessor"?binding="netMsmqBinding"
????????????bindingConfiguration="MsmqBinding"?contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"?/>
??????</service>
????</services>
??</system.serviceModel>
</configuration>
Program
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?Artech.ResponsiveQueuedService.Service;
using?System.ServiceModel;
using?System.Configuration;
using?System.Messaging;
namespace?Artech.ResponsiveQueuedService.Hosting
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????string?path?=?ConfigurationManager.AppSettings["msmqPath"];
????????????if?(!MessageQueue.Exists(path))
????????????{
????????????????MessageQueue.Create(path);
????????????}
????????????using?(ServiceHost?host?=?new?ServiceHost(typeof(OrderProcessorService)))
????????????{
????????????????host.Opened?+=?delegate
????????????????{
????????????????????Console.WriteLine("The?Order?Processor?service?has?begun?to?listen");
????????????????};
????????????????host.Open();
????????????????Console.Read();
????????????}
????????}
????}
}
4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService
using?System;using?System.Collections.Generic;
using?System.Text;
using?Artech.ResponsiveQueuedService.Contract;
using?System.ServiceModel;
namespace?Artech.ResponsiveQueuedService.LocalService
{
????public?class?OrderRessponseService?:?IOrderRessponse
????{
????????IOrderRessponse?Members#region?IOrderRessponse?Members
????????public?void?SubmitOrderResponse(Guid?orderNo,?FaultException?exception)
????????{
????????????if?(exception?==?null)
????????????{
????????????????Console.WriteLine("It's?successful?to?process?the?order!/n/tOrder?No.:?{0}",orderNo);
????????????}
????????????else
????????????{
????????????????Console.WriteLine("It's?fail?to?process?the?order!/n/tOrder?No.:?{0}/n/tReason:?{1}",?orderNo,?exception.Message);
????????????}
????????}
????????#endregion
????}
}
5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting
Configuration
?
<?xml?version="1.0"?encoding="utf-8"??><configuration>
??<appSettings>
????<add?key="msmqPath"?value="./private$/orderresponse"/>
??</appSettings>
??<system.serviceModel>
????<bindings>
??????<netMsmqBinding>
????????<binding?name="msmqBinding"?exactlyOnce="false">
??????????<security>
????????????<transport?msmqAuthenticationMode="None"?msmqProtectionLevel="None"?/>
??????????</security>
????????</binding>
??????</netMsmqBinding>
????</bindings>
????<services>
??????<service?name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
????????<endpoint?address="net.msmq://localhost/private/orderresponse"?binding="netMsmqBinding"
????????????bindingConfiguration="msmqBinding"?contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse"?/>
??????</service>
????</services>
??</system.serviceModel>
</configuration>
Program
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?Artech.ResponsiveQueuedService.LocalService;
using?System.Configuration;
using?System.ServiceModel;
using?System.Messaging;
namespace?Artech.ResponsiveQueuedService.LocalhHosting
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????string?path?=?ConfigurationManager.AppSettings["msmqPath"];
????????????if?(!MessageQueue.Exists(path))
????????????{
????????????????MessageQueue.Create(path);
????????????}
????????????using?(ServiceHost?host?=?new?ServiceHost(typeof(OrderRessponseService)))
????????????{
????????????????host.Opened?+=?delegate
????????????????{
????????????????????Console.WriteLine("The?Order?Response?service?has?begun?to?listen");
????????????????};
????????????????host.Open();
????????????????Console.Read();
????????????}
????????}
????}
}
6. Client: Artech.ResponsiveQueuedService.Client
Configuration:
?
<?xml?version="1.0"?encoding="utf-8"??><configuration>
??<appSettings>
????<add?key="msmqPath"?value="net.msmq://localhost/private/orderresponse"/>
??</appSettings>
??<system.serviceModel>
????<bindings>
??????<netMsmqBinding>
????????<binding?name="MsmqBinding"?exactlyOnce="false"?useActiveDirectory="false">
??????????<security>
????????????<transport?msmqAuthenticationMode="None"?msmqProtectionLevel="None"?/>
??????????</security>
????????</binding>
??????</netMsmqBinding>
????</bindings>
????<client>
??????<endpoint?address="net.msmq://localhost/private/orderprocessor"?binding="netMsmqBinding"
????????????bindingConfiguration="MsmqBinding"?contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"?name="defaultEndpoint"?/>
????</client>
??</system.serviceModel>
</configuration>
Program:
?
using?System;using?System.Collections.Generic;
using?System.Text;
using?System.Configuration;
using?System.ServiceModel;
using?Artech.ResponsiveQueuedService.Contract;
using?System.Messaging;
namespace?Artech.ResponsiveQueuedService.Clinet
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????Order?order1?=?new?Order(Guid.NewGuid(),?DateTime.Today.AddDays(5),?Guid.NewGuid(),?"Supplier?A");
????????????Order?order2?=?new?Order(Guid.NewGuid(),?DateTime.Today.AddDays(-5),?Guid.NewGuid(),?"Supplier?A");
????????????string?path?=?ConfigurationManager.AppSettings["msmqPath"];
????????????Uri?address?=?new?Uri(path);
????????????OrderResponseContext?context?=?new?OrderResponseContext();
????????????context.ResponseAddress?=?address;
????????????ChannelFactory<IOrderProcessor>?channelFactory?=?new?ChannelFactory<IOrderProcessor>("defaultEndpoint");
????????????IOrderProcessor?orderProcessor?=?channelFactory.CreateChannel();
????????????using?(OperationContextScope?contextScope?=?new?OperationContextScope(orderProcessor?as?IContextChannel))
????????????{
????????????????Console.WriteLine("Submit?the?order?of?order?No.:?{0}",?order1.OrderNo);
????????????????OrderResponseContext.Current?=?context;
????????????????orderProcessor.Submit(order1);
????????????}
????????????using?(OperationContextScope?contextScope?=?new?OperationContextScope(orderProcessor?as?IContextChannel))
????????????{
????????????????Console.WriteLine("Submit?the?order?of?order?No.:?{0}",?order2.OrderNo);
????????????????OrderResponseContext.Current?=?context;
????????????????orderProcessor.Submit(order2);
????????????}
????????????Console.Read();
????????}
????}
}
我創建了兩個Order對象, 其中一個已經過期。從Configuration中取出Response Address并購建一個OrderResponseContext,然后分兩次將這兩個Order向Order Processing Service遞交。在調用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。
我們現在運行一下整個程序,看看最終的輸出結果:
Client:
?
Order Processing:
?
Order Response:
Reference:
Build a Queued WCF Response Service
作者:Artech
出處:http://artech.cnblogs.com
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
總結
以上是生活随笔為你收集整理的我的WCF之旅(13):创建基于MSMQ的Responsive Service的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows Phone 7项目实战之
- 下一篇: 一项调查结果:你的分析/挖掘计算机的配置