基于事件的 NIO 多线程服务器
2019獨角獸企業重金招聘Python工程師標準>>>
JDK1.4 的 NIO 有效解決了原有流式 IO 存在的線程開銷的問題,在 NIO 中使用多線程,主要目的已不是為了應對每個客戶端請求而分配獨立的服務線程,而是通過多線程充分使用用多個 CPU 的處理能力和處理中的等待時間,達到提高服務能力的目的。?
多線程的引入,容易為本來就略顯復雜的 NIO 代碼進一步降低可讀性和可維護性。引入良好的設計模型,將不僅帶來高性能、高可靠的代碼,也將帶來一個愜意的開發過程。
線程模型
NIO 的選擇器采用了多路復用(Multiplexing)技術,可在一個選擇器上處理多個套接字, 通過獲取讀寫通道來進行 IO 操作。由于網絡帶寬等原因,在通道的讀、寫操作中是容易出現等待的, 所以在讀、寫操作中引入多線程,對性能提高明顯,而且可以提高客戶端的感知服務質量。所以本文的模型將主要通過使用讀、寫線程池 來提高與客戶端的數據交換能力。
如下圖所示,服務端接受客戶端請求后,控制線程將該請求的讀通道交給讀線程池,由讀線程池分配線程完成對客戶端數據的讀取操作;當讀線程完成讀操作后,將數據返回控制線程,進行服務端的業務處理;完成 業務處理后,將需回應給客戶端的數據和寫通道提交給寫線程池,由寫線程完成向客戶端發送回應數據的操作。
(NIO 多線程服務器模型)
同時整個服務端的流程處理,建立于事件機制上。在?[接受連接->讀->業務處理->寫?->關閉連接?]這個 過程中,觸發器將觸發相應事件,由事件處理器對相應事件分別響應,完成服務器端的業務處理。?
下面我們就來詳細看一下這個模型的各個組成部分。
相關事件定義 在這個模型中,我們定義了一些基本的事件:
(1)onAccept:當服務端收到客戶端連接請求時,觸發該事件。通過該事件我們可以知道有新的客戶端呼入。該事件可用來控制服務端的負載。例如,服務器可設定同時只為一定數量客戶端提供服務,當同時請求數超出數量時,可在響應該事件時直接拋出異常,以拒絕新的連接。?
(2)onAccepted:當客戶端請求被服務器接受后觸發該事件。該事件表明一個新的客戶端與服務器正式建立連接。?
(3)onRead:當客戶端發來數據,并已被服務器控制線程正確讀取時,觸發該事件 。該事件通知各事件處理器可以對客戶端發來的數據進行實際處理了。需要注意的是,在本模型中,客戶端的數據讀取是由控制線程交由讀線程完成的,事件處理器不需要在該事件中進行專門的讀操作,而只需將控制線程傳來的數據進行直接處理即可。?
(4)onWrite:當客戶端可以開始接受服務端發送數據時觸發該事件,通過該事件,我們可以向客戶端發送回應數據。 在本模型中,事件處理器只需要在該事件中設置?
(5)onClosed:當客戶端與服務器斷開連接時觸發該事件。?
(6)onError:當客戶端與服務器從連接開始到最后斷開連接期間發生錯誤時觸發該事件。通過該事件我們可以知道有什么錯誤發生。
事件回調機制的實現
在這個模型中,事件采用廣播方式,也就是所有在冊的事件處理器都能獲得事件通知。這樣可以將不同性質的業務處理,分別用不同的處理器實現,使每個處理器的業務功能盡可能單一。?
如下圖:整個事件模型由監聽器、事件適配器、事件觸發器、事件處理器組成。
(事件模型)
監聽器(Serverlistener):這是一個事件接口,定義需監聽的服務器事件,如果您需要定義更多的事件,可在這里進行擴展。?
事件適配器(EventAdapter):對 Serverlistener 接口實現一個適配器 (EventAdapter),這樣的好處是最終的事件處理器可以只處理所關心的事件。?
事件觸發器(Notifier):用于在適當的時候通過觸發服務器事件,通知在冊的事件處理器對事件做出響應。觸發器以 Singleton 模式實現,統一控制整個服務器端的事件,避免造成混亂。?
事件處理器(Handler):繼承事件適配器,對感興趣的事件進行響應處理,實現業務處理。以下是一個簡單的事件處理器實現,它響應 onRead 事件,在終端打印出從客戶端讀取的數據。?
事件處理器的注冊。為了能讓事件處理器獲得服務線程的事件通知,事件處理器需在觸發器中注冊。?
實現 NIO 多線程服務器
NIO 多線程服務器主要由主控服務線程、讀線程和寫線程組成。
(線程模型)
主控服務線程(Server):主控線程將創建讀、寫線程池,實現監聽、接受客戶端請求,同時將讀、寫通道提交由相應的讀線程(Reader)和寫服務線程 (Writer) ,由讀寫線程分別完成對客戶端數據的讀取和對客戶端的回應操作。
public?class?Server?implements?Runnable?{?....?private?static?int?MAX_THREADS?=?4;?public?Server(int?port)?throws?Exception?{?....?//?創建無阻塞網絡套接selector?=?Selector.open();?sschannel?=?ServerSocketChannel.open();?sschannel.configureBlocking(false);?address?=?new?InetSocketAddress(port);?ServerSocket?ss?=?sschannel.socket();?ss.bind(address);?sschannel.register(selector,?SelectionKey.OP_ACCEPT);?}?public?void?run()?{?System.out.println("Server?started?...");?System.out.println("Server?listening?on?port:?"?+?port);?//?監聽while?(true)?{?try?{?int?num?=?0;?num?=?selector.select();?if?(num?>?0)?{?Set?selectedKeys?=?selector.selectedKeys();?Iterator?it?=?selectedKeys.iterator();?while?(it.hasNext())?{?SelectionKey?key?=?(SelectionKey)?it.next();?it.remove();?//?處理?IO?事件if?(?(key.readyOps()?&?SelectionKey.OP_ACCEPT)?==?SelectionKey.OP_ACCEPT)?{?//?Accept?the?new?connection?ServerSocketChannel?ssc?=?(ServerSocketChannel)?key.channel();notifier.fireOnAccept();?SocketChannel?sc?=?ssc.accept();?sc.configureBlocking(false);?//?觸發接受連接事件Request?request?=?new?Request(sc);?notifier.fireOnAccepted(request);?//?注冊讀操作?,?以進行下一步的讀操作sc.register(selector,??SelectionKey.OP_READ,?request);}?else?if?(?(key.readyOps()?&?SelectionKey.OP_READ)?==?SelectionKey.OP_READ?)?{?//?提交讀服務線程讀取客戶端數據Reader.processRequest(key);??key.cancel();?}?else?if?(?(key.readyOps()?&?SelectionKey.OP_WRITE)?==?SelectionKey.OP_WRITE?)?{?//?提交寫服務線程向客戶端發送回應數據Writer.processRequest(key);??key.cancel();?}?}?}?else?{?addRegister();??//?在?Selector?中注冊新的寫通道}?}?catch?(Exception?e)?{?notifier.fireOnError("Error?occured?in?Server:?"?+?e.getMessage());?continue;?}?}?}?....? }讀線程(Reader):使用線程池技術,通過多個線程讀取客戶端數據,以充分利用網絡數據傳輸的時間,提高讀取效率。
public?class?Reader?extends?Thread?{?public?void?run()?{?while?(true)?{?try?{?SelectionKey?key;?synchronized?(pool)?{?while?(pool.isEmpty())?{?pool.wait();?}?key?=?(SelectionKey)?pool.remove(0);?}?//?讀取客戶端數據,并觸發?onRead?事件read(key);??}?catch?(Exception?e)?{?continue;?}?}?}?....?}寫線程(Writer):和讀操作一樣,使用線程池,負責將服務器端的數據發送回客戶端。
?public?final?class?Writer?extends?Thread?{?public?void?run()?{?while?(true)?{?try?{?SelectionKey?key;?synchronized?(pool)?{?while?(pool.isEmpty())?{?pool.wait();?}?key?=?(SelectionKey)?pool.remove(0);?}?//?向客戶端發送數據,然后關閉連接,并分別觸發?onWrite,onClosed?事件write(key);?}?catch?(Exception?e)?{?continue;?}?}?}?....?}具體應用
NIO 多線程模型的實現告一段落,現在我們可以暫且將 NIO 的各個 API 和煩瑣的調用方法拋于腦后,專心于我們的實際應用中。?
我們用一個簡單的 TimeServer(時間查詢服務器)來看看該模型能帶來多么簡潔的開發方式。?
在這個 TimeServer 中,將提供兩種語言(中文、英文)的時間查詢服務。我們將讀取客戶端的查詢命令(GB/EN),并回應相應語言格式的當前時間。在應答客戶的請求的同時,服務器將進行日志記錄。做為示例,對日志記錄,我們只是簡單地將客戶端的訪問時間和 IP 地址輸出到服務器的終端上。
實現時間查詢服務的事件處理器(TimeHandler):?
實現日志記錄服務的事件處理器(LogHandler):
public?class?LogHandler?extends?EventAdapter?{?public?LogHandler()?{?}?public?void?onClosed(Request?request)?throws?Exception?{?String?log?=?new?Date().toString()?+?"?from?"?+?request.getAddress().toString();?System.out.println(log);?}?public?void?onError(String?error)?{?System.out.println("Error:?"?+?error);?}? }啟動程序:
public?class?Start?{?public?static?void?main(String[]?args)?{?try?{?LogHandler?loger?=?new?LogHandler();?TimeHandler?timer?=?new?TimeHandler();?Notifier?notifier?=?Notifier.getNotifier();?notifier.addlistener(loger);?notifier.addlistener(timer);?System.out.println("Server?starting?...");?Server?server?=?new?Server(5100);?Thread?tServer?=?new?Thread(server);?tServer.start();?}?catch?(Exception?e)?{?System.out.println("Server?error:?"?+?e.getMessage());?System.exit(-1);?}?}?}小結
通過例子我們可以看到,基于事件回調的 NIO 多線程服務器模型,提供了清晰直觀的實現方式,可讓開發者從 NIO 及多線程的技術細節中擺脫出來,集中精力關注具體的業務實現。
附錄
演示程序(?nioserver.zip)。
轉載于:https://my.oschina.net/u/167082/blog/205428
總結
以上是生活随笔為你收集整理的基于事件的 NIO 多线程服务器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何把Eclipse改成中文版
- 下一篇: 理解关键的渲染路径