bio linux 创建_不断升级,Java之BIO、NIO、AIO的演变
一、前言
一句話概括BIO NIO AIO:
第一階段,服務(wù)端采用同步阻塞的BIO;
第二階段,服務(wù)端采用同步阻塞的線程池的BIO;
第三階段,JDK4之后服務(wù)端采用同步非阻塞的NIO;
第四階段,JDK7之后服務(wù)端采用異步非阻塞的AIO。
Java BIO 對應(yīng) Linux 同步非阻塞IO,
Java NIO 對應(yīng) Linux 信號(hào)驅(qū)動(dòng)IO,
Java AIO 對應(yīng) Linux 異步IO。
二、手寫B(tài)IO(等待數(shù)據(jù)阻塞、數(shù)據(jù)從內(nèi)核復(fù)制到用戶空間阻塞)
2.1 曙光:同步與異步,阻塞與非阻塞
同步與異步
同步: 同步就是發(fā)起一個(gè)調(diào)用后,被調(diào)用者未處理完請求之前,調(diào)用不返回。
異步: 異步就是發(fā)起一個(gè)調(diào)用后,立刻得到被調(diào)用者的回應(yīng)表示已接收到請求,但是被調(diào)用者并沒有返回結(jié)果,此時(shí)我們可以處理其他的請求,被調(diào)用者通常依靠事件,回調(diào)等機(jī)制來通知調(diào)用者其返回結(jié)果。
小結(jié):同步和異步的區(qū)別最大在于異步的話調(diào)用者不需要等待處理結(jié)果,被調(diào)用者會(huì)通過回調(diào)等機(jī)制來通知調(diào)用者其返回結(jié)果。
阻塞和非阻塞
阻塞: 阻塞就是發(fā)起一個(gè)請求,調(diào)用者一直等待請求結(jié)果返回,也就是當(dāng)前線程會(huì)被掛起,無法從事其他任務(wù),只有當(dāng)條件就緒才能繼續(xù)。
非阻塞: 非阻塞就是發(fā)起一個(gè)請求,調(diào)用者不用一直等著結(jié)果返回,可以先去干其他事情。
小結(jié):同步和阻塞對應(yīng),異步和非阻塞對應(yīng),但是也存在同步非阻塞,間隔時(shí)間就調(diào)用。
tip1:同步與異步,對于響應(yīng)方來說的,同步響應(yīng)還是異步響應(yīng);阻塞和非阻塞,對于請求方來說的,阻塞請求方還是不阻塞請求方。
tip2:對于等待數(shù)據(jù)階段,Linux五種IO中,第一種同步阻塞,第二種同步非阻塞,第三種基于消息通信的異步非阻塞;對于等待數(shù)據(jù)階段+復(fù)制數(shù)據(jù)階段,Linux五種IO中,第一種到第四種都是同步阻塞,因?yàn)閺?fù)制數(shù)據(jù)的時(shí)間都是同步阻塞,只有第五種才是真正意義上的異步非阻塞。
2.2 概要:BIO
同步阻塞I/O模式,一個(gè)請求一個(gè)應(yīng)答,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成。
采用 BIO 通信模型 的服務(wù)端,通常由一個(gè)獨(dú)立的 Acceptor 線程負(fù)責(zé)監(jiān)聽客戶端的連接。我們一般通過在 while(true) 循環(huán)中服務(wù)端會(huì)調(diào)用 accept() 方法等待接收客戶端的連接的方式監(jiān)聽請求,請求一旦接收到一個(gè)連接請求,就可以建立通信套接字在這個(gè)通信套接字上進(jìn)行讀寫操作,此時(shí)不能再接收其他客戶端連接請求,只能等待同當(dāng)前連接的客戶端的操作執(zhí)行完成, 不過可以通過多線程來支持多個(gè)客戶端的連接,如上圖所示。
如果要讓 BIO 通信模型 能夠同時(shí)處理多個(gè)客戶端請求,就必須使用多線程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三個(gè)主要函數(shù)都是同步阻塞的),也就是說它在接收到客戶端連接請求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理,處理完成之后,通過輸出流返回應(yīng)答給客戶端,線程銷毀。這就是典型的 一請求一應(yīng)答通信模型 。我們可以設(shè)想一下如果這個(gè)連接不做任何事情的話就會(huì)造成不必要的線程開銷,不過可以通過 線程池機(jī)制 改善,線程池還可以讓線程的創(chuàng)建和回收成本相對較低。使用FixedThreadPool 可以有效的控制了線程的最大數(shù)量,保證了系統(tǒng)有限的資源的控制,實(shí)現(xiàn)了N(客戶端請求數(shù)量):M(處理客戶端請求的線程數(shù)量)的偽異步I/O模型(N 可以遠(yuǎn)遠(yuǎn)大于 M),下面一節(jié)"偽異步 BIO"中會(huì)詳細(xì)介紹到。
我們再設(shè)想一下當(dāng)客戶端并發(fā)訪問量增加后這種模型會(huì)出現(xiàn)什么問題?
在 Java 虛擬機(jī)中,線程是寶貴的資源,線程的創(chuàng)建和銷毀成本很高,除此之外,線程的切換成本也是很高的。尤其在 Linux 這樣的操作系統(tǒng)中,線程本質(zhì)上就是一個(gè)進(jìn)程,創(chuàng)建和銷毀線程都是重量級(jí)的系統(tǒng)函數(shù)。如果并發(fā)訪問量增加會(huì)導(dǎo)致線程數(shù)急劇膨脹可能會(huì)導(dǎo)致線程堆棧溢出、創(chuàng)建新線程失敗等問題,最終導(dǎo)致進(jìn)程宕機(jī)或者僵死,不能對外提供服務(wù)。
2.3 手寫B(tài)IO:服務(wù)端使用同步阻塞單線程的BIO,客戶端使用命令行連接
public class BIOServer { public static void main(String[] args) { try (ServerSocket serverSocket = new ServerSocket(8081)) { System.out.println("服務(wù)端端口號(hào):" + serverSocket.getLocalSocketAddress()); while (true) { Socket clientSocket = serverSocket.accept(); System.out.println("連接來自:" + clientSocket.getRemoteSocketAddress()); try (Scanner input = new Scanner(clientSocket.getInputStream())) { while (true) { String request = input.nextLine(); if ("quit".equals(request)) break; System.out.println("客戶端端口:" + clientSocket.getLocalSocketAddress()); System.out.println("客戶端輸入:" + request); String response = "服務(wù)端響應(yīng)" + request; clientSocket.getOutputStream().write(response.getBytes()); } } } } catch (Exception e) { e.printStackTrace(); } }}
運(yùn)行成功
2.4 概要:偽異步IO
為了解決同步阻塞I/O面臨的一個(gè)鏈路需要一個(gè)線程處理的問題,后來有人對它的線程模型進(jìn)行了優(yōu)化一一一后端通過一個(gè)線程池來處理多個(gè)客戶端的請求接入,形成客戶端個(gè)數(shù)M:線程池最大線程數(shù)N的比例關(guān)系,其中M可以遠(yuǎn)遠(yuǎn)大于N.通過線程池可以靈活地調(diào)配線程資源,設(shè)置線程的最大值,防止由于海量并發(fā)接入導(dǎo)致線程耗盡。
采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種叫做偽異步的 I/O 通信框架,它的模型圖如上圖所示。當(dāng)有新的客戶端接入時(shí),將客戶端的 Socket 封裝成一個(gè)Task(該任務(wù)實(shí)現(xiàn)java.lang.Runnable接口,變成一個(gè)可以多線程的類)投遞到后端的線程池中進(jìn)行處理,JDK 的線程池維護(hù)一個(gè)消息隊(duì)列和 N 個(gè)活躍線程,對消息隊(duì)列中的任務(wù)進(jìn)行處理。由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個(gè)客戶端并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡和宕機(jī)。
偽異步I/O通信框架采用了線程池實(shí)現(xiàn),因此避免了為每個(gè)請求都創(chuàng)建一個(gè)獨(dú)立線程造成的線程資源耗盡問題。不過因?yàn)樗牡讓尤稳皇峭阶枞腂IO模型,因此無法從根本上解決問題。
2.5 手寫偽異步IO:服務(wù)端使用同步阻塞多線程的BIO,客戶端使用命令行連接
public class ClientHandler implements Runnable { //這就是Task private final Socket clientSocket; // Task類中注入一個(gè)Socket 是客戶端socket public ClientHandler(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { //Task類中國的run()方法 try (Scanner input = new Scanner(clientSocket.getInputStream())) { //讀取輸入流 while (true) { String request = input.nextLine(); // 讀取輸入的數(shù)據(jù) if ("quit".equals(request)) break; //如果輸入的是quit,表示客戶端程序結(jié)束了,這邊結(jié)束服務(wù)端程序 System.out.println("客戶端端口:" + clientSocket.getLocalSocketAddress()+ " 客戶端輸入:" + request); // 打印收到的請求報(bào)文 String response = "server response: " + request; //組裝響應(yīng)體 clientSocket.getOutputStream().write(response.getBytes()); // 發(fā)送響應(yīng)體 } } catch (Exception e) { System.out.println(e); } }}public class ServerThreadPool { public static void main(String[] args){ ExecutorService executor= Executors.newFixedThreadPool(3); // 擁有三個(gè)線程的executor,在哪里使用 try(ServerSocket serverSocket=new ServerSocket(8082)){ // 新建服務(wù)端socket System.out.println("服務(wù)端端口號(hào):"+serverSocket.getLocalSocketAddress()); while(true){ Socket clientSocket=serverSocket.accept(); // 服務(wù)端socket接收請求 System.out.println("連接來自:"+clientSocket.getRemoteSocketAddress()); // 通過accept阻塞后,打印收到的請求 executor.execute(new ClientHandler(clientSocket)); // 這里使用executor,當(dāng)還有空閑線程的時(shí)候,使用空閑線程執(zhí)行run方法來執(zhí)行read write,沒有空閑線程,這里阻塞 } }catch (Exception e){ e.printStackTrace(); } }}其實(shí),Client不是一定要用命令行,Client代碼也可以自己寫出來。在客戶端創(chuàng)建多個(gè)線程依次連接服務(wù)端并向其發(fā)送"當(dāng)前時(shí)間+:hello world",服務(wù)端會(huì)為每個(gè)客戶端線程創(chuàng)建一個(gè)線程來處理,如下:
客戶端代碼代替命令行
public class IOClient { public static void main(String[] args) { // TODO 創(chuàng)建多個(gè)線程,模擬多個(gè)客戶端連接服務(wù)端 new Thread(() -> { try { Socket socket = new Socket("127.0.0.1", 3333); while (true) { try { // 沒休眠兩秒鐘,不斷write到服務(wù)端 socket.getOutputStream().write((new Date() + ": hello world").getBytes()); Thread.sleep(2000); } catch (Exception e) { } } } catch (IOException e) { } }).start(); }}服務(wù)端
public class IOServer { public static void main(String[] args) throws IOException { // TODO 服務(wù)端處理客戶端連接請求 ServerSocket serverSocket = new ServerSocket(3333); // 接收到客戶端連接請求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理 new Thread(() -> { while (true) { try { // 阻塞方法獲取新的連接 Socket socket = serverSocket.accept(); // 子線程接收請求,不會(huì)阻塞main線程 // 每一個(gè)新的連接都創(chuàng)建一個(gè)線程,負(fù)責(zé)讀取數(shù)據(jù),接受請求后,新建一個(gè)子線程來處理read write new Thread(() -> { try { int len; byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream(); // 按字節(jié)流方式讀取數(shù)據(jù) while ((len = inputStream.read(data)) != -1) { // 讀入數(shù)據(jù) System.out.println(new String(data, 0, len)); // 打印 } } catch (IOException e) { } }).start(); } catch (IOException e) { } } }).start(); }}小結(jié)
在活動(dòng)連接數(shù)不是特別高(小于單機(jī)1000)的情況下,這種模型是比較不錯(cuò)的,可以讓每一個(gè)連接專注于自己的 I/O 并且編程模型簡單,也不用過多考慮系統(tǒng)的過載、限流等問題。線程池本身就是一個(gè)天然的漏斗,可以緩沖一些系統(tǒng)處理不了的連接或請求。但是,當(dāng)面對十萬甚至百萬級(jí)連接的時(shí)候,傳統(tǒng)的 BIO 模型是無能為力的。因此,我們需要一種更高效的 I/O 處理模型來應(yīng)對更高的并發(fā)量。
三、手寫NIO(等待數(shù)據(jù)不阻塞、數(shù)據(jù)從內(nèi)核復(fù)制到用戶空間阻塞)
3.1 NIO:JDK4之后服務(wù)端采用同步非阻塞NIO(客戶端不變)
NIO是一種同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,對應(yīng) java.nio 包,提供了 Channel , Selector,Buffer等抽象。
NIO中的N可以理解為Non-blocking,不單純是New。它支持面向緩沖的,基于通道的I/O操作方法。 NIO提供了與傳統(tǒng)BIO模型中的 Socket 和 ServerSocket 相對應(yīng)的 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實(shí)現(xiàn),兩種通道都支持阻塞和非阻塞兩種模式。阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。對于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護(hù)性;對于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,應(yīng)使用 NIO 的非阻塞模式來開發(fā)。
tip1:服務(wù)端channel和客戶端channel:服務(wù)端只有啟動(dòng)的時(shí)候有一個(gè)channel,但是,服務(wù)端每收到一個(gè)客戶端連接,就有一個(gè)客戶端channel,n個(gè)客戶端連接就有n個(gè)客戶端channel,一個(gè)客戶端斷開連接會(huì)銷毀這個(gè)客戶端channel,同時(shí)取消在select中的selectionKey。
tip2:服務(wù)端channel和客戶端channel設(shè)置為非阻塞的時(shí)機(jī):服務(wù)端channel在啟動(dòng)五步中就設(shè)置為非阻塞,accept連接之后得到的客戶端channel設(shè)置為非阻塞;
3.2 服務(wù)端使用同步非阻塞單線程的NIO(Reactor單線程模式,一個(gè)線程接收請求,一個(gè)線程執(zhí)行操作),客戶端使用命令行連接
public class NioServer { //三要素 Channel Selector Buffer public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // channel.open() serverSocketChannel.configureBlocking(false); // 設(shè)置為阻塞為false serverSocketChannel.bind(new InetSocketAddress(8083)); // 設(shè)置服務(wù)端端口 System.out.println("服務(wù)端端口:" + serverSocketChannel.getLocalAddress()); // 打印服務(wù)端ip:port Selector selector = Selector.open(); // selector.open()得到一個(gè)selector對象 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// channel注冊到selector中 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//新建1024 Bytebuffer 緩沖區(qū) while (true) { // 服務(wù)端永不停止 int select = selector.select(); // selector選擇 這里阻塞,要有客戶端連接accept,或者客戶端輸入才能通過 if (0 == select) continue; // Set selectionKeys = selector.selectedKeys(); // 得到一個(gè)SelectKey集合 Iterator iterator = selectionKeys.iterator(); //得到對應(yīng)的迭代器,用來遍歷 while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 遍歷每一個(gè)key if (key.isAcceptable()) { // key是可接受的那就接受 key.isAcceptable() 和 channel.accept() ServerSocketChannel channel = (ServerSocketChannel) key.channel(); // 服務(wù)端的channel SocketChannel clientChannel = channel.accept(); // 客戶端的SocketChannel,類似io中的Socket System.out.println("客戶端接口:" + clientChannel.getRemoteAddress()); // 從客戶端的SocketChannel打印客戶端ip:port clientChannel.configureBlocking(false); // 客戶端channel設(shè)置阻塞為false clientChannel.register(selector, SelectionKey.OP_READ); // 客戶端Socketchannel注冊到selector } if (key.isReadable()) { // 是可讀的就讀 key.isReadable() 到 channel.read(byteBuffer) SocketChannel channel = (SocketChannel) key.channel(); channel.read(byteBuffer); // 讀取buffer里面的內(nèi)容,從客戶端讀入,這個(gè)key已經(jīng)建立連接,所以使用channel讀入 String request = new String(byteBuffer.array()).trim(); byteBuffer.clear(); // 每次循環(huán)之前或之后要清空buffer System.out.println("客戶端端口:" + channel.getRemoteAddress() + " 客戶端輸入:" + request); String response = "server request" + request; channel.write(ByteBuffer.wrap(response.getBytes())); // 寫出到客戶端 } iterator.remove(); //一定要使用迭代器刪除 } } }}運(yùn)行結(jié)果:
實(shí)際上,accept和讀寫操作可以交給子線程來完成,不一定要在main線程上完成,如下:
public class NIOServer { public static void main(String[] args) throws IOException { // 1. serverSelector負(fù)責(zé)輪詢是否有新的連接,服務(wù)端監(jiān)測到新的連接之后,不再創(chuàng)建一個(gè)新的線程, // 而是直接將新連接綁定到clientSelector上,這樣就不用 IO 模型中 1w 個(gè) while 循環(huán)在死等 Selector serverSelector = Selector.open(); // 2. clientSelector負(fù)責(zé)輪詢連接是否有數(shù)據(jù)可讀 Selector clientSelector = Selector.open(); new Thread(() -> { try { // 對應(yīng)IO編程中服務(wù)端啟動(dòng) ServerSocketChannel listenerChannel = ServerSocketChannel.open(); // 打開 listenerChannel.socket().bind(new InetSocketAddress(3333)); listenerChannel.configureBlocking(false); listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT); // 服務(wù)端channel注冊到服務(wù)端selector上面去 while (true) { // 監(jiān)測是否有新的連接,這里的1指的是阻塞的時(shí)間為 1ms if (serverSelector.select(1) > 0) { // 服務(wù)端select Set set = serverSelector.selectedKeys(); Iterator keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { try { // (1) 每來一個(gè)新連接,不需要?jiǎng)?chuàng)建一個(gè)線程,而是直接注冊到clientSelector SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(clientSelector, SelectionKey.OP_READ); // 客戶端selector注冊到客戶端selector上面去 } finally { keyIterator.remove(); } } } } } } catch (IOException ignored) { } }).start(); new Thread(() -> { // 這個(gè)線程負(fù)責(zé)客戶端讀寫 try { while (true) { // (2) 批量輪詢是否有哪些連接有數(shù)據(jù)可讀,這里的1指的是阻塞的時(shí)間為 1ms if (clientSelector.select(1) > 0) { Set set = clientSelector.selectedKeys(); Iterator keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isReadable()) { try { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // (3) 面向 Buffer clientChannel.read(byteBuffer); byteBuffer.flip(); System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer).toString()); } finally { keyIterator.remove(); key.interestOps(SelectionKey.OP_READ); } } } } } } catch (IOException ignored) { } }).start(); }}3.3 合理封裝:Reactor單線程模型的NIO,一個(gè)線程接收請求accept,一個(gè)線程執(zhí)行IO操作read-write
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.util.Iterator;import java.util.Set;public class NIOServer { private static final Charset charset = Charset.forName("utf-8"); private static Selector selector ; static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { try { // channel起手四句:ServerSocketChannel.open(); channel設(shè)置阻塞為false // channel.bind 端口號(hào) channel.register(selector) ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(8080)); serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); new Reactor().start(); } catch (IOException e) { e.printStackTrace(); } } private static class Reactor extends Thread{ @Override public void run() { try { select(); } catch (IOException e) { e.printStackTrace(); } } private void select() throws IOException{ while (selector.select()>0){ //如果沒有準(zhǔn)備好的通道,這里會(huì)阻塞住,減少CPU消耗,有準(zhǔn)備好的通過,馬上使用 Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if (key.isAcceptable()){ accept(key); }else if (key.isReadable()){ read(key); } iterator.remove(); //必須要從集合中移除!否則下次事件發(fā)生時(shí)不會(huì)被感知到 } } } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(100); int num = channel.read(byteBuffer); if (num > 0) { byteBuffer.flip(); String data = charset.decode(byteBuffer).toString(); // 解析出來 System.out.println(data); //打印出來 }else if (num == -1){ // num=-1代表連接已關(guān)閉 channel.close(); } } private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); if (channel != null) { InetSocketAddress localAddress = (InetSocketAddress) channel.getLocalAddress(); String hostName = localAddress.getHostName(); System.out.println("接收到來自" + hostName + "的請求"); channel.configureBlocking(false); //監(jiān)聽這個(gè)接收到的socket channel.register(selector, SelectionKey.OP_READ); // 注冊到客戶端channel上面去 } } }}注意1:這里將所有發(fā)生的事件都交給單個(gè)線程去處理,如果性能不夠,可以開個(gè)線程池去處理事件,
比如,Reactor三種模式:單線程、線程池、主從線程池。
注意2:這里的select模型和redis單線程處理多個(gè)連接請求有相似之處;
注意3:解釋一下 while (selector.select()>0)
對于while (selector.select()>0)這句,表示如果沒有準(zhǔn)備好的通道,這里會(huì)阻塞住,減少CPU消耗,有準(zhǔn)備好的通過,馬上使用。
問題:為什么大家都不愿意用 JDK 原生 NIO 進(jìn)行開發(fā)呢?
回答:
1、原生NIO開發(fā)代碼比較復(fù)雜,需要熟練Channel Buffer Selector的使用,增加程序員的負(fù)擔(dān);
2、JDK 的 NIO 底層由 epoll 實(shí)現(xiàn),該實(shí)現(xiàn)飽受詬病的空輪詢 bug 會(huì)導(dǎo)致 cpu 飆升 100%;
3、已封裝的框架做好了,Netty 的出現(xiàn)很大程度上改善了 JDK 原生 NIO 所存在的一些讓人難以忍受的問題,完成封裝好了NIO的操作,提供了Reactor三種模式的實(shí)現(xiàn)。
四、手寫AIO(等待數(shù)據(jù)不阻塞、數(shù)據(jù)從內(nèi)核復(fù)制到用戶空間不阻塞)
4.1 概要:AIO
AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改進(jìn)版 NIO 2,它是異步非阻塞的IO模型。異步 IO 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里,當(dāng)后臺(tái)處理完成,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作。
AIO 是異步IO的縮寫,雖然 NIO 在網(wǎng)絡(luò)操作中,提供了非阻塞的方法,但是 NIO 的 IO 行為還是同步的。對于 NIO 來說,我們的業(yè)務(wù)線程是在 IO 操作準(zhǔn)備好時(shí),得到通知,接著就由這個(gè)線程自行進(jìn)行 IO 操作,IO操作本身是同步的。
4.2 手寫AIO
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;public class AIOServer { public void startListen(int port) throws InterruptedException { try { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.accept(null,new CompletionHandler() { @Override public void completed(AsynchronousSocketChannel socketChannel, Void attachment) { serverSocketChannel.accept(null,this); //收到連接后,應(yīng)該調(diào)用accept方法等待新的連接進(jìn)來 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer,byteBuffer, new CompletionHandler() { @Override public void completed(Integer num, ByteBuffer attachment) { if (num > 0){ attachment.flip(); System.out.println(new String(attachment.array()).trim()); }else { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("read error"); exc.printStackTrace(); } }); } @Override public void failed(Throwable exc, Void attachment) { System.out.println("accept error"); exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); }//模擬去做其他事情 while (true){ Thread.sleep(1000); } } public static void main(String[] args) throws InterruptedException { AIOServer aioServer = new AIOServer(); aioServer.startListen(8080); }}五、面試金手指
5.1 BIO
BIO中是accept read write都是同步阻塞操作,這是無法改變,所有要改善bio,必須使用線程池,但是線程的創(chuàng)建和銷毀的代價(jià)比較大,特別是對于linux這種,一個(gè)線程就是一個(gè)進(jìn)程的,但是沒辦法,只有這樣一種方式。
BIO中是accept read write都是同步阻塞操作:同步阻塞I/O模式,一個(gè)請求一個(gè)應(yīng)答,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成。采用 BIO 通信模型 的服務(wù)端,通常由一個(gè)獨(dú)立的 Acceptor 線程負(fù)責(zé)監(jiān)聽客戶端的連接。我們一般通過在 while(true) 循環(huán)中服務(wù)端會(huì)調(diào)用 accept() 方法等待接收客戶端的連接的方式監(jiān)聽請求,請求一旦接收到一個(gè)連接請求,就可以建立通信套接字在這個(gè)通信套接字上進(jìn)行讀寫操作,此時(shí)不能再接收其他客戶端連接請求,只能等待同當(dāng)前連接的客戶端的操作執(zhí)行完成, 不過可以通過多線程來支持多個(gè)客戶端的連接。
多線程,就是偽異步IO:如果要讓 BIO 通信模型 能夠同時(shí)處理多個(gè)客戶端請求,就必須使用多線程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三個(gè)主要函數(shù)都是同步阻塞的),也就是說它在接收到客戶端連接請求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理,處理完成之后,通過輸出流返回應(yīng)答給客戶端,線程銷毀。這就是典型的 一請求一應(yīng)答通信模型 。我們可以設(shè)想一下如果這個(gè)連接不做任何事情的話就會(huì)造成不必要的線程開銷,不過可以通過 線程池機(jī)制 改善,線程池還可以讓線程的創(chuàng)建和回收成本相對較低。使用FixedThreadPool 可以有效的控制了線程的最大數(shù)量,保證了系統(tǒng)有限的資源的控制,實(shí)現(xiàn)了N(客戶端請求數(shù)量):M(處理客戶端請求的線程數(shù)量)的偽異步I/O模型(N 可以遠(yuǎn)遠(yuǎn)大于 M),下面一節(jié)"偽異步 BIO"中會(huì)詳細(xì)介紹到。
多線程,偽異步IO的代價(jià)大,但是沒辦法:在 Java 虛擬機(jī)中,線程是寶貴的資源,線程的創(chuàng)建和銷毀成本很高,除此之外,線程的切換成本也是很高的。尤其在 Linux 這樣的操作系統(tǒng)中,線程本質(zhì)上就是一個(gè)進(jìn)程,創(chuàng)建和銷毀線程都是重量級(jí)的系統(tǒng)函數(shù)。如果并發(fā)訪問量增加會(huì)導(dǎo)致線程數(shù)急劇膨脹可能會(huì)導(dǎo)致線程堆棧溢出、創(chuàng)建新線程失敗等問題,最終導(dǎo)致進(jìn)程宕機(jī)或者僵死,不能對外提供服務(wù)。
5.2 偽異步IO
線程池:為了解決同步阻塞I/O面臨的一個(gè)鏈路需要一個(gè)線程處理的問題,后來有人對它的線程模型進(jìn)行了優(yōu)化一一一后端通過一個(gè)線程池來處理多個(gè)客戶端的請求接入,形成客戶端個(gè)數(shù)M:線程池最大線程數(shù)N的比例關(guān)系,其中M可以遠(yuǎn)遠(yuǎn)大于N.通過線程池可以靈活地調(diào)配線程資源,設(shè)置線程的最大值,防止由于海量并發(fā)接入導(dǎo)致線程耗盡。
偽異步IO:采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種叫做偽異步的 I/O 通信框架,它的模型圖如上圖所示。當(dāng)有新的客戶端接入時(shí),將客戶端的 Socket 封裝成一個(gè)Task(該任務(wù)實(shí)現(xiàn)java.lang.Runnable接口,變成一個(gè)可以多線程的類)投遞到后端的線程池中進(jìn)行處理,JDK 的線程池維護(hù)一個(gè)消息隊(duì)列和 N 個(gè)活躍線程,對消息隊(duì)列中的任務(wù)進(jìn)行處理。由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個(gè)客戶端并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡和宕機(jī)。
偽異步IO還是BIO:偽異步I/O通信框架采用了線程池實(shí)現(xiàn),因此避免了為每個(gè)請求都創(chuàng)建一個(gè)獨(dú)立線程造成的線程資源耗盡問題。不過因?yàn)樗牡讓尤稳皇峭阶枞腂IO模型,因此無法從根本上解決問題。
5.3 NIO
NIO中的N可以理解為Non-blocking,不單純是New。它支持面向緩沖的,基于通道的I/O操作方法。NIO提供了與傳統(tǒng)BIO模型中的 Socket 和 ServerSocket 相對應(yīng)的 SocketChannel 和ServerSocketChannel
兩種不同的套接字通道實(shí)現(xiàn),兩種通道都支持阻塞和非阻塞兩種模式。阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。對于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護(hù)性;對于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,應(yīng)使用NIO 的非阻塞模式來開發(fā)。
5.4 NIO與IO的區(qū)別
問題:NIO與IO區(qū)別?
回答:首先,從 NIO 流是同步非阻塞,而 IO 流是同步阻塞 IO 說起;然后,從 NIO 的3個(gè)核心組件/特性為 NIO帶來的一些改進(jìn)來分析。
第一,IO流是阻塞的,NIO流是不阻塞的。
(1)Java IO的各種流是阻塞的。這意味著,當(dāng)一個(gè)線程調(diào)用 accept() read() 或 write()時(shí),該線程被阻塞,直到有一些數(shù)據(jù)被讀取,或數(shù)據(jù)完全寫入。該線程在此期間不能再干任何事情了。
(2)Java NIO使我們可以進(jìn)行非阻塞IO操作。
非阻塞讀read:單線程中從通道讀取數(shù)據(jù)到buffer,同時(shí)可以繼續(xù)做別的事情,當(dāng)數(shù)據(jù)讀取到buffer中后,線程再繼續(xù)處理數(shù)據(jù)。
非阻塞寫write:一個(gè)線程請求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個(gè)線程同時(shí)可以去做別的事情。
第二,NIO三要素Buffer(緩沖區(qū)) IO 面向流(Stream oriented),而 NIO 面向緩沖區(qū)(Buffer oriented)。
Buffer是一個(gè)對象,它包含一些要寫入或者要讀出的數(shù)據(jù)。在NIO類庫中加入Buffer對象,體現(xiàn)了新庫與原I/O的一個(gè)重要區(qū)別。在面向流的I/O中,可以將數(shù)據(jù)直接寫入或者將數(shù)據(jù)直接讀到Stream**對象中。雖然 Stream 中也有 Buffer 開頭的擴(kuò)展類,但只是流的包裝類,還是從流讀到緩沖區(qū),而 NIO 卻是直接讀到 Buffer 中進(jìn)行操作。
在NIO厙中,所有數(shù)據(jù)都是用緩沖區(qū)buffer處理的。在讀取數(shù)據(jù)時(shí),它是直接讀到緩沖區(qū)中的;在寫入數(shù)據(jù)時(shí),寫入到緩沖區(qū)中。任何時(shí)候訪問NIO中的數(shù)據(jù),都是通過緩沖區(qū)進(jìn)行操作。最常用的緩沖區(qū)是 ByteBuffer,一個(gè) ByteBuffer 提供了一組功能用于操作 byte 數(shù)組。除了ByteBuffer,還有其他的一些緩沖區(qū),事實(shí)上,每一種Java基本類型(除了Boolean類型)都對應(yīng)有一種緩沖區(qū)。
值得注意的是,Buffer只對執(zhí)行IO操作的線程有用(read/write),對執(zhí)行連接的線程是沒有用的(accept)。
第三,NIO三要素Channel (通道),NIO 通過Channel(通道) 進(jìn)行讀寫。
通道是雙向的,可讀也可寫,而流的讀寫是單向的。無論讀寫,通道channel 只能和Buffer交互。因?yàn)?Buffer,通道可以異步地讀寫。
在這里,需要知道的是,NIO中的所有IO都是從 Channel(通道) 開始的。
從通道進(jìn)行數(shù)據(jù)讀取 :創(chuàng)建一個(gè)緩沖區(qū),然后請求通道讀取數(shù)據(jù)。
從通道進(jìn)行數(shù)據(jù)寫入 :創(chuàng)建一個(gè)緩沖區(qū),填充數(shù)據(jù),并要求通道寫入數(shù)據(jù)。
如下圖所示:
第四,NIO三要素Selectors(選擇器) NIO有選擇器,而IO沒有。
選擇器用于使用單個(gè)線程處理多個(gè)通道,因此,它需要較少的線程來處理這些通道。另一方面,線程之間的切換對于操作系統(tǒng)來說是昂貴的,因此,為了提高系統(tǒng)效率選擇器是有用的。操作過程中,Channel注冊到選擇器中,如下圖所示:
小結(jié):從BIO到NIO,分下NIO三要素:Buffer Channel Selector
Buffer:表示滿了才讀,這就是linux五種IO中的同步非阻塞IO,即recvfrom輪詢;
Channel:ServerSocket 變?yōu)?ServerSocketChannel,Socket變?yōu)镾ocketChannel;
Selector:用來選擇Channel,所有的Channel都注冊在Selector上面。
5.5 AIO
異步 IO 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里,當(dāng)后臺(tái)處理完成,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作。
AIO 是異步IO的縮寫,雖然 NIO 在網(wǎng)絡(luò)操作中,提供了非阻塞的方法,但是 NIO 的 IO 行為還是同步的。對于 NIO 來說,我們的業(yè)務(wù)線程是在 IO 操作準(zhǔn)備好時(shí),得到通知,接著就由這個(gè)線程自行進(jìn)行 IO 操作,IO操作本身是同步的。
小結(jié):三種IO要完成的功能是一樣的:BIO也是accept read write三個(gè)操作,NIO也是accept read write三個(gè)操作,AIO也是accept read write三個(gè)操作,任何io都是accept read write三個(gè)操作。
六、尾聲
不斷升級(jí),Java之BIO、NIO、AIO的演變,完成了。
天天打碼,天天進(jìn)步!!!
如果覺得本文有用,可以關(guān)注+轉(zhuǎn)發(fā),您的鼓勵(lì)就是我創(chuàng)作的最大動(dòng)力。
總結(jié)
以上是生活随笔為你收集整理的bio linux 创建_不断升级,Java之BIO、NIO、AIO的演变的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 林肯大陆挂挡不走怎么办
- 下一篇: 5位随机数重复的概率 php_php防止