你对Java网络编程了解的如何?Java NIO 网络编程 | Netty前期知识(二)
本文主要講解NIO的簡(jiǎn)介、NIO和傳統(tǒng)阻塞I/O有什么區(qū)別、NIO模型和傳統(tǒng)I/O模型之間的對(duì)比、以及圍繞NIO的三大組件來(lái)講解,理論代碼相結(jié)合。
很喜歡一句話:"沉下去,再浮上來(lái)"。
我想我們會(huì)變的不一樣。
一、Java NIO 簡(jiǎn)介
在 Java 1.4 中引入了 NIO 框架(java.nio 包),提供了 Channel、Selector、Buffer 等新的抽象,可以構(gòu)建多路復(fù)用的、同步非阻塞 IO 程序,同時(shí)提供了更接近操作系統(tǒng)底層的高性能數(shù)據(jù)操作方式
同步非阻塞:
Java NIO 的非阻塞模式:
- 非阻塞讀:一個(gè)線程從某一個(gè)通道發(fā)送請(qǐng)求或者讀取數(shù)據(jù)時(shí),它僅能得到目前可用的數(shù)據(jù),如果目前沒(méi)有數(shù)據(jù)可用時(shí),就什么都不會(huì)獲取,但是并不會(huì)像原生IO一樣,阻塞等待著,反而是直到數(shù)據(jù)變得可以讀取之前,此線程可以去繼續(xù)做其他事情。
- 非阻塞寫(xiě):和非阻塞讀一樣,一個(gè)線程發(fā)送請(qǐng)求寫(xiě)入一些數(shù)據(jù)到某通道,在等待它完成寫(xiě)入這段時(shí)間中,無(wú)需阻塞等待,這個(gè)線程可以同時(shí)去做其他的事情。
- 總結(jié)起來(lái)就是NIO 是可以做到用一個(gè)線程來(lái)處理多個(gè)操作的。
- 通俗理解:NIO 是可以做到用一個(gè)線程來(lái)處理多個(gè)操作的。假設(shè)有 10000 個(gè)請(qǐng)求過(guò)來(lái),根據(jù)實(shí)際情況,可以分配 50 或者 100 個(gè)線程來(lái)處理。不像之前的阻塞 IO 那樣,非得分配 10000 個(gè)。
Java NIO 由以下幾個(gè)核心部分組成:
- Channels (通道)
- Buffers (緩沖區(qū))
- Selector (選擇器)
雖然Java NIO除此之外,仍有很多類和組件,但是總的來(lái)說(shuō)仍然是靠Channel,Buffer 和 Selector 構(gòu)成了核心的API。其余更多的是為了能讓這三個(gè)核心組件更方便的使用。之后的講解也會(huì)偏向于此。三個(gè)核心部分模型圖:
簡(jiǎn)單說(shuō)明:
二、傳統(tǒng)阻塞BIO存在的問(wèn)題
我們先來(lái)回憶一下傳統(tǒng)阻塞IO的經(jīng)典編程模型:
public static void main(String[] args) throws Exception {//1. 創(chuàng)建一個(gè)線程池ExecutorService newCachedThreadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//2、創(chuàng)建ServerSocketServerSocket serverSocket = new ServerSocket(8888);while (true) {//3.偵聽(tīng)要與此套接字建立的連接并接受它。 該方法阻塞,直到建立連接。final Socket socket = serverSocket.accept();//4、就創(chuàng)建一個(gè)線程,與之通訊(單獨(dú)寫(xiě)一個(gè)方法)newCachedThreadPool.execute(() -> {//可以和客戶端通訊handler(socket);});} }/*** 編寫(xiě)一個(gè)handler方法,和客戶端通訊,讀取客戶端發(fā)過(guò)來(lái)的信息* @param socket*/ public static void handler(Socket socket) {try {byte[] bytes = new byte[1024];//通過(guò)socket獲取輸入流InputStream inputStream = socket.getInputStream();//循環(huán)的讀取客戶端發(fā)送的數(shù)據(jù)while (true) {int read = inputStream.read(bytes);if (read != -1) {//輸出客戶端發(fā)送的數(shù)據(jù)System.out.println(new String(bytes, 0, read));} else {break;}}} catch (Exception e) {e.printStackTrace();} finally {System.out.println("關(guān)閉和client的連接");try {socket.close();} catch (Exception e) {e.printStackTrace();}} }這是一個(gè)經(jīng)典的一個(gè)連接一個(gè)線程的模型,使用多線程的主要原因在于socket.accept()、socket.read()、socket.write()三個(gè)主要函數(shù)都是同步阻塞的,即沒(méi)有成功連接或沒(méi)有數(shù)據(jù)讀、寫(xiě)時(shí),系統(tǒng)都是阻塞,在這種情況下如果是單線程的話就會(huì)一直阻塞在哪里;但CPU是被釋放出來(lái)的,開(kāi)啟多線程,就可以讓CPU去處理更多的事情。
不過(guò),這個(gè)模型是存在問(wèn)題的,線程池,雖然可以讓線程的創(chuàng)建和回收成本變低。但是線程池本生是有局限的的,在并發(fā)數(shù)并不是特別高的時(shí)候(小于單機(jī)1000),使用線程池搭用這種模型還是可行的,并竟線程池是一個(gè)天然的漏斗,不用過(guò)多考慮負(fù)載、限流等問(wèn)題,編程也簡(jiǎn)單。但是仍然是解決不了根本問(wèn)題的,根本問(wèn)題在于嚴(yán)重的依賴于線程。但是大家都知道哈,線程是個(gè)特別貴的資源。
為什么說(shuō)線程貴呢?
主要表現(xiàn)在以下幾個(gè)方面:
所以說(shuō),使用BIO面臨十萬(wàn)或百萬(wàn)請(qǐng)求時(shí),是無(wú)能為力的。所以必然需要更加高效的I/O處理模型。
三、NIO和BIO的區(qū)別
NIO適用于連接數(shù)目多且連接比較短的架構(gòu),比如聊天服務(wù)器,彈幕系統(tǒng),服務(wù)器間通訊等,編程比較復(fù)雜,JDK1.4開(kāi)始支持。
四、I/O模型之間的對(duì)比
學(xué)習(xí)I/O,I/O模型是必須知道的一點(diǎn)。I/O模型共有五種,分別是:
不過(guò)在這只描述了前三種。
傳統(tǒng)阻塞式I/O
模型圖如下:Java中原生的IO便是這種。
特點(diǎn):
當(dāng)用戶線程發(fā)出IO請(qǐng)求之后,內(nèi)核會(huì)去查看數(shù)據(jù)是否就緒,如果沒(méi)有就緒就會(huì)等待數(shù)據(jù)就緒,而用戶線程就會(huì)處于阻塞狀態(tài),用戶線程交出CPU。當(dāng)數(shù)據(jù)就緒之后,內(nèi)核會(huì)將數(shù)據(jù)拷貝到用戶線程,并返回結(jié)果給用戶線程,用戶線程才解除block狀態(tài)。
非阻塞式I/O
模型圖如下:
特點(diǎn):
當(dāng)用戶線程發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。如果結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒(méi)有準(zhǔn)備好,于是就返回到用戶進(jìn)程去執(zhí)行其他任務(wù),等過(guò)一段時(shí)間后在去查看數(shù)據(jù)是否準(zhǔn)備好。一旦內(nèi)核中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶線程的請(qǐng)求,那么它馬上就將數(shù)據(jù)拷貝到了用戶線程,然后返回。所以事實(shí)上,在非阻塞IO模型中,用戶線程需要不斷地詢問(wèn)內(nèi)核數(shù)據(jù)是否就緒,也就說(shuō)非阻塞IO不會(huì)交出CPU,而會(huì)一直占用CPU。
注意:對(duì)于非阻塞IO也有一個(gè)非常嚴(yán)重的問(wèn)題,在while循環(huán)中需要不斷地去詢問(wèn)內(nèi)核數(shù)據(jù)是否就緒,這樣會(huì)導(dǎo)致CPU占用率非常高,因此一般情況下很少使用while循環(huán)方式來(lái)讀取數(shù)據(jù)。
多路復(fù)用I/O模型
Java NIO使用的即是這種模型。
多路復(fù)用IO主要用于處理多個(gè)IO連接時(shí)候的場(chǎng)景。在多路復(fù)用IO模型中,會(huì)有一個(gè)線程不斷去輪詢多個(gè)socket的狀態(tài),只有當(dāng)socket真正有讀寫(xiě)事件時(shí),才會(huì)真正調(diào)用實(shí)際的IO讀寫(xiě)操作。因?yàn)樵诙嗦窂?fù)用IO模型中,只需使用一個(gè)線程就可以管理多個(gè)socket,系統(tǒng)無(wú)需重復(fù)建立新的線程和銷毀線程,也不必維護(hù)等,另一方面同時(shí)也避免了多線程之間的上下文切換導(dǎo)致的開(kāi)銷。并且只有在真正有socket讀寫(xiě)事件進(jìn)行時(shí),才會(huì)使用IO資源,大大減少了資源占用,極大的提高了效率。
I/O多路復(fù)用比傳統(tǒng)I/O好在哪里?
也許有小伙伴會(huì)說(shuō),我可以采用多線程+ 阻塞IO達(dá)到類似的效果,但是你需要考慮到多線程+阻塞IO,沒(méi)有改變的是每個(gè)socket還是對(duì)應(yīng)著一個(gè)線程,仍然無(wú)法解決根本問(wèn)題,并且尤其是對(duì)于長(zhǎng)連接來(lái)說(shuō),線程的資源一直不釋放,如果后面陸續(xù)還有很多連接的話,就會(huì)造成系統(tǒng)的極大壓力。
而使用多路復(fù)用IO模式,通過(guò)一個(gè)線程就可以去管理多個(gè)socket,并且只有當(dāng)socket真正有讀寫(xiě)事件發(fā)生才會(huì)占用資源來(lái)進(jìn)行實(shí)際的讀寫(xiě)操作。所以多路復(fù)用IO模式一方面減少了創(chuàng)建和銷毀線程的操作,同時(shí)也避免了多線程之間切換的開(kāi)銷,并且提高了并發(fā)數(shù)。
I/O多路復(fù)用為何比非阻塞式I/O模型效率高?
因?yàn)樵诜亲枞鸌O中,不斷地詢問(wèn)socket狀態(tài)是通過(guò)用戶線程去進(jìn)行的,但是在多路復(fù)用IO模型中,輪詢每個(gè)socket狀態(tài)是內(nèi)核在進(jìn)行的,這個(gè)效率要比用戶線程要高的多。
注意事項(xiàng):
多路復(fù)用IO模型是通過(guò)輪詢的方式來(lái)檢測(cè)是否有事件到達(dá),并且對(duì)到達(dá)的事件逐一進(jìn)行響應(yīng)。因此對(duì)于多路復(fù)用IO模型來(lái)說(shuō),如果一旦事件響應(yīng)體很大,那么就有可能會(huì)導(dǎo)致后續(xù)的事件遲遲得不到處理,并且會(huì)影響到新的事件輪詢。
在Java NIO中的理解是:
Selector就是一個(gè)socket選擇器,它不停地查看所有與他綁定的socket是否準(zhǔn)備完成,哪一個(gè)io準(zhǔn)備完成,它就會(huì)處理對(duì)應(yīng)的channel。
但是終究選擇什么樣的IO模型,還是需要根據(jù)實(shí)際問(wèn)題實(shí)際分析的。
五、三大核心組件 | Buffer
基本介紹
Java NIO中的Buffer用于和NIO通道進(jìn)行交互。數(shù)據(jù)是從通道讀入緩沖區(qū),從緩沖區(qū)寫(xiě)入到通道中的。
緩沖區(qū)本質(zhì)上是一塊可以寫(xiě)入數(shù)據(jù),然后可以從中讀取數(shù)據(jù)的內(nèi)存。這塊內(nèi)存被包裝成NIO Buffer對(duì)象,并提供了一組方法,用來(lái)方便的訪問(wèn)該塊內(nèi)存。
Buffer結(jié)構(gòu)
Buffer是一個(gè)頂級(jí)的抽象類,底下有很多的實(shí)現(xiàn)類,這里是小小的一個(gè)圖哈。從這張圖可用看出Java 中的基本數(shù)據(jù)類型(boolean 除外),都有一個(gè)Buffer對(duì)應(yīng),不過(guò)其中ByteBuffer使用的最多。
Buffer API:
public abstract class Buffer {public final int capacity() ;//返回此緩沖區(qū)的容量。public final int position();//返回此緩沖區(qū)的位置。public Buffer position(int newPosition) ;//設(shè)置此緩沖區(qū)的位置。 如果標(biāo)記已定義且大于新位置,則將其丟棄。public final int limit();//返回此緩沖區(qū)的限制。public Buffer limit(int newLimit) ;//設(shè)置此緩沖區(qū)的限制。 public Buffer mark() ;//在其位置設(shè)置此緩沖區(qū)的標(biāo)記。public Buffer reset();//將此緩沖區(qū)的位置重置為先前標(biāo)記的位置。public Buffer clear() ;//清除此緩沖區(qū)。 將位置設(shè)置為零,將限制設(shè)置為容量,并丟棄標(biāo)記。public Buffer flip();//翻轉(zhuǎn)這個(gè)緩沖區(qū)。 將限制設(shè)置為當(dāng)前位置,然后將位置設(shè)置為零。 如果定義了標(biāo)記,則將其丟棄。public Buffer rewind();//倒帶此緩沖區(qū)。 位置設(shè)置為零,標(biāo)記被丟棄。public final int remaining() ;//返回當(dāng)前位置和限制之間的元素?cái)?shù)。public final boolean hasRemaining();//告訴當(dāng)前位置和限制之間是否有任何元素。public abstract boolean isReadOnly();//告訴這個(gè)緩沖區(qū)是否是只讀的。// -----------jdk1.6引入----------public abstract boolean hasArray();//告訴此緩沖區(qū)是否由可訪問(wèn)數(shù)組支持。public abstract Object array();//返回支持此緩沖區(qū)的數(shù)組(可選操作) 。public abstract int arrayOffset();//返回緩沖區(qū)第一個(gè)元素在此緩沖區(qū)的后備數(shù)組中的偏移量public abstract boolean isDirect();//判斷此緩沖區(qū)是否為direct 。 }六、三大核心組件 | Channel
基本介紹
Java NIO的通道類似流,但又有些不同:
- 既可以從通道中讀取數(shù)據(jù),又可以寫(xiě)數(shù)據(jù)到通道。但流的讀寫(xiě)通常是單向的。
- 通道可以異步地讀寫(xiě)。
- 通道中的數(shù)據(jù)總是要先讀到一個(gè)Buffer,或者總是要從一個(gè)Buffer中寫(xiě)入。
常用Channel的實(shí)現(xiàn)類
以下是Java NIO中最常用Channel的通道的實(shí)現(xiàn):
- FileChannel 從文件中讀寫(xiě)數(shù)據(jù)。
- DatagramChannel 能通過(guò)UDP讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。
- SocketChannel 能通過(guò)TCP讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。
- ServerSocketChannel可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接,像Web服務(wù)器那樣。對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè)SocketChannel。
結(jié)構(gòu)圖:
小小的應(yīng)用案例:
場(chǎng)景:使用一個(gè)Buffer和FileChannel完成文件讀取、寫(xiě)入。
my.txt—>you.txt
public class NIOFileChannel {public static void main(String[] args) throws Exception {FileInputStream fileInputStream = new FileInputStream("my.txt");FileChannel fileChannel01 = fileInputStream.getChannel();FileOutputStream fileOutputStream = new FileOutputStream("you.txt");FileChannel fileChannel02 = fileOutputStream.getChannel();ByteBuffer byteBuffer = ByteBuffer.allocate(512);//循環(huán)讀取while (true) {//清空 buffer 緩沖區(qū)byteBuffer.clear(); int read = fileChannel01.read(byteBuffer);System.out.println("read = " + read);//表示讀完if (read == -1) { break;}//將 buffer 中的數(shù)據(jù)寫(xiě)入到 fileChannel02--2.txtbyteBuffer.flip();fileChannel02.write(byteBuffer);}//關(guān)閉相關(guān)的流fileInputStream.close();fileOutputStream.close();} }七、三大核心組件 | Selector
基本介紹
Selector(選擇器)是Java NIO中能夠檢測(cè)一到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫(xiě)事件做好準(zhǔn)備的組件。這樣,才使得一個(gè)單獨(dú)的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接。
在這里只要知道使用Selector能夠處理多個(gè)通道就足夠了,其他的暫時(shí)先不去瞄了哈。
模型圖:
Selector | API:
public abstract class Selector implements Closeable {public static Selector open() ;//打開(kāi)一個(gè)選擇器。得到一個(gè)選擇器對(duì)象public abstract boolean isOpen();//告訴這個(gè)選擇器是否打開(kāi)public abstract SelectorProvider provider(); //返回創(chuàng)建此頻道的提供者。public abstract Set<SelectionKey> keys(); //返回此選擇器的鍵集public abstract Set<SelectionKey> selectedKeys();//返回此選擇器的選定鍵集。public abstract int selectNow() throws IOException;//不阻塞,立馬返還 ,其對(duì)應(yīng)的通道已準(zhǔn)備好進(jìn)行 I/O 操作。public abstract int select(long timeout) throws IOException;//監(jiān)控所有注冊(cè)的通道,當(dāng)其中有IO操作時(shí),將對(duì)應(yīng)的SelectionKey加入到內(nèi)部集合中并返回,參數(shù)用來(lái)設(shè)置超時(shí)時(shí)間。public abstract Selector wakeup();//使尚未返回的第一個(gè)選擇操作立即返回。如果另一個(gè)線程當(dāng)前在選擇操作中被阻塞,那么該調(diào)用將立即返回。 public abstract void close() throws IOException;//關(guān)閉此選擇器。 }因?yàn)橐Y(jié)合其他組件才能體現(xiàn)出Selector,所以案例放在后文中了。
八、NIO 網(wǎng)絡(luò)編程分析圖
模型圖如下:
流程分析:
服務(wù)器端啟動(dòng),打開(kāi)服務(wù)器的ServerSocketChannel,創(chuàng)建并且打開(kāi)Selector,將ServerSocketChannel注冊(cè)到selector上
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);Selector 進(jìn)行監(jiān)聽(tīng) select 方法, 返回有事件發(fā)生的通道的個(gè)數(shù)
if (selector.select(1000) == 0) //這里我寫(xiě)的是每秒刷新一次客戶端啟動(dòng),打開(kāi)客戶端的SocketChannel,綁定服務(wù)端地址,向服務(wù)端發(fā)送連接請(qǐng)求
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(inetSocketAddress)服務(wù)端Selector監(jiān)聽(tīng)到注冊(cè)事件發(fā)生后,通過(guò)selector.selectedKeys()方法拿到SelectionKey,會(huì)和該 Selector 關(guān)聯(lián)(集合)
//通過(guò)selector得到selectionKey Set<SelectionKey> selectionKeys = selector.selectedKeys();再通過(guò) java.nio.channels.SelectionKey 反向獲取 SocketChannel , 方法 channel()
SocketChannel channel = (SocketChannel) selectionKey.channel();可以通過(guò)得到的channel , 完成業(yè)務(wù)處理
九、快速入門(mén)案例(一)
一直寫(xiě)理論,不寫(xiě)代碼,看著發(fā)愁。所以一起來(lái)看看這個(gè)簡(jiǎn)單的入門(mén)案例吧。
場(chǎng)景:NIO 入門(mén)案例,實(shí)現(xiàn)服務(wù)器端和客戶端之間的數(shù)據(jù)簡(jiǎn)單通訊(非阻塞)
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set;public class NioServerTest {public static void main(String[] args) throws Exception{//1、創(chuàng)建一個(gè)ServerSocketChannel對(duì)象,綁定端口并配置成非阻塞模式。ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8888), 1024);//2、下面這句必需要,否則ServerSocketChannel會(huì)使用阻塞的模式,那就不是NIO了serverSocketChannel.configureBlocking(false);//3、把ServerSocketChannel交給Selector監(jiān)聽(tīng)Selector selector = Selector.open();//4、注冊(cè)進(jìn)SelectorserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//5、循環(huán),不斷的從Selector中獲取準(zhǔn)備就緒的Channel,最開(kāi)始的時(shí)候Selector只監(jiān)聽(tīng)了一個(gè)ServerSocketChannel//但是后續(xù)有客戶端連接時(shí),會(huì)把客戶端對(duì)應(yīng)的Channel也交給Selector對(duì)象while (true) {//6、一直監(jiān)聽(tīng),每秒刷新一次,等待客戶端的連接if (selector.select(1000) == 0){continue;}//獲取所有的準(zhǔn)備就緒的Channel,SelectionKey中包含中Channel信息Set<SelectionKey> selectionKeySet = selector.selectedKeys();//遍歷,每個(gè)Channel都可處理for (SelectionKey selectionKey : selectionKeySet) {//如果Channel已經(jīng)無(wú)效了,則跳過(guò)(如Channel已經(jīng)關(guān)閉了)if(!selectionKey.isValid()) {continue;}//判斷Channel具體的就緒事件,如果是有客戶端連接,則建立連接if (selectionKey.isAcceptable()) {System.out.println("連接成功!!!");//當(dāng)確定有selectionKey時(shí),說(shuō)明必有socketChannel,Server接收后得到SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);//把客戶端的Channel交給Selector監(jiān)控,之后如果有數(shù)據(jù)可以讀取時(shí),會(huì)被select出來(lái)socketChannel.register(selector, SelectionKey.OP_READ);}//如果有客戶端可以讀取請(qǐng)求了,則讀取請(qǐng)求然后返回?cái)?shù)據(jù)if (selectionKey.isReadable()) {System.out.println(readFromSelectionKey(selectionKey));}}//處理完成后把返回的Set清空,如果不清空下次還會(huì)再返回這些Key,導(dǎo)致重復(fù)處理selectionKeySet.clear();}}//從客戶端讀取數(shù)據(jù)的廬江private static String readFromSelectionKey(SelectionKey selectionKey) throws Exception{//從SelectionKey中包含選取出來(lái)的Channel的信息把Channel獲取出來(lái)SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());//讀取數(shù)據(jù)到ByteBuffer中ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int len = socketChannel.read(byteBuffer);//如果讀到-1,說(shuō)明數(shù)據(jù)已經(jīng)傳輸完成了,可以并閉if (len < 0) {socketChannel.close();selectionKey.cancel();return "";} else if(len == 0) { //什么都沒(méi)讀到return "";}byteBuffer.flip();doWrite(selectionKey, "Hello Nio");return new String(byteBuffer.array(), 0, len);}private static void doWrite(SelectionKey selectionKey, String responseMessage) throws Exception{System.err.println("Output message...");SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());ByteBuffer byteBuffer = ByteBuffer.allocate(responseMessage.getBytes().length);byteBuffer.put(responseMessage.getBytes());byteBuffer.flip();socketChannel.write(byteBuffer);} }客戶端:
package com.crush.nio.nio2;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Set;/** * @Author: crush * @Date: 2021-08-24 16:01 * version 1.0 */public class NioClientTest { public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)SocketChannel對(duì)象,配置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //創(chuàng)建一個(gè)選擇器,并把SocketChannel交給selector對(duì)象 Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); //發(fā)起建立連接的請(qǐng)求,這里會(huì)立即返回,當(dāng)連接建立完成后,SocketChannel就會(huì)被選取出來(lái) socketChannel.connect(new InetSocketAddress("localhost", 8888)); //遍歷,不段的從Selector中選取出已經(jīng)就緒的Channel,在這個(gè)例子中,Selector只監(jiān)控了一個(gè)SocketChannel while (true) { selector.select(1000); Set<SelectionKey> selectionKeySet = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeySet) { if (!selectionKey.isValid()) { continue; } //連接建立完成后的操作:直接發(fā)送請(qǐng)求數(shù)據(jù) if (selectionKey.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); doWriteRequest(((SocketChannel) selectionKey.channel())); } } //如果當(dāng)前已經(jīng)可以讀數(shù)據(jù)了,說(shuō)明服務(wù)端已經(jīng)響應(yīng)完了,讀取數(shù)據(jù) if (selectionKey.isReadable()) { doRead(selectionKey); } } //最后同樣要清除所有的Key selectionKeySet.removeAll(selectionKeySet); } } //發(fā)送請(qǐng)求 private static void doWriteRequest(SocketChannel socketChannel) throws Exception { System.err.println("start connect..."); //創(chuàng)建ByteBuffer對(duì)象,會(huì)放入數(shù)據(jù) ByteBuffer byteBuffer = ByteBuffer.allocate("Hello Server!".getBytes().length); byteBuffer.put("Hello Server!".getBytes()); byteBuffer.flip(); //寫(xiě)數(shù)據(jù) socketChannel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { System.err.println("Send request success..."); } } //讀取服務(wù)端的響應(yīng) private static void doRead(SelectionKey selectionKey) throws Exception { SocketChannel socketChannel = ((SocketChannel) selectionKey.channel()); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = socketChannel.read(byteBuffer); System.out.println("Recv:" + new String(byteBuffer.array(), 0, len)); }}先啟動(dòng)服務(wù)器端,再啟動(dòng)客戶端,直接可以在客戶端看到輸出消息。
十、快速入門(mén)案例(二)
/*** @Author: crush* @Date: 2021-08-24 16:33* version 1.0*/ public class GroupChatServer {//定義屬性private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6667;//構(gòu)造器//初始化工作public GroupChatServer() {try {//得到選擇器selector = Selector.open();//ServerSocketChannellistenChannel = ServerSocketChannel.open();//綁定端口listenChannel.socket().bind(new InetSocketAddress(PORT));//設(shè)置非阻塞模式listenChannel.configureBlocking(false);//將該 listenChannel 注冊(cè)到 selectorlistenChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void listen() {try {//循環(huán)處理while (true) {int count = selector.select();if (count > 0) { //有事件處理// 遍歷得到 selectionKey 集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {//取出 selectionkeySelectionKey key = iterator.next();//監(jiān)聽(tīng)到 acceptif (key.isAcceptable()) {SocketChannel sc = listenChannel.accept();sc.configureBlocking(false);//將該 sc 注冊(cè)到 seletorsc.register(selector, SelectionKey.OP_READ);//提示System.out.println(sc.getRemoteAddress() + " 上線 ");}if (key.isReadable()) {//通道發(fā)送read事件,即通道是可讀的狀態(tài)// 處理讀(專門(mén)寫(xiě)方法..)readData(key);}//當(dāng)前的 key 刪除,防止重復(fù)處理iterator.remove();}} else {System.out.println("等待....");}}} catch (Exception e) {e.printStackTrace();} finally {//發(fā)生異常處理....}}//讀取客戶端消息public void readData(SelectionKey key) {SocketChannel channel = null;try {//得到 channelchannel = (SocketChannel) key.channel();//創(chuàng)建 bufferByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);//根據(jù) count 的值做處理if (count > 0) {//把緩存區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串String msg = new String(buffer.array());//輸出該消息System.out.println("form客戶端:" + msg);//向其它的客戶端轉(zhuǎn)發(fā)消息(去掉自己),專門(mén)寫(xiě)一個(gè)方法來(lái)處理sendInfoToOtherClients(msg, channel);}} catch (IOException e) {try {System.out.println(channel.getRemoteAddress() + "離線了..");//取消注冊(cè)key.cancel();//關(guān)閉通道channel.close();} catch (IOException e2) {e2.printStackTrace();}}}//轉(zhuǎn)發(fā)消息給其它客戶(通道)private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {System.out.println("服務(wù)器轉(zhuǎn)發(fā)消息中...");//遍歷所有注冊(cè)到 selector 上的 SocketChannel,并排除 selffor (SelectionKey key : selector.keys()) {//通過(guò) key 取出對(duì)應(yīng)的 SocketChannelChannel targetChannel = key.channel();//排除自己if (targetChannel instanceof SocketChannel && targetChannel != self) {//轉(zhuǎn)型SocketChannel dest = (SocketChannel) targetChannel;//將 msg 存儲(chǔ)到 bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());//將 buffer 的數(shù)據(jù)寫(xiě)入通道dest.write(buffer);}}}public static void main(String[] args) {//創(chuàng)建服務(wù)器對(duì)象GroupChatServer groupChatServer = new GroupChatServer();groupChatServer.listen();} }客戶端:
/*** @Author: crush* @Date: 2021-08-24 16:33* version 1.0*/ public class GroupChatClient {//定義相關(guān)的屬性private final String HOST = "127.0.0.1";//服務(wù)器的ipprivate final int PORT = 6667;//服務(wù)器端口private Selector selector;private SocketChannel socketChannel;private String username;//構(gòu)造器,完成初始化工作public GroupChatClient() throws IOException {selector = Selector.open();//連接服務(wù)器socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));//設(shè)置非阻塞socketChannel.configureBlocking(false);//將 channel 注冊(cè)到selectorsocketChannel.register(selector, SelectionKey.OP_READ);//得到 usernameusername = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username + " is ok...");}//向服務(wù)器發(fā)送消息public void sendInfo(String info) {info = username + " 說(shuō):" + info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes()));} catch (IOException e) {e.printStackTrace();}}//讀取從服務(wù)器端回復(fù)的消息public void readInfo() {try {int readChannels = selector.select();if (readChannels > 0) {//有可以用的通道Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isReadable()) {//得到相關(guān)的通道SocketChannel sc = (SocketChannel) key.channel();//得到一個(gè) BufferByteBuffer buffer = ByteBuffer.allocate(1024);//讀取sc.read(buffer);//把讀到的緩沖區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串String msg = new String(buffer.array());System.out.println(msg.trim());}}iterator.remove(); //刪除當(dāng)前的 selectionKey,防止重復(fù)操作} else {//System.out.println("沒(méi)有可以用的通道...");}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {//啟動(dòng)我們客戶端GroupChatClient chatClient = new GroupChatClient();//啟動(dòng)一個(gè)線程,每個(gè) 3 秒,讀取從服務(wù)器發(fā)送數(shù)據(jù)new Thread() {public void run() {while (true) {chatClient.readInfo();try {Thread.currentThread().sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}}.start();//發(fā)送數(shù)據(jù)給服務(wù)器端Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String s = scanner.nextLine();chatClient.sendInfo(s);}} }測(cè)試:
十一、自言自語(yǔ)
個(gè)人感受:越學(xué)習(xí)到后面,就越感覺(jué)基礎(chǔ)的重要性。很多技術(shù)都只會(huì)去用,很多時(shí)候甚至都不懂它的設(shè)計(jì)理念也不懂為什么要那樣做。
最近在持續(xù)更新中,如果你覺(jué)得對(duì)你有所幫助,也感興趣的話,關(guān)注我吧,讓我們一起學(xué)習(xí),一起討論吧。
在學(xué)習(xí)路上充滿好奇心,明白思考的重要性,是支持我一直學(xué)習(xí)下去的積極推動(dòng)力吧。希望你也能喜歡上編程!😁
熱愛(ài)生活,享受生活!!!無(wú)論在哪里,無(wú)論此時(shí)的生活多么艱難,都要記得熱愛(ài)生活!!!相信總會(huì)有光來(lái)的。
你好,我是博主寧在春,Java學(xué)習(xí)路上的一顆小小的種子,也希望有一天能扎根長(zhǎng)成蒼天大樹(shù)。
希望與君共勉😁
我們:待別時(shí)相見(jiàn)時(shí),都已有所成。
本文是個(gè)人的學(xué)習(xí)筆記,夾雜著個(gè)人理解,如有不對(duì),請(qǐng)不嗇賜教。課程是來(lái)自于尚硅谷-韓順平老師Netty課程
參考:
五種IO模型詳解及優(yōu)缺點(diǎn)
五種IO模型及其比較
Java NIO淺析
NIO非阻塞網(wǎng)絡(luò)編程原理
總結(jié)
以上是生活随笔為你收集整理的你对Java网络编程了解的如何?Java NIO 网络编程 | Netty前期知识(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 你对Java网络编程了解的如何?Java
- 下一篇: 如何使用Docker安装Mycat中间件