【Java】NIO中Selector的select方法源码分析
該篇博客的有些內(nèi)容和在之前介紹過了,在這里再次涉及到的就不詳細說了,如果有不理解請看【Java】NIO中Channel的注冊源碼分析, 【Java】NIO中Selector的創(chuàng)建源碼分析
?
Selector的創(chuàng)建在Windows下默認(rèn)生成WindowsSelectorImpl對象,那么Selector的select方法使用的就是WindowsSelectorImpl的select方法,而在WindowsSelectorImpl下并沒有覆蓋這個方法,而是由其基類SelectorImpl實現(xiàn)的:
1 public int select() throws IOException { 2 return this.select(0L); 3 }這個方法調(diào)用了另一個重載的方法:
1 public int select(long var1) throws IOException { 2 if (var1 < 0L) { 3 throw new IllegalArgumentException("Negative timeout"); 4 } else { 5 return this.lockAndDoSelect(var1 == 0L ? -1L : var1); 6 } 7 }首先對var1參數(shù)的合法性進行判斷,無參傳入進來的是0,實則交給lockAndDoSelect方法去完成,并且令參數(shù)為-1。
private int lockAndDoSelect(long var1) throws IOException {synchronized(this) {if (!this.isOpen()) {throw new ClosedSelectorException();} else {Set var4 = this.publicKeys;int var10000;synchronized(this.publicKeys) {Set var5 = this.publicSelectedKeys;synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}} }在方法執(zhí)行時先使用同步塊包裹,使用this作為鎖;進入同步塊先判斷當(dāng)前的Selector對象是否關(guān)閉了,因為在初始化時就是開啟狀態(tài),只有在關(guān)閉后isOpen才是false;isOpen是由AbstractSelector實現(xiàn)的:
1 private AtomicBoolean selectorOpen = new AtomicBoolean(true); 2 public final boolean isOpen() { 3 return selectorOpen.get(); 4 } 5 public final void close() throws IOException { 6 boolean open = selectorOpen.getAndSet(false); 7 if (!open) 8 return; 9 implCloseSelector(); 10 }可以看到在AbstractSelector中使用了原子化Boolean值表示開啟關(guān)閉。
回到SelectorImpl的lockAndDoSelect,若是Selector已經(jīng)關(guān)閉則拋出ClosedSelectorException異常,否則分別以publicKeys以及publicSelectedKeys為鎖,最終的實現(xiàn)交給抽象方法doSelect完成;
1 protected abstract int doSelect(long var1) throws IOException;其中publicKeys是供外部訪問的SelectionKey集合,publicSelectedKeys是供外部訪問并且已經(jīng)就緒的SelectionKey集合。
因為使用的是WindowsSelectorImpl,所以來看看WindowsSelectorImpl的doSelect實現(xiàn):
1 protected int doSelect(long var1) throws IOException { 2 if (this.channelArray == null) { 3 throw new ClosedSelectorException(); 4 } else { 5 this.timeout = var1; 6 this.processDeregisterQueue(); 7 if (this.interruptTriggered) { 8 this.resetWakeupSocket(); 9 return 0; 10 } else { 11 this.adjustThreadsCount(); 12 this.finishLock.reset(); 13 this.startLock.startThreads(); 14 15 try { 16 this.begin(); 17 18 try { 19 this.subSelector.poll(); 20 } catch (IOException var7) { 21 this.finishLock.setException(var7); 22 } 23 24 if (this.threads.size() > 0) { 25 this.finishLock.waitForHelperThreads(); 26 } 27 } finally { 28 this.end(); 29 } 30 31 this.finishLock.checkForException(); 32 this.processDeregisterQueue(); 33 int var3 = this.updateSelectedKeys(); 34 this.resetWakeupSocket(); 35 return var3; 36 } 37 } 38 }首先判斷channelArray是否為空,上一篇博客說了channelArray是一個SelectionKeyImpl數(shù)組,SelectionKeyImpl負(fù)責(zé)記錄Channel和SelectionKey狀態(tài),channelArray是根據(jù)連接的Channel數(shù)量動態(tài)維持的,初始化大小是8。
1 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];SelectionKeyImpl是SelectionKey的子類,只有當(dāng)Selector調(diào)用close方法時,在回調(diào)函數(shù)中才會令channelArray=null,所以這還是檢測Selector是否關(guān)閉了。
接著繼續(xù),在前面?zhèn)魅氲膌ong類型的參數(shù)是-1,在這里令超時時間timeout就等于-1,
接著調(diào)用processDeregisterQueue方法來取消準(zhǔn)備撤銷的集合
所謂的準(zhǔn)備撤銷的集合是因為SelectionKey對象在調(diào)用cancel方法時,會使Selector將其加入cancelledKeys,僅僅如此,真真的取消是在Selector調(diào)用selector方法時執(zhí)行
SelectionKey的cancel方法是在AbstractSelectionKey中實現(xiàn)的:
1 public final void cancel() { 2 // Synchronizing "this" to prevent this key from getting canceled 3 // multiple times by different threads, which might cause race 4 // condition between selector's select() and channel's close(). 5 synchronized (this) { 6 if (valid) { 7 valid = false; 8 ((AbstractSelector)selector()).cancel(this); 9 } 10 } 11 }這個方法在上一篇講過,可以看到基本上什么都沒做,僅僅時調(diào)用了與它關(guān)聯(lián)的Selector對象(AbstractSelector)的cancel方法:
AbstractSelector的cancel方法:
cancelledKeys就是所謂的準(zhǔn)備撤銷的集合,可以看到AbstractSelector的cancel方法僅僅是把此時請求取消的SelectionKey對象加入到cancelledKeys集合中,并沒有多余的操作。
回到doSelect方法,processDeregisterQueue這個方法的實現(xiàn)是在SelectorImpl中:
1 void processDeregisterQueue() throws IOException { 2 Set var1 = this.cancelledKeys(); 3 synchronized(var1) { 4 if (!var1.isEmpty()) { 5 Iterator var3 = var1.iterator(); 6 7 while(var3.hasNext()) { 8 SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next(); 9 10 try { 11 this.implDereg(var4); 12 } catch (SocketException var11) { 13 throw new IOException("Error deregistering key", var11); 14 } finally { 15 var3.remove(); 16 } 17 } 18 } 19 20 } 21 }這個方法的邏輯比較簡單,首先得到準(zhǔn)備撤銷的集合cancelledKeys,判斷是否有請求取消的,若有那么就進行遍歷,實際的取消操作主要邏輯交給了抽象方法implDereg執(zhí)行,最后再從集合中刪除這個SelectionKeyImpl對象。
implDereg方法的實現(xiàn)是在WindowsSelectorImpl中:
1 protected void implDereg(SelectionKeyImpl var1) throws IOException { 2 int var2 = var1.getIndex(); 3 4 assert var2 >= 0; 5 6 Object var3 = this.closeLock; 7 synchronized(this.closeLock) { 8 if (var2 != this.totalChannels - 1) { 9 SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1]; 10 this.channelArray[var2] = var4; 11 var4.setIndex(var2); 12 this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2); 13 } 14 15 var1.setIndex(-1); 16 } 17 18 this.channelArray[this.totalChannels - 1] = null; 19 --this.totalChannels; 20 if (this.totalChannels != 1 && this.totalChannels % 1024 == 1) { 21 --this.totalChannels; 22 --this.threadsCount; 23 } 24 25 this.fdMap.remove(var1); 26 this.keys.remove(var1); 27 this.selectedKeys.remove(var1); 28 this.deregister(var1); 29 SelectableChannel var7 = var1.channel(); 30 if (!var7.isOpen() && !var7.isRegistered()) { 31 ((SelChImpl)var7).kill(); 32 } 33 34 }首先獲取SelectionKeyImpl的下標(biāo)Index,這個下標(biāo)就是其在channelArray中的下標(biāo),檢驗下標(biāo)的合法性;
在同步塊內(nèi),首先檢驗這個SelectionKeyImpl對象是否是數(shù)組的最后一個元素,若不是那么就直接用最后一個元素覆蓋當(dāng)前位置的SelectionKeyImpl對象,同時還需要將pollWrapper中最后一個元素對應(yīng)的Channel描述符和事件響應(yīng)覆蓋到相應(yīng)位置。無論該SelectionKeyImpl對象是否是最后一個,都將其下標(biāo)置為-1,防止再次訪問。
再完成上述操作后,channelArray中的最后一個元素必然是不需要的,直接置為null,再totalChannels再自減。
接著根據(jù)totalChannels的數(shù)量來判斷是否需要減少輪詢線程的個數(shù),這和注冊時同理,就不再多說。
然后在fdMap中移除掉該SelectionKeyImpl和Channel的描述符映射(fdMap保存的是Channel的描述符和SelectionKeyImpl的映射關(guān)系,在上一篇提到過),keys和selectedKeys中同樣也需要移除(keys所有注冊了的SelectionKey集合,selectedKeys是所有有事件就緒的SelectionKey集合)。
這些操作僅僅是刪除了其在Selector中的映射關(guān)系,而真正的Channel的(雖說是SelectionKey的cancel方法,實則是Channel要取消對某一事件的響應(yīng))取消操作是在deregister中執(zhí)行:
deregister方法在AbstractSelector中實現(xiàn):
可以看到直接獲取SelectionKey對應(yīng)的channel對象,然后調(diào)用AbstractSelectableChannel的removeKey方法:
1 void removeKey(SelectionKey k) { 2 synchronized (keyLock) { 3 for (int i = 0; i < keys.length; i++) 4 if (keys[i] == k) { 5 keys[i] = null; 6 keyCount--; 7 } 8 ((AbstractSelectionKey)k).invalidate(); 9 } 10 }前面的遍歷很簡單,通過遍歷Channel的所有綁定的SelectionKey,即keys,直接將要取消的置為null,keyCount再自減,最后調(diào)用SelectionKey(AbstractSelectionKey)的invalidate方法:
1 void invalidate() { 2 valid = false; 3 }直接設(shè)置valid屬性為false,表明不可用。
回到implDereg中,最后一步操作,檢查Channel的活躍性,若是Channel既沒有打開且當(dāng)且也沒有注冊了的SelectionKey,那么直接“殺死”該Channel。
而這個kill方法,在不同的Channel中有不同的實現(xiàn),
SocketChannelImpl中:
其中state表示SocketChannelImpl的狀態(tài),一共有六種:
1 private static final int ST_UNINITIALIZED = -1; // 尚未初始化 2 private static final int ST_UNCONNECTED = 0; // 尚未建立連接 3 private static final int ST_PENDING = 1; // 未決狀態(tài) 4 private static final int ST_CONNECTED = 2; // 連接狀態(tài) 5 private static final int ST_KILLPENDING = 3; // KILL的未決狀態(tài) 6 private static final int ST_KILLED = 4; // KILL狀態(tài) 7 private int state = -1;這樣就很清晰,若是SocketChannelImpl尚未初始化直接變?yōu)镵ILL狀態(tài),否則檢查再次檢查Channel的活躍性,若是不活躍就斷言為false,直接結(jié)束,否則“殺死”。
接下來的判斷中的readerThread和writerThread,我在看完SocketChannelImpl后,發(fā)現(xiàn)一直都是賦值的0,并不知道會在何時發(fā)生修改,而且這兩個成員的賦值都是在有數(shù)據(jù)讀、寫操作后,若是有知道的朋友想請教一下!
這個就先不討論了,但是通過它們的賦值都是發(fā)生在有數(shù)據(jù)讀、寫操作后,那么就可以明白,若是完成了讀、寫,那么直接變?yōu)镵ILL狀態(tài),否則,等待讀、寫完成,就變?yōu)镵ILL的未決狀態(tài)。
其中 nd.close(this.fd),nd是Socket描述符,fd是文件描述符,這就是由操作系統(tǒng)來關(guān)閉Socket描述符對應(yīng)的文件描述符。
ServerSocketChannelImpl中kill:
1 private static final int ST_UNINITIALIZED = -1; // 尚未初始化 2 private static final int ST_INUSE = 0; // 使用中 3 private static final int ST_KILLED = 1; // KILL狀態(tài) 4 private int state = -1; 5 6 public void kill() throws IOException { 7 Object var1 = this.stateLock; 8 synchronized(this.stateLock) { 9 if (this.state != 1) { 10 if (this.state == -1) { 11 this.state = 1; 12 } else { 13 assert !this.isOpen() && !this.isRegistered(); 14 15 nd.close(this.fd); 16 this.state = 1; 17 } 18 } 19 } 20 }ServerSocketChannelImpl就要簡單一點,基本上一樣,由于ServerSocketChannel只能注冊ACCEPT事件響應(yīng),所以就沒有判斷讀、寫。
implDereg方法結(jié)束,processDeregisterQueue也徹底結(jié)束,再回到doSelect方法
接著檢驗interruptTriggered,表示是否觸發(fā)中斷。
interruptTriggered初始化時就是false,表示未觸發(fā)中斷,而在調(diào)用close或者wakeup方法時會觸發(fā)中斷,賦值true;
先看wakeup方法:
1 public Selector wakeup() { 2 Object var1 = this.interruptLock; 3 synchronized(this.interruptLock) { 4 if (!this.interruptTriggered) { 5 this.setWakeupSocket(); 6 this.interruptTriggered = true; 7 } 8 9 return this; 10 } 11 }可以看到核心是setWakeupSocket方法,當(dāng)目前沒有觸發(fā)中斷調(diào)用setWakeupSocket:
1 private void setWakeupSocket() { 2 this.setWakeupSocket0(this.wakeupSinkFd); 3 } 4 private native void setWakeupSocket0(int var1);在講Selector的創(chuàng)建時說過,在Selector創(chuàng)建時會產(chǎn)生一對SocketChannel,分別是SourceChannelImpl和SinkChannelImpl,wakeupSinkFd是SinkChannelImpl的描述符。
再來看看setWakeupSocket0的實現(xiàn):
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,jint scoutFd) {/* Write one byte into the pipe */const char byte = 1;send(scoutFd, &byte, 1, 0); }雖然是用C寫的,但是依舊很清晰,就是通過這個雙向通道的sink端向source發(fā)送一個字節(jié)的數(shù)據(jù),這樣source端描述符就進入就緒狀態(tài),就能被select感知到,Selector便被喚醒。
再來看下close方法,在AbstractSelector中實現(xiàn)的:
1 public final void close() throws IOException { 2 boolean open = selectorOpen.getAndSet(false); 3 if (!open) 4 return; 5 implCloseSelector(); 6 }核心是implCloseSelector,在SelectorImpl中實現(xiàn):
1 public void implCloseSelector() throws IOException { 2 this.wakeup(); 3 synchronized(this) { 4 Set var2 = this.publicKeys; 5 synchronized(this.publicKeys) { 6 Set var3 = this.publicSelectedKeys; 7 synchronized(this.publicSelectedKeys) { 8 this.implClose(); 9 } 10 } 11 12 } 13 }一開始就直接調(diào)用wakeup方法喚醒,然后調(diào)用implClose方法:
implClose是在WindowsSelectorImpl中實現(xiàn)的:
根據(jù)channelArray和pollWrapper是否為null來檢驗是否有必要關(guān)閉資源,后面就是對一些資源的關(guān)閉,可以看到關(guān)閉了我們一開始建立的雙向通道,取消了所有注冊事件,順便“殺死”不活躍的Channel,刪除所有映射關(guān)系,將所有輪詢線程從阻塞中喚醒,關(guān)于makeZombie和startLock后面給出。
再次回到doSelect上,若是發(fā)生了中斷,調(diào)用resetWakeupSocket方法恢復(fù)中斷:
1 private void resetWakeupSocket() { 2 Object var1 = this.interruptLock; 3 synchronized(this.interruptLock) { 4 if (this.interruptTriggered) { 5 this.resetWakeupSocket0(this.wakeupSourceFd); 6 this.interruptTriggered = false; 7 } 8 } 9 }resetWakeupSocket0也是一個native方法,和setWakeupSocket0正好互補,用來讀取setWakeupSocket0中發(fā)送的數(shù)據(jù),再將interruptTriggered設(shè)置為false,最后doSelect將會立即返回0,而不會調(diào)用poll操作。
在doSelect判斷沒有觸發(fā)中斷后,首先調(diào)用adjustThreadsCount調(diào)整輪詢線程數(shù)量:
1 private void adjustThreadsCount() { 2 int var1; 3 if (this.threadsCount > this.threads.size()) { 4 for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) { 5 WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1); 6 this.threads.add(var2); 7 var2.setDaemon(true); 8 var2.start(); 9 } 10 } else if (this.threadsCount < this.threads.size()) { 11 for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) { 12 ((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie(); 13 } 14 } 15 16 }threads是用ArrayList存放的:
1 private final List<WindowsSelectorImpl.SelectThread> threads = new ArrayList();邏輯比較簡單,通過檢查threadsCount的數(shù)量和threads的大小比較,若是threadsCount大于threads,則產(chǎn)生一個新的輪詢線程SelectThread,將其加入threads,并且設(shè)置輪詢線程是守護線程,直接啟動;若是threadsCount小于threads,則移除并喚醒多余的輪詢線程;若是threadsCount等于threads什么都不做。
來看一下SelectThread這個輪詢線程具體是怎么工作的:
1 private final class SelectThread extends Thread { 2 private final int index; 3 final WindowsSelectorImpl.SubSelector subSelector; 4 private long lastRun; 5 private volatile boolean zombie; 6 7 private SelectThread(int var2) { 8 this.lastRun = 0L; 9 this.index = var2; 10 this.subSelector = WindowsSelectorImpl.this.new SubSelector(var2); 11 this.lastRun = WindowsSelectorImpl.this.startLock.runsCounter; 12 } 13 14 void makeZombie() { 15 this.zombie = true; 16 } 17 18 boolean isZombie() { 19 return this.zombie; 20 } 21 22 public void run() { 23 for(; !WindowsSelectorImpl.this.startLock.waitForStart(this); WindowsSelectorImpl.this.finishLock.threadFinished()) { 24 try { 25 this.subSelector.poll(this.index); 26 } catch (IOException var2) { 27 WindowsSelectorImpl.this.finishLock.setException(var2); 28 } 29 } 30 31 } 32 }在構(gòu)造方法中對幾個成員完成初始化,index對應(yīng)的是其在ArrayList中的下標(biāo),lastRun 和startLock有關(guān)等會再說,subSelector是真正執(zhí)行輪詢的對象;zombie是一個標(biāo)志,在startLock中會使用到。
再來看run方法,核心就是調(diào)用subSelector的poll方法,而何時調(diào)用該方法由startLock來決定。
StartLock的定義:
1 private final class StartLock { 2 private long runsCounter; 3 4 private StartLock() { 5 } 6 7 private synchronized void startThreads() { 8 ++this.runsCounter; 9 this.notifyAll(); 10 } 11 12 private synchronized boolean waitForStart(WindowsSelectorImpl.SelectThread var1) { 13 while(this.runsCounter == var1.lastRun) { 14 try { 15 WindowsSelectorImpl.this.startLock.wait(); 16 } catch (InterruptedException var3) { 17 Thread.currentThread().interrupt(); 18 } 19 } 20 21 if (var1.isZombie()) { 22 return true; 23 } else { 24 var1.lastRun = this.runsCounter; 25 return false; 26 } 27 } 28 }在startThreads方法中,僅僅是通過synchronized 包裹,使runsCounter自增,然后notifyAll喚醒所有持有StartLock對象鎖的阻塞。
在WindowsSelectorImpl中StartLock對象有且只有一份,對于所有SelectThread來說StartLock是公共的
waitForStart方法需要結(jié)合SelectThread的run方法來看,首先先檢驗SelectThread的lastRun成員是否和runsCounter相等,若是相等直接阻塞,等待startThreads方法將其喚醒;若是不相等,說明它的run是在startThreads之后運行的,需要將lastRun更新后再執(zhí)行。
回到SelectThread中,我們再來看看SubSelector的定義:
1 private final class SubSelector { 2 private final int pollArrayIndex; 3 private final int[] readFds; 4 private final int[] writeFds; 5 private final int[] exceptFds; 6 7 private SubSelector() { 8 this.readFds = new int[1025]; 9 this.writeFds = new int[1025]; 10 this.exceptFds = new int[1025]; 11 this.pollArrayIndex = 0; 12 } 13 14 private SubSelector(int var2) { 15 this.readFds = new int[1025]; 16 this.writeFds = new int[1025]; 17 this.exceptFds = new int[1025]; 18 this.pollArrayIndex = (var2 + 1) * 1024; 19 } 20 ...... 21 }其中無參構(gòu)造是WindowsSelectorImpl使用的,單參構(gòu)造由SelectThread使用。
之前在講Channel的注冊時說過,每1024個注冊了的Channel會開啟一個SelectThread輪詢,如果是1024個以內(nèi),那么直接由WindowsSelectorImpl輪詢,不交給SelectThread處理,超過1024則WindowsSelectorImpl和SelectThread一起輪詢。
readFds 、writeFds、exceptFds 分別對應(yīng)讀、寫、異常描述符 ,在SubSelector構(gòu)造中初始化大小都是1025,多出來的一個就是前面說過的wakeupSourceFd描述符,用于喚醒,所以是1025。pollArrayIndex 對應(yīng)其在pollWrapper中的wakeupSourceFd描述符的起始位置。
再來看看poll方法:
1 private int poll() throws IOException { 2 return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); 3 } 4 5 private int poll(int var1) throws IOException { 6 return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); 7 } 8 9 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);無參poll方法是WindowsSelectorImpl執(zhí)行的,單參poll是由SelectThread執(zhí)行;
最后都調(diào)用poll0這個native方法,這個方法是真正的輪詢核心,交由操作系統(tǒng)來完成。
其中pollArrayAddress是pollArray在內(nèi)存空間的起始位置,在poll()中直接定位到最開始,而在poll(int var1)中通過加上pollArrayIndex * PollArrayWrapper.SIZE_POLLFD這個偏移量定位。
PollArrayWrapper.SIZE_POLLFD是8,表示pollWrapper中存放的一對Channel描述符和事件響應(yīng)共8位,0-3位保存Channel描述符fdVal,4-7位保存事件響應(yīng)events。
第二個參數(shù)表明需要底層輪詢的描述符fd個數(shù),最后一個是超時時間,若是底層超時是會結(jié)束的。
還是回到doSelect方法,在adjustThreadsCount調(diào)整完輪詢線程后,調(diào)用finishLock的reset方法
finishLock定義如下:
這個和startLock很相似,也是WindowsSelectorImpl持有,有且僅有一份,所有SelectThread共享,reset方法用來記錄在當(dāng)前select方法執(zhí)行時需要的輪詢線程個數(shù),在SelectThread的run方法中執(zhí)行完poll方法后,會執(zhí)行threadFinished,首先this.threadsToFinish == WindowsSelectorImpl.this.threads.size()的判斷是為幫助喚醒所有處于poll阻塞的輪詢。SelectThread執(zhí)行完畢,就需要讓threadsToFinish自減,至于notify的喚醒和后面有關(guān)系。
doSelect中執(zhí)行完finishLock的reset后,就需要調(diào)用startLock的startThreads喚醒所有輪詢線程工作。接著調(diào)用begin方法:
begin方法在AbstractSelector中實現(xiàn):
若是中斷器interruptor=null,就創(chuàng)建一個,當(dāng)當(dāng)前線程阻塞在I/O操作上并且發(fā)生了線程級別的中斷時,就會調(diào)用wakeup方法喚醒Selector。
doSelect中begin完畢后,調(diào)用subSelector的poll方法輪詢;若是poll上有事件就緒,那么就不會阻塞,繼續(xù)往下進行;若poll上沒有事件就緒就會等待SelectThread上的事件就緒,通過threadFinished將其喚醒;若是SelectThread上也沒有事件就緒就會一直阻塞,除非被外部喚醒,或者調(diào)用的是select的單參方法,會阻塞到超時結(jié)束。
接著判斷是否有輪詢線程的工作,調(diào)用waitForHelperThreads等待輪詢線程的結(jié)束:
1 private synchronized void waitForHelperThreads() { 2 if (this.threadsToFinish == WindowsSelectorImpl.this.threads.size() { 3 WindowsSelectorImpl.this.wakeup(); 4 } 5 6 while(this.threadsToFinish != 0) { 7 try { 8 WindowsSelectorImpl.this.finishLock.wait(); 9 } catch (InterruptedException var2) { 10 Thread.currentThread().interrupt(); 11 } 12 } 13 14 }waitForHelperThreads方法就呼應(yīng)了threadFinished方法,若是threadsToFinish != 0說明還有輪詢線程沒有結(jié)束,就wait阻塞,一直等到threadsToFinish == 0時再將其喚醒。
當(dāng)所有輪詢結(jié)束后,調(diào)用end方法:
1 protected final void end() { 2 AbstractInterruptibleChannel.blockedOn(null); 3 }這個方法是處理發(fā)生中斷,具體就不詳細介紹了。
然后調(diào)用finishLock的checkForException方法檢查異常,這個沒啥好說的,然后又調(diào)用processDeregisterQueue來取消可能在select輪詢時發(fā)生的SelectionKeyl的撤銷。
接著調(diào)用updateSelectedKeys方法:
1 private long updateCount = 0L; 2 3 private int updateSelectedKeys() { 4 ++this.updateCount; 5 byte var1 = 0; 6 int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount); 7 8 WindowsSelectorImpl.SelectThread var3; 9 for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) { 10 var3 = (WindowsSelectorImpl.SelectThread)var2.next(); 11 } 12 13 return var4; 14 }updateCount記錄更新次數(shù),即select調(diào)用次數(shù);然后調(diào)用subSelector的processSelectedKeys方法,得到poll返回的就緒的Channel描述符,也就是得到事件就緒的Channel個數(shù),同理也就需要得到所有SelectThread中的。
其中processSelectedKeys方法如下:
1 private int processSelectedKeys(long var1) { 2 byte var3 = 0; 3 int var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false); 4 var4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false); 5 var4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true); 6 return var4; 7 }分別對讀、寫、異常都處理了,主要還是調(diào)用processFDSet方法:
1 private int processFDSet(long var1, int[] var3, int var4, boolean var5) { 2 int var6 = 0; 3 4 for(int var7 = 1; var7 <= var3[0]; ++var7) { 5 int var8 = var3[var7]; 6 if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) { 7 synchronized(WindowsSelectorImpl.this.interruptLock) { 8 WindowsSelectorImpl.this.interruptTriggered = true; 9 } 10 } else { 11 WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8); 12 if (var9 != null) { 13 SelectionKeyImpl var10 = var9.ski; 14 if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) { 15 if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) { 16 if (var9.clearedCount != var1) { 17 if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) { 18 var9.updateCount = var1; 19 ++var6; 20 } 21 } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) { 22 var9.updateCount = var1; 23 ++var6; 24 } 25 26 var9.clearedCount = var1; 27 } else { 28 if (var9.clearedCount != var1) { 29 var10.channel.translateAndSetReadyOps(var4, var10); 30 if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { 31 WindowsSelectorImpl.this.selectedKeys.add(var10); 32 var9.updateCount = var1; 33 ++var6; 34 } 35 } else { 36 var10.channel.translateAndUpdateReadyOps(var4, var10); 37 if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { 38 WindowsSelectorImpl.this.selectedKeys.add(var10); 39 var9.updateCount = var1; 40 ++var6; 41 } 42 } 43 44 var9.clearedCount = var1; 45 } 46 } 47 } 48 } 49 } 50 51 return var6; 52 }這個方法其實就是把poll0方法輪詢的描述符結(jié)果放入傳入的數(shù)組中,然后通過遍歷這個數(shù)組,得到相應(yīng)的Channel描述符,因為之前通過fdMap保存了Channel的描述符和SelectionKeyImpl的映射關(guān)系,那么就可以根據(jù)Channel描述符找到對應(yīng)的SelectionKeyImpl對象,再根據(jù)傳入的狀態(tài)值var4來更新Channel的狀態(tài),最后將其保存在selectedKeys集合中供外部訪問。
Selector的select方法到此全部結(jié)束。
轉(zhuǎn)載于:https://www.cnblogs.com/a526583280/p/10890215.html
總結(jié)
以上是生活随笔為你收集整理的【Java】NIO中Selector的select方法源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pandas处理csv
- 下一篇: 猫半夜叫得像婴儿哭是为什么(不外乎这6种