netty 5 alph1源码分析(服务端创建过程)
研究了netty的服務端創建過程。至于netty的優勢,可以參照網絡其他文章。《Netty系列之Netty 服務端創建》是?李林鋒撰寫的netty源碼分析的一篇好文,絕對是技術干貨。但拋開技術來說,也存在一些瑕疵。
缺點如下
代碼銜接不連貫,上下不連貫。
代碼片段是截圖,對閱讀代理不便(可能和閱讀習慣有關)
本篇主要內容,參照《Netty系列之Netty 服務端創建》,梳理出自己喜歡的閱讀風格。
1.整體邏輯圖
整體將服務端創建分為2部分:(1)綁定端口,提供服務過程;(2)輪詢網絡請求
1.1 綁定端口序列圖
1.2 類圖
類圖僅僅涵蓋了綁定過程中比較重要的幾個組件
1.3 代碼分析
step 2 doBind?綁定本地端口,啟動服務
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | private?ChannelFuture?doBind(final?SocketAddress?localAddress)?{ ????final?ChannelFuture?regFuture?=?initAndRegister();//1 ????final?Channel?channel?=?regFuture.channel(); ????if?(regFuture.cause()?!=?null)?{ ????????return?regFuture; ????} ????final?ChannelPromise?promise; ????if?(regFuture.isDone())?{ ????????promise?=?channel.newPromise(); ????????doBind0(regFuture,?channel,?localAddress,?promise);//2 ????}?else?{ ????????//?Registration?future?is?almost?always?fulfilled?already,?but?just?in?case?it's?not. ????????promise?=?new?DefaultChannelPromise(channel,?GlobalEventExecutor.INSTANCE); ????????regFuture.addListener(new?ChannelFutureListener()?{ ????????????@Override ????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{ ????????????????doBind0(regFuture,?channel,?localAddress,?promise);//2 ????????????} ????????}); ????} ????return?promise; } |
step3 initAndRegister
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | final?ChannelFuture?initAndRegister()?{ ????Channel?channel; ????try?{ ????????channel?=?createChannel(); ????}?catch?(Throwable?t)?{ ????????return?VoidChannel.INSTANCE.newFailedFuture(t); ????} ????try?{ ????????init(channel); ????}?catch?(Throwable?t)?{ ????????channel.unsafe().closeForcibly(); ????????return?channel.newFailedFuture(t); ????} //注冊NioServerSocketChannel到Reactor線程的多路復用器上 ????ChannelPromise?regFuture?=?channel.newPromise(); ????channel.unsafe().register(regFuture); ????if?(regFuture.cause()?!=?null)?{ ????????if?(channel.isRegistered())?{ ????????????channel.close(); ????????}?else?{ ????????????channel.unsafe().closeForcibly(); ????????} ????} ????return?regFuture; } |
createChannel由子類ServerBootstrap實現,創建新的NioServerSocketChannel,并完成Channel初始化,以及注冊。
4.ServerBootstrap.createChannel
| 1 2 3 4 | Channel?createChannel()?{ ????EventLoop?eventLoop?=?group().next(); ????return?channelFactory().newChannel(eventLoop,?childGroup); } |
它有兩個參數,參數1是從父類的NIO線程池中順序獲取一個NioEventLoop,它就是服務端用于監聽和接收客戶端連接的Reactor線程。第二個參數就是所謂的workerGroup線程池,它就是處理IO讀寫的Reactor線程組。
5.ServerBootstrap.init
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | void?init(Channel?channel)?throws?Exception?{ //設置Socket參數和NioServerSocketChannel的附加屬性 ????final?Map<ChannelOption<?>,?Object>?options?=?options(); ????synchronized?(options)?{ ????????channel.config().setOptions(options); ????} ????final?Map<AttributeKey<?>,?Object>?attrs?=?attrs(); ????synchronized?(attrs)?{ ????????for?(Entry<AttributeKey<?>,?Object>?e:?attrs.entrySet())?{ ????????????@SuppressWarnings("unchecked") ????????????AttributeKey<Object>?key?=?(AttributeKey<Object>)?e.getKey(); ????????????channel.attr(key).set(e.getValue()); ????????} ????} //將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中 ????ChannelPipeline?p?=?channel.pipeline(); ????if?(handler()?!=?null)?{ ????????p.addLast(handler()); ????} ????final?ChannelHandler?currentChildHandler?=?childHandler; ????final?Entry<ChannelOption<?>,?Object>[]?currentChildOptions; ????final?Entry<AttributeKey<?>,?Object>[]?currentChildAttrs; ????synchronized?(childOptions)?{ ????????currentChildOptions?=?childOptions.entrySet().toArray(newOptionArray(childOptions.size())); ????} ????synchronized?(childAttrs)?{ ????????currentChildAttrs?=?childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); ????} //將用于服務端注冊的Handler?ServerBootstrapAcceptor添加到ChannelPipeline中 ????p.addLast(new?ChannelInitializer<Channel>()?{ ????????@Override ????????public?void?initChannel(Channel?ch)?throws?Exception?{ ????????????ch.pipeline().addLast(new?ServerBootstrapAcceptor(currentChildHandler,?currentChildOptions, ????????????????????currentChildAttrs)); ????????} ????}); } |
到此處,Netty服務端監聽的相關資源已經初始化完畢。
6.AbstractChannel.AbstractUnsafe.register
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public?final?void?register(final?ChannelPromise?promise)?{ //首先判斷是否是NioEventLoop自身發起的操作,如果是,則不存在并發操作,直接執行Channel注冊; ????if?(eventLoop.inEventLoop())?{ ????????register0(promise); ????}?else?{//如果由其它線程發起,則封裝成一個Task放入消息隊列中異步執行。 ????????try?{ ????????????eventLoop.execute(new?Runnable()?{ ????????????????@Override ????????????????public?void?run()?{ ????????????????????register0(promise); ????????????????} ????????????}); ????????}?catch?(Throwable?t)?{ ????????????logger.warn( ????????????????????"Force-closing?a?channel?whose?registration?task?was?not?accepted?by?an?event?loop:?{}", ????????????????????AbstractChannel.this,?t); ????????????closeForcibly(); ????????????closeFuture.setClosed(); ????????????promise.setFailure(t); ????????} ????} } |
7.register0
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | private?void?register0(ChannelPromise?promise)?{ ????try?{ ????????//?check?if?the?channel?is?still?open?as?it?could?be?closed?in?the?mean?time?when?the?register ????????//?call?was?outside?of?the?eventLoop ????????if?(!ensureOpen(promise))?{ ????????????return; ????????} ????????doRegister(); ????????registered?=?true; ????????promise.setSuccess(); ????????pipeline.fireChannelRegistered(); ????????? ????????if?(isActive())?{//完成綁定時,不會調用該代碼段 ????????????pipeline.fireChannelActive(); ????????} ????}?catch?(Throwable?t)?{ ????????//?Close?the?channel?directly?to?avoid?FD?leak. ????????closeForcibly(); ????????closeFuture.setClosed(); ????????if?(!promise.tryFailure(t))?{ ????????????logger.warn( ????????????????????"Tried?to?fail?the?registration?promise,?but?it?is?complete?already.?"?+ ????????????????????????????"Swallowing?the?cause?of?the?registration?failure:",?t); ????????} ????} } |
觸發事件
8.doRegister
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | protected?void?doRegister()?throws?Exception?{ ????boolean?selected?=?false; ????for?(;;)?{ ????????try?{ ????????//將NioServerSocketChannel注冊到NioEventLoop的Selector上 ????????????selectionKey?=?javaChannel().register(eventLoop().selector,?0,?this); ????????????return; ????????}?catch?(CancelledKeyException?e)?{ ????????????if?(!selected)?{ ????????????????//?Force?the?Selector?to?select?now?as?the?"canceled"?SelectionKey?may?still?be ????????????????//?cached?and?not?removed?because?no?Select.select(..)?operation?was?called?yet. ????????????????eventLoop().selectNow(); ????????????????selected?=?true; ????????????}?else?{ ????????????????//?We?forced?a?select?operation?on?the?selector?before?but?the?SelectionKey?is?still?cached ????????????????//?for?whatever?reason.?JDK?bug?? ????????????????throw?e; ????????????} ????????} ????} } |
大伙兒可能會很詫異,應該注冊OP_ACCEPT(16)到多路復用器上,怎么注冊0呢?0表示只注冊,不監聽任何網絡操作。這樣做的原因如下:
注冊方法是多態的,它既可以被NioServerSocketChannel用來監聽客戶端的連接接入,也可以用來注冊SocketChannel,用來監聽網絡讀或者寫操作;
通過SelectionKey的interestOps(int ops)方法可以方便的修改監聽操作位。所以,此處注冊需要獲取SelectionKey并給AbstractNioChannel的成員變量selectionKey賦值。
本文轉自 randy_shandong 51CTO博客,原文鏈接:http://blog.51cto.com/dba10g/1863497,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的netty 5 alph1源码分析(服务端创建过程)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 制作liveusb实现centos6.2
- 下一篇: redis演练(5) redis持久化