ESFramework介绍之(23)―― AgileTcp
??? 前面已經介紹了ITcp接口,而AgileTcp就是ESFramework給出的ITcp的參考實現。在之前,我曾經講解過模擬完成端口的Tcp組件實現和異步Tcp組件實現,在它們的基礎之上,我更改了處理策略,而形成了AgileTcp,目的是更清晰的結構、更高的效率。這個策略會在后面講到。
??? Tcp組件主要控制著系統與終端用戶的所有消息的進出,ITcp接口描述了這個組件的外貌,告訴外部如何使用Tcp組件、如何與Tcp組件交互。而從實現的角度來看,我們必須理清Tcp組件的職責:
(1)?管理所有已經建立的Tcp連接
(2)?管理與每個連接相對應接收緩沖區
(3)?管理所有的工作者線程
(4)?處理長度大于接收緩沖區的消息
??? 我們來看看如何滿足這些職責。
??? 由于每個連接都對應著一個接收緩沖區,所以可以將它們封裝在一起形成ContextKey(連接上下文):
???
??? public?class?ContextKey
????{????????
????????private?byte[]??buffer?;??????????//封裝接收緩沖區
????????private?ISafeNetworkStream?netStream?=?null?;????????????
????????private?volatile?bool??????isDataManaging?=?false?;
????????
????????public?ContextKey(ISafeNetworkStream?net_Stream?,int?buffSize)
????????{
????????????this.netStream?=?net_Stream?;????????????
????????????this.buffer????=?new?byte[buffSize]?;????????????
????????}
????????#region?NetStream??
????????public?ISafeNetworkStream?NetStream
????????{
????????????get
????????????{
????????????????return?this.netStream?;
????????????}
????????}????????????
????????public?byte[]?Buffer
????????{
????????????get
????????????{
????????????????return?this.buffer?;
????????????}????????????
????????}????????
????????public?bool?IsDataManaging
????????{
????????????get
????????????{
????????????????return?this.isDataManaging?;
????????????}
????????????set
????????????{
????????????????this.isDataManaging?=?value?;
????????????}
????????}
????????private?bool?firstMessageExist?=?false?;
????????public??bool?FirstMessageExist?
????????{
????????????get
????????????{
????????????????return?this.firstMessageExist?;
????????????}
????????????set
????????????{
????????????????this.firstMessageExist?=?value?;
????????????}
????????}
????????#endregion????????????
????}????
??? ContextKey中封裝的是ISafeNetworkStream而不是NetworkStream,原因可參見這里。
??? IsDataManaging屬性表明工作線程是否正在處理本連接對應的緩沖區中的數據,FirstMessageExist屬性用于標志接收到的第一條消息,因為系統可能需要對接收到的第一條消息做一些特殊的處理。
??? 任何時刻,可能都有成千上萬個連接存活著;任何時刻,都可能有新的連接建立、或已有的連接被釋放。所有這些ContextKey對象需要被管理起來,這就是上下文管理器IContextKeyManager:
????{
????????void?InsertContextKey(ContextKey?context_key)?;
????????void?DisposeAllContextKey()?;????????
????????void?RemoveContextKey(int?streamHashCode)?;
????????ISafeNetworkStream?GetNetStream(int?streamHashCode)?;
????????int????????????ConnectionCount?{get?;}
????????ICollection?ContextKeyList{get?;}
????????event?CbSimpleInt?StreamCountChanged?;????????
????}????
??? 說到上下文管理器,先要講講如何標志每個不同的上下文了?使用連接字,連接字是Tcp連接的Hashcode,它們與連接一一對應,并且不會重復。所以在源碼中你經常會看到名為“streamHashCode”的參數和變量。由于Tcp連接與streamHashCode一一對應,所以GetNetStream方法的實現就非常簡單。不知道你是否記得,RoundedMessage類中有個ConnectID字段,它就是連接字,與streamHashCode意義一樣。根據此字段,你可以清楚的知道這個RoundedMessage來自于哪個連接。
??? 關于工作者線程,很幸運的是,我們可以直接使用.NET提供的后臺線程池,而不必要再去手動管理,這可以省卻一些麻煩。當然你也可以使用ThreadPool類,甚至你可以從頭開始實現自己的線程池組件,這也是不困難的。
????
??? 我經常被問到,接收緩沖區應該開辟多大?這取決于你的應用,但是有一點是錯不了的――緩沖區的大小至少要大于消息頭Header的大小,否則麻煩就多了。根據我的經驗,一般緩沖區的大小至少應該能容納所有接收消息中的60%-80%。對于大于緩沖區大小的消息,ESFramework采用的策略是使用緩沖區池IBufferPool。
????{
????????byte[]?RentBuffer(int?minSize)?;
????????void???GivebackBuffer(byte[]?buffer)?;
????}
??? 通過上面的介紹我們已經知道如何滿足Tcp組件的職責,現在我們來看看更細的實現策略:
(1)?使用Checker線程。
??? 使用Checker線程是AgileTcp組件的區別于模擬完成端口的Tcp組件實現和異步Tcp組件的主要特色。當AgileTcp啟動時,Checker線程也隨之啟動,這個線程的主要工作就是檢查已經存在的每個連接上是否有數據要接收(還記得Select網絡模型),這可以通過NetworkStream.DataAvailable屬性知道。如果發現某個連接上有待接收的數據,就將其放到工作者線程中去處理,并設置前面提到的ContextKey.IsDataManaging屬性,然后再判斷下個連接,如此循環下去。
????????{
????????????while(!?this.stop)
????????????{
????????????????foreach(ContextKey?key?in?this.contextKeyManager.ContextKeyList)
????????????????{
????????????????????if(this.stop)
????????????????????{
????????????????????????break?;
????????????????????}
????????????????????if((!?key.IsDataManaging)?&&?key.NetStream.DataAvailable)
????????????????????{????????????????????????
????????????????????????key.IsDataManaging?=?true?;????
????????????????????????CbContextKey?cb?=?new?CbContextKey(this.DataManaging)?;
????????????????????????cb.BeginInvoke(key?,null?,null?)?;
????????????????????}????????????????????
????????????????}
????????????????System.Threading.Thread.Sleep(50)?;
????????????}
????????}
(2)?將消息頭的解析置于Tcp組件之中
??? 將消息頭解析置于Tcp組件之中這個方案我層考慮了非常久,原因是,這會破壞Tcp組件的單純性,使得Tcp組件與協議(Contract)有所關聯。最終采用這個策略的第一個理由是清晰,第二個理由是效率。清晰在于簡化了ContextKey結構,避免了使用消息分裂器這樣復雜的算法組件(如果大家看過我以前關于通信方案的文章,一定會得到這樣的答案)。效率在于,當在此解析了Header之后,后面所有的處理器都可以使用這個Header對象了,而不用在自己去解析。這也是NetMessage類中有個Header字段的原因。
(3)?針對于某個連接,只有當上一個消息處理完并將回復發送后(如果有回復的話),才會去接收下一個消息。
??? 這個策略會使很多事情變得簡單,而且不會影響任何有用的特性。由于不會在處理消息的時候去接收下一個消息,所以可以直接處理接收緩沖區中的數據,而不需要將數據從接收緩沖區拷貝到另外的地方去處理。這又對效率提高有所幫助。
??? 綜上所述,我們可以總結工作者線程要做的事情:首先,從連接上接收MessageHeaderSize個字節,解析成Header,然后在接收Header. MessageBodyLength個字節,即是Body,接著構造成RoundedMessage對象交給消息分配器去處理,最后將得到的處理結果發送出去。代碼如下所示:
DataManaging??????? private?void?DataManaging(ContextKey?key)
????????{????
????????????int?streamHashCode?=?key.NetStream.GetHashCode()?;????
????????????int?headerLen?=?this.contractHelper.MessageHeaderLength?;
????????????
????????????while((key.NetStream.DataAvailable)?&&?(!?this.stop))
????????????{
????????????????byte[]?rentBuff?=?null?;//每次分派的消息中,最多有一個rentBuff
????????????????try
????????????????{
????????????????????#region?構造?RoundedMessage
????????????????????NetHelper.RecieveData(key.NetStream?,key.Buffer?,0?,headerLen)?;
????????????????????IMessageHeader?header?=?this.contractHelper.ParseMessageHeader(key.Buffer?,0)?;????
????????????????????if(!?this.contractHelper.ValidateMessageToken(header))
????????????????????{
????????????????????????this.DisposeOneConnection(streamHashCode?,DisconnectedCause.MessageTokenInvalid)?;
????????????????????????return?;
????????????????????}
????????????????????RoundedMessage?requestMsg?=?new?RoundedMessage()?;
????????????????????requestMsg.ConnectID??????=?streamHashCode?;
????????????????????requestMsg.Header?????????=?header?;
????????????????????
????????????????????if(!?key.FirstMessageExist)
????????????????????{
????????????????????????requestMsg.IsFirstMessage?=?true?;
????????????????????????key.FirstMessageExist?????=?true?;
????????????????????}
????????????????????if((headerLen?+?header.MessageBodyLength)?>?this.maxMessageSize)
????????????????????{
????????????????????????this.DisposeOneConnection(streamHashCode?,DisconnectedCause.MessageSizeOverflow)?;
????????????????????????return?;
????????????????????}
????????????????
????????????????????if(header.MessageBodyLength?>0?)
????????????????????{
????????????????????????if((header.MessageBodyLength?+?headerLen)?<=?this.recieveBuffSize)
????????????????????????{
????????????????????????????NetHelper.RecieveData(key.NetStream?,key.Buffer?,0?,header.MessageBodyLength)?;
????????????????????????????requestMsg.Body?=?key.Buffer?;????????????????????????????
????????????????????????}
????????????????????????else
????????????????????????{????????????????????????
????????????????????????????rentBuff?=?this.bufferPool.RentBuffer(header.MessageBodyLength)?;????????????????????????
????????????????????????????NetHelper.RecieveData(key.NetStream?,rentBuff?,0?,header.MessageBodyLength)?;
????????????????????????????requestMsg.Body?=?rentBuff?;????????????????????????????
????????????????????????}
????????????????????}
????????????????????#endregion????????????????????
????????????????
????????????????????bool?closeConnection?=?false?;
????????????????????NetMessage?resMsg?=?this.tcpStreamDispatcher.DealRequestData(requestMsg?,ref?closeConnection)?;
????????????????????if(rentBuff?!=?null)
????????????????????{
????????????????????????this.bufferPool.GivebackBuffer(rentBuff)?;
????????????????????}
????????????????????if(closeConnection)
????????????????????{
????????????????????????this.DisposeOneConnection(streamHashCode?,DisconnectedCause.OtherCause)?;
????????????????????????return?;
????????????????????}
????????????????????if((resMsg?!=?null)?&&(!?this.stop))
????????????????????{????????????????????
????????????????????????byte[]?bRes?=?resMsg.ToStream()?;
????????????????????????key.NetStream.Write(bRes?,0?,bRes.Length)?;
????????????????????????if(this.ServiceCommitted?!=?null)
????????????????????????{????????????????????????????????
????????????????????????????this.ServiceCommitted(streamHashCode?,resMsg)?;
????????????????????????}
????????????????????}
????????????????}
????????????????catch(Exception?ee)
????????????????{
????????????????????if(ee?is?System.IO.IOException)?//正在讀寫流的時候,連接斷開
????????????????????{
????????????????????????this.DisposeOneConnection(streamHashCode?,DisconnectedCause.NetworkError)?;
????????????????????????break?;
????????????????????}
????????????????????else
????????????????????{
????????????????????????this.esbLogger.Log(ee.Message?,"ESFramework.Network.Tcp.AgileTcp"?,ErrorLevel.Standard)?;
????????????????????}
????????????????????ee?=?ee?;????????????????????
????????????????}
????????????}
????????????key.IsDataManaging?=?false?;
????????}
??? AgileTcp組件的主要原理差不多就這些了,這種實現有個缺點,不知大家發現沒有。那就是當客戶端主動斷開連接或掉線時,AgileTcp組件可能感受不到(除非對應的連接上正在發送或接收數據,此時會拋出異常),因為當連接斷開時,key.NetStream.DataAvailable不會拋出異常,而是仍然返回false。這是個問題,幸好有補救的辦法,一是要求客戶端下線的時候給服務器發送Logoff消息,二是使用定時掉線檢查器(IUserOnLineChecker)。當服務器檢查或發現某用戶下線時,即可調用ITcpClientsController.DisposeOneConnection方法來釋放對應的連接和Context。(你應該還記得ITcp接口是從ITcpClientsController繼承的)。關于這個問題,你有更好的解決辦法嗎?
??? 感謝關注!
上一篇文章:ESFramework介紹之(21)-- Tcp組件接口ITcp介紹
轉到??:ESFramework 可復用的通信框架(序)
轉載于:https://www.cnblogs.com/zhuweisky/archive/2006/04/13/374025.html
總結
以上是生活随笔為你收集整理的ESFramework介绍之(23)―― AgileTcp的全部內容,希望文章能夠幫你解決所遇到的問題。