源码分析netty服务器创建过程vs java nio服务器创建
1.Java NIO服務端創建
首先,我們通過一個時序圖來看下如何創建一個NIO服務端并啟動監聽,接收多個客戶端的連接,進行消息的異步讀寫。
?
示例代碼(參考文獻【2】):
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set;/*** User: mihasya* Date: Jul 25, 2010* Time: 9:09:03 AM*/ public class JServer {public static void main (String[] args) {ServerSocketChannel sch = null;Selector sel = null;try {// setup the socket we're listening for connections on.InetSocketAddress addr = new InetSocketAddress(8400);sch = ServerSocketChannel.open();sch.configureBlocking(false);sch.socket().bind(addr);// setup our selector and register the main socket on it sel = Selector.open();sch.register(sel, SelectionKey.OP_ACCEPT);} catch (IOException e) {System.out.println("Couldn't setup server socket");System.out.println(e.getMessage());System.exit(1);}// fire up the listener thread, pass it our selectorListenerThread listener = new ListenerThread(sel);listener.run();}/** the thread is completely unnecessary, it could all just happen* in main()*/ class ListenerThread extends Thread {Selector sel = null;ListenerThread(Selector sel) {this.sel = sel;}public void run() {while (true) {// our canned response for nowByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());try {// loop over all the sockets that are ready for some activitywhile (this.sel.select() > 0) {Set keys = this.sel.selectedKeys();Iterator i = keys.iterator();while (i.hasNext()) {SelectionKey key = (SelectionKey)i.next();if (key.isAcceptable()) {// this means that a new client has hit the port our main// socket is listening on, so we need to accept the connection// and add the new client socket to our select pool for reading// a command laterSystem.out.println("Accepting connection!");// this will be the ServerSocketChannel we initially registered// with the selector in main()ServerSocketChannel sch = (ServerSocketChannel)key.channel();SocketChannel ch = sch.accept();ch.configureBlocking(false);ch.register(this.sel, SelectionKey.OP_READ);} else if (key.isReadable()) {// one of our client sockets has received a command and// we're now ready to read it inSystem.out.println("Accepting command!"); SocketChannel ch = (SocketChannel)key.channel();ByteBuffer buf = ByteBuffer.allocate(200);ch.read(buf);buf.flip();Charset charset = Charset.forName("UTF-8");CharsetDecoder decoder = charset.newDecoder();CharBuffer cbuf = decoder.decode(buf);System.out.print(cbuf.toString());// re-register this socket with the selector, this time// for writing since we'll want to write something to it// on the next go-aroundch.register(this.sel, SelectionKey.OP_WRITE);} else if (key.isWritable()) {// we are ready to send a response to one of the client sockets// we had read a command from previouslySystem.out.println("Sending response!");SocketChannel ch = (SocketChannel)key.channel();ch.write(resp);resp.rewind();// we may get another command from this guy, so prepare// to read again. We could also close the channel, but// that sort of defeats the whole purpose of doing asyncch.register(this.sel, SelectionKey.OP_READ);}i.remove();}}} catch (IOException e) {System.out.println("Error in poll loop");System.out.println(e.getMessage());System.exit(1);}}} } }從上面的代碼可以看出java nio的通用步驟:
1.打開ServerSocketChannel,用于監聽客戶端的連接,它是所有客戶端連接的父通道,綁定監聽端口,設置客戶端連接方式為非阻塞模式。
2.打開多路復用器并啟動服務端監聽線程,將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監聽ACCEPT狀態。
3.多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手后,與客戶端建立物理鏈路。
?
2. Netty服務端創建
?2.1 打開ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)創建一個ServerSocketChannel的過程:
/*** Create a new instance*/public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}調用newSocket方法:
private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.*/return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}其中的provider.openServerSocketChannel()就是java nio的實現。設置非阻塞模式包含在父類中:
/*** Create a new instance** @param parent the parent {@link Channel} by which this instance was created. May be {@code null}* @param ch the underlying {@link SelectableChannel} on which it operates* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}*/protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}}?
?
2.2?打開多路復用器過程
NioEventLoop負責調度和執行Selector輪詢操作,選擇準備就緒的Channel集合,相關代碼如下:
@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}2.2.1 綁定處理的key
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}以processSelectedKeysPlain為例:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}}?
?2.3 綁定端口,接收請求:
// Bind and start to accept incoming connections.ChannelFuture f = b.bind(PORT).sync();調用bind程序
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}最終的綁定由dobind0來完成
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new OneTimeTask() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}具體實現:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {// close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}?
參考文獻
【1】http://www.infoq.com/cn/articles/netty-server-create
【2】https://github.com/mihasya/sample-java-nio-server/blob/master/src/JServer.java
轉載于:https://www.cnblogs.com/davidwang456/p/5050607.html
總結
以上是生活随笔為你收集整理的源码分析netty服务器创建过程vs java nio服务器创建的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring与memcache的整合
- 下一篇: windows下spark开发环境配置