Socket粘包问题终极解决方案—Netty版(2W字)!
作者 | 王磊
來源 | Java中文社群(ID:javacn666)
轉載請聯系授權(微信ID:GG_Stone)
上一篇我們寫了《Socket粘包問題的3種解決方案》,但沒想到評論區竟然炸了。介于大家的熱情討論,以及不同的反饋意見,本文就來做一個擴展和延伸,試圖找到問題的最優解,以及消息通訊的最優解決方案。
在正式開始之前,我們先對上篇評論中的幾個典型問題做一個簡單的回復,不感興趣的朋友可直接劃過。
問題一:TCP存在粘包問題嗎?
先說答案:TCP 本身并沒有粘包和半包一說,因為 TCP 本質上只是一個傳輸控制協議(Transmission Control Protocol,TCP),它是一種面向連接的、可靠的、基于字節流的傳輸層通信協議,由 IETF 的 RFC 793 定義。
所謂的協議本質上是一個約定,就好比 Java 編程約定使用駝峰命名法一樣,約定的意義是為了讓通訊雙方,能夠正常的進行消息互換的,那粘包和半包問題又是如何產生的呢?
這是因為在 TCP 的交互中,數據是以字節流的形式進行傳輸的,而“流”的傳輸是沒有邊界的,因為沒有邊界所以就不能區分消息的歸屬,從而就會產生粘包和半包問題(粘包和半包的定義,詳見上一篇)。所以說 TCP 協議本身并不存在粘包和半包問題,只是在使用中如果不能有效的確定流的邊界就會產生粘包和半包問題。
問題二:分隔符是最優解決方案?
坦白的說,經過評論區大家的耐心“開導”,我也意識到了以結束符作為最終的解決方案存在一定的局限性,比如當一條消息中間如果出現了結束符就會造成半包的問題,所以如果是復雜的字符串要對內容進行編碼和解碼處理,這樣才能保證結束符的正確性。
問題三:Socket 高效嗎?
這個問題的答案是否定的,其實上文在開頭已經描述了應用場景:「傳統的 Socket 編程」,學習它的意義就在于理解更早期更底層的一些知識,當然作為補充本文會提供更加高效的消息通訊方案——Netty 通訊。
聊完了以上問題,接下來咱們先來補充一下上篇文章中提到的,將消息分為消息頭和消息體的代碼實現。
一、封裝消息頭和消息體
在開始寫服務器端和客戶端之前,咱們先來編寫一個消息的封裝類,使用它可以將消息封裝成消息頭和消息體,如下圖所示:消息頭中存儲消息體的長度,從而確定了消息的邊界,便解決粘包和半包問題。
1.消息封裝類
消息的封裝類中提供了兩個方法:一個是將消息轉換成消息頭 + 消息體的方法,另一個是讀取消息頭的方法,具體實現代碼如下:
/***?消息封裝類*/ class?SocketPacket?{//?消息頭存儲的長度(占?8?字節)static?final?int?HEAD_SIZE?=?8;/***?將協議封裝為:協議頭?+?協議體*?@param?context?消息體(String?類型)*?@return?byte[]*/public?byte[]?toBytes(String?context)?{//?協議體?byte?數組byte[]?bodyByte?=?context.getBytes();int?bodyByteLength?=?bodyByte.length;//?最終封裝對象byte[]?result?=?new?byte[HEAD_SIZE?+?bodyByteLength];//?借助?NumberFormat?將?int?轉換為?byte[]NumberFormat?numberFormat?=?NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(HEAD_SIZE);numberFormat.setGroupingUsed(false);//?協議頭?byte?數組byte[]?headByte?=?numberFormat.format(bodyByteLength).getBytes();//?封裝協議頭System.arraycopy(headByte,?0,?result,?0,?HEAD_SIZE);//?封裝協議體System.arraycopy(bodyByte,?0,?result,?HEAD_SIZE,?bodyByteLength);return?result;}/***?獲取消息頭的內容(也就是消息體的長度)*?@param?inputStream*?@return*/public?int?getHeader(InputStream?inputStream)?throws?IOException?{int?result?=?0;byte[]?bytes?=?new?byte[HEAD_SIZE];inputStream.read(bytes,?0,?HEAD_SIZE);//?得到消息體的字節長度result?=?Integer.valueOf(new?String(bytes));return?result;} }2.編寫客戶端
接下來我們來定義客戶端,在客戶端中我們添加一組待發送的消息,隨機給服務器端發送一個消息,實現代碼如下:
/***?客戶端*/ class?MySocketClient?{public?static?void?main(String[]?args)?throws?IOException?{//?啟動?Socket?并嘗試連接服務器Socket?socket?=?new?Socket("127.0.0.1",?9093);//?發送消息合集(隨機發送一條消息)final?String[]?message?=?{"Hi,Java.",?"Hi,SQL~",?"關注公眾號|Java中文社群."};//?創建協議封裝對象SocketPacket?socketPacket?=?new?SocketPacket();try?(OutputStream?outputStream?=?socket.getOutputStream())?{//?給服務器端發送?10?次消息for?(int?i?=?0;?i?<?10;?i++)?{//?隨機發送一條消息String?msg?=?message[new?Random().nextInt(message.length)];//?將內容封裝為:協議頭+協議體byte[]?bytes?=?socketPacket.toBytes(msg);//?發送消息outputStream.write(bytes,?0,?bytes.length);outputStream.flush();}}} }3.編寫服務器端
服務器端我們使用線程池來處理每個客戶端的業務請求,實現代碼如下:
/***?服務器端*/ class?MySocketServer?{public?static?void?main(String[]?args)?throws?IOException?{//?創建?Socket?服務器端ServerSocket?serverSocket?=?new?ServerSocket(9093);//?獲取客戶端連接Socket?clientSocket?=?serverSocket.accept();//?使用線程池處理更多的客戶端ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(100,?150,?100,TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(1000));threadPool.submit(()?->?{//?客戶端消息處理processMessage(clientSocket);});}/***?客戶端消息處理*?@param?clientSocket*/private?static?void?processMessage(Socket?clientSocket)?{//?Socket?封裝對象SocketPacket?socketPacket?=?new?SocketPacket();//?獲取客戶端發送的消息對象try?(InputStream?inputStream?=?clientSocket.getInputStream())?{while?(true)?{//?獲取消息頭(也就是消息體的長度)int?bodyLength?=?socketPacket.getHeader(inputStream);//?消息體?byte?數組byte[]?bodyByte?=?new?byte[bodyLength];//?每次實際讀取字節數int?readCount?=?0;//?消息體賦值下標int?bodyIndex?=?0;//?循環接收消息頭中定義的長度while?(bodyIndex?<=?(bodyLength?-?1)?&&(readCount?=?inputStream.read(bodyByte,?bodyIndex,?bodyLength))?!=?-1)?{bodyIndex?+=?readCount;}bodyIndex?=?0;//?成功接收到客戶端的消息并打印System.out.println("接收到客戶端的信息:"?+?new?String(bodyByte));}}?catch?(IOException?ioException)?{System.out.println(ioException.getMessage());}} }以上程序的執行結果如下:從上述結果可以看出,消息通訊正常,客戶端和服務器端的交互中并沒有出現粘包和半包的問題。
二、使用 Netty 實現高效通訊
以上的內容都是針對傳統 Socket 編程的,但要實現更加高效的通訊和連接對象的復用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,異步非阻塞 IO)了。
傳統的 Socket 編程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO ?的區別如下:
BIO 來自傳統的 java.io 包,它是基于流模型實現的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調用是可靠的線性順序。它的優點就是代碼比較簡單、直觀;缺點就是 IO 的效率和擴展性很低,容易成為應用性能瓶頸。
NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以構建多路復用的、同步非阻塞 IO 程序,同時提供了更接近操作系統底層高性能的數據操作方式。
AIO 是 Java 1.7 之后引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操作方式,因此人們叫它 AIO(Asynchronous IO),異步 IO 是基于事件和回調機制實現的,也就是應用操作之后會直接返回,不會堵塞在那里,當后臺處理完成,操作系統會通知相應的線程進行后續的操作。
PS:AIO 可以看作是 NIO 的升級,它也叫 NIO 2。
傳統 Socket 的通訊流程:NIO 的通訊流程:
使用 Netty 替代傳統 NIO 編程
NIO 的設計思路雖然很好,但它的代碼編寫比較麻煩,比如 Buffer 的使用和 Selector 的編寫等。并且在面對斷線重連、包丟失和粘包等復雜問題時手動處理的成本都很大,因此我們通常會使用 Netty 框架來替代傳統的 NIO。
Netty 是什么?
Netty 是一個異步、事件驅動的用來做高性能、高可靠性的網絡應用框架,使用它可以快速輕松地開發網絡應用程序,極大的簡化了網絡編程的復雜度。
Netty 主要優點有以下幾個:
框架設計優雅,底層模型隨意切換適應不同的網絡協議要求;
提供很多標準的協議、安全、編碼解碼的支持;
簡化了 NIO 使用中的諸多不便;
社區非常活躍,很多開源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。
Netty 主要包含以下 3 個部分,如下圖所示:
這 3 個部分的功能介紹如下。
1. Core 核心層
Core 核心層是 Netty 最精華的內容,它提供了底層網絡通信的通用抽象和實現,包括可擴展的事件模型、通用的通信 API、支持零拷貝的 ByteBuf 等。
2. Protocol Support 協議支持層
協議支持層基本上覆蓋了主流協議的編解碼實現,如 HTTP、SSL、Protobuf、壓縮、大文件傳輸、WebSocket、文本、二進制等主流協議,此外 Netty 還支持自定義應用層協議。Netty 豐富的協議支持降低了用戶的開發成本,基于 Netty 我們可以快速開發 HTTP、WebSocket 等服務。
3. Transport Service 傳輸服務層
傳輸服務層提供了網絡傳輸能力的定義和實現方法。它支持 Socket、HTTP 隧道、虛擬機管道等傳輸方式。Netty 對 TCP、UDP 等數據傳輸做了抽象和封裝,用戶可以更聚焦在業務邏輯實現上,而不必關系底層數據傳輸的細節。
Netty 使用
對 Netty 有了大概的認識之后,接下來我們用 Netty 來編寫一個基礎的通訊服務器,它包含兩個端:服務器端和客戶端,客戶端負責發送消息,服務器端負責接收并打印消息,具體的實現步驟如下。
1.添加 Netty 框架
首先我們需要先添加 Netty 框架的支持,如果是 Maven 項目添加如下配置即可:
<!--?添加?Netty?框架?--> <!--?https://mvnrepository.com/artifact/io.netty/netty-all?--> <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.56.Final</version> </dependency>Netty 版本說明
Netty 的 3.x 和 4.x 為主流的穩定版本,而最新的 5.x 已經是放棄的測試版了,因此推薦使用 Netty 4.x 的最新穩定版。
2. 服務器端實現代碼
按照官方的推薦,這里將服務器端的代碼分為以下 3 個部分:
MyNettyServer:服務器端的核心業務代碼;
ServerInitializer:服務器端通道(Channel)初始化;
ServerHandler:服務器端接收到信息之后的處理邏輯。
PS:Channel 字面意思為“通道”,它是網絡通信的載體。Channel 提供了基本的 API 用于網絡 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己實現的 Channel 是以 JDK NIO Channel 為基礎的,相比較于 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時屏蔽了底層 Socket 的復雜性,賦予了 Channel 更加強大的功能,你在使用 Netty 時基本不需要再與 Java Socket 類直接打交道。
服務器端的實現代碼如下:
//?定義服務器的端口號 static?final?int?PORT?=?8007;/***?服務器端*/ static?class?MyNettyServer?{public?static?void?main(String[]?args)?{//?創建一個線程組,用來負責接收客戶端連接EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();//?創建另一個線程組,用來負責?I/O?的讀寫EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{//?創建一個?Server?實例(可理解為?Netty?的入門類)ServerBootstrap?b?=?new?ServerBootstrap();//?將兩個線程池設置到?Server?實例b.group(bossGroup,?workerGroup)//?設置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務器).channel(NioServerSocketChannel.class)//?設置建立連接之后的執行器(ServerInitializer?是我創建的一個自定義類).childHandler(new?ServerInitializer());//?綁定端口并且進行同步ChannelFuture?future?=?b.bind(PORT).sync();//?對關閉通道進行監聽future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?資源關閉bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }/***?服務端通道初始化*/ static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?服務器端連接之后的執行器(自定義的類)private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();/***?初始化通道的具體執行方法*/@Overridepublic?void?initChannel(SocketChannel?ch)?{//?通道?Channel?設置ChannelPipeline?pipeline?=?ch.pipeline();//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?服務器端連接之后的執行器,接收到消息之后的業務處理pipeline.addLast(SERVER_HANDLER);} }/***?服務器端接收到消息之后的業務處理類*/ static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到客戶端的消息*/@Overridepublic?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{if?(!request.isEmpty())?{System.out.println("接到客戶端的消息:"?+?request);}}/***?數據讀取完畢*/@Overridepublic?void?channelReadComplete(ChannelHandlerContext?ctx)?{ctx.flush();}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();} }3.客戶端實現代碼
客戶端的代碼實現也是分為以下 3 個部分:
MyNettyClient:客戶端核心業務代碼;
ClientInitializer:客戶端通道初始化;
ClientHandler:接收到消息之后的處理邏輯。
客戶端的實現代碼如下:
/***?客戶端*/ static?class?MyNettyClient?{public?static?void?main(String[]?args)?{//?創建事件循環線程組(客戶端的線程組只有一個)EventLoopGroup?group?=?new?NioEventLoopGroup();try?{//?Netty?客戶端啟動對象Bootstrap?b?=?new?Bootstrap();//?設置啟動參數b.group(group)//?設置通道類型.channel(NioSocketChannel.class)//?設置啟動執行器(負責啟動事件的業務執行,ClientInitializer?為自定義的類).handler(new?ClientInitializer());//?連接服務器端并同步通道Channel?ch?=?b.connect("127.0.0.1",?8007).sync().channel();//?發送消息ChannelFuture?lastWriteFuture?=?null;//?給服務器端發送?10?條消息for?(int?i?=?0;?i?<?10;?i++)?{//?發送給服務器消息lastWriteFuture?=?ch.writeAndFlush("Hi,Java.");}//?在關閉通道之前,同步刷新所有的消息if?(lastWriteFuture?!=?null)?{lastWriteFuture.sync();}}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?釋放資源group.shutdownGracefully();}} }/***?客戶端通道初始化類*/ static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?客戶端連接成功之后業務處理private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();/***?初始化客戶端通道*/@Overridepublic?void?initChannel(SocketChannel?ch)?{ChannelPipeline?pipeline?=?ch.pipeline();//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?客戶端連接成功之后的業務處理pipeline.addLast(CLIENT_HANDLER);} }/***?客戶端連接成功之后的業務處理*/ static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到服務器端的消息*/@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{System.err.println("接到服務器的消息:"?+?msg);}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();} }從以上代碼可以看出,我們代碼實現的功能是,客戶端給服務器端發送 10 條消息。
編寫完上述代碼之后,我們就可以啟動服務器端和客戶端了,啟動之后,它們的執行結果如下:從上述結果中可以看出,雖然客戶端和服務器端實現了通信,但在 Netty 的使用中依然存在粘包的問題,服務器端一次收到了 10 條消息,而不是每次只收到一條消息,因此接下來我們要解決掉 Netty 中的粘包問題。
三、解決 Netty 粘包問題
在 Netty 中,解決粘包問題的常用方案有以下 3 種:
設置固定大小的消息長度,如果長度不足則使用空字符彌補,它的缺點比較明顯,比較消耗網絡流量,因此不建議使用;
使用分隔符來確定消息的邊界,從而避免粘包和半包問題的產生;
將消息分為消息頭和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息。
接下來我們分別來看后兩種推薦的解決方案。
1.使用分隔符解決粘包問題
在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號作為消息的結束符,從而解決粘包和半包的問題。
它的核心實現代碼是在初始化通道(Channel)時,通過設置 DelimiterBasedFrameDecoder 來分隔消息,需要在客戶端和服務器端都進行設置,具體實現代碼如下。
服務器端核心實現代碼如下:
/***?服務端通道初始化*/ static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?服務器端連接之后的執行器(自定義的類)private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();/***?初始化通道的具體執行方法*/@Overridepublic?void?initChannel(SocketChannel?ch)?{//?通道?Channel?設置ChannelPipeline?pipeline?=?ch.pipeline();// 19 行:設置結尾分隔符【核心代碼】(參數1:為消息的最大長度,可自定義;參數2:分隔符[此處以換行符為分隔符])pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?服務器端連接之后的執行器,接收到消息之后的業務處理pipeline.addLast(SERVER_HANDLER);} }核心代碼為第 19 行,代碼中已經備注了方法的含義,這里就不再贅述。
客戶端的核心實現代碼如下:
/***?客戶端通道初始化類*/ static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?客戶端連接成功之后業務處理private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();/***?初始化客戶端通道*/@Overridepublic?void?initChannel(SocketChannel?ch)?{ChannelPipeline?pipeline?=?ch.pipeline();// 17 行:設置結尾分隔符【核心代碼】(參數1:為消息的最大長度,可自定義;參數2:分隔符[此處以換行符為分隔符])pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?客戶端連接成功之后的業務處理pipeline.addLast(CLIENT_HANDLER);} }完整的服務器端和客戶端的實現代碼如下:
import?io.netty.bootstrap.Bootstrap; import?io.netty.bootstrap.ServerBootstrap; import?io.netty.channel.*; import?io.netty.channel.nio.NioEventLoopGroup; import?io.netty.channel.socket.SocketChannel; import?io.netty.channel.socket.nio.NioServerSocketChannel; import?io.netty.channel.socket.nio.NioSocketChannel; import?io.netty.handler.codec.DelimiterBasedFrameDecoder; import?io.netty.handler.codec.Delimiters; import?io.netty.handler.codec.string.StringDecoder; import?io.netty.handler.codec.string.StringEncoder;public?class?NettyExample?{//?定義服務器的端口號static?final?int?PORT?=?8007;/***?服務器端*/static?class?MyNettyServer?{public?static?void?main(String[]?args)?{//?創建一個線程組,用來負責接收客戶端連接EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();//?創建另一個線程組,用來負責?I/O?的讀寫EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{//?創建一個?Server?實例(可理解為?Netty?的入門類)ServerBootstrap?b?=?new?ServerBootstrap();//?將兩個線程池設置到?Server?實例b.group(bossGroup,?workerGroup)//?設置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務器).channel(NioServerSocketChannel.class)//?設置建立連接之后的執行器(ServerInitializer?是我創建的一個自定義類).childHandler(new?ServerInitializer());//?綁定端口并且進行同步ChannelFuture?future?=?b.bind(PORT).sync();//?對關閉通道進行監聽future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?資源關閉bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}/***?服務端通道初始化*/static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?服務器端連接之后的執行器(自定義的類)private?static?final?ServerHandler?SERVER_HANDLER?=?new?ServerHandler();/***?初始化通道的具體執行方法*/@Overridepublic?void?initChannel(SocketChannel?ch)?{//?通道?Channel?設置ChannelPipeline?pipeline?=?ch.pipeline();//?設置結尾分隔符pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?服務器端連接之后的執行器,接收到消息之后的業務處理pipeline.addLast(SERVER_HANDLER);}}/***?服務器端接收到消息之后的業務處理類*/static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到客戶端的消息*/@Overridepublic?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{if?(!request.isEmpty())?{System.out.println("接到客戶端的消息:"?+?request);}}/***?數據讀取完畢*/@Overridepublic?void?channelReadComplete(ChannelHandlerContext?ctx)?{ctx.flush();}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}}/***?客戶端*/static?class?MyNettyClient?{public?static?void?main(String[]?args)?{//?創建事件循環線程組(客戶端的線程組只有一個)EventLoopGroup?group?=?new?NioEventLoopGroup();try?{//?Netty?客戶端啟動對象Bootstrap?b?=?new?Bootstrap();//?設置啟動參數b.group(group)//?設置通道類型.channel(NioSocketChannel.class)//?設置啟動執行器(負責啟動事件的業務執行,ClientInitializer?為自定義的類).handler(new?ClientInitializer());//?連接服務器端并同步通道Channel?ch?=?b.connect("127.0.0.1",?PORT).sync().channel();//?發送消息ChannelFuture?lastWriteFuture?=?null;//?給服務器端發送?10?條消息for?(int?i?=?0;?i?<?10;?i++)?{//?發送給服務器消息lastWriteFuture?=?ch.writeAndFlush("Hi,Java.\n");}//?在關閉通道之前,同步刷新所有的消息if?(lastWriteFuture?!=?null)?{lastWriteFuture.sync();}}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?釋放資源group.shutdownGracefully();}}}/***?客戶端通道初始化類*/static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?客戶端連接成功之后業務處理private?static?final?ClientHandler?CLIENT_HANDLER?=?new?ClientHandler();/***?初始化客戶端通道*/@Overridepublic?void?initChannel(SocketChannel?ch)?{ChannelPipeline?pipeline?=?ch.pipeline();//?設置結尾分隔符pipeline.addLast(new?DelimiterBasedFrameDecoder(1024,?Delimiters.lineDelimiter()));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?客戶端連接成功之后的業務處理pipeline.addLast(CLIENT_HANDLER);}}/***?客戶端連接成功之后的業務處理*/static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到服務器端的消息*/@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{System.err.println("接到服務器的消息:"?+?msg);}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}} }最終的執行結果如下圖所示:從上述結果中可以看出,Netty 可以正常使用了,它已經不存在粘包和半包問題了。
2.封裝消息解決粘包問題
此解決方案的核心是將消息分為消息頭 + 消息體,在消息頭中保存消息體的長度,從而確定一條消息的邊界,這樣就避免了粘包和半包問題了,它的實現過程如下圖所示:在 Netty 中可以通過 LengthFieldPrepender(編碼)和 LengthFieldBasedFrameDecoder(解碼)兩個類實現消息的封裝。和上一個解決方案類似,我們需要分別在服務器端和客戶端通過設置通道(Channel)來解決粘包問題。
服務器端的核心代碼如下:
/***?服務端通道初始化*/ static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?服務器端連接之后的執行器(自定義的類)private?static?final?NettyExample.ServerHandler?SERVER_HANDLER?=?new?NettyExample.ServerHandler();/***?初始化通道的具體執行方法*/@Overridepublic?void?initChannel(SocketChannel?ch)?{//?通道?Channel?設置ChannelPipeline?pipeline?=?ch.pipeline();// 18 行:消息解碼:讀取消息頭和消息體pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));// 20?行:消息編碼:將消息封裝為消息頭和消息體,在消息前添加消息體的長度pipeline.addLast(new?LengthFieldPrepender(4));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?服務器端連接之后的執行器,接收到消息之后的業務處理pipeline.addLast(SERVER_HANDLER);} }其中核心代碼是 18 行和 20 行,通過 LengthFieldPrepender 實現編碼(將消息打包成消息頭 + 消息體),通過 LengthFieldBasedFrameDecoder 實現解碼(從封裝的消息中取出消息的內容)。
LengthFieldBasedFrameDecoder 的參數說明如下:
參數 1:maxFrameLength - 發送的數據包最大長度;
參數 2:lengthFieldOffset - 長度域偏移量,指的是長度域位于整個數據包字節數組中的下標;
參數 3:lengthFieldLength - 長度域自己的字節數長度;
參數 4:lengthAdjustment – 長度域的偏移量矯正。如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長;
參數 5:initialBytesToStrip – 丟棄的起始字節數。丟棄處于有效數據前面的字節數量。比如前面有 4 個節點的長度域,則它的值為 4。
LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:數據包最大長度為 1024,長度域占首部的四個字節,在讀數據的時候去掉首部四個字節(即長度域)。
客戶端的核心實現代碼如下:
/***?客戶端通道初始化類*/ static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?客戶端連接成功之后業務處理private?static?final?NettyExample.ClientHandler?CLIENT_HANDLER?=?new?NettyExample.ClientHandler();/***?初始化客戶端通道*/@Overridepublic?void?initChannel(SocketChannel?ch)?{ChannelPipeline?pipeline?=?ch.pipeline();//?消息解碼:讀取消息頭和消息體pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));//?消息編碼:將消息封裝為消息頭和消息體,在響應字節數據前面添加消息體長度pipeline.addLast(new?LengthFieldPrepender(4));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?客戶端連接成功之后的業務處理pipeline.addLast(CLIENT_HANDLER);} }完整的服務器端和客戶端的實現代碼如下:
import?io.netty.bootstrap.Bootstrap; import?io.netty.bootstrap.ServerBootstrap; import?io.netty.channel.*; import?io.netty.channel.nio.NioEventLoopGroup; import?io.netty.channel.socket.SocketChannel; import?io.netty.channel.socket.nio.NioServerSocketChannel; import?io.netty.channel.socket.nio.NioSocketChannel; import?io.netty.handler.codec.LengthFieldBasedFrameDecoder; import?io.netty.handler.codec.LengthFieldPrepender; import?io.netty.handler.codec.string.StringDecoder; import?io.netty.handler.codec.string.StringEncoder;/***?通過封裝?Netty?來解決粘包*/ public?class?NettyExample?{//?定義服務器的端口號static?final?int?PORT?=?8007;/***?服務器端*/static?class?MyNettyServer?{public?static?void?main(String[]?args)?{//?創建一個線程組,用來負責接收客戶端連接EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();//?創建另一個線程組,用來負責?I/O?的讀寫EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{//?創建一個?Server?實例(可理解為?Netty?的入門類)ServerBootstrap?b?=?new?ServerBootstrap();//?將兩個線程池設置到?Server?實例b.group(bossGroup,?workerGroup)//?設置?Netty?通道的類型為?NioServerSocket(非阻塞?I/O?Socket?服務器).channel(NioServerSocketChannel.class)//?設置建立連接之后的執行器(ServerInitializer?是我創建的一個自定義類).childHandler(new?NettyExample.ServerInitializer());//?綁定端口并且進行同步ChannelFuture?future?=?b.bind(PORT).sync();//?對關閉通道進行監聽future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?資源關閉bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}/***?服務端通道初始化*/static?class?ServerInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?服務器端連接之后的執行器(自定義的類)private?static?final?NettyExample.ServerHandler?SERVER_HANDLER?=?new?NettyExample.ServerHandler();/***?初始化通道的具體執行方法*/@Overridepublic?void?initChannel(SocketChannel?ch)?{//?通道?Channel?設置ChannelPipeline?pipeline?=?ch.pipeline();//?消息解碼:讀取消息頭和消息體pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));//?消息編碼:將消息封裝為消息頭和消息體,在響應字節數據前面添加消息體長度pipeline.addLast(new?LengthFieldPrepender(4));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?服務器端連接之后的執行器,接收到消息之后的業務處理pipeline.addLast(SERVER_HANDLER);}}/***?服務器端接收到消息之后的業務處理類*/static?class?ServerHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到客戶端的消息*/@Overridepublic?void?channelRead0(ChannelHandlerContext?ctx,?String?request)?{if?(!request.isEmpty())?{System.out.println("接到客戶端的消息:"?+?request);}}/***?數據讀取完畢*/@Overridepublic?void?channelReadComplete(ChannelHandlerContext?ctx)?{ctx.flush();}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}}/***?客戶端*/static?class?MyNettyClient?{public?static?void?main(String[]?args)?{//?創建事件循環線程組(客戶端的線程組只有一個)EventLoopGroup?group?=?new?NioEventLoopGroup();try?{//?Netty?客戶端啟動對象Bootstrap?b?=?new?Bootstrap();//?設置啟動參數b.group(group)//?設置通道類型.channel(NioSocketChannel.class)//?設置啟動執行器(負責啟動事件的業務執行,ClientInitializer?為自定義的類).handler(new?NettyExample.ClientInitializer());//?連接服務器端并同步通道Channel?ch?=?b.connect("127.0.0.1",?PORT).sync().channel();//?發送消息ChannelFuture?lastWriteFuture?=?null;//?給服務器端發送?10?條消息for?(int?i?=?0;?i?<?10;?i++)?{//?發送給服務器消息lastWriteFuture?=?ch.writeAndFlush("Hi,Java.\n");}//?在關閉通道之前,同步刷新所有的消息if?(lastWriteFuture?!=?null)?{lastWriteFuture.sync();}}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?釋放資源group.shutdownGracefully();}}}/***?客戶端通道初始化類*/static?class?ClientInitializer?extends?ChannelInitializer<SocketChannel>?{//?字符串編碼器和解碼器private?static?final?StringDecoder?DECODER?=?new?StringDecoder();private?static?final?StringEncoder?ENCODER?=?new?StringEncoder();//?客戶端連接成功之后業務處理private?static?final?NettyExample.ClientHandler?CLIENT_HANDLER?=?new?NettyExample.ClientHandler();/***?初始化客戶端通道*/@Overridepublic?void?initChannel(SocketChannel?ch)?{ChannelPipeline?pipeline?=?ch.pipeline();//?消息解碼:讀取消息頭和消息體pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));//?消息編碼:將消息封裝為消息頭和消息體,在響應字節數據前面添加消息體長度pipeline.addLast(new?LengthFieldPrepender(4));//?設置(字符串)編碼器和解碼器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);//?客戶端連接成功之后的業務處理pipeline.addLast(CLIENT_HANDLER);}}/***?客戶端連接成功之后的業務處理*/static?class?ClientHandler?extends?SimpleChannelInboundHandler<String>?{/***?讀取到服務器端的消息*/@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{System.err.println("接到服務器的消息:"?+?msg);}/***?異常處理,打印異常并關閉通道*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}} }以上程序的執行結果為:
四、總結
本文提供了傳統 Socket 通訊將消息分為消息頭和消息體的具體代碼實現,然而傳統的 Socket 在性能和復用性上表現一般,為了更加高效的實現通訊,我們可以使用 Netty 框架來替代傳統的 Socket 和 NIO 編程,但 Netty 在使用時依然會出現粘包的問題,于是我們提供了兩種最常見的解決方案:通過分隔符或將封裝消息的解決方案,其中最后一種解決方案的使用更加廣泛。
參考 & 鳴謝
《Netty 核心原理剖析與 RPC 實踐》
往期推薦Socket粘包問題的3種解決方案,最后一種最完美!
文件寫入的6種方法,這種方法性能最好
SpringBoot集成Google開源圖片處理框架,賊好用!
關注我,每天陪你進步一點點!
總結
以上是生活随笔為你收集整理的Socket粘包问题终极解决方案—Netty版(2W字)!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何系统学习python
- 下一篇: 报告老板:这次的缓存事故是这样的...