阻塞式和非阻塞式udp传输_NIO非阻塞网络编程三大核心理念
本次開(kāi)始NIO網(wǎng)絡(luò)編程,之前已經(jīng)說(shuō)過(guò)BIO,對(duì)于阻塞IO里面的問(wèn)題一定有了清晰的認(rèn)識(shí),在JDK1.4版本后,提供了新的JAVA IO操作非阻塞API,用意替換JAVA IO 和JAVA NetWorking相關(guān)的API。NIO其實(shí)有個(gè)名稱(chēng)叫new IO。
(一)NIO
① 介紹
java.nio全稱(chēng)java non-blocking IO(實(shí)際上是 new io),是指JDK 1.4 及以上版本里提供的新api(New IO) ,為所有的原始類(lèi)型(boolean類(lèi)型除外)提供緩存支持的數(shù)據(jù)容器,使用它可以提供非阻塞式的高伸縮性網(wǎng)絡(luò)。
HTTP2.0使用了多路復(fù)用的技術(shù),做到同一個(gè)連接并發(fā)處理多個(gè)請(qǐng)求,而且并發(fā)請(qǐng)求的數(shù)量比HTTP1.1大了好幾個(gè)數(shù)量級(jí)。
② 三大核心組件
高性能網(wǎng)絡(luò)編程的基礎(chǔ)組件,Buffer緩存區(qū)、Channel 通道、Selector 選擇器。
(二) Buffer緩存區(qū)
① 介紹
緩存區(qū)本質(zhì)上是一個(gè)可以寫(xiě)入數(shù)據(jù)的內(nèi)存塊(類(lèi)似數(shù)組),然后可以再次讀取。此內(nèi)存塊包含在NIO Buffer 對(duì)象中,該對(duì)象提供了一組方法,可以更輕松地使用內(nèi)存塊。
相比較直接對(duì)數(shù)組的操作。Buffer API 更加容易操作和管理。
② 使用Buffer進(jìn)行數(shù)據(jù)寫(xiě)入與讀取,需要進(jìn)行如下四個(gè)步驟
將數(shù)據(jù)寫(xiě)入緩沖區(qū)。
調(diào)用buffer.flip(),轉(zhuǎn)換為讀取模式。
緩沖區(qū)讀取數(shù)據(jù)。
調(diào)用buffer.clear() 或 buffer.compact() 消除緩沖區(qū)
③ Buffer工作原理
BUffer三個(gè)重要屬性,通過(guò)完成了數(shù)組的封裝。
1.capacity 容量:作為一個(gè)內(nèi)存塊,Buffer具有一定的固定大小,也稱(chēng)為【容量】。
2.position 位置:寫(xiě)入模式時(shí)代表寫(xiě)數(shù)據(jù)的位置。讀取模式時(shí)代表讀取數(shù)據(jù)的位置。
3.limit 限制:寫(xiě)入模式,限制等于buffer的容量,讀取模式下,limit等于寫(xiě)入的數(shù)據(jù)量。
④ 源碼
import java.nio.IntBuffer;
import java.nio.LongBuffer;
public class BufferDemo {
public static void main(String[] args) {
// 構(gòu)建一個(gè)byte字節(jié)緩沖區(qū),容量是4
//堆內(nèi)存
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
//堆外內(nèi)存
// ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
// 默認(rèn)寫(xiě)入模式,查看三個(gè)重要的指標(biāo)
System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 寫(xiě)入2字節(jié)的數(shù)據(jù)
byteBuffer.put((byte) 1);
byteBuffer.put((byte) 2);
byteBuffer.put((byte) 3);
// 再看數(shù)據(jù)
System.out.println(String.format("寫(xiě)入3字節(jié)后,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對(duì))
System.out.println("#######開(kāi)始讀取");
byteBuffer.flip();
byte a = byteBuffer.get();
System.out.println(a);
byte b = byteBuffer.get();
System.out.println(b);
System.out.println(String.format("讀取2字節(jié)數(shù)據(jù)后,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 繼續(xù)寫(xiě)入3字節(jié),此時(shí)讀模式下,limit=3,position=2.繼續(xù)寫(xiě)入只能覆蓋寫(xiě)入一條數(shù)據(jù)
// clear()方法清除整個(gè)緩沖區(qū)。compact()方法僅清除已閱讀的數(shù)據(jù)。轉(zhuǎn)為寫(xiě)入模式
byteBuffer.compact(); // buffer : 1 , 3
byteBuffer.put((byte) 3);
byteBuffer.put((byte) 4);
byteBuffer.put((byte) 5);
System.out.println(String.format("最終的情況,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// rewind() 重置position為0
// mark() 標(biāo)記position的位置
// reset() 重置position為上次mark()標(biāo)記的位置
}
}
⑤ ByteBuffer 內(nèi)存類(lèi)型
ByteBuffer 為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實(shí)現(xiàn),堆外內(nèi)存獲取的方式
ByteBuffer directBytebuffer = ByteBuffer.allocateDirect(noBytes);好處
進(jìn)行網(wǎng)絡(luò)IO 或者 文件IO時(shí)比heapBuffer 少一次拷貝,(file/socket —— OS memory —— jvm heap )GC會(huì)移動(dòng)對(duì)象內(nèi)存,在寫(xiě)file 或 socket的過(guò)程中,JVM的實(shí)現(xiàn)中,會(huì)先把數(shù)據(jù)復(fù)制到堆外,在進(jìn)行寫(xiě)入。
GC范圍之外,降低GC壓力,但實(shí)現(xiàn)了自動(dòng)管理。DirectByteBuffer 中 有一個(gè)Cleaner 對(duì)象(PhantomReference) ,Cleaner被GC前會(huì)執(zhí)行clean 方法,觸發(fā)DirectByteBuffer 中定義Deallocator
建議
性能確實(shí)可觀的時(shí)候才去使用,分配給大型,長(zhǎng)壽命(網(wǎng)絡(luò)傳輸,文件讀寫(xiě)場(chǎng)景)
通過(guò)虛擬機(jī)參數(shù)MaxDirectMemorySize限制大小,防止耗盡整個(gè)機(jī)器的內(nèi)存,在JVM之外的內(nèi)存無(wú)法監(jiān)控。
(三)Channel 通道
① 介紹
Channel的API 涵蓋了UDP、TCP網(wǎng)絡(luò)和文件IO,FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel。
② 和標(biāo)準(zhǔn)IO Stream操作的區(qū)別
在一個(gè)通道內(nèi)進(jìn)行讀取和寫(xiě)入stream通常是單向的(input 或 output),可以非堵塞讀取和寫(xiě)入通道,通道中讀取或?qū)懭刖彌_區(qū)。
③ SocketChannel
SocketChannel用于建立TCP網(wǎng)絡(luò)連接,類(lèi)似java.net.Socket。有兩種創(chuàng)建socketChannel形式
1.客戶(hù)端主動(dòng)發(fā)起和服務(wù)器的連接
2.服務(wù)器獲取的新連接
write寫(xiě)
在尚未寫(xiě)入任何內(nèi)容時(shí)可能就返回了。需要在循環(huán)中調(diào)用write()
read讀
read() 方法可能直接返回而根本不讀取任何數(shù)據(jù),根據(jù)返回的int值判斷讀取了多少字節(jié)。
④ ServerSocketChannel
ServerSocketChannel 可能監(jiān)聽(tīng)新建立的TCP連接通道,類(lèi)似ServerSocket。
ServerSocketChannel.accepta()
如果該通道處于飛度賽模式,那么如何沒(méi)有掛起的連接,該方法將立即返回null。必須檢查返回的SocketChannel是否為null。
⑤ 源碼
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 沒(méi)連接上,則一直等待
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("請(qǐng)輸入:");
// 發(fā)送內(nèi)容
String msg = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 讀取響應(yīng)
System.out.println("收到服務(wù)端響應(yīng):");
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
/**
* 直接基于非阻塞的寫(xiě)法,一個(gè)線(xiàn)程處理輪詢(xún)所有請(qǐng)求
*/
public class NIOServer1 {
/**
* 已經(jīng)建立連接的集合
*/
private static ArrayList channels = new ArrayList<>();public static void main(String[] args) throws Exception {// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
System.out.println("啟動(dòng)成功");while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道// tcp請(qǐng)求 讀取/響應(yīng)if (socketChannel != null) {
System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默認(rèn)是阻塞的,一定要設(shè)置為非阻塞
channels.add(socketChannel);
} else {// 沒(méi)有新連接的情況下,就去處理現(xiàn)有連接的數(shù)據(jù),處理完的就刪除掉
Iterator iterator = channels.iterator();while (iterator.hasNext()) {
SocketChannel ch = iterator.next();try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);if (ch.read(requestBuffer) == 0) {// 等于0,代表這個(gè)通道沒(méi)有數(shù)據(jù)需要處理,那就待會(huì)再處理continue;
}while (ch.isOpen() && ch.read(requestBuffer) != -1) {// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)if (requestBuffer.position() > 0) break;
}if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
ch.write(buffer);
}
iterator.remove();
} catch (IOException e) {
e.printStackTrace();
iterator.remove();
}
}
}
}// 用到了非阻塞的API, 再設(shè)計(jì)上,和BIO可以有很大的不同// 問(wèn)題: 輪詢(xún)通道的方式,低效,浪費(fèi)CPU
}
}
(四)Select選擇器
① 介紹
Selector 是一個(gè)Java NIO 組件,可以檢查一個(gè)或多個(gè)NIO通道,并確定哪些通道已準(zhǔn)備好進(jìn)行讀取或?qū)懭?#xff0c;實(shí)現(xiàn)單個(gè)線(xiàn)程可以管理多個(gè)通道,從而管理或多個(gè)網(wǎng)絡(luò)連接。
② selector 監(jiān)聽(tīng)多個(gè) channel的不同事件
Connect 連接(SelectionKey.OP_CONNECT)
Accept 準(zhǔn)備就緒(OP_ACCEPT)
Read 讀取(OP_READ)
Write 寫(xiě)入(OP_WRITE)
③ selector 選擇器
一個(gè)線(xiàn)程處理多個(gè)通道的核心概念理解:事件驅(qū)動(dòng)機(jī)制。
非堵塞的網(wǎng)絡(luò)通道下,開(kāi)發(fā)者通過(guò)Selector注冊(cè)對(duì)于通道感興趣的事件類(lèi)型,線(xiàn)程通過(guò)監(jiān)聽(tīng)事件來(lái)觸發(fā)響應(yīng)的代碼執(zhí)行(最底層hi操作系統(tǒng)的多路復(fù)用機(jī)制)
④ 源碼
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 結(jié)合Selector實(shí)現(xiàn)的非阻塞服務(wù)端(放棄對(duì)channel的輪詢(xún),借助消息通知機(jī)制)
*/
public class NIOServerV2 {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
// 2. 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊(cè)到selector
selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
// 3. 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("啟動(dòng)成功");
while (true) {
// 不再輪詢(xún)通道,改用下面輪詢(xún)事件的方式.select方法有阻塞效果,直到有事件通知才會(huì)有返回
selector.select();
// 獲取事件
Set selectionKeys = selector.selectedKeys();// 遍歷查詢(xún)結(jié)果e
Iterator iter = selectionKeys.iterator();while (iter.hasNext()) {// 被封裝的查詢(xún)結(jié)果
SelectionKey key = iter.next();
iter.remove();// 關(guān)注 Read 和 Accept兩個(gè)事件if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.attachment();// 將拿到的客戶(hù)端連接通道,注冊(cè)到selector上面
SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢(xún)accept
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
}if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.attachment();try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)if (requestBuffer.position() > 0) break;
}if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:" + socketChannel.getRemoteAddress());// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù) 接口調(diào)用等等// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
} catch (IOException e) {// e.printStackTrace();
key.cancel(); // 取消事件訂閱
}
}
}
selector.selectNow();
}// 問(wèn)題: 此處一個(gè)selector監(jiān)聽(tīng)所有事件,一個(gè)線(xiàn)程處理所有請(qǐng)求事件. 會(huì)成為瓶頸! 要有多線(xiàn)程的運(yùn)用
}
}
⑤ NIO 和 BIO 的區(qū)別
⑥ NIO Reactor的方式
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NIO selector 多路復(fù)用reactor線(xiàn)程模型
*/
public class NIOServerV3 {
/** 處理業(yè)務(wù)操作的線(xiàn)程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封裝了selector.select()等事件輪詢(xún)的代碼
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue taskQueue = new LinkedBlockingQueue<>();/**
* Selector監(jiān)聽(tīng)到有事件后,調(diào)用這個(gè)方法
*/public abstract void handler(SelectableChannel channel) throws Exception;private ReactorThread() throws IOException {
selector = Selector.open();
}volatile boolean running = false;@Overridepublic void run() {// 輪詢(xún)Selector事件while (running) {try {// 執(zhí)行隊(duì)列中的任務(wù)
Runnable task;while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);// 獲取查詢(xún)結(jié)果
Set selected = selector.selectedKeys();// 遍歷查詢(xún)結(jié)果
Iterator iter = selected.iterator();while (iter.hasNext()) {// 被封裝的查詢(xún)結(jié)果
SelectionKey key = iter.next();
iter.remove();int readyOps = key.readyOps();// 關(guān)注 Read 和 Accept兩個(gè)事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);if (!channel.isOpen()) {
key.cancel(); // 如果關(guān)閉了,就取消這個(gè)KEY的訂閱
}
} catch (Exception ex) {
key.cancel(); // 如果有異常,就取消這個(gè)KEY的訂閱
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}private SelectionKey register(SelectableChannel channel) throws Exception {// 為什么register要以任務(wù)提交的形式,讓reactor線(xiàn)程去處理?// 因?yàn)榫€(xiàn)程在執(zhí)行channel注冊(cè)到selector的過(guò)程中,會(huì)和調(diào)用selector.select()方法的線(xiàn)程爭(zhēng)用同一把鎖// 而select()方法實(shí)在eventLoop中通過(guò)while循環(huán)調(diào)用的,爭(zhēng)搶的可能性很高,為了讓register能更快的執(zhí)行,就放到同一個(gè)線(xiàn)程來(lái)處理
FutureTask futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);return futureTask.get();
}private void doStart() {if (!running) {
running = true;
start();
}
}
}private ServerSocketChannel serverSocketChannel;// 1、創(chuàng)建多個(gè)線(xiàn)程 - accept處理reactor線(xiàn)程 (accept線(xiàn)程)private ReactorThread[] mainReactorThreads = new ReactorThread[1];// 2、創(chuàng)建多個(gè)線(xiàn)程 - io處理reactor線(xiàn)程 (I/O線(xiàn)程)private ReactorThread[] subReactorThreads = new ReactorThread[8];/**
* 初始化線(xiàn)程組
*/private void newGroup() throws IOException {// 創(chuàng)建IO線(xiàn)程,負(fù)責(zé)處理客戶(hù)端連接以后socketChannel的IO讀寫(xiě)for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {@Overridepublic void handler(SelectableChannel channel) throws IOException {// work線(xiàn)程只負(fù)責(zé)處理IO處理,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);while (ch.isOpen() && ch.read(requestBuffer) != -1) {// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)if (requestBuffer.position() > 0) break;
}if (requestBuffer.position() == 0) return; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù)、接口...
workPool.submit(() -> {
});// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}// 創(chuàng)建mainReactor線(xiàn)程, 只負(fù)責(zé)處理serverSocketChannelfor (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);@Overridepublic void handler(SelectableChannel channel) throws Exception {// 只做請(qǐng)求分發(fā),不做具體的數(shù)據(jù)讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);// 收到連接建立的通知之后,分發(fā)給I/O線(xiàn)程繼續(xù)去讀取數(shù)據(jù)int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
}
};
}
}/**
* 初始化channel,并且綁定一個(gè)eventLoop線(xiàn)程
*
* @throws IOException IO異常
*/private void initAndRegister() throws Exception {// 1、 創(chuàng)建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);// 2、 將serverSocketChannel注冊(cè)到selectorint index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}/**
* 綁定端口
*
* @throws IOException IO異常
*/private void bind() throws IOException {// 1、 正式綁定端口,對(duì)外服務(wù)
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動(dòng)完成,端口8080");
}public static void main(String[] args) throws Exception {
NIOServerV3 nioServerV3 = new NIOServerV3();
nioServerV3.newGroup(); // 1、 創(chuàng)建main和sub兩組線(xiàn)程
nioServerV3.initAndRegister(); // 2、 創(chuàng)建serverSocketChannel,注冊(cè)到mainReactor線(xiàn)程上的selector上
nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
}
}
PS:NIO為開(kāi)發(fā)者提供了功能豐富及強(qiáng)大的IO處理API,但是在應(yīng)用開(kāi)發(fā)的過(guò)程中,直接使用JDK提供的API,比較繁瑣,而且要想將性能進(jìn)行提升,光有NIO還是不夠的,還需要將多線(xiàn)程技術(shù)與之結(jié)合起來(lái)。因?yàn)榫W(wǎng)絡(luò)編程本身的復(fù)雜性,以及JDK API開(kāi)發(fā)的使用難度較高,所以開(kāi)源社區(qū)中,涌出來(lái)很多的JDK NIO進(jìn)行封裝了,增強(qiáng)后的網(wǎng)絡(luò)編程框架,例如:Netty、Mina等。
總結(jié)
以上是生活随笔為你收集整理的阻塞式和非阻塞式udp传输_NIO非阻塞网络编程三大核心理念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: otis电梯服务器tt使用说明_南充私人
- 下一篇: java dijkstra算法代码_[转