实现一个压缩Remoting传输数据的Sink:CompressionSink (转载)
在前兩講《初探.Net Remoting服務(wù)端 Loading Remtoing配置內(nèi)容的過程 》《初探.Net Remoting客戶端 Loading Remtoing配置內(nèi)容的過程 》中,我已經(jīng)分析了Remoting 的Sink機(jī)制,接下來,就提供一個具體的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通過在客戶端和服務(wù)端各自插入一個數(shù)據(jù)壓縮-解壓縮的Sink。目的是希望減少大數(shù)據(jù)量傳遞對網(wǎng)絡(luò)帶寬的占用,提高傳輸效率。下載SourceCode ,BTW,這個壓縮Sink相對比較穩(wěn)定,大家可以在各自的項目中放心使用。:-)
詳細(xì)設(shè)計:
提供一個Assembly: CompressionSink.dll
它包括:
??? 客戶端:
??????? CompressionSink.CompressionClientSinkProvider類和CompressionSink.CompressionClientSink類
??? 服務(wù)端:
??????? CompressionSink.CompressionServerSinkProvider類和CompressionSink.CompressionServerSink類
??? 壓縮類:CompressionHelper
??? 壓縮內(nèi)核:NZipLib庫。
客戶端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
??? <system.runtime.remoting>
??? ??? <application>
??? ??? ??? <channels>
??? ??? ??? ??? <channel ref="http">
??? ??? ??? ??? ??? <clientProviders>
??? ??? ??? ??? ??? ??? <formatter ref="soap" />
??? ??? ??? ??? ??? ??? <provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />
??? ??? ??? ??? ??? </clientProviders>
??? ??? ??? ??? </channel>
??? ??? ??? </channels>
??? ??? ??? ?<client>
??? ??? ??? ??? ?<wellknown type="Service.SomeSAO, Service" ?url="http://localhost:5555/SomeSAO.soap" />
??? ??? ??? ?</client>
??? ??? </application>
??? </system.runtime.remoting>
</configuration>
服務(wù)端的配置文件 :?
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
??? <system.runtime.remoting>
??? ??? <application>
??? ??? ??? <channels>
??? ??? ??? ??? <channel ref="http" port="5555">
??? ??? ??? ??? ??? <serverProviders>
??? ??? ??? ??? ??? ??? <provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />
??? ??? ??? ??? ??? ??? <formatter ref="soap"/>
??? ??? ??? ??? ??? </serverProviders>
??? ??? ??? ??? </channel>
??? ??? ??? </channels>
??? ??? ??? <service>
??? ??? ??? ??? <wellknown mode="Singleton" ?type="Service.SomeSAO, Service" ??objectUri="SomeSAO.soap" />
??? ??? ??? </service>
??? ??? </application>
??? </system.runtime.remoting>
</configuration>
?
public?classCompressionClientSinkProvider:?IClientChannelSinkProvider????{
????????private?IClientChannelSinkProvider?_nextProvider;
????????public?CompressionClientSinkProvider(IDictionary?properties,?ICollection?providerData)?
????????{
????????????//?not?yet?needed
????????}
????????public?IClientChannelSinkProvider?Next
????????{
????????????get?{
????????????????return?_nextProvider;
????????????}
????????????set?{
????????????????_nextProvider?=?value;
????????????}
????????}
????????public?IClientChannelSink?CreateSink(IChannelSender?channel,?string?url,?object?remoteChannelData)?
????????{
????????????//?create?other?sinks?in?the?chain
????????????IClientChannelSink?next?=?_nextProvider.CreateSink(channel,
????????????????url,
????????????????remoteChannelData);????
????
????????????//?put?our?sink?on?top?of?the?chain?and?return?it????????????????
????????????return?new?CompressionClientSink(next);
????????}
????}
?
2????????????????????????????????????????IClientChannelSink
3????{
?4????????private?IClientChannelSink?_nextSink;
?5
?6????????public?CompressionClientSink(IClientChannelSink?next)?
?7????????{
?8????????????_nextSink?=?next;
?9????????}
10
11????????public?IClientChannelSink?NextChannelSink?
12????????{
13????????????get?{
14????????????????return?_nextSink;
15????????????}
16????????}
17
18
19????????public?void?AsyncProcessRequest(IClientChannelSinkStack?sinkStack,?
20????????????????????????????????????????IMessage?msg,?
21????????????????????????????????????????ITransportHeaders?headers,?
22????????????????????????????????????????Stream?stream)?
23????????{
24
25
26????????????//?generate?a?compressed?stream?using?NZipLib
27????????????stream?=?CompressionHelper.getCompressedStreamCopy(stream);
28
29????????????//?push?onto?stack?and?forward?the?request
30????????????sinkStack.Push(this,null);
31????????????_nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
32????????}
33
34
35????????public?void?AsyncProcessResponse(IClientResponseChannelSinkStack?sinkStack,?
36????????????????????????????????????????????object?state,?
37????????????????????????????????????????????ITransportHeaders?headers,?
38????????????????????????????????????????????Stream?stream)?
39????????{
40
41????????????//?deflate?the?response
42????????????stream?=?
43????????????????CompressionHelper.getUncompressedStreamCopy(stream);
44
45????????????//?forward?the?request
46????????????sinkStack.AsyncProcessResponse(headers,stream);
47????????}
48
49
50????????public?Stream?GetRequestStream(IMessage?msg,?
51???????????????????????????????????????ITransportHeaders?headers)?
52????????{
53????????????return?_nextSink.GetRequestStream(msg,?headers);
54????????}
55
56
57????????public?void?ProcessMessage(IMessage?msg,?
58???????????????????????????????????ITransportHeaders?requestHeaders,?
59???????????????????????????????????Stream?requestStream,?
60???????????????????????????????????out?ITransportHeaders?responseHeaders,?
61???????????????????????????????????out?Stream?responseStream)?
62????????{
63????????????//?generate?a?compressed?stream?using?NZipLib
64
65????????????Stream?localrequestStream??=?
66????????????????CompressionHelper.getCompressedStreamCopy(requestStream);
67
68????????????Stream?localresponseStream;
69????????????//?forward?the?call?to?the?next?sink
70????????????_nextSink.ProcessMessage(msg,
71?????????????????????????????????????requestHeaders,
72?????????????????????????????????????localrequestStream,?
73?????????????????????????????????????out?responseHeaders,?
74?????????????????????????????????????out?localresponseStream);
75
76????????????//?deflate?the?response
77????????????responseStream?=?
78????????????????CompressionHelper.getUncompressedStreamCopy(localresponseStream);
79
80????????}
81????}
?
1public?classCompressionServerSinkProvider:?IServerChannelSinkProvider2????{
?3????????private?IServerChannelSinkProvider?_nextProvider;
?4
?5????????public?CompressionServerSinkProvider(IDictionary?properties,?ICollection?providerData)?
?6????????{
?7????????????//?not?yet?needed
?8????????}
?9
10????????public?IServerChannelSinkProvider?Next
11????????{
12????????????get?{
13????????????????return?_nextProvider;?
14????????????}
15????????????set?{
16????????????????_nextProvider?=?value;
17????????????}
18????????}
19
20????????public?IServerChannelSink?CreateSink(IChannelReceiver?channel)
21????????{
22????????????//?create?other?sinks?in?the?chain
23????????????IServerChannelSink?next?=?_nextProvider.CreateSink(channel);????????????????
24????
25????????????//?put?our?sink?on?top?of?the?chain?and?return?it????????????????
26????????????return?new?CompressionServerSink(next);
27????????}
28
29????????public?void?GetChannelData(IChannelDataStore?channelData)
30????????{
31????????????//?not?yet?needed
32????????}
33
34????}
?
usingSystem;usingSystem.Runtime.Remoting.Channels;
usingSystem.Runtime.Remoting.Messaging;
usingSystem.IO;
namespaceCompressionSink
{
????public?class?CompressionServerSink:?BaseChannelSinkWithProperties,
????????IServerChannelSink
????{
????????private?IServerChannelSink?_nextSink;
????????public?CompressionServerSink(IServerChannelSink?next)?
????????{
????????????_nextSink?=?next;
????????}
????????public?IServerChannelSink?NextChannelSink?
????????{
????????????get?
????????????{
????????????????return?_nextSink;
????????????}
????????}
????????public?void?AsyncProcessResponse(IServerResponseChannelSinkStack?sinkStack,?
????????????object?state,?
????????????IMessage?msg,?
????????????ITransportHeaders?headers,?
????????????Stream?stream)?
????????{
????????????//?compressing?the?response
????????????stream=CompressionHelper.getCompressedStreamCopy(stream);
????????????//?forwarding?to?the?stack?for?further?processing
????????????sinkStack.AsyncProcessResponse(msg,headers,stream);
????????}
????????public?Stream?GetResponseStream(IServerResponseChannelSinkStack?sinkStack,?
????????????object?state,?
????????????IMessage?msg,?
????????????ITransportHeaders?headers)
????????{
????????????return?null;
????????}
????????public?ServerProcessing?ProcessMessage(IServerChannelSinkStack?sinkStack,?
????????????IMessage?requestMsg,?
????????????ITransportHeaders?requestHeaders,
????????????Stream?requestStream,?
????????????out?IMessage?responseMsg,?
????????????out?ITransportHeaders?responseHeaders,?
????????????out?Stream?responseStream)?
????????{
????????????//?uncompressing?the?request
????????????Stream??localrequestStream?=?
????????????????CompressionHelper.getUncompressedStreamCopy(requestStream);
????????????//?pushing?onto?stack?and?forwarding?the?call
????????????sinkStack.Push(this,null);
????????????Stream?localresponseStream;
????????????ServerProcessing?srvProc?=?_nextSink.ProcessMessage(sinkStack,
????????????????requestMsg,
????????????????requestHeaders,
????????????????localrequestStream,
????????????????out?responseMsg,
????????????????out?responseHeaders,
????????????????out?localresponseStream);
????????????//?compressing?the?response
????????????responseStream=
????????????????CompressionHelper.getCompressedStreamCopy(localresponseStream);
????????????//?returning?status?information
????????????return?srvProc;
????????}
????}
}
?
1public?classCompressionHelper?2????{
?3
?4????????/**////?<summary>
?5????????///?refactor??by?zendy
?6????????///?</summary>
?7????????///?<param?name="inStream"></param>
?8????????///?<returns></returns>
?9????????public?static?Stream?getCompressedStreamCopy(Stream?inStream)?
10????????{
11????????????MemoryStream?outStream?=?new?MemoryStream();
12????????????Deflater?mDeflater?=?new?Deflater(Deflater.BEST_COMPRESSION);
13????????????DeflaterOutputStream?compressStream?=?new?DeflaterOutputStream(outStream,mDeflater);
14
15????????????byte[]?buf?=?new?Byte[4096];
16????????????int?cnt?=?inStream.Read(buf,0,4096);
17????????????while?(cnt>0)?{
18????????????????compressStream.Write(buf,0,cnt);
19????????????????cnt?=?inStream.Read(buf,0,4096);
20????????????}
21????????????compressStream.Finish();
22????????????//modify?by?zendy?//這個設(shè)置非常重要,否則會導(dǎo)致后續(xù)Sink在處理該stream時失敗,在原來的源碼中就是因為沒有這個處理導(dǎo)致程序運行失敗
23????????????outStream.Seek(0,SeekOrigin.Begin);
24????????????return?outStream;
25????????}
26
27????????/**////?<summary>
28????????///?refactor??by?zendy
29????????///?</summary>
30????????///?<param?name="inStream"></param>
31????????///?<returns></returns>
32????????public?static?Stream?getUncompressedStreamCopy(Stream?inStream)?
33????????{
34????????????InflaterInputStream?unCompressStream?=?new?InflaterInputStream(inStream);?
35????????????MemoryStream?outStream?=?new?MemoryStream();
36????????????int?mSize;
37????????????Byte[]?mWriteData?=?new?Byte[4096];
38????????????while(true)
39????????????{
40????????????????mSize?=?unCompressStream.Read(mWriteData,?0,?mWriteData.Length);
41????????????????if?(mSize?>?0)
42????????????????{
43????????????????????outStream.Write(mWriteData,?0,?mSize);
44????????????????}
45????????????????else
46????????????????{
47????????????????????break;
48????????????????}
49????????????}
50????????????unCompressStream.Close();
51????????????//modify?by?zendy//這個設(shè)置非常重要,否則會導(dǎo)致后續(xù)Sink在處理該stream時失敗,,在原來的源碼中就是因為沒有這個處理導(dǎo)致程序運行失敗
52????????????outStream.Seek(0,SeekOrigin.Begin);
53????????????return?outStream;
54????????}
55????}
BTW,這個Sink還可以擴(kuò)展,比如判斷需要壓縮Stream的大小,如果很大,就壓縮,否則不壓縮(可以在responseHeaders和requestHeaders添加是否 已經(jīng)壓縮的標(biāo)記)
轉(zhuǎn)載于:https://www.cnblogs.com/daitengfei/archive/2006/04/04/366366.html
總結(jié)
以上是生活随笔為你收集整理的实现一个压缩Remoting传输数据的Sink:CompressionSink (转载)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring学习笔记01-BeanFac
- 下一篇: C++学习——string