QuickFIX 源码分析
FIX是Financial Information eXchange的簡稱。FIX是一種專門為實(shí)時(shí)電子證券交易設(shè)計(jì)的標(biāo)準(zhǔn)消息協(xié)議。FIX協(xié)議由FIX protocol, Ltd(FPL)所有并維護(hù)。FIX協(xié)議的網(wǎng)址為http://www.fixprotocol.org
 QuickFix/J是實(shí)現(xiàn)了FIX協(xié)議所有版本及其功能的開源軟件,100%使用JAVA實(shí)現(xiàn)。
 QuickFix/J的網(wǎng)址為http://www.quickfixj.org
 QuickFix/J的源代碼可以從http://sourceforge.net/projects/quickfixj/files/QuickFIX_J 下載,也可以去QuickFix/J的官方網(wǎng)站,進(jìn)入下載頁面下載源代碼。
 那么能用QuickFix/J做什么事情呢?關(guān)注股票的兄弟們一定留意過LevelII這個(gè)名詞,他是中國股票交易新行情的簡稱。簡單說,可以將QuickFix/J的代碼改造一下,就用來接受深圳證券交易的LevelII行情數(shù)據(jù)。當(dāng)然接受行情不是免費(fèi)的,需要諸多的商務(wù)手續(xù),但是本文僅僅討論QuickFix/J的開源代碼的設(shè)計(jì)和實(shí)現(xiàn),并且側(cè)重于QuickFix/J的客戶端實(shí)現(xiàn)。服務(wù)器端留在以后的文章介紹。關(guān)于上海證券交易所的LevelII數(shù)據(jù)的格式和接受,跟深圳的有諸多的不同,也留在以后討論。
 首先QuickFixJ代碼功能主要有兩大部分,一部分是Fix協(xié)議數(shù)據(jù)的解析,另外一部分是客戶端跟服務(wù)器端建立連接并維持回話,傳輸數(shù)據(jù)。第一部分將主要介紹QuickFix/J的傳輸部分的實(shí)現(xiàn)。
 (一) QuickFix/J傳輸功能部分
 QuickFix/J的連接管理和傳輸功能是基于MINA框架實(shí)現(xiàn)的。MINA是什么?MINA是Apache旗下的一個(gè)網(wǎng)絡(luò)應(yīng)用框架,能夠幫助大家輕松的開發(fā)高性能、高擴(kuò)展性的網(wǎng)絡(luò)程序。它使用NIO在傳輸協(xié)議(比如TCP/IP,UDP/IP)之上提供了抽象的、事件驅(qū)動的、異步處理的API。MINA的網(wǎng)址為http://mina.apache.org。
 A). QuickFix/J客戶端用到的主要類的功能說明(V1.5.0)
a) 遍歷配置文件取得所有[session]節(jié)的配置并創(chuàng)建相應(yīng)的FixSession (如果[session]中沒有指定ConnectionType或者明確指定了ConnectionType為initiator,則建立FixSession(quickfix.Session),其他類型的ConnectionType無效,如acceptor)。配置文件中可以指定多個(gè)[session]。
b) 通過已經(jīng)生成的FixSession和傳入的eventHandlingStrategy創(chuàng)建IoSessionInitiator,并保存入initiators(Set類型的緩存)中。
那么FixSession和IoSessionInitiator有什么區(qū)別呢?請參考5、6。
c) 啟動、關(guān)閉客戶端(initiator)。啟動initiator時(shí)首先啟動應(yīng)用層的SessionTimer(請參考2),然后啟動連接層的initiator(IoSessionInitiator)。關(guān)閉initiator時(shí),先關(guān)閉連接層的initiator(IoSessionInitiator),再關(guān)閉應(yīng)用層的SessionTimer。
a) fixSession維護(hù)Session內(nèi)部消息的自增序列號、自動錯(cuò)誤恢復(fù)、與通信對方(counterpart)建立通信信道(communication channel)。
b) Session是獨(dú)立于特定的傳輸層協(xié)議的。Session被新建時(shí),消息序列號置為1,每次通信序列號自增,直到Session被重置(reset)。每個(gè)Session能夠跨越多個(gè)傳輸連接(并非同時(shí)跨越,而是說第一次網(wǎng)絡(luò)連接斷開后,隨后重連,雖然底層的網(wǎng)絡(luò)連接已經(jīng)是新建的了,但是Session還能保持跟斷網(wǎng)之前是同一個(gè)Session)。
c) fixSession中核心邏輯在next方法中。next(message)首先檢查SessionTime,如果超過1秒未刷新則刷新時(shí)間戳;如果發(fā)現(xiàn)Session不存在,則執(zhí)行reset,重置Session。然后取到消息的header,msgType進(jìn)行檢查,首先beginString不正確,拋異常并退出。然后從dataDictionaryProvider取得數(shù)據(jù)字典,驗(yàn)證數(shù)據(jù)字典。然后根據(jù)消息類型,分別回調(diào)用戶接口。回調(diào)用戶函數(shù)的入口在驗(yàn)證完數(shù)據(jù)字典之后,請注意verify(message)函數(shù),所有的普通消息通過這個(gè)函數(shù)去回調(diào)application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。關(guān)于消息的解析,其中普通Message是通過quickfix.MessageUtils.parse將String類型的消息解析成Message。
a) 初始化,即用eventHandlingStrategy創(chuàng)建Initiator,然后注冊此SocketInitiator所管理的全部Session,然后啟動Initiator,最后調(diào)用eventHandlingStrategy.blockInThread()在另外的后臺線程中去處理SessionTimer收到的插入隊(duì)列的消息。啟動Initiator做的事情依次是:先啟動SessionTimer去監(jiān)聽從傳輸層過來的消息,如果沒有Logon則先Logon,然后在收到消息后回調(diào)用戶代碼處理消息;啟動reconnectTask去建立和維護(hù)傳輸層的網(wǎng)絡(luò)連接。
b) 啟動Initiator,在另外的線程中后臺(Daemon)處理消息。
c) 阻塞Initiator,在同一線程中處理消息。
d) 停止Initiator。分為強(qiáng)制停止和非強(qiáng)制停止。強(qiáng)制或者非強(qiáng)制Logout所有FixSession,停止連接層的Initiator,取消注冊所有此SocketInitiator所管理的全部Session。
e) 關(guān)于a) b)如何處理來自底層的消息的邏輯,請參考11和12。因?yàn)檫@里所謂的處理消息實(shí)際上是直接或者間接調(diào)用了SingleThreadedEventHandlingStrategy的block處理消息。
a) 為了不阻塞輸入,那么就需要一個(gè)eventQueue來臨時(shí)快速的存儲收到的所有消息。
b) onMessage接到底層傳入的消息包裝成SessionMessageEvent,首先將其存入eventQueue。
c) 那么SessionMessageEvent里面有什么?SessionEvent僅僅是把fixSession和Message包裝到一起,并且提供了處理Message的方法processMessage。可以這樣理解,。
d) getSessionConnector獲取需要處理應(yīng)用級的connector以便處理eventQueue中的消息。
e) getMessage從eventQueue中取出SessionMessageEvent待處理。
f) block就是應(yīng)用程序級別處理消息的入口。block判斷HandlingMessage是否應(yīng)該繼續(xù)運(yùn)新,如果是則從消息隊(duì)列中取出SessionMessageEvent,調(diào)用其中的processMessage去處理該Message。
g) processMessage如何處理了收到的消息呢?它會調(diào)用fixSession的next方法,將消息傳給Session,由fixSession再接力將消息回調(diào)到用戶手中。請參考5。
h) 也許你會注意到處理消息的block在run中始終被調(diào)用,而且沒有任何sleep時(shí)間,難道它在沒有消息的時(shí)候始終不停的死循環(huán)運(yùn)行且絲毫不休息?CPU會保持100%?實(shí)際上效果不是這樣的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在沒有消息的時(shí)候,這個(gè)循環(huán)會每休息一秒再執(zhí)行下一次循環(huán)。如何做到這樣的效果呢?原因是如果eventQueue中如果沒有消息,而該eventQueue設(shè)置了阻塞超時(shí)1000毫秒,則取消息的操作會等待最多1000毫秒,如果沒有等到消息則超時(shí)退出不再等待,執(zhí)行完畢本次循環(huán),如果等到了則按照正常流程處理消息。這樣做最大的好處就是,如果eventQueue中有事件,那么就會連續(xù)不斷的處理,如果沒有消息,就會休息timeout毫秒再查看。
i) blockInThread,在新啟動的后臺線程中處理SessionMessageEvent
a) 由于是每個(gè)Session對應(yīng)一個(gè)線程,因此該策略內(nèi)部需要一個(gè)稱之為dispatchersMap作為緩存為每個(gè)Session保存響應(yīng)的處理線程(MessageDispatchingThread)引用。
b) 當(dāng)onMessage收到來自底層的輸入消息時(shí),根據(jù)輸入的fixSession從dispatchers中取到相應(yīng)的處理線程,并將該消息加入(enqueue)到該線程內(nèi)部的消息隊(duì)列中待處理。
c) 每個(gè)dispatcher(MessageDispatchingThread類型)內(nèi)部均維護(hù)了自己的消息隊(duì)列,和單線程模式不同在于,消息隊(duì)列中的消息僅僅是Message,不是SessionMessageEvent。處理Message的邏輯從單線程中的SessionMessageEvent中移出到dispatcher中。
:是一個(gè)接口,為指定的session protocol或者application version提供數(shù)據(jù)字典。getSessionDataDictionary根據(jù)提供的beginString 即協(xié)議版本獲取相應(yīng)的數(shù)據(jù)字典。getApplicationDataDictionary根據(jù)提供的application version ID和custom application ID獲取數(shù)據(jù)字典。application version ID在FIXT.1.1之前由BeginString字段確定。custom application ID是可選值,不是必須的。
B). 網(wǎng)絡(luò)數(shù)據(jù)在QuickFix/J中的流向
ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服務(wù)器端的通信。收到網(wǎng)絡(luò)數(shù)據(jù),ioConnector觸發(fā)相應(yīng)事件,并把事件交給ioHandler(InitiatorIoHandler)的processMessage -> processMessage中調(diào)用eventHandlingStrategy.onMessage(quickfixSession, message) ,將消息向外回調(diào) -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 將收到的消息入隊(duì)(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中啟動單獨(dú)的后臺線程,依次從eventQueue取出消息處理,向session回調(diào) -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根據(jù)msgType判斷回調(diào) ->逐層回調(diào) (verify -> veriry -> fromCallback -> fromAdmin/fromApp),從fromAdmin/fromApp(msg, sessionID)回調(diào)用戶處理邏輯。
總結(jié)
以上是生活随笔為你收集整理的QuickFIX 源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 一文看懂IC芯片生产流程:从设计到制造与
- 下一篇: 语音算法论文中frame-level,s
