Java NIO——Selector机制源码分析---转
一直不明白pipe是如何喚醒selector的,所以又去看了jdk的源碼(openjdk下載),整理了如下:
以Java nio自帶demo : OperationServer.java???OperationClient.java(見附件)
其中server端的核心代碼:
public void initSelector() {try {selector = SelectorProvider.provider().openSelector();this.serverChannel1 = ServerSocketChannel.open();serverChannel1.configureBlocking(false);InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);serverChannel1.socket().bind(isa);serverChannel1.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {// TODO Auto-generated catch block e.printStackTrace();} }從頭開始,
先看看SelectorProvider.provider()做了什么:
public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}}其中provider?= sun.nio.ch.DefaultSelectorProvider.create();會根據操作系統來返回不同的實現類,windows平臺就返回WindowsSelectorProvider;
而if?(provider?!=?null)?return?provider;
保證了整個server程序中只有一個WindowsSelectorProvider對象;
再看看WindowsSelectorProvider.?openSelector():
public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);} new WindowsSelectorImpl(SelectorProvider)代碼: WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper = new PollArrayWrapper(INIT_CAP);wakeupPipe = Pipe.open();wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediateSinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();(sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd = ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceFd, 0);}其中Pipe.open()是關鍵,這個方法的調用過程是:
Java代碼?? public static Pipe open() throws IOException {return SelectorProvider.provider().openPipe(); } SelectorProvider 中: public Pipe openPipe() throws IOException {return new PipeImpl(this); }?
再看看怎么new PipeImpl()的:
Java代碼 PipeImpl(SelectorProvider sp) {long pipeFds = IOUtil.makePipe(true);int readFd = (int) (pipeFds >>> 32);int writeFd = (int) pipeFds;FileDescriptor sourcefd = new FileDescriptor();IOUtil.setfdVal(sourcefd, readFd);source = new SourceChannelImpl(sp, sourcefd);FileDescriptor sinkfd = new FileDescriptor();IOUtil.setfdVal(sinkfd, writeFd);sink = new SinkChannelImpl(sp, sinkfd);}?
?
其中IOUtil.makePipe(true)是個native方法:
/**
?????* Returns two file descriptors for a pipe encoded in a long.
?????* The read end of the pipe is returned in the high 32 bits,
?????* while the write end is returned in the low 32 bits.
?????*/
staticnativelong?makePipe(boolean?blocking);
具體實現:
?
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) {int fd[2];if (pipe(fd) < 0) {JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");return 0;}if (blocking == JNI_FALSE) {if ((configureBlocking(fd[0], JNI_FALSE) < 0)|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");close(fd[0]);close(fd[1]);return 0;}}return ((jlong) fd[0] << 32) | (jlong) fd[1]; } static int configureBlocking(int fd, jboolean blocking) {int flags = fcntl(fd, F_GETFL);int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }?
正如這段注釋:
/**
?????* Returns two file descriptors for a pipe encoded in a long.
?????* The read end of the pipe is returned in the high 32 bits,
?????* while the write end is returned in the low 32 bits.
?????*/
High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位處理。
?
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
這行代碼把返回的pipe的write端的FD放在了pollWrapper中(后面會發現,這么做是為了實現selector的wakeup())
?
ServerSocketChannel.open()的實現:
public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel(); } SelectorProvider: public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this); }?
可見創建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);this.fd = Net.serverSocket(true); //打開一個socket,返回FDthis.fdVal = IOUtil.fdVal(fd);this.state = ST_INUSE; }?
然后通過serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel綁定在一起,也就是把new ServerSocketChannel時創建的FD與selector綁定在了一起。
到此,server端已啟動完成了,主要創建了以下對象:
WindowsSelectorProvider:單例
WindowsSelectorImpl中包含:
????pollWrapper:保存selector上注冊的FD,包括pipe的write端FD和ServerSocketChannel所用的FD
????wakeupPipe:通道(其實就是兩個FD,一個read,一個write)
?
再到Server?中的run():
selector.select();主要調用了WindowsSelectorImpl中的這個方法:
?
protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeout processDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// Calculate number of helper threads needed for poll. If necessary// threads are created here and start waiting on startLock adjustThreadsCount();finishLock.reset(); // reset finishLock// Wakeup helper threads, waiting on startLock, so they start polling.// Redundant threads will exit here after wakeup. startLock.startThreads();// do polling in the main thread. Main thread is responsible for// first MAX_SELECTABLE_FDS entries in pollArray.try {begin();try {subSelector.poll();} catch (IOException e) {finishLock.setException(e); // Save this exception }// Main thread is out of poll(). Wakeup others and wait for themif (threads.size() > 0)finishLock.waitForHelperThreads();} finally {end();}// Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException();processDeregisterQueue();int updated = updateSelectedKeys();// Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket();return updated;}?
?其中subSelector.poll()是核心,也就是輪訓pollWrapper中保存的FD;具體實現是調用native方法poll0:
private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout);} private native int poll0(long pollAddress, int numfds,int[] readFds, int[] writeFds, int[] exceptFds, long timeout); // These arrays will hold result of native select().// The first element of each array is the number of selected sockets.// Other elements are file descriptors of selected sockets.private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發生read的FDprivate final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生write的FDprivate final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生except的FD?
這個poll0()會監聽pollWrapper中的FD有沒有數據進出,這會造成IO阻塞,直到有數據讀寫事件發生。比如,由于pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket發一份數據到ServerSocket,那么poll0()就會返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD發一份數據,也會造成poll0()返回;如果這兩種情況都沒有發生,那么poll0()就一直阻塞,也就是selector.select()會一直阻塞;如果有任何一種情況發生,那么selector.select()就會返回,所有在OperationServer的run()里要用while?(true) {,這樣就可以保證在selector接收到數據并處理完后繼續監聽poll();
這時再來看看WindowsSelectorImpl.?Wakeup():
?
public Selector wakeup() {synchronized (interruptLock) {if (!interruptTriggered) {setWakeupSocket();interruptTriggered = true;}}return this;} // Sets Windows wakeup socket to a signaled state.private void setWakeupSocket() {setWakeupSocket0(wakeupSinkFd);} private native void setWakeupSocket0(int wakeupSinkFd); JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,jint scoutFd) {/* Write one byte into the pipe */const char byte = 1;send(scoutFd, &byte, 1, 0); }可見wakeup()是通過pipe的write?端send(scoutFd, &byte, 1, 0),發生一個字節1,來喚醒poll()。所以在需要的時候就可以調用selector.wakeup()來喚醒selector。
原文:http://goon.iteye.com/blog/1775421
?
補充linux操作系統下的DefaultSelectorProvider的實現,可以看到,如果內核版本>=2.6則,具體的SelectorProvider為EPollSelectorProvider,否則為默認的PollSelectorProvider
//sun.nio.ch.DefaultSelectorProviderpublic static SelectorProvider create() { PrivilegedAction pa = new GetPropertyAction("os.name"); String osname = (String) AccessController.doPrivileged(pa);if ("SunOS".equals(osname)) {return new sun.nio.ch.DevPollSelectorProvider();}// use EPollSelectorProvider for Linux kernels >= 2.6if ("Linux".equals(osname)) {pa = new GetPropertyAction("os.version");String osversion = (String) AccessController.doPrivileged(pa);String[] vers = osversion.split("\\.", 0);if (vers.length >= 2) {try {int major = Integer.parseInt(vers[0]);int minor = Integer.parseInt(vers[1]);if (major > 2 || (major == 2 && minor >= 6)) {return new sun.nio.ch.EPollSelectorProvider();}} catch (NumberFormatException x) {// format not recognized }}}return new sun.nio.ch.PollSelectorProvider(); }?
轉載于:https://www.cnblogs.com/davidwang456/p/3831617.html
總結
以上是生活随笔為你收集整理的Java NIO——Selector机制源码分析---转的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java NIO类库Selector机制
- 下一篇: Spring 事务管理高级应用难点剖析-