netty4.0.x源码分析—bootstrap
Bootstrap的意思就是引導,輔助的意思,在編寫服務端或客戶端程序時,我們都需要先new一個bootstrap,然后基于這個bootstrap調用函數,添加eventloop和handler,可見對bootstrap進行分析還是有必要的。
1、bootstrap結構圖
bootstrap的結構比較簡單,涉及的類和接口很少,如下圖所示,其中Bootstrap則是客戶端程序用的引導類,ServerBootstrap是服務端程序用的引導類。
2、serverbootstrap分析
這部分,專門對serverbootstrap進行分析,bootstrap過程大同小異就不作詳細的分析了。下面是我們編寫服務端代碼的一般化過程,整個分析過程將基于下面這段代碼中用到的函數進行。
?
// Configure the bootstrap.EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new HexDumpProxyInitializer(remoteHost, remotePort)).childOption(ChannelOption.AUTO_READ, false).bind(localPort).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}?
?
先看關鍵代碼(注意這里面的部分函數是在AbstractBootstrap中定義的)
?
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;/*** Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and* {@link Channel}'s.*/public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}屬性值ChildGroup,ChildHandler,是用來處理accpt的Channel的。group函數其實就是將parentGroup和ChildGroup進行賦值,其中parentGroup用于處理accept事件,ChildGroup用于處理accpt的Channel的IO事件。
?
?
//channel函數的實現定義在抽象父類中,其實就是通過newInstance函數生成一個具體的channel對象。 <pre name="code" class="java"> /*** The {@link Class} which is used to create {@link Channel} instances from.* You either use this or {@link #channelFactory(ChannelFactory)} if your* {@link Channel} implementation has no no-args constructor.*/public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new BootstrapChannelFactory<C>(channelClass));}/*** {@link ChannelFactory} which is used to create {@link Channel} instances from* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}* is not working for you because of some more complex needs. If your {@link Channel} implementation* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for* simplify your code.*/@SuppressWarnings("unchecked")public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return (B) this;}<pre name="code" class="java"> private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;BootstrapChannelFactory(Class<? extends T> clazz) {this.clazz = clazz;}@Overridepublic T newChannel() {try {return clazz.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}@Overridepublic String toString() {return clazz.getSimpleName() + ".class";}}Channel函數比較簡單,其實就是通過newInstance函數,生成一個具體的Channel對象,例如服務端的NioServerSocketChannel。
?
?
/*** Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.*/public ServerBootstrap childHandler(ChannelHandler childHandler) {if (childHandler == null) {throw new NullPointerException("childHandler");}this.childHandler = childHandler;return this;}上面的函數即給serverbootstrap的childHandler賦值。
?
?
/*** Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set* {@link ChannelOption}.*/public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;}上面的函數是指定accpt的channel的屬性,channel有很多屬性,比如SO_TIMEOUT時間,Buf長度等等。
?
?
/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind() {validate();SocketAddress localAddress = this.localAddress;if (localAddress == null) {throw new IllegalStateException("localAddress not set");}return doBind(localAddress);}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(String inetHost, int inetPort) {return bind(new InetSocketAddress(inetHost, inetPort));}<pre name="code" class="java"> /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(SocketAddress localAddress) {validate();if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regPromise = initAndRegister();final Channel channel = regPromise.channel();final ChannelPromise promise = channel.newPromise();if (regPromise.isDone()) {doBind0(regPromise, channel, localAddress, promise);} else {regPromise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {doBind0(future, channel, localAddress, promise);}});}return promise;}<pre name="code" class="java"> private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}Bind函數層層調用過來之后,最后就調用Channel的bind函數了,下面再看channel的bind函數是如何處理的。定義在AbstractChannel中:
?
?
@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}channel的bind函數,最終就是調用pipeline的bind,而pipeline的bind實際上就是調用contexthandler的bind,之個之前分析write和flush的時候說過了。所以這里直接看contexthandler的bind函數。下面是定義:
?
?
@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}validatePromise(promise, false);final DefaultChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}});}return promise;}<pre name="code" class="java"> private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {try {((ChannelOutboundHandler) handler).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}最終調用Handler的bind函數,還記得之前說的outbound類型的事件嗎,這類事件提供了默認的實現方法,HeadHandler的bind函數,下面是它的定義:
?
?
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise);}我們又看到了unsafe這個苦力了,最終的操作還是得由它來完成啊,趕緊去看看這個bind函數吧,
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {if (!ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {closeIfClosed();promise.setFailure(t);return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}promise.setSuccess();}上面的代碼最終調用了Channel的doBind函數,這里我們的Channel是NioServerSocketChannel,所以最終就是調用它的bind函數了,代碼如下
?
?
@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {javaChannel().socket().bind(localAddress, config.getBacklog());}其實它最終也是調用了JDK的Channel的socket bind函數。
?
?
看到這里,你是否會覺得有點怪異,為什么沒有注冊accpt事件啊,一般的我們的server socket都是要注冊accpt事件到selector,用于監聽連接。如果你發現了這個問題,說明你是理解socket的編程的,^_^。實際上是前面在分析bind的時候我們漏掉了一個重要的函數,initAndRegister,下面再來看看它的定義:
?
final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();return channel.newFailedFuture(t);}ChannelPromise regPromise = channel.newPromise();group().register(channel, regPromise);if (regPromise.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regPromise;}在這里,我們看到了我們之前介紹event時說的register函數,它就是用于將Channel注冊到eventloop中去的。eventloop經過層層調用,最終調用了SingleThreadEventLoop類中的register函數,下面是它的定義:
@Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {if (channel == null) {throw new NullPointerException("channel");}if (promise == null) {throw new NullPointerException("promise");}channel.unsafe().register(this, promise);return promise;}還是逃離不了unsafe對象的調用,前面說了那么多的unsafe,這個函數猜都可以猜測出執行過程了,這里就不細細的列舉代碼了。
?
?
還有一個init函數,這里需要說明一下,代碼如下:
?
@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();if (handler() != null) {p.addLast(handler());}final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}它就是用來處理channel 的pipeline,并添加一個ServerBootstrapAcceptor的handler,繼續看看這個handler的定義,我們就會明白它的意圖。
?
?
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;@SuppressWarnings("unchecked")ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child);} catch (Throwable t) {child.unsafe().closeForcibly();logger.warn("Failed to register an accepted channel: " + child, t);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final ChannelConfig config = ctx.channel().config();if (config.isAutoRead()) {// stop accept new connections for 1 second to allow the channel to recover// See https://github.com/netty/netty/issues/1328config.setAutoRead(false);ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {config.setAutoRead(true);}}, 1, TimeUnit.SECONDS);}// still let the exceptionCaught event flow through the pipeline to give the user// a chance to do something with itctx.fireExceptionCaught(cause);}}上面就是這個handler的全部代碼,它重寫了ChannelRead函數,它的目的其實是想將server端accept的channel注冊到ChildGroup的eventloop中,這樣就可以理解,服務端代碼workerGroup這個eventloop的作用了,它終于在這里體現出了它的作用了。
?
3、總結
這篇文章主要是分析了serverbootstrap的全過程,通過對這個的分析,我們清晰的看到了平時編寫socket服務端代碼時對bind,register事件,以及accept channel等的處理。
?
http://blog.csdn.net/pingnanlee/article/details/11973769
總結
以上是生活随笔為你收集整理的netty4.0.x源码分析—bootstrap的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eclipse安装weblogic Se
- 下一篇: Arp协议和Arp欺骗