Netty解决粘包和拆包问题的四种方案
在RPC框架中,粘包和拆包問題是必須解決一個問題,因為RPC框架中,各個微服務相互之間都是維系了一個TCP長連接,比如dubbo就是一個全雙工的長連接。由于微服務往對方發送信息的時候,所有的請求都是使用的同一個連接,這樣就會產生粘包和拆包的問題。本文首先會對粘包和拆包問題進行描述,然后介紹其常用的解決方案,最后會對Netty提供的幾種解決方案進行講解。這里說明一下,由于oschina將“jie ma qi”認定為敏感文字,因而本文統一使用“解碼一器”表示該含義
1. 粘包和拆包
產生粘包和拆包問題的主要原因是,操作系統在發送TCP數據的時候,底層會有一個緩沖區,例如1024個字節大小,如果一次請求發送的數據量比較小,沒達到緩沖區大小,TCP則會將多個請求合并為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的數據量比較大,超過了緩沖區大小,TCP就會將其拆分為多次發送,這就是拆包,也就是將一個大的包拆分為多個小包進行發送。如下圖展示了粘包和拆包的一個示意圖:
上圖中演示了粘包和拆包的三種情況:
- A和B兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行發送;
- A和B兩次請求間隔時間內較短,并且數據包較小,因而合并為同一個包發送給服務端;
- B包比較大,因而將其拆分為兩個包B_1和B_2進行發送,而這里由于拆分后的B_2比較小,其又與A包合并在一起發送。
2. 常見解決方案
對于粘包和拆包問題,常見的解決方案有四種:
- 客戶端在發送數據包的時候,每個包都固定長度,比如1024個字節大小,如果客戶端發送的數據長度不足1024個字節,則通過補充空格的方式補全到指定長度;
- 客戶端在每個包的末尾使用固定的分隔符,例如\r\n,如果一個包被拆分了,則等待下一個包發送過來之后找到其中的\r\n,然后對其拆分后的頭部部分與前一個包的剩余部分進行合并,這樣就得到了一個完整的包;
- 將消息分為頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息;
- 通過自定義協議進行粘包和拆包的處理。
3. Netty提供的粘包拆包解決方案
3.1 FixedLengthFrameDecoder
對于使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼一器會每次讀取固定長度的消息,如果當前讀取到的消息不足指定長度,那么就會等待下一個消息到達后進行補足。其使用也比較簡單,只需要在構造函數中指定每個消息的長度即可。這里需要注意的是,FixedLengthFrameDecoder只是一個解碼一器,Netty也只提供了一個解碼一器,這是因為對于解碼是需要等待下一個包的進行補全的,代碼相對復雜,而對于編碼器,用戶可以自行編寫,因為編碼時只需要將不足指定長度的部分進行補全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:
public?class?EchoServer?{public?void?bind(int?port)?throws?InterruptedException?{EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{ServerBootstrap?bootstrap?=?new?ServerBootstrap();bootstrap.group(bossGroup,?workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,?1024).handler(new?LoggingHandler(LogLevel.INFO)).childHandler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{//?這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));//?將前一步解碼得到的數據轉碼為字符串ch.pipeline().addLast(new?StringDecoder());//?這里FixedLengthFrameEncoder是我們自定義的,用于將長度不足20的消息進行補全空格ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));//?最終的數據處理ch.pipeline().addLast(new?EchoServerHandler());}});ChannelFuture?future?=?bootstrap.bind(port).sync();future.channel().closeFuture().sync();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public?static?void?main(String[]?args)?throws?InterruptedException?{new?EchoServer().bind(8080);} }上面的pipeline中,對于入棧數據,這里主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個用于處理固定長度的消息的粘包和拆包問題,第二個則是將處理之后的消息轉換為字符串。最后由EchoServerHandler處理最終得到的數據,處理完成后,將處理得到的數據交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實現,主要作用是將長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實現代碼:
public?class?FixedLengthFrameEncoder?extends?MessageToByteEncoder<String>?{private?int?length;public?FixedLengthFrameEncoder(int?length)?{this.length?=?length;}@Overrideprotected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)throws?Exception?{//?對于超過指定長度的消息,這里直接拋出異常if?(msg.length()?>?length)?{throw?new?UnsupportedOperationException("message?length?is?too?large,?it's?limited?"?+?length);}//?如果長度不足,則進行補全if?(msg.length()?<?length)?{msg?=?addSpace(msg);}ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));}//?進行空格補全private?String?addSpace(String?msg)?{StringBuilder?builder?=?new?StringBuilder(msg);for?(int?i?=?0;?i?<?length?-?msg.length();?i++)?{builder.append("?");}return?builder.toString();} }這里FixedLengthFrameEncoder實現了decode()方法,在該方法中,主要是將消息長度不足20的消息進行空格補全。EchoServerHandler的作用主要是打印接收到的消息,然后發送響應給客戶端:
public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<String>?{@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{System.out.println("server?receives?message:?"?+?msg.trim());ctx.writeAndFlush("hello?client!");} }對于客戶端,其實現方式基本與服務端的使用方式類似,只是在最后進行消息發送的時候與服務端的處理方式不同。如下是客戶端EchoClient的代碼:
public?class?EchoClient?{public?void?connect(String?host,?int?port)?throws?InterruptedException?{EventLoopGroup?group?=?new?NioEventLoopGroup();try?{Bootstrap?bootstrap?=?new?Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?true).handler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{//?對服務端發送的消息進行粘包和拆包處理,由于服務端發送的消息已經進行了空格補全,//?并且長度為20,因而這里指定的長度也為20ch.pipeline().addLast(new?FixedLengthFrameDecoder(20));//?將粘包和拆包處理得到的消息轉換為字符串ch.pipeline().addLast(new?StringDecoder());//?對客戶端發送的消息進行空格補全,保證其長度為20ch.pipeline().addLast(new?FixedLengthFrameEncoder(20));//?客戶端發送消息給服務端,并且處理服務端響應的消息ch.pipeline().addLast(new?EchoClientHandler());}});ChannelFuture?future?=?bootstrap.connect(host,?port).sync();future.channel().closeFuture().sync();}?finally?{group.shutdownGracefully();}}public?static?void?main(String[]?args)?throws?InterruptedException?{new?EchoClient().connect("127.0.0.1",?8080);} }對于客戶端而言,其消息的處理流程其實與服務端是相似的,對于入站消息,需要對其進行粘包和拆包處理,然后將其轉碼為字符串,對于出站消息,則需要將長度不足20的消息進行空格補全。客戶端與服務端處理的主要區別在于最后的消息處理handler不一樣,也即這里的EchoClientHandler,如下是該handler的源碼:
public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<String>?{@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?throws?Exception?{System.out.println("client?receives?message:?"?+?msg.trim());}@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{ctx.writeAndFlush("hello?server!");} }這里客戶端的處理主要是重寫了channelActive()和channelRead0()兩個方法,這兩個方法的主要作用在于,channelActive()會在客戶端連接上服務器時執行,也就是說,其連上服務器之后就會往服務器發送消息。而channelRead0()主要是在服務器發送響應給客戶端時執行,這里主要是打印服務器的響應消息。對于服務端而言,前面我們我們可以看到,EchoServerHandler只重寫了channelRead0()方法,這是因為服務器只需要等待客戶端發送消息過來,然后在該方法中進行處理,處理完成后直接將響應發送給客戶端。如下是分別啟動服務端和客戶端之后控制臺打印的數據:
//?server server?receives?message:?hello?server! //?client client?receives?message:?hello?client!3.2 LineBasedFrameDecoder與DelimiterBasedFrameDecoder
對于通過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這里LineBasedFrameDecoder的作用主要是通過換行符,即\n或者\r\n對數據進行處理;而DelimiterBasedFrameDecoder的作用則是通過用戶指定的分隔符對數據進行粘包和拆包處理。同樣的,這兩個類都是解碼一器類,而對于數據的編碼,也即在每個數據包最后添加換行符或者指定分割符的部分需要用戶自行進行處理。這里以DelimiterBasedFrameDecoder為例進行講解,如下是EchoServer中使用該類的代碼片段,其余部分與前面的例子中的完全一致:
@Override protected?void?initChannel(SocketChannel?ch)?throws?Exception?{String?delimiter?=?"_$";//?將delimiter設置到DelimiterBasedFrameDecoder中,經過該解碼一器進行處理之后,源數據將會//?被按照_$進行分隔,這里1024指的是分隔的最大長度,即當讀取到1024個字節的數據之后,若還是未//?讀取到分隔符,則舍棄當前數據段,因為其很有可能是由于碼流紊亂造成的ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,Unpooled.wrappedBuffer(delimiter.getBytes())));//?將分隔之后的字節數據轉換為字符串數據ch.pipeline().addLast(new?StringDecoder());//?這是我們自定義的一個編碼器,主要作用是在返回的響應數據最后添加分隔符ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));//?最終處理數據并且返回響應的handlerch.pipeline().addLast(new?EchoServerHandler()); }上面pipeline的設置中,添加的解碼一器主要有DelimiterBasedFrameDecoder和StringDecoder,經過這兩個處理器處理之后,接收到的字節流就會被分隔,并且轉換為字符串數據,最終交由EchoServerHandler處理。這里DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在返回的響應數據之后添加分隔符。如下是該編碼器的源碼:
public?class?DelimiterBasedFrameEncoder?extends?MessageToByteEncoder<String>?{private?String?delimiter;public?DelimiterBasedFrameEncoder(String?delimiter)?{this.delimiter?=?delimiter;}@Overrideprotected?void?encode(ChannelHandlerContext?ctx,?String?msg,?ByteBuf?out)?throws?Exception?{//?在響應的數據后面添加分隔符ctx.writeAndFlush(Unpooled.wrappedBuffer((msg?+?delimiter).getBytes()));} }對于客戶端而言,這里的處理方式與服務端類似,其pipeline的添加方式如下:
@Override protected?void?initChannel(SocketChannel?ch)?throws?Exception?{String?delimiter?=?"_$";//?對服務端返回的消息通過_$進行分隔,并且每次查找的最大大小為1024字節ch.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,?Unpooled.wrappedBuffer(delimiter.getBytes())));//?將分隔之后的字節數據轉換為字符串ch.pipeline().addLast(new?StringDecoder());//?對客戶端發送的數據進行編碼,這里主要是在客戶端發送的數據最后添加分隔符ch.pipeline().addLast(new?DelimiterBasedFrameEncoder(delimiter));//?客戶端發送數據給服務端,并且處理從服務端響應的數據ch.pipeline().addLast(new?EchoClientHandler()); }這里客戶端的處理方式與服務端基本一致,關于這里沒展示的代碼,其與示例一中的代碼完全一致,這里則不予展示。
3.3 LengthFieldBasedFrameDecoder與LengthFieldPrepender
這里LengthFieldBasedFrameDecoder與LengthFieldPrepender需要配合起來使用,其實本質上來講,這兩者一個是解碼,一個是編碼的關系。它們處理粘拆包的主要思想是在生成的數據包中添加一個長度字段,用于記錄當前數據包的長度。LengthFieldBasedFrameDecoder會按照參數指定的包長度偏移量數據對接收到的數據進行解碼,從而得到目標消息體數據;而LengthFieldPrepender則會在響應的數據前面添加指定的字節數據,這個字節數據中保存了當前消息體的整體字節數據長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:
LengthFieldPrepender的編碼過程如下圖所示:
關于LengthFieldBasedFrameDecoder,這里需要對其構造函數參數進行介紹:
- maxFrameLength:指定了每個包所能傳遞的最大數據包大小;
- lengthFieldOffset:指定了長度字段在字節碼中的偏移量;
- lengthFieldLength:指定了長度字段所占用的字節長度;
- lengthAdjustment:對一些不僅包含有消息頭和消息體的數據進行消息頭的長度的調整,這樣就可以只得到消息體的數據,這里的lengthAdjustment指定的就是消息頭的長度;
- initialBytesToStrip:對于長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的字節。
這里我們以json序列化為例對LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進行講解。如下是EchoServer的源碼:
public?class?EchoServer?{public?void?bind(int?port)?throws?InterruptedException?{EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{ServerBootstrap?bootstrap?=?new?ServerBootstrap();bootstrap.group(bossGroup,?workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,?1024).handler(new?LoggingHandler(LogLevel.INFO)).childHandler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{//?這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數據//?進行長度字段解碼,這里也會對數據進行粘包和拆包處理ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));//?LengthFieldPrepender是一個編碼器,主要是在響應字節數據前面添加字節長度字段ch.pipeline().addLast(new?LengthFieldPrepender(2));//?對經過粘包和拆包處理之后的數據進行json反序列化,從而得到User對象ch.pipeline().addLast(new?JsonDecoder());//?對響應數據進行編碼,主要是將User對象序列化為jsonch.pipeline().addLast(new?JsonEncoder());//?處理客戶端的請求的數據,并且進行響應ch.pipeline().addLast(new?EchoServerHandler());}});ChannelFuture?future?=?bootstrap.bind(port).sync();future.channel().closeFuture().sync();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public?static?void?main(String[]?args)?throws?InterruptedException?{new?EchoServer().bind(8080);} }這里EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負責將響應的User對象序列化為json對象,然后在其字節數組前面添加一個長度字段的字節數組;解碼一器主要是對接收到的數據進行長度字段的解碼,然后將其反序列化為一個User對象。下面是JsonDecoder的源碼:
public?class?JsonDecoder?extends?MessageToMessageDecoder<ByteBuf>?{@Overrideprotected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?buf,?List<Object>?out)?throws?Exception?{byte[]?bytes?=?new?byte[buf.readableBytes()];buf.readBytes(bytes);User?user?=?JSON.parseObject(new?String(bytes,?CharsetUtil.UTF_8),?User.class);out.add(user);} }JsonDecoder首先從接收到的數據流中讀取字節數組,然后將其反序列化為一個User對象。下面我們看看JsonEncoder的源碼:
public?class?JsonEncoder?extends?MessageToByteEncoder<User>?{@Overrideprotected?void?encode(ChannelHandlerContext?ctx,?User?user,?ByteBuf?buf)throws?Exception?{String?json?=?JSON.toJSONString(user);ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));} }JsonEncoder將響應得到的User對象轉換為一個json對象,然后寫入響應中。對于EchoServerHandler,其主要作用就是接收客戶端數據,并且進行響應,如下是其源碼:
public?class?EchoServerHandler?extends?SimpleChannelInboundHandler<User>?{@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{System.out.println("receive?from?client:?"?+?user);ctx.write(user);} }對于客戶端,其主要邏輯與服務端的基本類似,這里主要展示其pipeline的添加方式,以及最后發送請求,并且對服務器響應進行處理的過程:
@Override protected?void?initChannel(SocketChannel?ch)?throws?Exception?{ch.pipeline().addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?2,?0,?2));ch.pipeline().addLast(new?LengthFieldPrepender(2));ch.pipeline().addLast(new?JsonDecoder());ch.pipeline().addLast(new?JsonEncoder());ch.pipeline().addLast(new?EchoClientHandler()); } public?class?EchoClientHandler?extends?SimpleChannelInboundHandler<User>?{@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{ctx.write(getUser());}private?User?getUser()?{User?user?=?new?User();user.setAge(27);user.setName("zhangxufeng");return?user;}@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?User?user)?throws?Exception?{System.out.println("receive?message?from?server:?"?+?user);} }這里客戶端首先會在連接上服務器時,往服務器發送一個User對象數據,然后在接收到服務器響應之后,會打印服務器響應的數據。
3.4 自定義粘包與拆包器
對于粘包與拆包問題,其實前面三種基本上已經能夠滿足大多數情形了,但是對于一些更加復雜的協議,可能有一些定制化的需求。對于這些場景,其實本質上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來實現粘包和拆包的處理。
如果用戶確實需要不通過繼承的方式實現自己的粘包和拆包處理器,這里可以通過實現MessageToByteEncoder和ByteToMessageDecoder來實現。這里MessageToByteEncoder的作用是將響應數據編碼為一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數據轉換為某個對象數據。通過實現這兩個抽象類,用戶就可以達到實現自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:
public?abstract?class?ByteToMessageDecoder?extends?ChannelInboundHandlerAdapter?{protected?abstract?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List<Object>?out)?throws?Exception; } public?abstract?class?MessageToByteEncoder<I>?extends?ChannelOutboundHandlerAdapter?{protected?abstract?void?encode(ChannelHandlerContext?ctx,?I?msg,?ByteBuf?out)?throws?Exception; }4. 小結
本文首先對粘包和拆包的問題原理進行描述,幫助讀者理解粘包和拆包問題所在。然后對處理粘包和拆包的幾種常用解決方案進行講解。接著通過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進行了詳細講解。
總結
以上是生活随笔為你收集整理的Netty解决粘包和拆包问题的四种方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cache 缺陷,我好像有
- 下一篇: 又臭又长!流着泪我也要把它给改完!