Java NIO之选择器
1.簡介
前面的文章說了緩沖區,說了通道,本文就來說說 NIO 中另一個重要的實現,即選擇器 Selector。在更早的文章中,我簡述了幾種 IO 模型。如果大家看過之前的文章,并動手寫過代碼的話。再看 Java 的選擇器大概就會知道它是什么了,以及怎么用了。選擇器是 Java 多路復用模型的一個實現,可以同時監控多個非阻塞套接字通道。示意圖大致如下:
如果大家了解過多路復用模型,那應該也會知道幾種復用模型的實現。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的選擇器并非憑空創造,而是在底層操作系統提供的接口的基礎上封裝而來。相關的細節,我隨后會進行分析。
關于 Java 選擇器的簡介這里先說到這,接下來進入正題。
?2.基本操作及實現
本章我將對 Selector 的創建,通道的注冊,Selector 的選擇過程進行分析。內容篇幅較大,希望大家耐心看完。由于 Selector 相關類在不同操作系統下的實現是不同的,加之個人對 Linux epoll 更為熟悉,所以本文所分析的源碼也是和 epoll 相關的。好了,進入正題吧。
?2.1 創建選擇器
選擇器 Selector 是一個抽象類,所以不能直接創建。Selector 提供了一個 open 方法,通過 open 方法既可以創建選擇器實例。示例代碼如下:
| 1 | Selector selector = Selector.open(); |
上面的代碼比較簡單,只有一行。不過不要被表象迷惑,這行代碼僅是完整實現的冰山一角,更復雜的邏輯則隱藏在水面之下。
在簡介一節,我已經說了 Java 選擇器是對底層多路復用接口的一個包裝,這里的 open 方法也不例外。假設我們的 Java 運行在 Linux 平臺下,那么 open 最終所做的事情應該是調用操作系統的epoll_create函數,用于創建 epoll 實例。真實情況是不是如此呢?答案就在冰山深處,接下來就讓我們一起去求索吧。下面我們將沿著 open 方法一路走下去,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | public abstract class Selector implements Closeable {public static Selector open() throws IOException {// 創建 SelectorProvider,再通過其 openSelector 方法創建 Selectorreturn SelectorProvider.provider().openSelector();}// 省略無關代碼 }public abstract class SelectorProvider {public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 創建默認的 SelectorProviderprovider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}} }public class DefaultSelectorProvider {private DefaultSelectorProvider() { }/*** 根據系統名稱創建相應的 SelectorProvider*/public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");// return new sun.nio.ch.PollSelectorProvider();}/*** 加載 SelectorProvider 類,并創建實例*/@SuppressWarnings("unchecked")private static SelectorProvider createProvider(String cn) {Class<SelectorProvider> c;try {c = (Class<SelectorProvider>)Class.forName(cn);} catch (ClassNotFoundException x) {throw new AssertionError(x);}try {return c.newInstance();} catch (IllegalAccessException | InstantiationException x) {throw new AssertionError(x);}} }/*** 創建完 SelectorProvider,接下來要調用 openSelector 方法* 創建 Selector 的繼承類了。*/ public class EPollSelectorProvider extends SelectorProviderImpl {public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);} }class EPollSelectorImpl extends SelectorImpl {EPollSelectorImpl(SelectorProvider sp) throws IOException {// 調用父類構造方法super(sp);long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 創建 EPollArrayWrapper,EPollArrayWrapper 是一個重要的實現pollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();} }public abstract class SelectorImpl extends AbstractSelector {protected SelectorImpl(SelectorProvider sp) {super(sp);keys = new HashSet<SelectionKey>();selectedKeys = new HashSet<SelectionKey>();/* 初始化 publicKeys 和 publicSelectedKeys,* publicKeys 即 selector.keys() 方法所返回的集合,* publicSelectedKeys 則是 selector.selectedKeys() 方法返回的集合*/if (Util.atBugLevel("1.4")) {publicKeys = keys;publicSelectedKeys = selectedKeys;} else {publicKeys = Collections.unmodifiableSet(keys);publicSelectedKeys = Util.ungrowableSet(selectedKeys);}} }/*** EPollArrayWrapper 一個重要的實現,這一層再往下就是 C 代碼了*/ class EPollArrayWrapper {EPollArrayWrapper() throws IOException {// 調用 epollCreate 方法創建 epoll 文件描述符epfd = epollCreate();// the epoll_event array passed to epoll_wait// 初始化 pollArray,該對象用于存儲就緒文件描述符和事件int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();}// epollCreate 方法是 native 類型的private native int epollCreate(); } |
以上代碼時 Java 層面的,Java 層調用棧最下面的類是 EPollArrayWrapper(源碼路徑可以在附錄中查找)。EPollArrayWrapper 是一個重要的實現,起著承上啟下的作用。上層是 Java 代碼,下層是 C 代碼。上層的代碼看完了,接下來看看冰山深處的 C 代碼:
| 1 2 3 4 5 6 7 8 9 10 | JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {// 調用 epoll_create 函數創建 epoll 實例,并返回文件描述符 epfdint epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd; } |
上面的代碼很簡單,僅做了創建 epoll 實例這一件事??吹竭@里,答案就明了了。最后在附一張時序圖幫助大家理清代碼調用順序,如下:
?2.2 選擇鍵
?2.2.1 幾種事件
選擇鍵 SelectionKey 包含4種事件,分別是:
| 1 2 3 4 | public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4; |
事件之間可以通過或運算進行組合,比如:
| 1 | int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE; |
?2.2.2 兩種事件集合:interestOps 和 readyOps
interestOps 即感興趣的事件集合,通道調用 register 方法注冊時會設置此值,interestOps 可通過 SelectionKey interestOps() 方法獲取。readyOps 是就緒事件集合,可通過 SelectionKey readyOps() 獲取。
interestOps 和 readyOps 被聲明在 SelectionKey 子類 SelectionKeyImpl 中,代碼如下:
| 1 2 3 4 | public class SelectionKeyImpl extends AbstractSelectionKey {private volatile int interestOps;private int readyOps; } |
接下來再來看看與 readyOps 事件集合相關的幾個方法,如下:
| 1 2 3 4 | selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable(); |
以上方法從字面意思上就可以知道有什么用,這里就不解釋了。接下來以 isReadable 方法為例,簡單看一下這個方法是如何實現。
| 1 2 3 | public final boolean isReadable() {return (readyOps() & OP_READ) != 0; } |
上面說到可以通過或運算組合事件,這里則是通過與運算來測試某個事件是否在事件集合中。比如
| 1 2 3 | readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101, readyOps & OP_READ = 0101 & 0001 = 0001, readyOps & OP_CONNECT = 0101 & 1000 = 0 |
readyOps & OP_READ != 0,所以 OP_READ 在事件集合中。readyOps & OP_CONNECT == 0,所以 OP_CONNECT 不在事件集合中。
?2.2.3 attach 方法
attach 是一個好用的方法,通過這個方法,可以將對象暫存在 SelectionKey 中,待需要的時候直接取出來即可。比如本文對應的練習代碼實現了一個簡單的 HTTP 服務器,在讀取用戶請求數據后(即 selectionKey.isReadable() 為 true),會去解析請求頭,然后將請求頭信息通過 attach 方法放入 selectionKey 中。待通道可寫后,再從 selectionKey 中取出請求頭,并根據請求頭回復客戶端不同的消息。當然,這只是一個應用場景,attach 可能還有其他的應用場景,比如標識通道。不過其他的場景我沒使用過,就不說了。attach 使用方式如下:
| 1 2 | selectionKey.attach(obj); Object attachedObj = selectionKey.attachment(); |
?2.3 通道注冊
通道注冊即將感興趣的事件告知 Selector,待事件發生時,Selector 即可返回就緒事件,我們就可以去做后續的事情了。比如 ServerSocketChannel 通道通常對 OP_ACCEPT 事件感興趣,那么我們就可以把這個事件注冊給 Selector。待事件發生,即服務端接受客戶端連接后,我們即可獲取這個就緒的事件并做相應的操作。通道注冊的示例代碼如下:
| 1 2 | channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); |
起初我以為通道注冊操作會調用操作系統的 epoll_ctl 函數,但最終通過看源碼,發現自己的理解是錯的。既然通道注冊階段不調用 epoll_ctl 函數。那么,epoll_ctl 什么時候才會被調用呢?如果不調用 epoll_ctl,那么注冊過程都干了什么事情呢?關于第一個問題,本節還無法解答,不過第二個問題則可以說說。接下來讓我們深入通道類 register 方法的調用棧中去探尋答案吧。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {return register(sel, ops, null);}public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; }public abstract class AbstractSelectableChannel extends SelectableChannel {private SelectionKey[] keys = null;public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {synchronized (regLock) {// 省去一些校驗代碼// 從 keys 數組中查找,查找條件為 k.selector() == selSelectionKey k = findKey(sel);// 如果 k 不為空,則修改 k 所感興趣的事件if (k != null) {k.interestOps(ops);k.attach(att);}// k 為空,則創建一個 SelectionKey,并存儲到 keys 數組中if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}} }public abstract class AbstractSelector extends Selector {protected abstract SelectionKey register(AbstractSelectableChannel ch,int ops, Object att); }public abstract class SelectorImpl extends AbstractSelector {protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {if (!(ch instanceof SelChImpl))throw new IllegalSelectorException();// 創建 SelectionKeyImpl 實例SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);k.attach(attachment);synchronized (publicKeys) {implRegister(k);}k.interestOps(ops);return k;} }class EPollSelectorImpl extends SelectorImpl {protected void implRegister(SelectionKeyImpl ski) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;int fd = Integer.valueOf(ch.getFDVal());// 存儲 fd 和 SelectionKeyImpl 的映射關系fdToKey.put(fd, ski);pollWrapper.add(fd);// 將 SelectionKeyImpl 實例存儲到 keys 中(這里的 keys 聲明在 SelectorImpl 類中),keys 集合可由 selector.keys() 方法獲取keys.add(ski);} }public class SelectionKeyImpl extends AbstractSelectionKey {public SelectionKey interestOps(int ops) {ensureValid();return nioInterestOps(ops);}public SelectionKey nioInterestOps(int ops) {if ((ops & ~channel().validOps()) != 0)throw new IllegalArgumentException();// 轉換并設置感興趣的事件channel.translateAndSetInterestOps(ops, this);// 設置 interestOps 變量interestOps = ops;return this;} }class SocketChannelImpl extends SocketChannel implements SelChImpl {public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {int newOps = 0;// 轉換事件if ((ops & SelectionKey.OP_READ) != 0)newOps |= PollArrayWrapper.POLLIN;if ((ops & SelectionKey.OP_WRITE) != 0)newOps |= PollArrayWrapper.POLLOUT;if ((ops & SelectionKey.OP_CONNECT) != 0)newOps |= PollArrayWrapper.POLLCONN;// 設置事件sk.selector.putEventOps(sk, newOps);} }class class EPollSelectorImpl extends SelectorImpl {public void putEventOps(SelectionKeyImpl ski, int ops) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;// 設置感興趣的事件pollWrapper.setInterest(ch.getFDVal(), ops);} }class EPollArrayWrapper {void setInterest(int fd, int mask) {synchronized (updateLock) {// 擴容 updateDescriptors 數組,并存儲文件描述符 fdint oldCapacity = updateDescriptors.length;if (updateCount == oldCapacity) {int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;int[] newDescriptors = new int[newCapacity];System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);updateDescriptors = newDescriptors;}updateDescriptors[updateCount++] = fd;// events are stored as bytes for efficiency reasonsbyte b = (byte)mask;assert (b == mask) && (b != KILLED);// 存儲事件setUpdateEvents(fd, b, false);}}private void setUpdateEvents(int fd, byte events, boolean force) {if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}} } |
到 setUpdateEvents 這個方法,整個調用棧就結束了。但是我們并未在調用棧中看到調用 epoll_ctl 函數的地方,也就是說,通道注冊時,并不會立即調用 epoll_ctl,而是先將事件集合 events 存放在 eventsLow。至于 epoll_ctl 函數何時調用的,需要大家繼續往下看了。
?2.4 選擇過程
?2.4.1 選擇方法
Selector 包含3種不同功能的選擇方法,分別如下:
- int select()
- int select(long timeout)
- int selectNow()
select() 是一個阻塞方法,僅在至少一個通道處于就緒狀態時才返回。
select(long timeout) 同樣也是阻塞方法,不過可對該方法設置超時時間(timeout > 0),使得線程不會被一直阻塞。如果 timeout = 0,會一直阻塞線程。
selectNow() 為非阻塞方法,調用后立即返回。
以上3個方法均返回 int 類型值,表示每次調用 select 或 selectNow 方法后,新就緒通道的數量。如果某個通道在上一次調用 select 方法時就已經處于就緒狀態,但并未將該通道對應的 SelectionKey 對象從 selectedKeys 集合中移除。假設另一個的通道在本次調用 select 期間處于就緒狀態,此時,select 返回1,而不是2。
?2.4.2 選擇過程
選擇方法用起來雖然簡單,但方法之下隱藏的邏輯還是比較復雜的。大致分為下面幾個步驟:
上面五個步驟對應于 EPollSelectorImpl 類中 doSelect 方法的邏輯,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();// 處理已取消鍵集合,對應步驟1processDeregisterQueue();try {begin();// select 方法的核心,對應步驟2和3pollWrapper.poll(timeout);} finally {end();}// 處理已取消鍵集合,對應步驟4processDeregisterQueue();// 更新 selectedKeys 集合,并返回就緒通道數量,對應步驟5int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated; } |
接下來,我們按照上面的步驟順序去分析代碼實現。先來看看步驟1對應的代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | +----SelectorImpl.java void processDeregisterQueue() throws IOException {// Precondition: Synchronized on this, keys, and selectedKeysSet<SelectionKey> cks = cancelledKeys();synchronized (cks) {if (!cks.isEmpty()) {Iterator<SelectionKey> i = cks.iterator();// 遍歷 cancelledKeys,執行注銷操作while (i.hasNext()) {SelectionKeyImpl ski = (SelectionKeyImpl)i.next();try {// 執行注銷邏輯implDereg(ski);} catch (SocketException se) {throw new IOException("Error deregistering key", se);} finally {i.remove();}}}} }+----EPollSelectorImpl.java protected void implDereg(SelectionKeyImpl ski) throws IOException {assert (ski.getIndex() >= 0);SelChImpl ch = ski.channel;int fd = ch.getFDVal();// 移除 fd 和選擇鍵鍵的映射關系fdToKey.remove(Integer.valueOf(fd));// 從 epoll 實例中刪除事件pollWrapper.remove(fd);ski.setIndex(-1);// 從 keys 和 selectedKeys 中移除選擇鍵keys.remove(ski);selectedKeys.remove(ski);// 注銷選擇鍵deregister((AbstractSelectionKey)ski);// 注銷通道SelectableChannel selch = ski.channel();if (!selch.isOpen() && !selch.isRegistered())((SelChImpl)selch).kill(); } |
上面的代碼代碼邏輯不是很復雜,首先是獲取 cancelledKeys 集合,然后遍歷集合,并對每個選擇鍵及其對應的通道執行注銷操作。接下來再來看看步驟2和3對應的代碼,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | +----EPollArrayWrapper.java int poll(long timeout) throws IOException {// 調用 epoll_ctl 函數注冊事件,對應步驟3updateRegistrations();// 調用 epoll_wait 函數等待事件發生,對應步驟4updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated; }/*** Update the pending registrations.*/ private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {// 獲取 fd 和 events,這兩個值在調用 register 方法時被存儲到數組中int fd = updateDescriptors[j];short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {// 確定 opcode 的值if (isRegistered) {opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 注冊事件epollCtl(epfd, opcode, fd, events);// 設置 fd 的注冊狀態if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}// 下面兩個均是 native 方法private native void epollCtl(int epfd, int opcode, int fd, int events);private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException; } |
看到 updateRegistrations 方法的實現,大家現在知道 epoll_ctl 這個函數是在哪里調用的了。在 3.2 節通道注冊的結尾給大家埋了一個疑問,這里就是答案了。注冊通道實際上只是先將事件收集起來,等調用 select 方法時,在一起通過 epoll_ctl 函數將事件注冊到 epoll 實例中。
上面 epollCtl 和 epollWait 方法是 native 類型的,接下來我們再來看看這兩個方法是如何實現的。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | +----EPollArrayWrapper.c JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {struct epoll_event event;int res;event.events = events;event.data.fd = fd;// 調用 epoll_ctl 注冊事件RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");} }JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) { /* Indefinite or no wait */// 調用 epoll_wait 等待事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else { /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res; } |
上面的C代碼沒什么復雜的邏輯,這里就不多說了。如果大家對 epoll_ctl 和 epoll_wait 函數不了解,可以參考 Linux man-page。關于 epoll 的示例,也可以參考我的另一篇文章“基于epoll實現簡單的web服務器”。
說完步驟2和3對應的代碼,接下來再來說說步驟4和5。由于步驟4和步驟1是一樣的,這里不再贅述。最后再來說說步驟5的邏輯。代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | +----EPollSelectorImpl.java private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {/* 從 pollWrapper 成員變量的 pollArray 中獲取文件描述符,* pollArray 中的數據由 epoll_wait 設置*/int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {// 從 pollArray 中獲取就緒事件集合int rOps = pollWrapper.getEventOps(i);/* 如果 selectedKeys 已包含選擇鍵,則選擇鍵必須由新的事件發生時,* 才會將 numKeysUpdated + 1*/ if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {// 轉換并設置就緒事件集合ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {// 更新 selectedKeys 集合,并將 numKeysUpdated + 1selectedKeys.add(ski);numKeysUpdated++;}}}}// 返回 numKeysUpdatedreturn numKeysUpdated; }+----SocketChannelImpl.java public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {int intOps = sk.nioInterestOps(); // Do this just once, it synchronizesint oldOps = sk.nioReadyOps();int newOps = initialOps;if ((ops & PollArrayWrapper.POLLNVAL) != 0) {return false;}if ((ops & (PollArrayWrapper.POLLERR| PollArrayWrapper.POLLHUP)) != 0) {newOps = intOps;sk.nioReadyOps(newOps);// No need to poll again in checkConnect,// the error will be detected therereadyToConnect = true;return (newOps & ~oldOps) != 0;}/* * 轉換事件*/if (((ops & PollArrayWrapper.POLLIN) != 0) &&((intOps & SelectionKey.OP_READ) != 0) &&(state == ST_CONNECTED))newOps |= SelectionKey.OP_READ;if (((ops & PollArrayWrapper.POLLCONN) != 0) &&((intOps & SelectionKey.OP_CONNECT) != 0) &&((state == ST_UNCONNECTED) || (state == ST_PENDING))) {newOps |= SelectionKey.OP_CONNECT;readyToConnect = true;}if (((ops & PollArrayWrapper.POLLOUT) != 0) &&((intOps & SelectionKey.OP_WRITE) != 0) &&(state == ST_CONNECTED))newOps |= SelectionKey.OP_WRITE;// 設置事件sk.nioReadyOps(newOps);// 如果新的就緒事件和老的就緒事件不相同,則返回true,否則返回 falsereturn (newOps & ~oldOps) != 0; } |
上面就是步驟5的邏輯了,簡單總結一下。首先是獲取就緒通道數量,然后再獲取這些就緒通道對應的文件描述符 fd,以及就緒事件集合 rOps。之后調用 translateAndSetReadyOps 轉換并設置就緒事件集合。最后,將選擇鍵添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之后返回該值。
以上就是選擇過程的代碼講解,貼了不少代碼,可能不太好理解。Java NIO 和操作系統接口關聯比較大,所以在學習 NIO 相關原理時,也應該去了解諸如 epoll 等系統調用的知識。沒有這些背景知識,很多東西看起來不太好懂。好了,本節到此結束。
?2.5 模板代碼
使用 NIO 選擇器編程時,主干代碼的結構一般比較固定。所以把主干代碼寫好后,就可以往里填業務代碼了。下面貼一個服務端的模板代碼,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("localhost", 8080)); ssc.configureBlocking(false);Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {int readyNum = selector.select();if (readyNum == 0) {continue;}Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();while(it.hasNext()) {SelectionKey key = it.next();if(key.isAcceptable()) {// 接受連接} else if (key.isReadable()) {// 通道可讀} else if (key.isWritable()) {// 通道可寫}it.remove();} } |
?2.6 實例演示
原本打算將示例演示的代碼放在本節中展示,奈何文章篇幅已經很大了,所以決定把本節的內容獨立成文。在下一篇文章中,我將會演示使用 Java NIO 完成一個簡單的 HTTP 服務器。這里先貼張效果圖,如下:
?3.總結
到這里,本文差不多就要結束了。原本只是打算簡單說說 Selector 的用法,然后再寫一份實例代碼。但是后來發現這樣寫顯得比較空洞,沒什么深度。所以后來翻了一下 Selector 的源碼,大致理解了 Selector 的邏輯,然后就有了上面的分析。不過 Selector 的邏輯并不止我上面所說的那些,還有一些內容我現在還沒看,所以就沒有講。對于已寫出來的分析,由于我個人水平有限,難免會有錯誤。如果有錯誤,也歡迎大家指出來,共同進步!
好了,本文到此結束,感謝大家的閱讀。
?參考
- Java NIO Selector - jenkov.com
- Java NIO(6): Selector - 知乎
?附錄
文中貼的一些代碼是沒有包含在 JDK src.zip 包里的,這里單獨列舉出來,方便大家查找。
| DefaultSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java |
| EPollSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java |
| SelectorImpl.java | jdk/src/share/classes/sun/nio/ch/SelectorImpl.java |
| EPollSelectorImpl.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java |
| EPollArrayWrapper.java | jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java |
| SelectionKeyImpl.java | jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java |
| SocketChannelImpl.java | jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java |
| EPollArrayWrapper.c | jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c |
- 本文鏈接:?https://www.tianxiaobo.com/2018/04/03/Java-NIO之選擇器/
from:?http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/?
總結
以上是生活随笔為你收集整理的Java NIO之选择器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java NIO之套接字通道
- 下一篇: 基于 Java NIO 实现简单的 HT