从netty-example分析Netty组件续
上文我們從netty-example的Discard服務器端示例分析了netty的組件,今天我們從另一個簡單的示例Echo客戶端分析一下上個示例中沒有出現的netty組件。
1.?服務端的連接處理,讀寫處理
echo客戶端代碼:
/*** Sends one message when a connection is open and echoes back any received* data to the server. Simply put, the echo client initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.*/ public final class EchoClient {static final boolean SSL = System.getProperty("ssl") != null;static final String HOST = System.getProperty("host", "127.0.0.1");static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));public static void main(String[] args) throws Exception {// Configure SSL.gitfinal SslContext sslCtx;if (SSL) {sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslCtx = null;}// Configure the client.EventLoopGroup group = new NioEventLoopGroup();try { Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));}//p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler());}});// Start the client.ChannelFuture f = b.connect(HOST, PORT).sync();// Wait until the connection is closed. f.channel().closeFuture().sync();} finally {// Shut down the event loop to terminate all threads. group.shutdownGracefully();}} }從上面的代碼可以看出,discard的服務端代碼和echo的客戶端代碼基本相似,不同的是一個使用ServerBootStrap,另一個使用BootStrap而已。先看一下連接過程
NioEventLoop處理key的過程,
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }2.1 連接流程
調用AbstractNioByteChannel的finishConnect()方法
@Overridepublic final void finishConnect() {// Note this method is invoked by the event loop only if the connection attempt was// neither cancelled nor timed out.assert eventLoop().inEventLoop();try {boolean wasActive = isActive();doFinishConnect();fulfillConnectPromise(connectPromise, wasActive);} catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }觸發channelActive操作:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {if (promise == null) {// Closed via cancellation and the promise has been notified already.return;}// trySuccess() will return false if a user cancelled the connection attempt.boolean promiseSet = promise.trySuccess();// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,// because what happened is what happened.if (!wasActive && isActive()) {pipeline().fireChannelActive();}// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().if (!promiseSet) { close(voidPromise()); } }?
2.2 讀操作流程
調用AbstractNioByteChannel的read()方法,
典型的autoRead流程如下:
1. 當socket建立連接時,Netty觸發一個inbound事件channelActive,然后提交一個read()請求給本身(參考DefaultChannelPipeline.fireChannelActive())
2. 接收到read()請求后,Netty從socket讀取消息。
3. 當讀取到消息時,Netty觸發channelRead()。
4. 當讀取不到消息后,Netty觸發ChannelReadCompleted().
5. Netty提交另外一個read()請求來繼續從socket中讀取消息。
@Overridepublic final void read() {final ChannelConfig config = config();if (!config.isAutoRead() && !isReadPending()) {// ChannelConfig.setAutoRead(false) was called in the meantimeremoveReadOp();return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; try { boolean needReadPendingReset = true; do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; break; } allocHandle.incMessagesRead(1); if (needReadPendingReset) { needReadPendingReset = false; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (allocHandle.lastBytesRead() < 0) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }?觸發讀操作
@Overridepublic ChannelHandlerContext fireChannelRead(Object msg) {AbstractChannelHandlerContext next = findContextInbound();next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));return this;}讀完觸發完成事件
@Overridepublic ChannelPipeline fireChannelReadComplete() {head.fireChannelReadComplete();if (channel.config().isAutoRead()) {read();}return this;}@Overridepublic ChannelHandlerContext fireChannelReadComplete() {AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelReadComplete(next); return this; }2.3 寫操作流程
寫操作
@SuppressWarnings("deprecation")protected void flush0() {if (inFlush0) {// Avoid re-entrancereturn;}final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }寫操作具體實現(以NioSocketChannel為例):
@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {for (;;) {int size = in.size();if (size == 0) {// All written so clear OP_WRITEclearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }?
2.?ChannelInboundHandler和ChannelInboundHandler
Echo的handler代碼如下:
/*** Handler implementation for the echo client. It initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.*/ public class EchoClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage;/*** Creates a client-side handler.*/public EchoClientHandler() {firstMessage = Unpooled.buffer(EchoClient.SIZE);for (int i = 0; i < firstMessage.capacity(); i ++) {firstMessage.writeByte((byte) i);}}@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised. cause.printStackTrace();ctx.close();}上面的代碼出現了兩個重要的netty組件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已經講到。我們這次重點分析一下 ChannelInboundHandlerAdapter及其相關類。
ChannelInboundHandlerAdapter繼承了ChannelInboundHandler,它的作用是將operation轉到ChannelPipeline中的下一個ChannelHandler。子類可以重寫一個方法的實現來改變。注意:在方法#channelRead(ChannelHandlerContext, Object)自動返回前,message不會釋放。若需要一個可以自動釋放接收消息的ChannelInboundHandler實現時,請考慮SimpleChannelInboundHandler。
ChannelOutboundHandlerAdapter繼承了ChannelOutboundHandler,它僅通過調用ChannelHandlerContext跳轉到每個方法。
ChannelInboundHandler處理輸入的事件,事件由外部事件源產生,例如從一個socket接收到數據。?
ChannelOutboundHandler解析你自己應用提交的操作。
2.1?ChannelInboundHandler.channelActive()
從源碼角度看一下,Netty觸發一個inbound事件channelActive(以LoggingHandler為例):
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "ACTIVE"));}ctx.fireChannelActive();}觸發操作如下:
@Overridepublic ChannelHandlerContext fireChannelActive() {AbstractChannelHandlerContext next = findContextInbound();next.invoker().invokeChannelActive(next);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while (!ctx.inbound);return ctx;}?invokeChannelActive方法實現:
@Overridepublic void invokeChannelActive(final ChannelHandlerContext ctx) {if (executor.inEventLoop()) {invokeChannelActiveNow(ctx);} else {executor.execute(new OneTimeTask() {@Overridepublic void run() {invokeChannelActiveNow(ctx);}});}}public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {try {((ChannelInboundHandler) ctx.handler()).channelActive(ctx);} catch (Throwable t) {notifyHandlerException(ctx, t);}}2.2 ChannelOutboundHandler.Read()
讀的流程:
@Overridepublic ChannelHandlerContext read() {AbstractChannelHandlerContext next = findContextOutbound();next.invoker().invokeRead(next);return this;}查找outbound的過程:
private AbstractChannelHandlerContext findContextOutbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.prev;} while (!ctx.outbound);return ctx;}觸發讀操作:
@Overridepublic void invokeRead(final ChannelHandlerContext ctx) {if (executor.inEventLoop()) {invokeReadNow(ctx);} else {AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;Runnable task = dctx.invokeReadTask;if (task == null) {dctx.invokeReadTask = task = new Runnable() {@Overridepublic void run() {invokeReadNow(ctx);}};}executor.execute(task);}}2.3?ChannelOutboundHandler.write()
以實現類LoggingHandler為例:
@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "WRITE", msg));}ctx.write(msg, promise);}具體實現:
@Overridepublic ChannelFuture write(Object msg, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);return promise;}寫操作的觸發
@Overridepublic void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {if (msg == null) {throw new NullPointerException("msg");}if (!validatePromise(ctx, promise, true)) {// promise cancelled ReferenceCountUtil.release(msg);return;}if (executor.inEventLoop()) {invokeWriteNow(ctx, msg, promise);} else {safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);}}立刻觸發
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}小結:
? Netty中,可以注冊多個handler。ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行,如下圖所示,按照注冊的先后順序對Handler進行排序,request進入Netty后的執行順序為:
?
?參考文獻
【1】http://blog.csdn.net/u013252773/article/details/21195593
【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler
?
轉載于:https://www.cnblogs.com/davidwang456/p/5046406.html
總結
以上是生活随笔為你收集整理的从netty-example分析Netty组件续的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows下spark开发环境配置
- 下一篇: hadoop命令帮助