Netty详解(五):Netty TCP粘包 拆包
1. 概述
無論是服務(wù)端還是客戶端,我們讀取或者發(fā)送消息的時(shí)候,都需要考慮TCP底層的粘包和拆包機(jī)制。下面我們來通過Netty來詳解TCP底層的粘包和拆包機(jī)制。
2. TCP底層的粘包和拆包機(jī)制
TCP是一個(gè)“流”協(xié)議,所謂流,就是沒有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的水流,它們是連城有一片的,期間沒有界限。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,他會根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的的數(shù)據(jù)包進(jìn)行發(fā)送,這就是所謂的TCP的粘包和拆包機(jī)制。
2.1 TCP粘包和拆包問題說明
假設(shè)客戶端分別發(fā)送了兩個(gè)數(shù)據(jù)包D1 和D2給服務(wù)商廈 ,由于服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,故可能存在4種情況:
2.2 TCP粘包/拆包發(fā)生的原因
問題產(chǎn)生的原因有三個(gè),分別如下:
- MSS:TCP傳輸層(傳輸幀)最大報(bào)文段長度。Maxitum Segment Size最大分段大小。為了達(dá)到最佳的傳輸效能TCP協(xié)議在建立連接的時(shí)候通常要協(xié)商雙方的MSS值,這個(gè)值TCP協(xié)議在實(shí)現(xiàn)的時(shí)候往往用MTU值代替,值往往為1460.IPV6中通常是1440
- MTU:Maxitum Transmission Unit最大傳輸單元。這個(gè)最大傳輸單元實(shí)際上和鏈路層協(xié)議有著密切的關(guān)系,EthernetII 幀的結(jié)構(gòu)DMAC+SMAC+Type+Data+CRC。由于以太網(wǎng)傳輸限制,每個(gè)以太網(wǎng)幀都有最小的大小64bytes,最大不能超過1518bytes,對于小于或大于這個(gè)限制的以太網(wǎng)幀我們都可以視之為錯誤的數(shù)據(jù)幀,一般的以太網(wǎng)轉(zhuǎn)發(fā)設(shè)備會丟棄這些數(shù)據(jù)幀。
2.3 粘包問題的解決策略
底層的TCP無法理解上層的業(yè)務(wù)數(shù)據(jù),需要在上層的應(yīng)用協(xié)議棧調(diào)來來解決。
3. Netty提供半包解碼器來解決TCP粘包/拆包問題
3.1 LineBasedFrameDecoder
LineBasedFrameDecoder 是依次遍歷ByteBuf中的可讀字節(jié),判斷看是否有\(zhòng)n 或 \r\n,如果有,就以此位置為結(jié)束位置,以換行符為結(jié)束標(biāo)志的解碼器。它支持?jǐn)y帶結(jié)束符或者不攜帶結(jié)束符兩種解碼方式,同時(shí)支持配置單行的最大長度。如果連續(xù)讀取到最大長度后仍然沒有發(fā)現(xiàn)換行符,就會拋出異常,同時(shí)忽略掉之前讀到的異常碼流。
StringDecoder的功能非常簡單,就是將接受到的對象轉(zhuǎn)換成字符串,然后繼續(xù)調(diào)用后面的Handler。LineBasedFrameDecoder+StringDecoder組合就是按行切換的文本解碼器,它被設(shè)計(jì)用來支持TCP的粘包和拆包。
在ChannelInitializer類中添加LineBasedFrameDecoder+StringDecoder
EventLoopGroup group=new NioEventLoopGroup();try{Bootstrap b=new Bootstrap();//Channel需要設(shè)置為NioSocketChannel,然后為其添加Handlerb.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>(){//為了簡單直接創(chuàng)建匿名內(nèi)部類,實(shí)現(xiàn)initChannel方法//其作用是當(dāng)創(chuàng)建NioSocketChannel成功之后,在進(jìn)行初始化時(shí),//將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡(luò)I/O事件@Overridepublic void initChannel(SocketChannel ch) throws Exception{ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new TimeClientHandler());}});//發(fā)起異步連接,然后調(diào)用同步方法等待連接成功ChannelFuture f=b.connect(host,port).sync();//當(dāng)客戶端連接關(guān)閉之后,客戶端主函數(shù)退出,退出前釋放NIO線程組的資源f.channel().closeFuture().sync();}finally{}TimeServerHandler.java 關(guān)鍵代碼
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {System.out.println("channelRead start");ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("The time server receive order : " + body);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());ctx.write(resp);System.out.println("channelRead end");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("channelReadComplete start");ctx.flush();System.out.println("channelReadComplete end");}TimeClientHandler.java 關(guān)鍵代碼
/*** Creates a client-side handler.*/public TimeClientHandler() {req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();}@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf message = null;for (int i = 0; i < 100; i++) {message = Unpooled.buffer(req.length);message.writeBytes(req);ctx.writeAndFlush(message);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {String body = (String) msg;System.out.println("Now is : " + body + " ; the counter is : "+ ++counter);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 釋放資源logger.warning("Unexpected exception from downstream : "+ cause.getMessage());ctx.close();}3.2 DelimiterBasedFrameDecoder
以分隔符作為碼流結(jié)束標(biāo)識的消息的解碼。示例:Echo服務(wù),以$_作為分隔符。
EchoServer.java關(guān)鍵代碼
@Override public void initChannel(SocketChannel ch) throws Exception{//創(chuàng)建分隔符緩沖對象ByteBuf,以$_為分隔符ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());//1024表示單條消息的最大長度,當(dāng)達(dá)到該長度后仍然沒有查找到分隔符//就拋出TooLongFrameException異常//第二個(gè)參數(shù)是分隔符緩沖對象new DelimiterBasedFrameDecoder(1024,delimiter)); //后續(xù)的ChannelHandler接收到的msg對象將會是完整的消息包ch.pipeline().addLast(new StringDecoder()); //將ByteBuf解碼成字符串對象 ch.pipeline().addLast(new EchoServerHandler()); //接收到的msg消息就是解碼后的字符串對象 }DelimiterBasedFrameDecoder 有多個(gè)構(gòu)造方法,這里我們傳遞兩個(gè)參數(shù):第一個(gè)1024表示單條最大長度,當(dāng)達(dá)到該長度之后仍然沒有找到分隔符,就拋出TooLongFrameException異常,防止由于異常流缺失分隔符導(dǎo)致的內(nèi)存溢出,這就是Netty解碼器的可靠性保證。第二個(gè)參數(shù)就是分隔符緩沖對象。
EchoServerHandler.java 關(guān)鍵代碼
@Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{String body=(String)msg;System.out.println("This is " + ++counter + " times receive client : [" + body + "]");body+="$_"; //$_已被過濾掉了,所以這里要拼接上ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());ctx.writeAndFlush(echo); }由于DelimiterBasedFrameDecoder自動對請求消息進(jìn)行了編碼,后續(xù)的ChannelHandler接受到的msg對象就是個(gè)完整的消息包;第二個(gè)ChannelHandler是StringDecoder,它將ByteBuffer解碼成字符串對象;第三個(gè)EchoServerHandler接受到的msg消息就是解碼后的字符串對象。
EchoClient.java 關(guān)鍵代碼
@Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{String body=(String)msg;System.out.println("This is " + ++counter + " times receive client : [" + body + "]");body+="$_"; //$_已被過濾掉了,所以這里要拼接上ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());ctx.writeAndFlush(echo); }EchoClientHandler.java 關(guān)鍵代碼
@Override public void channelActive(ChannelHandlerContext ctx){for(int i=0;i<10;i++){ctx.writeAndFlush(Unpooled.copiedBuffer(ECHOREQ.getBytes()));} }3.3 FixedLengthFrameDecoder
FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對消息進(jìn)行自動解碼,按照指定的長度對消息進(jìn)行自動解碼,開發(fā)者不需要考慮TCP的粘包/拆包問題。
EchoServer.java 關(guān)鍵代碼
@Override public void initChannel (SocketChannel ch) throws Exception{ch.pipeline().addLast(new FixedLengthFrameDecoder(20));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new EchoServerHandler())); }在服務(wù)端的ChannelPipeline中新增FixedLengthFrameDecoder,長度設(shè)置為20,然后再依次增加字符串解碼器和EchoHandler。
EchoServerHandler.java 關(guān)鍵代碼
@Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{System.out.println("Receive client : [" + msg + "]"); }利用FixedLengthFrameDecoder解碼器,無論一次接收到多少數(shù)據(jù)報(bào),他都會按照構(gòu)造函數(shù)中設(shè)置的固定長度間解碼,如果是半包消息,FixedLengthFrameDecoder會緩存半包消息并等待下一個(gè)包到達(dá)后進(jìn)行拼包,直到讀取到一個(gè)完整的包。
4. 總結(jié)
DelimiterBasedFrameDecoder 用于對使用分隔符結(jié)尾的消息間自動解碼,FixedLengthFrameDecoder用于對固定長度的消息進(jìn)行自動解碼。有了上述兩種解碼器,再結(jié)合其他的解碼器,如字符串解碼器等,可以輕松完成對很多消息的自動解碼,而且不需要考慮TCP粘包和拆包問題。
總結(jié)
以上是生活随笔為你收集整理的Netty详解(五):Netty TCP粘包 拆包的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty详解(三):Netty 入门应
- 下一篇: Netty详解(六):Netty 编解码