ServerBootstrap的启动流程
一、ServerBootstrap的啟動示例代碼?
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture = bootstrap.bind(port).sync();log.debug(" server is closing");closeFuture.channel().closeFuture().sync();log.debug(" server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}二、初始化流程
1.ServerBootstrap和bootStrap繼承于AbstractBootStrap,都使用模板方式。AbstractBootStrap類有幾個比較重要的成員變量。
group:ServerSocketChannel的EventLoopGroup,即reactor-accept的BOSS事件循環(huán)線程池組。
channelFactory:創(chuàng)建channel的工廠類,如創(chuàng)建ServerSocketChannel,SocketChannel,
handler:reactor-accept的事件處理類。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private volatile ChannelHandler handler; public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;? 2.設(shè)置參數(shù)流程。
bootstrap.group(bossEventLoopGroup,workEventLoopGroup) bossEventLoopGroup設(shè)置AbstractBootStrap的group,workEventLoopGroup設(shè)置為ServerBootstrap的childGroup,
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;} channel(NioServerSocketChannel.class) 生成一個根據(jù)指定SocketChannel類的反射CHANNEL工廠類,就是根據(jù)傳入的類反射生成對象。這是設(shè)置AbstractBootStrap的類的channelFactory public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));} .handler(new LoggingHandler(LogLevel.INFO)) 這是設(shè)置AbstractBootStrap的類的handler public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();} .childHandler( new PojoServerIntitlizer());這是設(shè)置ServerBootstrap子類的childHandler public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}三、綁定流程
1.ChannelFuture closeFuture = bootstrap.bind(port).sync();這為起點,會調(diào)用AbstractBootStrap的doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;}2.這里面首先調(diào)用initAndRegister,會先創(chuàng)建ServerSocketChannel,并且將channel注冊到NioEventLoop的selector中,也就是前文的channel.register(selector,0,eventLoop)
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);return regFuture;}3.newChannel就是通過前面生成的ReflectChannelFactory調(diào)用反射生成一個NioServerSocketChannel對象,這里看一下類的繼承圖,
?AbstractChannel 有兩個成員變量unsafe和pipline,unsafe為操作底層網(wǎng)絡(luò)IO的接口。pipline也就是我們的管道事件流。也就是說每個channel會自帶一個pipline
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}?3.創(chuàng)建完channel后,我們來看下初始化流程,這個會調(diào)用到子類(ServerBootStrap)的init方法,
這個會做兩個事件,
? ?(1).設(shè)置channel的連接選項和屬性。
? ?(2).添加新的handler
這里會為ServerSocketChannel的管道中新增一個handler,用來處理accept-reactor的IO流讀取事件,也就是新連接事件。ServerBootstrapAcceptor這個類就是來處理新連接,并注冊accept的socketChannel并注冊到childGroup的work事件循環(huán)以及內(nèi)部eventLoop的selector中,并且設(shè)置子socketChannel的handler為childHandler.這個我們留在接收新連接的流程中講解。
void init(Channel channel) {setChannelOptions(channel, this.newOptionsArray(), logger);setAttributes(channel, (Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}4.現(xiàn)在回到第2步,初始化完成了,現(xiàn)在開始將serverSocketChannel注冊到bossEventLoop的selector中。ChannelFuture regFuture = config().group().register(channel);這個會調(diào)用MultithreadEventLoopGroup.register,也就是從事件循環(huán)組的子eventLoop中取下一個NioEventLoop進(jìn)行注冊分配。也就是work-reactor的channel注冊機(jī)制。
@Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}?5.我們的NioEventLoop為SingleThreadEventLoop,這個會將channel,當(dāng)前的eventLoop對象封裝成一個promise,進(jìn)行注冊。
public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); }6.接著找到 ServerSocketChannel的內(nèi)部的unsafe對象進(jìn)行注冊。這個unsafe就是NioMessageUnsafe
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();} }7.由于NioMessageUnsafe繼承于AbstractUnsafe,所以調(diào)用此類的register,這個方法就是首先調(diào)用Channel.register,然后觸發(fā)管道的handlerAdd,channelRegister,channelActive方法。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} }private void register0(ChannelPromise promise) {try {boolean firstRegistration = this.neverRegistered;AbstractChannel.this.doRegister();this.neverRegistered = false;AbstractChannel.this.registered = true;AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();this.safeSetSuccess(promise);AbstractChannel.this.pipeline.fireChannelRegistered();if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}}}}8.AbstractChannel.this.doRegister();這個就是把ServerSocketChannel注冊到NioEventLoop的selector中去。這時還沒有監(jiān)聽事件。
AbstractNioChannelprotected void doRegister() throws Exception {boolean selected = false;while(true) {try {this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} }}生成的selectKey如下:?
9.invokeHandlerAddedIfNeeded會調(diào)用到父類CHANNEL管道的channelHandler,也就是在我們初始化時第三步加入的handler
?在這里才會真正觸發(fā)handler的initChannel方法,生成第三步的ServerBootstrapAcceptor,初始化處理新的子連接的channelReader的處理器。
?10.現(xiàn)在初始化和注冊完成了,我們再回到第一步,進(jìn)行doBind0(regFuture, channel, localAddress, promise)
AbstractBootstrap private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}11.channel.bind,會調(diào)用內(nèi)部管道的bind,然后到tail的BIND,再到AbstractChannelHandlerContext的bind,因為tail就是繼承于AbstractChannelHandlerContext,這里面的BIND就是從尾部向前找,找一個帶有bind標(biāo)簽的HANDLER進(jìn)行處理。最終找到了header.ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x159b9902])
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();next.invokeBind(localAddress, promise);return promise;}12.最終到了head的BIND,這里面調(diào)用了unsafe本地IO類的bind.
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}13.最終調(diào)用到AbstractChannel的內(nèi)部類AbstractUnsafe的bind,
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}?14.接著調(diào)用到了AbstractUnsafe的外部類的繼承類的doBind方法,也就是NioServerSOcketChannel.doBind,這個就是調(diào)用原生的ServeSocketChannel進(jìn)行bind.
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}15.綁定成功后會調(diào)用pipeline.fireChannelActive();這個是從頭節(jié)點一直向后傳遞事件。在頭節(jié)點的
channelActive方法中會觸發(fā)readIfIsAutoRead @Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}?16.readIfIsAutoRead就是channel如果開啟了自動讀,則開始設(shè)置讀關(guān)注事件到eventLoop的selector中。這個讀會從管道的讀-》tail的讀->一直向前找支持讀的handler進(jìn)行處理。這個就是找到了head的handler
private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {public void read(ChannelHandlerContext ctx) {unsafe.beginRead();}?17.headHandler的beginRead就調(diào)用了unsafe.beginRead方法,這個會調(diào)用外部channel.
doBeginRead--》AbstractNioChannel AbstractNioChannel protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}這個就是將讀事件加到channel的selectKey的關(guān)注事件中。
?
?四、接收新連接流程
1.我們回到上一節(jié)的3,9步,為父類NioServerSocketChannel的管道中新增了一個handler,
ServerBootstrapAcceptor,用來處理父類的接收數(shù)據(jù),也就是新連接請求。當(dāng)有新連接進(jìn)來時,會觸發(fā)到ServerBootstrapAcceptor.channelRead2.ServerBootstrapAcceptor.channelRead 方法是怎么被調(diào)用的呢? 其實當(dāng)一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會調(diào)用到 NioServerSocketChannel.doReadMessages:
NioServerSocketChannel protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}在 doReadMessages 中, 通過 javaChannel().accept() 獲取到客戶端新連接的 SocketChannel, 接著就實例化一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創(chuàng)建的這個 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .
?3.ServerBootstrapAcceptor.channelRead方法,這個就是把子channel的管道添加子handler,并且把子channel注冊到子group的eventLoop的selector中。流程跟父類channel注冊一樣。
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child)}?4.還有個讀完成事件,里面跟上一節(jié)的第16步一樣,都是將當(dāng)前channel的selectKey中加入讀關(guān)注事件,以便接收讀數(shù)據(jù)。
@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();readIfIsAutoRead();}?當(dāng)然這個事件會反復(fù)調(diào)用,然后判斷加過讀事件,就不重復(fù)加了。
總結(jié)
以上是生活随笔為你收集整理的ServerBootstrap的启动流程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nio的epoll和selector实现
- 下一篇: NioEventLoop加载流程分析