bio阻塞的缺点_java 中的 BIO/NIO/AIO 详解
java 的 IO 演進之路
我們在前面學習了 linux 的 5 種 I/O 模型詳解
下面我們一起來學習下如何使用 java 實現 BIO/NIO/AIO 這 3 種不同的網絡 IO 模型編程。
BIO 編程
BIO 作為最基礎的 IO 版本,實現起來比較簡單。
Server
import?java.io.BufferedReader;import?java.io.IOException;
import?java.io.InputStreamReader;
import?java.io.PrintWriter;
import?java.net.ServerSocket;
import?java.net.Socket;
/**
?*?
?BIO?服務端?
?*?@author?老馬嘯西風
?*/
public?class?TimeServer?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????final?int?port?=?8088;
????????ServerSocket?serverSocket?=?new?ServerSocket(port);
????????System.out.println("server?started?at?port?"?+?port);
????????//?循環監聽
????????while?(true)?{
????????????Socket?socket?=?serverSocket.accept();
????????????System.out.println("客戶端連接成功");
????????????//?讀取客戶端的信息
????????????BufferedReader?bufferedReader?=?new?BufferedReader(new?InputStreamReader(socket.getInputStream()));
????????????System.out.println("Server?Recevie:?"?+?bufferedReader.readLine());
????????????//?讀取客戶端的信息
????????????PrintWriter?printWriter?=?new?PrintWriter(socket.getOutputStream(),?true);
????????????String?currentTime?=?System.currentTimeMillis()+"";
????????????printWriter.println(currentTime);
????????}
????}
}
client
import?java.io.BufferedReader;import?java.io.IOException;
import?java.io.InputStreamReader;
import?java.io.PrintWriter;
import?java.net.Socket;
/**
?*?
?BIO?客戶端?
?*
?*?@author?老馬嘯西風
?*/
public?class?TimeClient?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????final?int?port?=?8088;
????????try(Socket?clientSocket?=?new?Socket("127.0.0.1",?port))?{
????????????System.out.println("Client?started?at?port?"?+?port);
????????????//?寫入信息
????????????PrintWriter?printWriter?=?new?PrintWriter(clientSocket.getOutputStream(),?true);
????????????printWriter.println("hello?bio");
????????????//?讀取反饋
????????????BufferedReader?bufferedReader?=?new?BufferedReader(new?InputStreamReader(clientSocket.getInputStream()));
????????????System.out.println("client?recevie:?"?+?bufferedReader.readLine());
????????}
????}
}
啟動測試
啟動服務端
啟動客戶端
client?recevie:?1568643464491
Process?finished?with?exit?code?0
再次查看服務端日志
客戶端連接成功
Server?Recevie:?hello?bio
線程池版本
BIO 的缺點
缺點其實非常明顯,每次都要創建一個線程去處理。
比如我的實現是直接阻塞當前線程的,這當然非常的不友好。
可以使用線線程池的方式進行優化改進。
線程版本
public?class?TimeThreadServer?{????public?static?void?main(String[]?args)?throws?IOException?{
????????final?int?port?=?8088;
????????ServerSocket?serverSocket?=?new?ServerSocket(port);
????????System.out.println("server?started?at?port?"?+?port);
????????//?循環監聽
????????while?(true)?{
????????????Socket?socket?=?serverSocket.accept();
????????????System.out.println("客戶端連接成功");
????????????new?ServerHandler(socket).start();
????????}
????}
????static?class?ServerHandler?extends?Thread?{
????????private?final?Socket?socket;
????????ServerHandler(Socket?socket)?{
????????????this.socket?=?socket;
????????}
????????@Override
????????public?void?run()?{
????????????try?{
????????????????//?讀取客戶端的信息
????????????????BufferedReader?bufferedReader?=?new?BufferedReader(new?InputStreamReader(socket.getInputStream()));
????????????????System.out.println("Server?Recevie:?"?+?bufferedReader.readLine());
????????????????//?讀取客戶端的信息
????????????????PrintWriter?printWriter?=?new?PrintWriter(socket.getOutputStream(),?true);
????????????????String?currentTime?=?System.currentTimeMillis()+"";
????????????????printWriter.println(currentTime);
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}
線程池版本
public?static?void?main(String[]?args)?throws?IOException?{????final?int?port?=?8088;
????ServerSocket?serverSocket?=?new?ServerSocket(port);
????System.out.println("server?started?at?port?"?+?port);
????ExecutorService?executorService?=?Executors.newFixedThreadPool(2);
????//?循環監聽
????while?(true)?{
????????Socket?socket?=?serverSocket.accept();
????????System.out.println("客戶端連接成功");
????????//?線程池處理
????????executorService.submit(new?ServerHandler(socket));
????}
}
其他代碼保持不變。
優缺點
線程池版本的 BIO 又被稱作偽異步 IO。
屬于在 NIO 還沒有流行之前的一種實戰解決方案。
這種方式的性能和 BIO 想比較提升了很多,實現起來也比較簡單,但是可靠性相對較差。
NIO 基本概念
Buffer
Java NIO Buffers用于和NIO Channel交互。正如你已經知道的,我們從channel中讀取數據到buffers里,從buffer把數據寫入到channels.
buffer 本質上就是一塊內存區,可以用來寫入數據,并在稍后讀取出來。
這塊內存被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的接口。
Channel
Java NIO Channel通道和流非常相似,主要有以下幾點區別:
通道可以讀也可以寫,流一般來說是單向的(只能讀或者寫)。
通道可以異步讀寫。
通道總是基于緩沖區Buffer來讀寫。
Selector
用單線程處理多個channels的好處是我需要更少的線程來處理channel。
實際上,你甚至可以用一個線程來處理所有的channels。
從操作系統的角度來看,切換線程開銷是比較昂貴的,并且每個線程都需要占用系統資源,因此暫用線程越少越好。
需要留意的是,現代操作系統和CPU在多任務處理上已經變得越來越好,所以多線程帶來的影響也越來越小。
如果一個CPU是多核的,如果不執行多任務反而是浪費了機器的性能。不過這些設計討論是另外的話題了。
簡而言之,通過Selector我們可以實現單線程操作多個channel。
NIO 實現方式
NIO 采取通道(Channel)和緩沖區(Buffer)來傳輸和保存數據,它是非阻塞式的 I/O,即在等待連接、讀寫數據(這些都是在一線程以客戶端的程序中會阻塞線程的操作)的時候,程序也可以做其他事情,以實現線程的異步操作。
考慮一個即時消息服務器,可能有上千個客戶端同時連接到服務器,但是在任何時刻只有非常少量的消息需要讀取和分發(如果采用線程池或者一線程一客戶端方式,則會非常浪費資源),這就需要一種方法能阻塞等待,直到有一個通道可以進行 I/O 操作。
NIO 的 Selector 選擇器就實現了這樣的功能,一個 Selector 實例可以同時檢查一組信道的 I/O 狀態,它就類似一個觀察者,只要我們把需要探知的 SocketChannel 告訴 Selector,我們接著做別的事情,當有事件(比如,連接打開、數據到達等)發生時,它會通知我們,傳回一組 SelectionKey,我們讀取這些 Key,就會獲得我們剛剛注冊過的 SocketChannel,然后,我們從這個 Channel 中讀取數據,接著我們可以處理這些數據。
Selector 內部原理實際是在做一個對所注冊的 Channel 的輪詢訪問,不斷的輪詢(目前就這一個算法),一旦輪詢到一個 Channel 有所注冊的事情發生,比如數據來了,它就會讀取 Channel 中的數據,并對其進行處理。
要使用選擇器,需要創建一個 Selector 實例,并將其注冊到想要監控的信道上(通過 Channel 的方法實現)。
最后調用選擇器的 select()方法,該方法會阻塞等待,直到有一個或多個信道準備好了 I/O 操作或等待超時,或另一個線程調用了該選擇器的 wakeup()方法。
現在,在一個單獨的線程中,通過調用 select()方法,就能檢查多個信道是否準備好進行 I/O 操作,由于非阻塞 I/O 的異步特性,在檢查的同時,我們也可以執行其他任務。
服務端
步驟
(1)創建一個 Selector 實例;
(2)將其注冊到各種信道,并指定每個信道上感興趣的I/O操作;
(3)重復執行:
調用一種?select()?方法;獲取選取的鍵列表;
對于已選鍵集中的每個鍵:
????獲取信道,并從鍵中獲取附件(如果為信道及其相關的?key?添加了附件的話);
????確定準備就緒的操縱并執行,如果是?accept?操作,將接收的信道設置為非阻塞模式,并注冊到選擇器;
????如果需要,修改鍵的興趣操作集;
????從已選鍵集中移除鍵。
代碼實現
import?java.io.IOException;import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.nio.channels.SelectionKey;
import?java.nio.channels.Selector;
import?java.nio.channels.ServerSocketChannel;
import?java.nio.channels.SocketChannel;
import?java.util.Iterator;
import?java.util.concurrent.TimeUnit;
/**
?*?@author?binbin.hou
?*?@since?1.0.0
?*/
public?class?NioTcpServer?{
????/**
?????*?緩沖區的長度
?????*/
????private?static?final?int?BUFSIZE?=?256;
????/**
?????*?select方法等待信道準備好的最長時間
?????*/
????private?static?final?int?TIMEOUT?=?3000;
????/**
?????*?監聽的端口號
?????*/
????private?static?final?int?PORT?=?18888;
????public?static?void?main(String[]?args)?throws?IOException,?InterruptedException?{
????????//?1.?實例化一個通道
????????ServerSocketChannel?serverSocketChannel?=?ServerSocketChannel.open();
????????//?設置為非阻塞模式
????????serverSocketChannel.configureBlocking(false);
????????//?綁定監聽的端口
????????serverSocketChannel.socket().bind(new?InetSocketAddress(PORT));
????????System.out.println("Server?started?listen?on:?"?+?PORT);
????????//?2.?構建一個?Selector,用于監聽?Channel?的狀態
????????Selector?selector?=?Selector.open();
????????serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT);
????????//3.?不斷循環等待
????????while?(true)?{
????????????//3.1?循環等待直到有通道已經準備好
????????????if(selector.select(TIMEOUT)?==?0)?{
????????????????System.out.println(".");
????????????????TimeUnit.SECONDS.sleep(1);
????????????????continue;
????????????}
????????????//3.2?遍歷多有的?key
????????????Iterator?selectionKeySetIter?=?selector.selectedKeys().iterator();while(selectionKeySetIter.hasNext())?{
????????????????SelectionKey?selectionKey?=?selectionKeySetIter.next();//?accept?I/O形式if(selectionKey.isAcceptable())?{
????????????????????ServerSocketChannel?serverSocketChannel1?=?(ServerSocketChannel)?selectionKey.channel();//?獲取客戶端?channel
????????????????????SocketChannel?socketChannel?=?serverSocketChannel1.accept();
????????????????????socketChannel.configureBlocking(false);//?選擇器注冊監聽的事件,同時制定關聯的附件
????????????????????socketChannel.register(selectionKey.selector(),?SelectionKey.OP_READ?|?SelectionKey.OP_WRITE,
????????????????????????????ByteBuffer.allocate(BUFSIZE));
????????????????}//?客戶端信道已經準備好了讀取數據到?bufferif(selectionKey.isReadable())?{//?讀取代碼
????????????????????SocketChannel?socketChannel?=?(SocketChannel)?selectionKey.channel();//?獲取對應的附件信息
????????????????????ByteBuffer?byteBuffer?=?(ByteBuffer)?selectionKey.attachment();long?bufferRead?=?socketChannel.read(byteBuffer);//客戶端關閉的鏈接??梢园踩P閉if(bufferRead?==?-1)?{
????????????????????????socketChannel.close();
????????????????????}?else?{//?緩沖區讀取到了數據,將其感興趣的操作設置為可讀可寫。
????????????????????????selectionKey.interestOps(SelectionKey.OP_READ?|?SelectionKey.OP_WRITE);//?打印讀取的內容
????????????????????????System.out.println("Server?read:?"?+?new?String(byteBuffer.array()));
????????????????????}
????????????????}//?寫入處理if(selectionKey.isValid()
????????????????????&&?selectionKey.isWritable())?{
????????????????????SocketChannel?socketChannel?=?(SocketChannel)?selectionKey.channel();//?獲取附件
????????????????????ByteBuffer?byteBuffer?=?(ByteBuffer)?selectionKey.attachment();//?重置緩沖區,準備將數據寫入到信道
????????????????????byteBuffer.flip();
????????????????????socketChannel.write(byteBuffer);//Tells?whether?there?are?any?elements?between?the?current?position?and?the?limit.//?如果已經全部寫入到信道,則將該信道感興趣的操作標識為讀if(!byteBuffer.hasRemaining())?{
????????????????????????selectionKey.interestOps(SelectionKey.OP_READ);
????????????????????}//?為讀取更多的數據騰出空間
????????????????????byteBuffer.compact();
????????????????}//?手動刪除
????????????????selectionKeySetIter.remove();
????????????}
????????}
????}
}
客戶端
代碼實現
import?java.io.IOException;import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.nio.channels.SocketChannel;
import?java.util.concurrent.TimeUnit;
/**
?*?@author?binbin.hou
?*?@since?1.0.0
?*/
public?class?NioTcpClient?{
????/**
?????*?監聽的端口號
?????*/
????private?static?final?int?PORT?=?18888;
????public?static?void?main(String[]?args)?throws?IOException,?InterruptedException?{
????????//1.?設置為非阻塞
????????SocketChannel?socketChannel?=?SocketChannel.open();
????????socketChannel.configureBlocking(false);
????????socketChannel.connect(new?InetSocketAddress(PORT));
????????//2.?連接中...
????????while?(!socketChannel.finishConnect())?{
????????????System.out.println(".");
????????????TimeUnit.SECONDS.sleep(1);
????????}
????????System.out.println("\n");
????????//3.?寫入/讀取信息
????????String?info?=?"hello?nio?test";
????????ByteBuffer?readBuffer?=?ByteBuffer.allocate(info.length());
????????ByteBuffer?writeBuffer?=?ByteBuffer.wrap(info.getBytes());
????????int?totalReceivedBytes?=?0;
????????int?receivedBytes?=?0;
????????while?(totalReceivedBytes?????????????//?循環寫入
????????????while?(writeBuffer.hasRemaining())?{
????????????????socketChannel.write(writeBuffer);
????????????}
????????????receivedBytes?=?socketChannel.read(readBuffer);
????????????//?說明服務端中斷
????????????if(receivedBytes?==?-1)?{
????????????????throw?new?RuntimeException("Server?has?been?shut?done.");
????????????}
????????????totalReceivedBytes?+=?receivedBytes;
????????}
????????System.out.println("Client?received?from?server:?"?+?new?String(readBuffer.array()));
????????socketChannel.close();
????}
}
測試
運行服務端
服務端
運行客戶端
客戶端
服務端
.
.
Server?read:?hello?nio?test???????????????????????????????????????????????????????????????????????????????????????????????????????????????.
.
.
JDK AIO
jdk7中新增了一些與文件(網絡)I/O相關的一些api。這些API被稱為NIO.2,或稱為AIO(Asynchronous I/O)。
AIO最大的一個特性就是異步能力,這種能力對socket與文件I/O都起作用。
實現方式
Future 方式
即提交一個 I/O 操作請求(accept/read/write),返回一個 Future。
然后您可以對 Future 進行檢查(調用get(timeout)),確定它是否完成,或者阻塞 IO 操作直到操作正常完成或者超時異常。
使用 Future 方式很簡單,需要注意的是,因為Future.get()是同步的,所以如果不仔細考慮使用場合,使用 Future 方式可能很容易進入完全同步的編程模式,從而使得異步操作成為一個擺設。
如果這樣,那么原來舊版本的 Socket API 便可以完全勝任,大可不必使用異步 I/O.
Callback 方式
即提交一個 I/O 操作請求,并且指定一個 CompletionHandler。
當異步 I/O 操作完成時,便發送一個通知,此時這個 CompletionHandler 對象的 completed 或者 failed 方法將會被調用。
性能
因為AIO的實施需充分調用OS參與,IO需要操作系統支持、并發也同樣需要操作系統的支持,所以性能方面不同操作系統差異會比較明顯。
Future 實現方式
Server
import?java.io.IOException;import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.nio.channels.AsynchronousServerSocketChannel;
import?java.nio.channels.AsynchronousSocketChannel;
import?java.nio.charset.Charset;
import?java.util.concurrent.ExecutionException;
import?java.util.concurrent.Future;
import?java.util.concurrent.TimeUnit;
import?java.util.concurrent.TimeoutException;
public?class?AioFutureServer?{
????private?static?final?int?DEFAULT_PORT?=?12345;
????private?AsynchronousServerSocketChannel?serverSocketChannel;
????public?AioFutureServer()?throws?IOException?{
????????serverSocketChannel?=?AsynchronousServerSocketChannel.open();
????????serverSocketChannel.bind(new?InetSocketAddress(DEFAULT_PORT));
????????System.out.println("Server?listen?on?port:?"?+?DEFAULT_PORT);
????}
????public?void?startWithFuture()?throws?InterruptedException,
????????????ExecutionException,?TimeoutException?{
????????while?(true)?{
????????????//?循環接收客戶端請求
????????????Future?future?=?serverSocketChannel.accept();//?get()?是為了確保?accept?到一個連接
????????????AsynchronousSocketChannel?socket?=?future.get();
????????????handleWithFuture(socket);
????????}
????}/**
?????*?處理未來的信息
?????*?@param?channel?異步客戶端
?????*/private?void?handleWithFuture(AsynchronousSocketChannel?channel)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
????????ByteBuffer?readBuf?=?ByteBuffer.allocate(8);
????????readBuf.clear();//?一次可能讀不完while?(true)?{//get?是為了確保?read?完成,超時時間可以有效避免DOS攻擊,如果客戶端一直不發送數據,則進行超時處理
????????????Integer?integer?=?channel.read(readBuf).get(10,?TimeUnit.SECONDS);
????????????System.out.println("read:?"?+?integer);if?(integer?==?-1)?{break;
????????????}
????????????readBuf.flip();
????????????System.out.println("received:?"?+?Charset.forName("UTF-8").decode(readBuf));
????????????readBuf.clear();
????????}
????}public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?ExecutionException,?TimeoutException?{new?AioFutureServer().startWithFuture();
????}
}
客戶端
import?java.io.IOException;import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.nio.channels.AsynchronousServerSocketChannel;
import?java.nio.channels.AsynchronousSocketChannel;
import?java.nio.charset.Charset;
import?java.util.concurrent.ExecutionException;
import?java.util.concurrent.Future;
import?java.util.concurrent.TimeUnit;
import?java.util.concurrent.TimeoutException;
public?class?AioClient?{
????private?static?final?int?DEFAULT_PORT?=?12345;
????public?static?void?main(String[]?args)?throws?IOException,?ExecutionException,?InterruptedException?{
????????AsynchronousSocketChannel?client?=?AsynchronousSocketChannel.open();
????????client.connect(new?InetSocketAddress("localhost",?DEFAULT_PORT)).get();
????????client.write(ByteBuffer.wrap("123456789".getBytes()));
????}
}
測試
啟動服務端
啟動客戶端
服務端日志
read:?8received:?12345678
read:?1
received:?9
Exception?in?thread?"main"?java.util.concurrent.ExecutionException:?java.io.IOException:?指定的網絡名不再可用。
Callback 模式
服務端
import?java.io.IOException;import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.nio.channels.AsynchronousServerSocketChannel;
import?java.nio.channels.AsynchronousSocketChannel;
import?java.nio.channels.CompletionHandler;
import?java.nio.charset.Charset;
import?java.util.concurrent.TimeUnit;
public?class?AioCompletionServer?{
????private?static?final?int?DEFAULT_PORT?=?12345;
????private?AsynchronousServerSocketChannel?serverSocketChannel;
????public?AioCompletionServer()?throws?IOException?{
????????serverSocketChannel?=?AsynchronousServerSocketChannel.open();
????????serverSocketChannel.bind(new?InetSocketAddress(DEFAULT_PORT));
????????System.out.println("Server?listen?on?port:?"?+?DEFAULT_PORT);
????}
????/**
?????*?使用回調的方式
?????*/
????public?void?startWithCompletionHandler()?{
????????serverSocketChannel.accept(null,
????????????????new?CompletionHandler()?{@Overridepublic?void?completed(AsynchronousSocketChannel?result,?Object?attachment)?{//?再此接收客戶端連接
????????????????????????serverSocketChannel.accept(null,?this);//?處理結果
????????????????????????handleWithCompletionHandler(result);
????????????????????}@Overridepublic?void?failed(Throwable?exc,?Object?attachment)?{
????????????????????????exc.printStackTrace();
????????????????????}
????????????????});
????}/**
?????*?處理異步的結果
?????*?@param?channel?客戶端信道
?????*/private?void?handleWithCompletionHandler(final?AsynchronousSocketChannel?channel)?{try?{final?long?timeout?=?10L;final?ByteBuffer?buffer?=?ByteBuffer.allocate(8);//?再次讀取,還是一種回調的方式。
????????????channel.read(buffer,?timeout,?TimeUnit.SECONDS,?null,?new?CompletionHandler()?{@Overridepublic?void?completed(Integer?result,?Object?attachment)?{
????????????????????System.out.println("read:"?+?result);if?(result?==?-1)?{try?{
????????????????????????????channel.close();
????????????????????????}?catch?(IOException?e)?{
????????????????????????????e.printStackTrace();
????????????????????????}return;
????????????????????}
????????????????????buffer.flip();
????????????????????System.out.println("received?message:"?+?Charset.forName("UTF-8").decode(buffer));
????????????????????buffer.clear();//?遞歸調用,直到結束為止。
????????????????????channel.read(buffer,?timeout,?TimeUnit.SECONDS,?null,?this);
????????????????}@Overridepublic?void?failed(Throwable?exc,?Object?attachment)?{
????????????????????exc.printStackTrace();
????????????????}
????????????});
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}public?static?void?main(String[]?args)?throws?IOException,?InterruptedException?{new?AioCompletionServer().startWithCompletionHandler();//?沉睡等待處理。
????????TimeUnit.SECONDS.sleep(100);
????}
}
客戶端
同上
小結
本文講述了 jdk 實現的 bio/nio/aio 的方式,你是否會感覺 jdk 中的 api 設計過于復雜呢?
下一節我們將通過 netty 框架實現上述功能,并講述我們為什么要選擇 netty 作為網絡開發的基本工具。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次相遇。
總結
以上是生活随笔為你收集整理的bio阻塞的缺点_java 中的 BIO/NIO/AIO 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小米新款智能触控笔曝光 名为Focus
- 下一篇: 不止特斯拉,外媒测试 61 款车型发现丰