netty 管道和handler的加载和处理流程
一、pom引入包,此處版本為4.1.52.Final
<dependency><groupId>io.netty</groupId><artifactId>netty-transport-native-kqueue</artifactId><scope>provided</scope></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-transport-native-epoll</artifactId><scope>provided</scope></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-common</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-codec</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-buffer</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-transport</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-resolver-dns</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-handler</artifactId></dependency>二、簡單的實體對象利用NETTY進行序列化和數據傳輸的示例,服務端代碼
1.啟動類
public boolean bind(){EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture = bootstrap.bind(port).sync();log.debug(" server is closing");closeFuture.channel().closeFuture().sync();log.debug(" server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}return true;}2.管道中的handler初始化類
@Slf4j public class PojoServerIntitlizer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new PojoEncoder());ch.pipeline().addLast(new PojoJsonEncoder());ch.pipeline().addLast(new PojoDecoder());ch.pipeline().addLast(new PojoServerHandler());log.debug(" initChannel handler names:{}",ch.pipeline().names());} }initChannel handler names:[PojoClientIntitlizer#0, PojoEncoder#0, PojoJsonEncoder#0, PojoDecoder#0, PojoClientHandler#0, DefaultChannelPipeline$TailContext#0]
3.POJO對象轉換JSON編碼類
@Slf4j public class PojoJsonEncoder extends MessageToMessageEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {String msgStr = JSONUtil.toJsonStr(msg);log.debug(" encode json data. msgStr:{}",msgStr);out.add(msgStr);}}4.POJO轉換為JSON后將字符串轉為帶長度的字節流編碼類
@Slf4j public class PojoEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {byte[] msgByte = ((String)msg).getBytes(StandardCharsets.UTF_8);int msgLen = msgByte.length;log.debug(" encode byte data. msgLen:{}",msgLen);out.writeInt(msgLen);out.writeBytes(msgByte);} }5.輸入字節流轉換為POJO對象類
@Slf4j public class PojoDecoder extends ByteToMessageDecoder {private static final int HEAD_LEN =4;@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {log.debug(" decode data. read len:{}",byteBuf.readableBytes());if (byteBuf.readableBytes() < HEAD_LEN){return;}byteBuf.markReaderIndex();int dataLen = byteBuf.readInt();if (dataLen <= 0){byteBuf.resetReaderIndex();return;}if (byteBuf.readableBytes() < dataLen){byteBuf.resetReaderIndex();return;}byte[] data = new byte[byteBuf.readableBytes()];byteBuf.readBytes(data);Object obj = JSONUtil.toBean(new String(data,"UTF-8"), NettyConstant.pojoCls);list.add(obj);} }6.業務讀處理類
@Slf4j public class PojoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { // super.channelActive(ctx);log.debug(" channelRegistered name:{},localAddress:{},remoteAddress:{} threadName:{}",ctx.name(),ctx.channel().localAddress(),ctx.channel().remoteAddress(),Thread.currentThread().getName());TelnetMsgDto telnetMsgDto = TelnetMsgDto.builder().nick("游客").sendTime(DateUtil.currentSeconds()).content("歡迎你 \r\n").type(PojoContentType.POJO_CONTENT_TYPE_MSG).build();ctx.writeAndFlush(telnetMsgDto);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg);TelnetMsgDto telnetMsgDto = (TelnetMsgDto) msg;log.debug(" channelRead telnetMsgDto:{}",telnetMsgDto);TelnetMsgDto sendMsgDtO = null;switch (((TelnetMsgDto) msg).getType()){case POJO_CONTENT_TYPE_LOGIN:sendMsgDtO = TelnetMsgDto.builder().nick(telnetMsgDto.getNick()).sendTime(DateUtil.currentSeconds()).content(" 歡迎你 \r\n").type(PojoContentType.POJO_CONTENT_TYPE_MSG).build();break;case POJO_CONTENT_TYPE_MSG:sendMsgDtO = TelnetMsgDto.builder().nick(telnetMsgDto.getNick()).sendTime(DateUtil.currentSeconds()).content(" 回復:" + telnetMsgDto.getContent() + "\r\n").type(PojoContentType.POJO_CONTENT_TYPE_MSG).build();break;case POJO_CONTENT_TYPE_LOGOUT:sendMsgDtO = TelnetMsgDto.builder().nick(telnetMsgDto.getNick()).sendTime(DateUtil.currentSeconds()).content(" bye byte:" + telnetMsgDto.getNick() + "\r\n").type(PojoContentType.POJO_CONTENT_TYPE_MSG).build();break;default:throw new IllegalStateException("Unexpected value: " + ((TelnetMsgDto) msg).getType());}if (ObjectUtil.isNotNull(sendMsgDtO)){ctx.writeAndFlush(sendMsgDtO);}}三、客戶端類
? 1.啟動類
@Slf4j public class PojoClientServer {private int port;private String host;private Channel channel;public PojoClientServer(int port, String host) {this.port = port;this.host = host;}public ChannelFuture sendData(TelnetMsgDto telnetMsgDto){ChannelFuture channelFuture = this.channel.writeAndFlush(telnetMsgDto);return channelFuture;}public boolean connect(){EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("clientThread",false));Bootstrap bootstrap = new Bootstrap();bootstrap.group(workEventLoopGroup).channel(NioSocketChannel.class).handler(new PojoClientIntitlizer());try {log.debug(" will start connect server");this.channel = bootstrap.connect(host,port).sync().channel();log.debug(" client is START");String nick = "jack";this.sendData(TelnetMsgDto.buildLoginMsg(nick));String sendData[] = {"hello","world","qiutian","happy","bye"}; // BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));ChannelFuture sendFuture = null;int i = 0;while (true){ // String inputData = bufferedReader.readLine() ;if (i >= sendData.length){sendFuture = this.sendData(TelnetMsgDto.buildLogoutMsg(nick));break;}String inputData = sendData[i] ;i++;sendFuture = this.sendData(TelnetMsgDto.buildNormalMsg(nick,inputData));log.debug(" send data inputData:{},i:{}",inputData,i);if (ObjectUtil.isNotNull(sendFuture)){sendFuture.sync();}TimeUnit.SECONDS.sleep(3);/* if (StrUtil.contains(inputData,"bye")){log.debug(" end.");this.channel.close().sync();break;}*/}TimeUnit.SECONDS.sleep(3);log.debug(" client is closed");this.channel.close().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}return true;} }2.HANDLER初始化加入管道類
@Slf4j public class PojoClientIntitlizer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new PojoEncoder());ch.pipeline().addLast(new PojoJsonEncoder());ch.pipeline().addLast(new PojoDecoder());ch.pipeline().addLast(new PojoClientHandler());log.debug(" initChannel handler names:{}",ch.pipeline().names());} }3.客戶端讀處理類
@Slf4j public class PojoClientHandler extends SimpleChannelInboundHandler<TelnetMsgDto> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TelnetMsgDto msg) throws Exception {log.debug(" channelRead0 msg:{}",msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // super.exceptionCaught(ctx, cause);cause.printStackTrace();ctx.close();} }四、客戶端發送數據的處理類流程
? 1.客戶端向channel寫入數據
public ChannelFuture sendData(TelnetMsgDto telnetMsgDto){ChannelFuture channelFuture = this.channel.writeAndFlush(telnetMsgDto);return channelFuture;}2.channel.writeAndFlush會調用pipeline.writeAndFlush,然后會調用到tail.writeAndFlush(msg)
io.netty.channel.DefaultChannelPipelinepublic final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);}3.這里的tail為管道默認生成的最后一個HANDLER,實際上為AbstractChannelHandlerContext
io.netty.channel.AbstractChannelHandlerContext private void write(Object msg, boolean flush, ChannelPromise promise) {final AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final WriteTask task = WriteTask.newInstance(next, m, promise, flush);if (!safeExecute(executor, task, promise, m, !flush)) {// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes// and put it back in the Recycler for re-use later.//// See https://github.com/netty/netty/issues/8343.task.cancel();}}},注意,這里有一個找輸出bound的過程。它是從HANDLER處理鏈中的尾部向前找,找到一個僅僅可以處理輸出bound的HANDLE進行處理。所以是我們初始化管理處理器時,outbound的執行順序是依次從尾部向頭部找,逐個執行。
private AbstractChannelHandlerContext findContextOutbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {ctx = ctx.prev;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));return ctx;}這里面的executor實際上就是一個NioEventLoop,next為ChannelHandlerContext(PojoJsonEncoder#0, [id: 0x08b5bc88, L:/127.0.0.1:52977 - R:/127.0.0.1:7110])
?4.由于不是在NioEventLoop線程進行發送數據,為了避免數據競爭,所以都會提交任務到NioEventLoop進行處理。這里構造一個WriteTask異步任務。
?5.現在我們看到異步執行了writeTask的任務
WriteTask的執行函數 @Overridepublic void run() {try {decrementPendingOutboundBytes();if (size >= 0) {ctx.invokeWrite(msg, promise);} else {ctx.invokeWriteAndFlush(msg, promise);}} finally {recycle();}}6.接著執行了AbstractChannelHandlerContext.invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) handler()).write(this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}7.這就是調用處理HANDLER的write方法,由于PojoJsonEncoder繼承于MessageToMessageEncoder,所以要在MessageToMessageEncoder的write方法里面再調用encode進行消息轉換。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out = null;try {if (acceptOutboundMessage(msg)) {out = CodecOutputList.newInstance();I cast = (I) msg;encode(ctx, cast, out);} else {ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);} finally {if (out != null) {try {final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) {ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {out.recycle();}}}}8.消息轉換完畢后放到一個名為out 的list里面,然后在上述的write方法中發現out不為空,則再次觸發ctx.write(out.getUnsafe(0), promise);注意這里跟第一次tail的write方法一樣,又會從當前(PojoJsonEncoder)的HANDLER中向前找下一個可支持outBound的HANDLER類,所以就會找到PojoEncoder
9.接著會調用到next.write方法,由于此次是在同一個NioEventLoop,所以直接調用,?PojoEncoder繼承于MessageToByteEncoder,所以跟上面那個MessageToMessageEncoder類似,會先調用此類的write方法,方法內部調用encode將消息體轉換為字節流。
MessageToByteEncoderpublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ByteBuf buf = null;try {if (acceptOutboundMessage(msg)) {@SuppressWarnings("unchecked")I cast = (I) msg;buf = allocateBuffer(ctx, cast, preferDirect);try {encode(ctx, cast, buf);} finally {ReferenceCountUtil.release(cast);}if (buf.isReadable()) {ctx.write(buf, promise);} else {buf.release();ctx.write(Unpooled.EMPTY_BUFFER, promise);}buf = null;} else {ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable e) {throw new EncoderException(e);} finally {if (buf != null) {buf.release();}}}10.轉換完后,會再次調用ctx.write(buf, promise),這就是一個責任鏈模式,這次找到的next為head節點,ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x7cb49772, L:/127.0.0.1:50203 - R:/127.0.0.1:7110])
?11.由于在同一個NioEventLoop,所以直接調用write,最終調用到HeadContext.write,這里為終點了,沒有再調用ctx.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);}這個unsafe為NioSocketChannel內部的safe,這里面就是NIO的SOCKET封裝類了,這個write只是寫入緩沖區,當調用flush時再會真正進行網絡發送。?
12.unsafe為NioSocketChannelUnsafe,這個類內部有一個ChannelOutboundBuffer outboundBuffer的成員變量,用來保存待發送緩存區。write方法將將數據保存到此緩存區。
protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);@Overridepublic final void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;outboundBuffer.addMessage(msg, size, promise);}調用堆棧
?13.ctx.flush函數調用流程,我們簡單描述。最終會調用到NioSocketChannel.doWrite方法,這個里面會調用java的java.nio.channels.SocketChannel[connected local=/127.0.0.1:50203 remote=/127.0.0.1:7110],進行寫入數據,真正發送。
NioSocketChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();int writeSpinCount = config().getWriteSpinCount();do {int nioBufferCnt = in.nioBufferCount();switch (nioBufferCnt) {case 1: {ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();final int localWrittenBytes = ch.write(buffer);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}break;}}}} while (writeSpinCount > 0);}?14.至此,數據寫入發送流程就結束了。
?15.注意:ctx.write和ctx.channel.write是有的區別的。ctx.write是從當前handler中向前找一個可寫的handler然后調用其write函數,ctx.channel.write則是調用pipline.write,這個方法,會是從整個管道的尾部向前找一個可寫的handler進行處理,會比前一個更耗時。
五、客戶端接收讀取數據的流程。
? 1.目前是由NioEventLoop的run方法,會循環檢測網絡IO的select事件,processSelectedKeysOptimized,注意,這里的selectionKey有一個attachment附加對象,就是我們的NioSocketChannel
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} }}2.調用NioEventLoop.processSelectedKey檢測socket的連接,可寫,可讀事件,當在SOCKETCHANNEL發現數據可讀時,會調用unsafe.read.
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}unsafe跟上面寫時的對象一樣為NioSocketChannel的內部safe,封裝了原始的socketChannel
?3.unsafe.read函數就會從網絡讀取數據,然后調用pipeline.fireChannelRead(byteBuf),向管道發送讀事件。
AbstractNioByteChannel @Overridepublic final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();} }}4.pipeline.fireChannelRead(byteBuf),直接調用頭節點的write方法。
DefaultChannelPipeline public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}5.head節點的write方法,從head節點向尾部找一個支持inbound的HANDLER進行處理,剛好跟寫入相反
AbstractChannelHandlerContextpublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;} private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {ctx = ctx.next;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));return ctx;}6.找到的第一個handler為ChannelHandlerContext(PojoDecoder#0, [id: 0x7cb49772, L:/127.0.0.1:50203 - R:/127.0.0.1:7110])
?7.由于PojoDecoder繼承于ByteToMessageDecoder類,所以會調用ByteToMessageDecoder.
channelRead,在此方法內嘗試解碼轉換。 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size = out.size();firedChannelRead |= out.insertSinceRecycled();fireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}}8.轉換成功后,會調用fireChannelRead,對out這個list列表進行逐個循環調用,ctx.fireChannelRead(msgs.getUnsafe(i));,通知給下一個inBound的handler.
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}}9.這次找到的下一個handler為PojoClientHandler,由于此類繼承SimpleChannelInboundHandler,
最終調到其read0方法。這次可以看到輸出整個對象。
?10.可以看對象的數據了,至此讀數據流程結束了。
總結
以上是生活随笔為你收集整理的netty 管道和handler的加载和处理流程的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: redis 主从哨兵模式搭建
 - 下一篇: nio的epoll和selector实现