NIO和AIO
摘要: 本系列基于煉數(shù)成金課程,為了更好的學習,做了系列的記錄。 本文主要介紹
- 什么是NIO
- Buffer
- Channel
- 網(wǎng)絡(luò)編程
- AIO
IO感覺上和多線程并沒有多大關(guān)系,但是NIO改變了線程在應(yīng)用層面使用的方式,也解決了一些實際的困難。而AIO是異步IO和前面的系列也有點關(guān)系。在此,為了學習和記錄,也寫一篇文章來介紹NIO和AIO。
1. 什么是NIO
NIO是New I/O的簡稱,與舊式的基于流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被納入到JDK中的,并具有以下特性
- NIO是基于塊(Block)的,它以塊為基本單位處理數(shù)據(jù) (硬盤上存儲的單位也是按Block來存儲,這樣性能上比基于流的方式要好一些)
- 為所有的原始類型提供(Buffer)緩存支持
- 增加通道(Channel)對象,作為新的原始 I/O 抽象
- 支持鎖(我們在平時使用時經(jīng)常能看到會出現(xiàn)一些.lock的文件,這說明有線程正在使用這把鎖,當線程釋放鎖時,會把這個文件刪除掉,這樣其他線程才能繼續(xù)拿到這把鎖)和內(nèi)存映射文件的文件訪問接口
- 提供了基于Selector的異步網(wǎng)絡(luò)I/O
所有的從通道中的讀寫操作,都要經(jīng)過Buffer,而通道就是io的抽象,通道的另一端就是操縱的文件。
2. Buffer
Java中Buffer的實現(xiàn)。基本的數(shù)據(jù)類型都有它對應(yīng)的Buffer
Buffer的簡單使用例子:
package test;import java.io.File; import java.io.FileInputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel;public class Test {public static void main(String[] args) throws Exception {FileInputStream fin = new FileInputStream(new File("d:\\temp_buffer.tmp"));FileChannel fc = fin.getChannel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);fc.read(byteBuffer);fc.close();byteBuffer.flip();//讀寫轉(zhuǎn)換} }下面的例子是使用NIO來復(fù)制文件:
public static void nioCopyFile(String resource, String destination)throws IOException {FileInputStream fis = new FileInputStream(resource);FileOutputStream fos = new FileOutputStream(destination);FileChannel readChannel = fis.getChannel(); // 讀文件通道FileChannel writeChannel = fos.getChannel(); // 寫文件通道ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數(shù)據(jù)緩存while (true) {buffer.clear();int len = readChannel.read(buffer); // 讀入數(shù)據(jù)if (len == -1) {break; // 讀取完畢}buffer.flip();writeChannel.write(buffer); // 寫入文件}readChannel.close();writeChannel.close();}這里要區(qū)別下容量和上限,比如一個Buffer有10KB,那么10KB就是容量,我將5KB的文件讀到Buffer中,那么上限就是5KB。
下面舉個例子來理解下這3個重要的參數(shù):
public static void main(String[] args) throws Exception {ByteBuffer b = ByteBuffer.allocate(15); // 15個字節(jié)大小的緩沖區(qū)System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()+ " position=" + b.position());for (int i = 0; i < 10; i++) {// 存入10個字節(jié)數(shù)據(jù)b.put((byte) i);}System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()+ " position=" + b.position());b.flip(); // 重置positionSystem.out.println("limit=" + b.limit() + " capacity=" + b.capacity()+ " position=" + b.position());for (int i = 0; i < 5; i++) {System.out.print(b.get());}System.out.println();System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()+ " position=" + b.position());b.flip();System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()+ " position=" + b.position());}此時position從0到10,capactiy和limit不變。
該操作會重置position,通常,將buffer從寫模式轉(zhuǎn)換為讀 模式時需要執(zhí)行此方法 flip()操作不僅重置了當前的position為0,還將limit設(shè)置到當前position的位置 。
limit的意義在于,來確定哪些數(shù)據(jù)是有意義的,換句話說,從position到limit之間的數(shù)據(jù)才是有意義的數(shù)據(jù),因為是上次操作的數(shù)據(jù)。所以flip操作往往是讀寫轉(zhuǎn)換的意思。
意義同上。
而Buffer中大多數(shù)的方法都是去改變這3個參數(shù)來達到某些功能的:
public final Buffer rewind() public final Buffer clear() public final Buffer flip()2.1 文件映射到內(nèi)存
public static void main(String[] args) throws Exception {RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");FileChannel fc = raf.getChannel();// 將文件映射到內(nèi)存中MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,raf.length());while (mbb.hasRemaining()) {System.out.print((char) mbb.get());}mbb.put(0, (byte) 98); // 修改文件raf.close();}3. Channel
多線程網(wǎng)絡(luò)服務(wù)器的一般結(jié)構(gòu):
簡單的多線程服務(wù)器:
public static void main(String[] args) throws Exception {ServerSocket echoServer = null;Socket clientSocket = null;try {echoServer = new ServerSocket(8000);} catch (IOException e) {System.out.println(e);}while (true) {try {clientSocket = echoServer.accept();System.out.println(clientSocket.getRemoteSocketAddress()+ " connect!");tp.execute(new HandleMsg(clientSocket));} catch (IOException e) {System.out.println(e);}}}這里的tp是一個線程池,HandleMsg是處理消息的類。
static class HandleMsg implements Runnable{ 省略部分信息 public void run(){ try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true); // 從InputStream當中讀取客戶端所發(fā)送的數(shù)據(jù) String inputLine = null; long b=System. currentTimeMillis (); while ((inputLine = is.readLine()) != null){ os.println(inputLine); } long e=System. currentTimeMillis (); System. out.println ("spend:"+(e - b)+" ms "); } catch (IOException e) { e.printStackTrace(); }finally{ 關(guān)閉資源 } } } public static void main(String[] args) throws Exception {Socket client = null;PrintWriter writer = null;BufferedReader reader = null;try {client = new Socket();client.connect(new InetSocketAddress("localhost", 8000));writer = new PrintWriter(client.getOutputStream(), true);writer.println("Hello!");writer.flush();reader = new BufferedReader(new InputStreamReader(client.getInputStream()));System.out.println("from server: " + reader.readLine());} catch (Exception e) {} finally {// 省略資源關(guān)閉}}為每一個客戶端使用一個線程,如果客戶端出現(xiàn)延時等異常,線程可能會被占用很長時間。因為數(shù)據(jù)的準備和讀取都在這個線程中。此時,如果客戶端數(shù)量眾多,可能會消耗大量的系統(tǒng)資源。
解決方案:
使用非阻塞的NIO (讀取數(shù)據(jù)不等待,數(shù)據(jù)準備好了再工作)
為了體現(xiàn)NIO使用的高效。這里先模擬一個低效的客戶端來模擬因網(wǎng)絡(luò)而延時的情況:
private static ExecutorService tp= Executors.newCachedThreadPool(); private static final int sleep_time=1000*1000*1000; public static class EchoClient implements Runnable{ public void run(){ try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); }catch(Exception e){}}} spend:6000ms spend:6000ms spend:6000ms spend:6001ms spend:6002ms spend:6002ms spend:6002ms spend:6002ms spend:6003ms spend:6003ms while ((inputLine = is.readLine()) != null)如果用NIO來處理這個問題會怎么做呢?
NIO有一個很大的特點就是:把數(shù)據(jù)準備好了再通知我
selector是一個選擇器,它可以選擇某一個Channel,然后做些事情。
一個線程可以對應(yīng)一個selector,而一個selector可以輪詢多個Channel,而每個Channel對應(yīng)了一個Socket。
與上面一個線程對應(yīng)一個Socket相比,使用NIO后,一個線程可以輪詢多個Socket。
當selector調(diào)用select()時,會查看是否有客戶端準備好了數(shù)據(jù)。當沒有數(shù)據(jù)被準備好時,select()會阻塞。平時都說NIO是非阻塞的,但是如果沒有數(shù)據(jù)被準備好還是會有阻塞現(xiàn)象。
當有數(shù)據(jù)被準備好時,調(diào)用完select()后,會返回一個SelectionKey,SelectionKey表示在某個selector上的某個Channel的數(shù)據(jù)已經(jīng)被準備好了。
只有在數(shù)據(jù)準備好時,這個Channel才會被選擇。
這樣NIO實現(xiàn)了一個線程來監(jiān)控多個客戶端。
而剛剛模擬的網(wǎng)絡(luò)延遲的客戶端將不會影響NIO下的線程,因為某個Socket網(wǎng)絡(luò)延遲時,數(shù)據(jù)還未被準備好,selector是不會選擇它的,而會選擇其他準備好的客戶端。
selectNow()與select()的區(qū)別在于,selectNow()是不阻塞的,當沒有客戶端準備好數(shù)據(jù)時,selectNow()不會阻塞,將返回0,有客戶端準備好數(shù)據(jù)時,selectNow()返回準備好的客戶端的個數(shù)。
主要代碼:
package test;import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class MultiThreadNIOEchoServer {public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();class EchoClient {private LinkedList<ByteBuffer> outq;EchoClient() {outq = new LinkedList<ByteBuffer>();}public LinkedList<ByteBuffer> getOutputQueue() {return outq;}public void enqueue(ByteBuffer bb) {outq.addFirst(bb);}}class HandleMsg implements Runnable {SelectionKey sk;ByteBuffer bb;public HandleMsg(SelectionKey sk, ByteBuffer bb) {super();this.sk = sk;this.bb = bb;}@Overridepublic void run() {// TODO Auto-generated method stubEchoClient echoClient = (EchoClient) sk.attachment();echoClient.enqueue(bb);sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);selector.wakeup();}}private Selector selector;private ExecutorService tp = Executors.newCachedThreadPool();private void startServer() throws Exception {selector = SelectorProvider.provider().openSelector();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);InetSocketAddress isa = new InetSocketAddress(8000);ssc.socket().bind(isa);// 注冊感興趣的事件,此處對accpet事件感興趣SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);for (;;) {selector.select();Set readyKeys = selector.selectedKeys();Iterator i = readyKeys.iterator();long e = 0;while (i.hasNext()) {SelectionKey sk = (SelectionKey) i.next();i.remove();if (sk.isAcceptable()) {doAccept(sk);} else if (sk.isValid() && sk.isReadable()) {if (!geym_time_stat.containsKey(((SocketChannel) sk.channel()).socket())) {geym_time_stat.put(((SocketChannel) sk.channel()).socket(),System.currentTimeMillis());}doRead(sk);} else if (sk.isValid() && sk.isWritable()) {doWrite(sk);e = System.currentTimeMillis();long b = geym_time_stat.remove(((SocketChannel) sk.channel()).socket());System.out.println("spend:" + (e - b) + "ms");}}}}private void doWrite(SelectionKey sk) {// TODO Auto-generated method stubSocketChannel channel = (SocketChannel) sk.channel();EchoClient echoClient = (EchoClient) sk.attachment();LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();ByteBuffer bb = outq.getLast();try {int len = channel.write(bb);if (len == -1) {disconnect(sk);return;}if (bb.remaining() == 0) {outq.removeLast();}} catch (Exception e) {// TODO: handle exceptiondisconnect(sk);}if (outq.size() == 0) {sk.interestOps(SelectionKey.OP_READ);}}private void doRead(SelectionKey sk) {// TODO Auto-generated method stubSocketChannel channel = (SocketChannel) sk.channel();ByteBuffer bb = ByteBuffer.allocate(8192);int len;try {len = channel.read(bb);if (len < 0) {disconnect(sk);return;}} catch (Exception e) {// TODO: handle exceptiondisconnect(sk);return;}bb.flip();tp.execute(new HandleMsg(sk, bb));}private void disconnect(SelectionKey sk) {// TODO Auto-generated method stub//省略略干關(guān)閉操作}private void doAccept(SelectionKey sk) {// TODO Auto-generated method stubServerSocketChannel server = (ServerSocketChannel) sk.channel();SocketChannel clientChannel;try {clientChannel = server.accept();clientChannel.configureBlocking(false);SelectionKey clientKey = clientChannel.register(selector,SelectionKey.OP_READ);EchoClient echoClinet = new EchoClient();clientKey.attach(echoClinet);InetAddress clientAddress = clientChannel.socket().getInetAddress();System.out.println("Accepted connection from "+ clientAddress.getHostAddress());} catch (Exception e) {// TODO: handle exception}}public static void main(String[] args) {// TODO Auto-generated method stubMultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();try {echoServer.startServer();} catch (Exception e) {// TODO: handle exception}}}當用之前模擬的那個延遲的客戶端時,這次的時間消耗就在2ms到11ms之間了。性能提升是很明顯的。
總結(jié):
NIO會將數(shù)據(jù)準備好后,再交由應(yīng)用進行處理,數(shù)據(jù)的讀取/寫入過程依然在應(yīng)用線程中完成,只是將等待的時間剝離到單獨的線程中去。
節(jié)省數(shù)據(jù)準備時間(因為Selector可以復(fù)用)
5. AIO
AIO的特點:
讀完了再通知我
不會加快IO,只是在讀完后進行通知
使用回調(diào)函數(shù),進行業(yè)務(wù)處理
AIO的相關(guān)代碼:
AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT)); public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);示例代碼:
server.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {final ByteBuffer buffer = ByteBuffer.allocate(1024);public void completed(AsynchronousSocketChannel result,Object attachment) {System.out.println(Thread.currentThread().getName());Future<Integer> writeResult = null;try {buffer.clear();result.read(buffer).get(100, TimeUnit.SECONDS);buffer.flip();writeResult = result.write(buffer);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {try {server.accept(null, this);writeResult.get();result.close();} catch (Exception e) {System.out.println(e.toString());}}}@Overridepublic void failed(Throwable exc, Object attachment) {System.out.println("failed: " + exc);}});這里使用了Future來實現(xiàn)即時返回,關(guān)于Future請參考上一篇
在理解了NIO的基礎(chǔ)上,看AIO,區(qū)別在于AIO是等讀寫過程完成后再去調(diào)用回調(diào)函數(shù)。
NIO是同步非阻塞的
AIO是異步非阻塞的
由于NIO的讀寫過程依然在應(yīng)用線程里完成,所以對于那些讀寫過程時間長的,NIO就不太適合。
而AIO的讀寫過程完成后才被通知,所以AIO能夠勝任那些重量級,讀寫過程長的任務(wù)。
原文鏈接:https://my.oschina.net/hosee/blog/615269
總結(jié)
- 上一篇: JDK7 AIO介绍
- 下一篇: Java语法基础-1