NioEventLoop加载流程分析
一、我們首先看NioEventLoopGroup創(chuàng)建和初始化過程。
EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("clientThread", false));1.看下類繼承圖
?2.MultithreadEventLoopGroup父類實(shí)現(xiàn)了channel的接口注冊(cè),next取下一個(gè)可用的NioEventLoop
@Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overrideprotected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}3.真正的NioEventLoop數(shù)組保存在MultithreadEventExecutorGroup父類中的children字段。
private final EventExecutor[] children;這里會(huì)創(chuàng)建具體的事件執(zhí)行器(NioEventLoop),使用executor線程池來進(jìn)行創(chuàng)建。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {}}chooser = chooserFactory.newChooser(children);}?4.具體的nweChild方法就是創(chuàng)建事件執(zhí)行器(NioEventLoop),然后執(zhí)行到NioEventLoopGroup的newChild方法,去新建NioEventLoop實(shí)例。這里的selectorProvider就是SelectorProvider.provider(),
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}NioEventLoopGroup(其實(shí)是MultithreadEventExecutorGroup) 內(nèi)部維護(hù)一個(gè)類型為 EventExecutor children 數(shù)組, 其大小是 nThreads, 這樣就構(gòu)成了一個(gè)線程池
如果我們?cè)趯?shí)例化 NioEventLoopGroup 時(shí), 如果指定線程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數(shù) * 2
MultithreadEventExecutorGroup 中會(huì)調(diào)用 newChild 抽象方法來初始化 children 數(shù)組。抽象方法 newChild 是在 NioEventLoopGroup 中實(shí)現(xiàn)的, 它返回一個(gè) NioEventLoop 實(shí)例.
二、我們?nèi)缓髞砜碞ioEventLoop創(chuàng)建和初始化過程。
1.先看類圖
?2.SingleThreadEventExecutor父類為核心,負(fù)責(zé)了任務(wù)隊(duì)列消費(fèi)生產(chǎn)和線程生成的工作。這里面的executor就是線程池,負(fù)責(zé)創(chuàng)建線程,和運(yùn)行任務(wù)。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");private final Queue<Runnable> taskQueue;private volatile Thread thread;private final Executor executor;protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, Queue<Runnable> taskQueue,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;this.executor = ThreadExecutorMap.apply(executor, this);this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}3.NioEventLoop主要負(fù)責(zé)selector的模塊管理和事件監(jiān)控。
public final class NioEventLoop extends SingleThreadEventLoop {/*** The NIO {@link Selector}.*/private Selector selector;private Selector unwrappedSelector;private SelectedSelectionKeySet selectedKeys;private final SelectorProvider provider;private final SelectStrategy selectStrategy;private volatile int ioRatio = 50;NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;}4.? final SelectorTuple selectorTuple = openSelector(); 這里創(chuàng)建一個(gè)自定義的selectorTuple,里面包含一個(gè)原始的selector和一個(gè)優(yōu)化過的selector,優(yōu)化過的selector主要是替換了selectorImpl中的
selectedKeys和publicSelectedKeys為SelectedSelectionKeySet(采用運(yùn)態(tài)數(shù)組的方式進(jìn)行selectKey存儲(chǔ)),而JDK自帶的SelectorImpl中的selectKeys則是使用hashSet(其實(shí)是一個(gè)hashmap的實(shí)現(xiàn),會(huì)多保存一個(gè)value),所以優(yōu)化過的,會(huì)節(jié)省內(nèi)存,提高性能。 provider.openSelector就是創(chuàng)建原生的JDK的selector,IO多路復(fù)用器。 private SelectorTuple openSelector() {final Selector unwrappedSelector;try {unwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {return Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader());} catch (Throwable cause) {return cause;}}});final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause != null) {return cause;}cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);return null;} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}});selectedKeys = selectedKeySet;logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));}?優(yōu)化前
優(yōu)化后
?三、channel注冊(cè)到NioEventLoop的流程(在注冊(cè)時(shí)去啟動(dòng)事件循環(huán)線程)
1.這個(gè)其實(shí)是在BootStrap的初始化過程注冊(cè)的,在前面講述bootStrap啟動(dòng)流程也大致講過。
this.channel = bootstrap.connect(host, port).sync().channel();這個(gè)會(huì)調(diào)用到abstractBootStrap的initAndRegister方法
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}2.這里會(huì)調(diào)用NioEventLoopGroup的register,這個(gè)會(huì)調(diào)用next根據(jù)roundrobin輪詢方式取下一個(gè)NioEventLoop來注冊(cè)。
public ChannelFuture register(Channel channel) {return next().register(channel); }這里面的EventExecutor就跟NioEventLoopGroup中的EventExecutor一樣。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicLong idx = new AtomicLong();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];}}?3.NioEventLoop的register在SingleThreadEventLoop中,內(nèi)部會(huì)調(diào)用NioSocketChannel中的NioSafe底層IO對(duì)象的register方法。
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}4.在NioSocketChannel的基類AbstractChannel中的AbstractUnsafe中register方法,首先會(huì)將當(dāng)前的eventLoop綁定到當(dāng)前channel的eventLoop對(duì)象。然后去事件循環(huán)中進(jìn)行注冊(cè)。
AbstractChannel.this.eventLoop = eventLoop; public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {}}}5.然后在SingleThreadEventExecutor的execute方法中,會(huì)將注冊(cè)任務(wù)加入隊(duì)列,并且運(yùn)行線程,啟動(dòng)線程循環(huán)。
private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}6.啟動(dòng)線程,使用線程狀態(tài)判斷采用了cas原子操作,保證多線程調(diào)用此方法時(shí),也會(huì)啟動(dòng)一次。
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}7.接著在doStartThread中調(diào)用開始設(shè)置的線程池執(zhí)行器進(jìn)行了真正的線程創(chuàng)建和運(yùn)行,這個(gè)會(huì)調(diào)用當(dāng)前類對(duì)象的run方法,這在SingleThreadEventExecutor為一個(gè)抽象方法,使用了模板設(shè)計(jì)方式,真正實(shí)現(xiàn)的是NioEventLoop。
private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {}});}8.NioEventLoop的run方法,執(zhí)行了IO事件處理和其它業(yè)務(wù)任務(wù)(注冊(cè)channel,連接channel,綁定channel)或者定時(shí)任務(wù),有一個(gè)ioRadio配置了IO循環(huán)和業(yè)務(wù)事件處理時(shí)間的百分比。
protected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {rebuildSelector0();selectCnt = 0;continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); }} catch (CancelledKeyException e) {} catch (Throwable t) {handleLoopException(t);}}}9.這里根據(jù)定時(shí)任務(wù)下一次運(yùn)行的時(shí)間,預(yù)留500MS,算出select最多超時(shí)的時(shí)間,進(jìn)行selector的select調(diào)用。
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}// Timeout will only be 0 if deadline is within 5 microsecslong timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}?10.selector返回的strategy大于0 ,即是有IO事件,那么就要進(jìn)行IO事件處理。那么 就要先調(diào)用
processSelectedKeys來進(jìn)行IO事件處理,處理完后,根據(jù)ioRadio算出處理其它業(yè)務(wù)隊(duì)列的運(yùn)行時(shí)間,調(diào)用runAllTasks(long timeoutNanos)進(jìn)行任務(wù)隊(duì)列執(zhí)行。 if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}四、IO事件處理流程
1.NioEventLoop的processSelectedKeys處理代碼
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}2.由于我們的selectKey是已經(jīng)被替換優(yōu)化過了,所以會(huì)觸發(fā)processSelectedKeysOptimized,NioEventLoop的selector中的selectedKeys就是被NioEventLoop的selectedKeys替換掉了,所以直接遍歷這個(gè)事件列表,逐個(gè)進(jìn)行IO事件處理。其中selectKey的附加對(duì)象就是NioSocketChannel.
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}}}3.NioEventLoop的processSelectedKey會(huì)先獲取ready的IO事件。
如果是客戶端連接事件,首先將連接關(guān)注事件取消,否則會(huì)重復(fù)觸發(fā)事件,并且調(diào)用?unsafe.finishConnect。
如果是可寫事件,則flush寫緩沖區(qū),釋放內(nèi)存。
如果是可讀或者接收新客戶端事件,則進(jìn)行unsafe.read讀處理。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}4.連接事件,首先在NioSocketChannel調(diào)用socketUtils.connect后,會(huì)設(shè)置關(guān)注op_connect事件。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {doBind0(localAddress);}boolean success = false;try {boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}?5.連接成功后,在NioEventLoop中的IO事件循環(huán)中就會(huì)收到op_connect事件。在
processSelectedKeysOptimized我們來進(jìn)行op_connect事件處理,可以看到readyOps 已經(jīng)變?yōu)?了。即為op_connect?6.接著觸發(fā)了processSelectedKey,這里會(huì)取消關(guān)注連接事件,然后調(diào)用unsafe.finishConnect()通知連接成功。
?7.finishConnect:335, AbstractNioChannel$AbstractNioUnsafe為調(diào)用內(nèi)部類所在外部類的pipeline().fireChannelActive(),最終會(huì)觸發(fā)到所有handler的
public void channelActive(ChannelHandlerContext ctx),這個(gè)事件回調(diào)方法。 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {boolean active = isActive();// trySuccess() will return false if a user cancelled the connection attempt.boolean promiseSet = promise.trySuccess();// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,// because what happened is what happened.if (!wasActive && active) {pipeline().fireChannelActive();}}?8.看完連接事件處理,再來看讀數(shù)據(jù)事件。收到讀數(shù)據(jù)時(shí),readyOps為1,即準(zhǔn)備事件為op_read?, 這時(shí)會(huì)調(diào)用unsafe.read讀數(shù)據(jù)。
9.AbstractNioByteChannel$NioByteUnsafe.read會(huì)被調(diào)用,這時(shí)就會(huì)讀取數(shù)據(jù),然后觸發(fā)管道的
fireChannelRead事件。 public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} }10.將網(wǎng)絡(luò)SOCKET數(shù)據(jù)讀取到byteBuffer的流程。
?
io.netty.buffer.PooledByteBufpublic final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {try {return in.read(this.internalNioBuffer(index, length));} catch (ClosedChannelException var5) {return -1;}}11.然后就會(huì)觸發(fā)pipeline.fireChannelRead(byteBuf),把接收到的數(shù)據(jù)通過管道觸發(fā)到后端的handler中。這里有一點(diǎn)注意的是,會(huì)循環(huán)讀取數(shù)據(jù),然后循環(huán)調(diào)用fireChannelRead,當(dāng)沒有數(shù)據(jù)可讀時(shí),觸發(fā)fireChannelReadComplete進(jìn)行讀完成回調(diào)。
?
?
總結(jié)
以上是生活随笔為你收集整理的NioEventLoop加载流程分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ServerBootstrap的启动流程
- 下一篇: ThreadLocal的原理和FastT