I / O神秘化
I / O類型
根據(jù)操作的阻塞或非阻塞性質(zhì)以及IO準(zhǔn)備/完成事件通知的同步或異步性質(zhì),可以使用四種不同的方法來完成IO。
同步阻塞I / O
IO操作在此阻止應(yīng)用程序,直到應(yīng)用程序完成為止,這構(gòu)成了大多數(shù)Web服務(wù)器中每個連接模型的典型線程的基礎(chǔ)。
當(dāng)調(diào)用阻塞的read()或write()時,將有一個上下文切換到內(nèi)核,IO操作將在此發(fā)生,并且數(shù)據(jù)將被復(fù)制到內(nèi)核緩沖區(qū)。 之后,內(nèi)核緩沖區(qū)將被轉(zhuǎn)移到用戶空間應(yīng)用程序級緩沖區(qū),并且應(yīng)用程序線程將被標(biāo)記為可運行
然后,應(yīng)用程序?qū)⒔獬枞⒆x取用戶空間緩沖區(qū)中的數(shù)據(jù)。
每個連接線程模型試圖通過將連接限制在線程中來限制這種阻塞的影響,以使其他并發(fā)連接的處理不會被一個連接上的I / O操作阻塞。 只要連接壽命短并且數(shù)據(jù)鏈接延遲不是那么糟糕,這很好。 但是在
如果連接壽命長或延遲長,則如果使用固定大小的線程池,線程將很長一段時間會被這些連接阻塞,導(dǎo)致新連接餓死,因為阻塞的線程無法在運行中重新用于服務(wù)新連接被阻止的狀態(tài),否則
如果使用新線程為每個連接提供服務(wù),將導(dǎo)致在系統(tǒng)內(nèi)產(chǎn)生大量線程,這可能會占用大量資源,并且對于高并發(fā)負(fù)載來說,上下文轉(zhuǎn)換成本很高。
同步非阻塞I / O
在此模式下,設(shè)備或連接被配置為非阻塞,因此不會阻塞read()和write()操作。 這通常意味著如果無法立即滿足該操作,它將返回錯誤代碼,指示該操作將阻塞(POSIX中的EWOULDBLOCK)或設(shè)備
暫時不可用(POSIX中為EAGAIN)。 在設(shè)備準(zhǔn)備就緒并讀取所有數(shù)據(jù)之前,應(yīng)由應(yīng)用程序輪詢。 但是,這不是很有效,因為這些調(diào)用中的每一個都會導(dǎo)致上下文切換到內(nèi)核并返回,而不管是否讀取了某些數(shù)據(jù)。
具有就緒事件的異步非阻塞I / O
早期模式的問題在于,應(yīng)用程序必須進行輪詢并且忙于等待才能完成工作。 在準(zhǔn)備好讀寫設(shè)備時如何通知應(yīng)用程序會更好嗎? 這正是此模式為您提供的。 使用特殊的系統(tǒng)調(diào)用(因平臺而異–對于Linux為select()/ poll()/ epoll(),對于BSD為kqueue(),對于Solaris為/ dev / poll),應(yīng)用程序注冊了獲取I / O準(zhǔn)備就緒的興趣來自特定設(shè)備的特定I / O操作(讀或?qū)?#xff09;的信息(Linux術(shù)語中是文件描述符,因為所有套接字都是使用文件描述符抽象的)。 之后,將調(diào)用此系統(tǒng)調(diào)用,該系統(tǒng)調(diào)用將阻塞,直到至少其中一個注冊文件描述符準(zhǔn)備就緒。 一旦這是真的,準(zhǔn)備進行I / O的文件描述符將作為
系統(tǒng)調(diào)用的返回,并且可以在應(yīng)用程序線程中的循環(huán)中按順序進行服務(wù)。
就緒的連接處理邏輯通常包含在用戶提供的事件處理程序中,該事件處理程序仍將必須發(fā)出非阻塞的read()/ write()調(diào)用,以從設(shè)備到內(nèi)核并最終向內(nèi)核獲取數(shù)據(jù)。
用戶空間緩沖區(qū),導(dǎo)致上下文切換到內(nèi)核。 而且,通常沒有絕對的保證可以使用該設(shè)備執(zhí)行預(yù)期的I / O,因為操作系統(tǒng)提供的只是該設(shè)備可能已準(zhǔn)備好進行感興趣的I / O操作的指示。在這種情況下,阻止read()或write()可以使您擺脫困境。 但是,這應(yīng)該是規(guī)范之外的例外。
因此,總體思路是以異步方式獲取就緒事件,并注冊一些事件處理程序以在觸發(fā)此類事件通知后進行處理。 因此,您可以看到,所有這些操作都可以在單個線程中完成,同時在不同的連接之間進行多路復(fù)用,這主要是由于select()的特性(這里我選擇一個代表性的系統(tǒng)調(diào)用),該特性可以一次返回多個套接字的就緒狀態(tài)。 這是這種操作模式的吸引力的一部分,在這種操作模式下,一個線程一次可以服務(wù)大量連接。 這個
模式通常稱為“非阻止I / O”模型。
Java通過其NIO API提取了特定于平臺的系統(tǒng)調(diào)用實現(xiàn)之間的差異。 套接字/文件描述符使用Channels進行抽象,并且Selector封裝選擇系統(tǒng)調(diào)用。 對獲取就緒事件感興趣的應(yīng)用程序向選擇器注冊一個Channel(通常是一個由ServerSocketChannel上的accept()獲得的SocketChannel),并獲得一個SelectionKey,它用作保存Channel和注冊信息的句柄。 然后在Selector上執(zhí)行阻塞的select()調(diào)用,該調(diào)用將返回一組SelectionKeys,然后可以對其進行處理
使用應(yīng)用程序指定的事件處理程序一個接一個地處理。
具有完成事件的異步和非阻塞I / O
就緒事件僅能通知您設(shè)備/套接字已準(zhǔn)備就緒,請執(zhí)行某些操作。 應(yīng)用程序仍然必須進行從設(shè)備/套接字到用戶空間緩沖區(qū)的數(shù)據(jù)讀取(更準(zhǔn)確地指示操作系統(tǒng)通過系統(tǒng)調(diào)用來這樣做),直到從設(shè)備一直到用戶空間緩沖區(qū)。 將作業(yè)委派給操作系統(tǒng)在后臺運行,并在完成作業(yè)后通過將所有數(shù)據(jù)從設(shè)備傳輸?shù)絻?nèi)核緩沖區(qū),最后傳輸?shù)綉?yīng)用程序級緩沖區(qū),讓它通知您是否很好? 這就是這種模式(通常稱為“異步I / O”模式)背后的基本思想。 為此,需要操作系統(tǒng)支持AIO操作。 在Linux中,此支持存在于2.6的aio POSIX API中,而對于Windows,則以“ I / O完成端口”的形式存在。
借助NIO2,Java已通過其AsynchronousChannel API增強了對該模式的支持。
操作系統(tǒng)支持
為了支持就緒和完成事件通知,不同的操作系統(tǒng)提供了不同的系統(tǒng)調(diào)用。 對于就緒事件,可以在基于Linux的系統(tǒng)中使用select()和poll()。 但是,較新的epoll()變體是首選的,因為它比select()或poll()更有效率。 select()遭受這樣的事實,即選擇時間隨所監(jiān)視的描述符數(shù)量線性增加。 覆蓋文件描述符數(shù)組引用似乎是臭名昭著的。 因此,每次稱為描述符數(shù)組時,都需要從單獨的副本中重新填充它。 無論如何都不是一個優(yōu)雅的解決方案。
可以通過兩種方式配置epoll()變體。 即邊沿觸發(fā)和水平觸發(fā)。 在邊緣觸發(fā)的情況下,僅當(dāng)在關(guān)聯(lián)的描述符上檢測到事件時,它才會發(fā)出通知。 在事件觸發(fā)的通知中說,您的應(yīng)用程序處理程序僅讀取內(nèi)核輸入緩沖區(qū)的一半。 現(xiàn)在,即使有一些數(shù)據(jù)要讀取,它下次也不會在此描述符上得到通知,除非設(shè)備準(zhǔn)備發(fā)送更多數(shù)據(jù)而導(dǎo)致文件描述符事件。 另一方面,級別觸發(fā)的配置將在每次要讀取數(shù)據(jù)時觸發(fā)通知。
根據(jù)版本,類似的系統(tǒng)調(diào)用以BSD形式的kqueue和/ dev / poll或Solaris中的“ Event Completion”形式出現(xiàn)。 Windows等效為“ I / O完成端口”。
但是,至少在Linux情況下,AIO模式的情況有所不同。 Linux在套接字上對aio的支持似乎充其量是陰暗的,有人暗示它實際上是在內(nèi)核級別使用就緒事件,而在應(yīng)用程序級別上對完成事件提供了異步抽象。 但是Windows似乎通過“ I / O完成端口”再次支持此類。
設(shè)計 I / O模式101
在軟件開發(fā)中,到處都有模式。 I / O沒什么不同。 與NIO和AIO模型相關(guān)的I / O模式有以下幾種。
反應(yīng)堆模式
有幾個組件參與此模式。 我將首先瀏覽它們,這樣很容易理解該圖。
Reactor Initiator:這是通過配置和啟動調(diào)度程序來啟動非阻塞服務(wù)器的組件。 首先,它將綁定服務(wù)器套接字,并向解復(fù)用器注冊它,以使客戶端連接接受就緒事件。 然后,將為分派器注冊每種就緒事件類型的事件處理程序?qū)崿F(xiàn)(讀/寫/接受等)。 接下來,調(diào)度程序事件循環(huán)將被調(diào)用以處理事件通知。
調(diào)度程序:定義用于注冊,除去和調(diào)度事件處理程序的接口,這些事件處理程序負(fù)責(zé)對連接事件做出反應(yīng),這些事件包括連接接受,一組連接上的數(shù)據(jù)輸入/輸出和超時事件。 為了服務(wù)于客戶端連接,相關(guān)的事件處理程序(例如:accept事件處理程序)將向解復(fù)用器注冊接受的客戶端通道(用于底層客戶端套接字的包裝器)以及就緒事件的類型,以偵聽該特定通道。 之后,調(diào)度程序線程將在多路分解器上為已注冊的通道集調(diào)用阻塞準(zhǔn)備狀態(tài)選擇操作。 一旦一個或多個注冊通道準(zhǔn)備好進行I / O,調(diào)度程序?qū)⑹褂米允录幚沓绦蛑鹨环?wù)與每個就緒通道相關(guān)聯(lián)的每個返回的“句柄”。 這些事件處理程序不要占用調(diào)度程序線程,這一點很重要,因為它將延遲調(diào)度程序為其他就緒連接提供服務(wù)的時間。 由于事件處理程序中的常規(guī)邏輯包括向/從就緒連接傳輸數(shù)據(jù),這將阻塞直到所有數(shù)據(jù)在用戶空間和內(nèi)核空間數(shù)據(jù)緩沖區(qū)之間正常傳輸之前,因此,這些處理程序?qū)⒃谂c線程不同的線程中運行池。
句柄:一旦向解復(fù)用器注冊了通道,就封裝了連接通道和就緒信息,返回句柄。 多路復(fù)用器就緒選擇操作將返回一組就緒的句柄。 Java NIO的等效項是SelectionKey。
解復(fù)用器:等待一個或多個已注冊連接通道的就緒事件。 Java NIO等效于Selector。
事件處理程序:指定具有鉤子方法的接口,用于分配連接事件。 這些方法需要由特定于應(yīng)用程序的事件處理程序?qū)崿F(xiàn)來實現(xiàn)。
具體事件處理程序:包含用于從基礎(chǔ)連接讀取/寫入數(shù)據(jù)以及執(zhí)行所需處理或從傳遞的Handle發(fā)起客戶端連接接受協(xié)議的邏輯。
事件處理程序通常在線程池的單獨線程中運行,如下圖所示。
此模式的簡單回顯服務(wù)器實現(xiàn)如下(沒有事件處理程序線程池)。
public class ReactorInitiator {private static final int NIO_SERVER_PORT = 9993;public void initiateReactiveServer(int port) throws Exception {ServerSocketChannel server = ServerSocketChannel.open();server.socket().bind(new InetSocketAddress(port));server.configureBlocking(false);Dispatcher dispatcher = new Dispatcher();dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);dispatcher.registerEventHandler(SelectionKey.OP_ACCEPT, new AcceptEventHandler(dispatcher.getDemultiplexer()));dispatcher.registerEventHandler(SelectionKey.OP_READ, new ReadEventHandler(dispatcher.getDemultiplexer()));dispatcher.registerEventHandler(SelectionKey.OP_WRITE, new WriteEventHandler());dispatcher.run(); // Run the dispatcher loop}public static void main(String[] args) throws Exception {System.out.println('Starting NIO server at port : ' +NIO_SERVER_PORT);new ReactorInitiator().initiateReactiveServer(NIO_SERVER_PORT);}}public class Dispatcher {private Map<Integer, EventHandler> registeredHandlers =new ConcurrentHashMap<Integer, EventHandler>();private Selector demultiplexer;public Dispatcher() throws Exception {demultiplexer = Selector.open();}public Selector getDemultiplexer() {return demultiplexer;}public void registerEventHandler(int eventType, EventHandler eventHandler) {registeredHandlers.put(eventType, eventHandler);}// Used to register ServerSocketChannel with the// selector to accept incoming client connectionspublic void registerChannel(int eventType, SelectableChannel channel) throws Exception {channel.register(demultiplexer, eventType);}public void run() {try {while (true) { // Loop indefinitelydemultiplexer.select();Set<SelectionKey> readyHandles =demultiplexer.selectedKeys();Iterator<SelectionKey> handleIterator =readyHandles.iterator();while (handleIterator.hasNext()) {SelectionKey handle = handleIterator.next();if (handle.isAcceptable()) {EventHandler handler =registeredHandlers.get(SelectionKey.OP_ACCEPT);handler.handleEvent(handle);// Note : Here we don't remove this handle from// selector since we want to keep listening to// new client connections}if (handle.isReadable()) {EventHandler handler =registeredHandlers.get(SelectionKey.OP_READ);handler.handleEvent(handle);handleIterator.remove();}if (handle.isWritable()) {EventHandler handler =registeredHandlers.get(SelectionKey.OP_WRITE);handler.handleEvent(handle);handleIterator.remove();}}}} catch (Exception e) {e.printStackTrace();}}}public interface EventHandler {public void handleEvent(SelectionKey handle) throws Exception;}public class AcceptEventHandler implements EventHandler {private Selector demultiplexer;public AcceptEventHandler(Selector demultiplexer) {this.demultiplexer = demultiplexer;}@Overridepublic void handleEvent(SelectionKey handle) throws Exception {ServerSocketChannel serverSocketChannel =(ServerSocketChannel) handle.channel();SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {socketChannel.configureBlocking(false);socketChannel.register(demultiplexer, SelectionKey.OP_READ);}}}public class ReadEventHandler implements EventHandler {private Selector demultiplexer;private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);public ReadEventHandler(Selector demultiplexer) {this.demultiplexer = demultiplexer;}@Overridepublic void handleEvent(SelectionKey handle) throws Exception {SocketChannel socketChannel =(SocketChannel) handle.channel();socketChannel.read(inputBuffer); // Read data from clientinputBuffer.flip();// Rewind the buffer to start reading from the beginningbyte[] buffer = new byte[inputBuffer.limit()];inputBuffer.get(buffer);System.out.println('Received message from client : ' +new String(buffer));inputBuffer.flip();// Rewind the buffer to start reading from the beginning// Register the interest for writable readiness event for// this channel in order to echo back the messagesocketChannel.register(demultiplexer, SelectionKey.OP_WRITE, inputBuffer);}}public class WriteEventHandler implements EventHandler {@Overridepublic void handleEvent(SelectionKey handle) throws Exception {SocketChannel socketChannel =(SocketChannel) handle.channel();ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();socketChannel.write(inputBuffer);socketChannel.close(); // Close connection}}前攝者模式
該模式基于異步I / O模型。 主要組成部分如下。
主動發(fā)起者:這是發(fā)起異步操作以接受客戶端連接的實體。 這通常是服務(wù)器應(yīng)用程序的主線程。 將完成處理程序與完成調(diào)度程序一起注冊以處理連接接受異步事件通知。
異步操作處理器:負(fù)責(zé)異步執(zhí)行I / O操作,并向應(yīng)用程序級別完成處理程序提供完成事件通知。 這通常是操作系統(tǒng)公開的異步I / O接口。
異步操作:異步操作由異步操作處理器在單獨的內(nèi)核線程中運行以完成操作。
完成分配器:負(fù)責(zé)異步操作完成時回調(diào)到應(yīng)用程序完成處理程序。 當(dāng)異步操作處理器完成異步啟動的操作時,完成調(diào)度程序?qū)⒋硭鼒?zhí)行應(yīng)用程序回調(diào)。 通常,根據(jù)事件的類型將事件通知處理委托給適當(dāng)?shù)耐瓿商幚沓绦颉?
完成處理程序:這是應(yīng)用程序?qū)崿F(xiàn)的接口,用于處理異步事件完成事件。
讓我們看看如何使用Java 7中添加的新Java NIO.2 API來實現(xiàn)此模式(作為簡單的回顯服務(wù)器)。
public class ProactorInitiator {static int ASYNC_SERVER_PORT = 4333;public void initiateProactiveServer(int port)throws IOException {final AsynchronousServerSocketChannel listener =AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));AcceptCompletionHandler acceptCompletionHandler =new AcceptCompletionHandler(listener);SessionState state = new SessionState();listener.accept(state, acceptCompletionHandler);}public static void main(String[] args) {try {System.out.println('Async server listening on port : ' +ASYNC_SERVER_PORT);new ProactorInitiator().initiateProactiveServer(ASYNC_SERVER_PORT);} catch (IOException e) {e.printStackTrace();}// Sleep indefinitely since otherwise the JVM would terminatewhile (true) {try {Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}} }public class AcceptCompletionHandlerimplementsCompletionHandler<AsynchronousSocketChannel, SessionState> {private AsynchronousServerSocketChannel listener;public AcceptCompletionHandler(AsynchronousServerSocketChannel listener) {this.listener = listener;}@Overridepublic void completed(AsynchronousSocketChannel socketChannel,SessionState sessionState) {// accept the next connectionSessionState newSessionState = new SessionState();listener.accept(newSessionState, this);// handle this connectionByteBuffer inputBuffer = ByteBuffer.allocate(2048);ReadCompletionHandler readCompletionHandler =new ReadCompletionHandler(socketChannel, inputBuffer);socketChannel.read(inputBuffer, sessionState, readCompletionHandler);}@Overridepublic void failed(Throwable exc, SessionState sessionState) {// Handle connection failure...}}public class ReadCompletionHandler implementsCompletionHandler<Integer, SessionState> {private AsynchronousSocketChannel socketChannel;private ByteBuffer inputBuffer;public ReadCompletionHandler(AsynchronousSocketChannel socketChannel,ByteBuffer inputBuffer) {this.socketChannel = socketChannel;this.inputBuffer = inputBuffer;}@Overridepublic void completed(Integer bytesRead, SessionState sessionState) {byte[] buffer = new byte[bytesRead];inputBuffer.rewind();// Rewind the input buffer to read from the beginninginputBuffer.get(buffer);String message = new String(buffer);System.out.println('Received message from client : ' +message);// Echo the message back to clientWriteCompletionHandler writeCompletionHandler =new WriteCompletionHandler(socketChannel);ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);socketChannel.write(outputBuffer, sessionState, writeCompletionHandler);}@Overridepublic void failed(Throwable exc, SessionState attachment) {//Handle read failure.....}}public class WriteCompletionHandler implementsCompletionHandler<Integer, SessionState> {private AsynchronousSocketChannel socketChannel;public WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void completed(Integer bytesWritten, SessionState attachment) {try {socketChannel.close();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, SessionState attachment) {// Handle write failure.....}}public class SessionState {private Map<String, String> sessionProps =new ConcurrentHashMap<String, String>();public String getProperty(String key) {return sessionProps.get(key);}public void setProperty(String key, String value) {sessionProps.put(key, value);}} 每種類型的事件完成(接受/讀取/寫入)由實現(xiàn)CompletionHandler接口(接受/讀取/ WriteCompletionHandler等)的單獨的完成處理程序處理。 在這些連接處理程序內(nèi)部管理狀態(tài)轉(zhuǎn)換。 附加的SessionState參數(shù)可用于
在一系列完成事件中保持客戶端會話的特定狀態(tài)。
NIO框架(HTTPCore)
如果您正在考慮實現(xiàn)基于NIO的HTTP服務(wù)器,那么您很幸運。 Apache HTTPCore庫為使用NIO處理HTTP流量提供了出色的支持。 API通過內(nèi)置的HTTP請求處理功能,在NIO層之上提供了更高級別的抽象。下面給出了一個最小的非阻塞HTTP服務(wù)器實現(xiàn),該實現(xiàn)為任何GET請求返回一個虛擬輸出。
public class NHttpServer {public void start() throws IOReactorException {HttpParams params = new BasicHttpParams();// Connection parametersparams.setIntParameter(HttpConnectionParams.SO_TIMEOUT, 60000).setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024).setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, true).setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true);final DefaultListeningIOReactor ioReactor =new DefaultListeningIOReactor(2, params);// Spawns an IOReactor having two reactor threads// running selectors. Number of threads here is// usually matched to the number of processor cores// in the system// Application specific readiness event handlerServerHandler handler = new ServerHandler();final IOEventDispatch ioEventDispatch =new DefaultServerIOEventDispatch(handler, params);// Default IO event dispatcher encapsulating the// event handlerListenerEndpoint endpoint = ioReactor.listen(new InetSocketAddress(4444));// start the IO reactor in a new separate threadThread t = new Thread(new Runnable() {public void run() {try {System.out.println('Listening in port 4444');ioReactor.execute(ioEventDispatch);} catch (InterruptedIOException ex) {ex.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}});t.start();// Wait for the endpoint to become ready,// i.e. for the listener to start accepting requests.try {endpoint.waitFor();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args)throws IOReactorException {new NHttpServer().start();}}public class ServerHandler implements NHttpServiceHandler {private static final int BUFFER_SIZE = 2048;private static final String RESPONSE_SOURCE_BUFFER ='response-source-buffer';// the factory to create HTTP responsesprivate final HttpResponseFactory responseFactory;// the HTTP response processorprivate final HttpProcessor httpProcessor;// the strategy to re-use connectionsprivate final ConnectionReuseStrategy connStrategy;// the buffer allocatorprivate final ByteBufferAllocator allocator;public ServerHandler() {super();this.responseFactory = new DefaultHttpResponseFactory();this.httpProcessor = new BasicHttpProcessor();this.connStrategy = new DefaultConnectionReuseStrategy();this.allocator = new HeapByteBufferAllocator();}@Overridepublic void connected(NHttpServerConnection nHttpServerConnection) {System.out.println('New incoming connection');}@Overridepublic void requestReceived(NHttpServerConnection nHttpServerConnection) {HttpRequest request =nHttpServerConnection.getHttpRequest();if (request instanceof HttpEntityEnclosingRequest) {// Handle POST and PUT requests} else {ContentOutputBuffer outputBuffer =new SharedOutputBuffer(BUFFER_SIZE, nHttpServerConnection, allocator);HttpContext context =nHttpServerConnection.getContext();context.setAttribute(RESPONSE_SOURCE_BUFFER, outputBuffer);OutputStream os =new ContentOutputStream(outputBuffer);// create the default response to this requestProtocolVersion httpVersion =request.getRequestLine().getProtocolVersion();HttpResponse response =responseFactory.newHttpResponse(httpVersion, HttpStatus.SC_OK,nHttpServerConnection.getContext());// create a basic HttpEntity using the source// channel of the response pipeBasicHttpEntity entity = new BasicHttpEntity();if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {entity.setChunked(true);}response.setEntity(entity);String method = request.getRequestLine().getMethod().toUpperCase();if (method.equals('GET')) {try {nHttpServerConnection.suspendInput();nHttpServerConnection.submitResponse(response);os.write(new String('Hello client..').getBytes('UTF-8'));os.flush();os.close();} catch (Exception e) {e.printStackTrace();}} // Handle other http methods}}@Overridepublic void inputReady(NHttpServerConnection nHttpServerConnection,ContentDecoder contentDecoder) {// Handle request enclosed entities here by reading// them from the channel}@Overridepublic void responseReady(NHttpServerConnection nHttpServerConnection) {try {nHttpServerConnection.close();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void outputReady(NHttpServerConnection nHttpServerConnection,ContentEncoder encoder) {HttpContext context = nHttpServerConnection.getContext();ContentOutputBuffer outBuf =(ContentOutputBuffer) context.getAttribute(RESPONSE_SOURCE_BUFFER);try {outBuf.produceContent(encoder);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void exception(NHttpServerConnection nHttpServerConnection,IOException e) {e.printStackTrace();}@Overridepublic void exception(NHttpServerConnection nHttpServerConnection,HttpException e) {e.printStackTrace();}@Overridepublic void timeout(NHttpServerConnection nHttpServerConnection) {try {nHttpServerConnection.close();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void closed(NHttpServerConnection nHttpServerConnection) {try {nHttpServerConnection.close();} catch (IOException e) {e.printStackTrace();}}}IOReactor類基本上將使用處理預(yù)備事件的ServerHandler實現(xiàn)包裝解復(fù)用器功能。
Apache Synapse(開源ESB)包含一個基于NIO的HTTP服務(wù)器的良好實現(xiàn),其中NIO用于為每個實例擴展大量客戶端,并且隨著時間的推移內(nèi)存使用量一直保持不變。 該實現(xiàn)還包含與Axis2傳輸框架集成一起內(nèi)置的良好調(diào)試和服務(wù)器統(tǒng)計信息收集機制。 可以在[1]中找到。
結(jié)論
在執(zhí)行I / O時,有多種選擇會影響服務(wù)器的可伸縮性和性能。 上面的每種I / O機制各有利弊,因此應(yīng)根據(jù)預(yù)期的可伸縮性和性能特征以及這些方法的易維護性來做出決定。 這結(jié)束了我關(guān)于I / O的篇幅較長的文章。 隨時提供您可能有的建議,更正或評論。 可以從此處下載文章中概述的服務(wù)器的完整源代碼以及客戶端。
相關(guān)鏈接
我在此過程中經(jīng)歷了許多參考。 以下是一些有趣的內(nèi)容。
[1] http://www.ibm.com/developerworks/java/library/j-nio2-1/index.html [2] http://www.ibm.com/developerworks/linux/library/l-async / [3] http://lse.sourceforge.net/io/aionotes.txt [4] http://wknight8111.blogspot.com/?tag=aio [5] http://nick-black.com/dankwiki /index.php/Fast_UNIX_Servers [6] http://today.java.net/pub/a/today/2007/02/13/architecture-of-highly-scalable-nio-server.html [7] Java NIO ,作者:羅恩·希欽斯[8] http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf [9] http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf [10] http://www.kegel.com/c10k.html參考: I / O來自Source Open博客,由我們的JCG合作伙伴 Buddhika Chamith揭秘。
翻譯自: https://www.javacodegeeks.com/2012/08/io-demystified.html
總結(jié)
- 上一篇: ADF:动态视图对象
- 下一篇: 配置CDI对话的超时