你了解Netty的编解码器吗?史上最通俗易懂的Netty解码器应用案例带你解开Netty解码器的神秘面纱
Netty解碼器也是非常重要的一個模塊, 服務端接收到客戶端發送過來的消息, 準確說是字節數組, Netty底層已經將它們讀取成ByteBuf了, 但是這些ByteBuf是沒有任何含義的,需要我們根據業務來對字節數組進行解碼。本文中我們將介紹Netty中常見的兩種解碼器DelimiterBasedFrameDecoder和FixedLengthFrameDecoder。
Netty解碼器
- 1. 前言
- 1.1 Netty解碼器
- 1.2 ByteToMessageDecoder
- 2. DelimiterBasedFrameDecoder解碼器應用
- 2.1 服務端代碼
- 2.2 客戶端代碼
- 2.3 運行結果
- 3. FixedLengthFrameDecoder解碼器應用
- 3.1 服務端代碼
- 3.2 telnet客戶端測試
1. 前言
TCP以流的方式進行傳輸數據,上層的應用協議為了對消息進行區分,通常會采用以下4種方式:
- 消息長度固定:累計讀取到長度總和為定長LEN的報文后,就認為讀取到了一個完整的消息;然后會將計數器重置,重新開始讀取下一個數據報;
- 回車換行符作為消息結束符:以回車換行符來標記一個數據報j結束;
- 特殊字符標識:自定義一個特殊字符,來作為數據報結束的標識;
- 消息頭定義:通過在消息頭種定義長度字段來標識消息的總長度。
Netty對上述4種應用做了統一的抽象,提供了4種解碼器來解決相應的問題。在本文中我們將詳細介紹DelimiterBasedFrameDecoder和FixedLengthFrameDecoder兩種解碼器,兩者分別可以完成上述第3和第1種功能。
1.1 Netty解碼器
在對這兩種解碼器介紹之前,我們簡單了解一下Netty解碼器的工作原理。下面首先給出一個簡單的示例:
上述圖片種展示是一個ToIntegerDecoder解碼器的工作過程,從字面上我們可以了解,該解碼器是將一個字節數組轉化為Integer類型數據。decoder 負責將“入站”數據從一種格式轉換到另一種格式,Netty的解碼器是一種 ChannelInboundHandler 的抽象實現。實踐中使用解碼器很簡單,就是將入站數據轉換格式后傳遞到 ChannelPipeline 中的下一個ChannelInboundHandler 進行處理;這樣的處理是很靈活的,我們可以將解碼器放在 ChannelPipeline 中,重用邏輯。
Netty 提供了豐富的解碼器抽象基類,我們可以很容易的實現這些基類來自定義解碼器。主要分兩類:
- 解碼字節到消息(ByteToMessageDecoder 和 ReplayingDecoder)
- 解碼消息到消息(MessageToMessageDecoder)
由于常用的幾種解碼器都是解碼字節到消息,那么下面簡單了解ByteToMessageDecoder。
1.2 ByteToMessageDecoder
ByteToMessageDecoder 是用于將字節轉為消息(或其他字節序列)。你不能確定遠端是否會一次發送完一個完整的“信息”,因此這個類會緩存入站的數據,直到準備好了用于處理。ByteToMessageDecoder抽象類中有兩個最重要的方法,如下表所示:
| Decode | 需要實現的唯一抽象方法。 通過具有輸入字節的ByteBuf和添加了已解碼消息的List來調用它。 反復調用decode(),直到列表返回時為空。 然后將List的內容傳遞到管道中的下一個處理程序。 |
| decodeLast | 所提供的默認實現只調用了decode()。當Channel變為非活動狀態時,此方法被調用一次。 |
下面我們依舊使用上面的ToIntegerDecoder作為示例。假設我們接收一個包含簡單整數的字節流,每個都單獨處理。在本例中,我們將從入站 ByteBuf 讀取每個整數并將其傳遞給 pipeline 中的下一個ChannelInboundHandler。“解碼”字節流成整數我們將擴展ByteToMessageDecoder,實現類為“ToIntegerDecoder”,如下圖所示。
每次從入站的 ByteBuf 讀取四個字節,解碼成整形,并添加到一個 List (本例是指 Integer),當不能再添加數據到 list 時,它所包含的內容就會被發送到下個 ChannelInboundHandler
讀者如果想要詳細了解解碼器的源碼設計可以閱讀博客。
2. DelimiterBasedFrameDecoder解碼器應用
DelimiterBasedFrameDecoder解碼器是通用的分隔符解碼器,可支持多個分隔符,每個分隔符可為一個或多個字符。如果定義了多個分隔符,并且可解碼出多個消息幀,則選擇產生最小幀長的結果。下面我們使用$_作為分隔符來演示。
2.1 服務端代碼
package netty.frame.delimiter;import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder;/*** created by LMR on 2020/5/20*/ public class EchoServer {public void bind(int port) throws Exception {// 配置服務端的NIO線程組EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new EchoServerHandler());}});// 綁定端口,同步等待成功ChannelFuture f = b.bind(port).sync();// 等待服務端監聽端口關閉f.channel().closeFuture().sync();} finally {// 優雅退出,釋放線程池資源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;new EchoServer().bind(port);} }在initChannel方法中,我們首先創建分隔符緩沖對象ByteBuf,然后使用該分隔符緩沖對象創建DelimiterBasedFrameDecoder編碼器對象,并加入到ChannelPipeline中。其中第二個參數白哦是消息的最大長度,如果超過這個長度還沒有找到分隔符,則認為消息出錯,報出異常,這是為了防止異常數據導致內存溢出,提高編碼器的可靠性,最后還添加了字符串解碼器和服務端處理類實例。
package netty.frame.delimiter;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /*** created by LMR on 2020/5/20*/ public class EchoServerHandler extends ChannelInboundHandlerAdapter {int counter = 0;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {String body = (String) msg;System.out.println("This is " + ++counter + " times receive client : ["+ body + "]");body += "$_";ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());ctx.writeAndFlush(echo);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();// 發生異常,關閉鏈路} }channelRead方法非常簡單,由于我們在ChannelPipeline中添加了多個編碼器,那個在這里接收到的消息直接就是完整的消息數據字符串,由于我們使用DelimiterBasedFrameDecoder解碼器過濾掉了分隔符,在這里我們重新添加分隔符,以便于客戶端識別,再發送給客戶端。
2.2 客戶端代碼
package netty.frame.delimiter;import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder;/*** created by LMR on 2020/5/20*/ public class EchoClient {public void connect(int port, String host) throws Exception {// 配置客戶端NIO線程組EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new EchoClientHandler());}});// 發起異步連接操作ChannelFuture f = b.connect(host, port).sync();// 當代客戶端鏈路關閉f.channel().closeFuture().sync();} finally {// 優雅退出,釋放NIO線程組group.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;new EchoClient().connect(port, "127.0.0.1");} }同樣在客戶端我們也添加相應的解碼器,然后創建客戶端處理類對象EchoClientHandler。
package netty.frame.delimiter;import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;/*** created by LMR on 2020/5/20*/ public class EchoClientHandler extends ChannelInboundHandlerAdapter {private int counter;static final String ECHO_REQ = "This is a example by LMRZero.$_";@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 0; i < 10; i++) {ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {System.out.println("This is " + ++counter + " times receive server : ["+ msg + "]");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }2.3 運行結果
服務端結果:
客戶端結果:
3. FixedLengthFrameDecoder解碼器應用
FixedLengthFrameDecoder是按照固定長度frameLength解碼出消息幀。在本節中我們使用一個應用實例對其用法進行介紹。
3.1 服務端代碼
我們在服務端ChannelPipeline中添加FixedLengthFrameDecoder,設置其長度為20,然后同樣添加字符串解碼器和服務端處理類實例。
package netty.frame.fixedLen;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder;/*** created by LMR on 2020/5/20*/ public class EchoServer {public void bind(int port) throws Exception {// 配置服務端的NIO線程組EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ch.pipeline().addLast(new FixedLengthFrameDecoder(20));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new EchoServerHandler());}});// 綁定端口,同步等待成功ChannelFuture f = b.bind(port).sync();// 等待服務端監聽端口關閉f.channel().closeFuture().sync();} finally {// 優雅退出,釋放線程池資源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;new EchoServer().bind(port);} }下面看看服務端處理類EchoServerHandler的實現代碼:
package netty.frame.fixedLen;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /*** created by LMR on 2020/5/20*/ public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {System.out.println("Receive client : [" + msg + "]");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();// 發生異常,關閉鏈路} }在本例中,我們在服務端接收到消息之后直接進行打印,不進行任何的操作。利用FixedLengthFrameDecoder解碼器,無論一次接收到多少數據報,都會按照固定長度來進行解碼。下面我們通過telnet命令行來測試服務端能否按照預期進行工作。
3.2 telnet客戶端測試
在這里我們通過telnet命令來對服務端進行測試,下面介紹具體步驟:
(1)啟動服務端
(2)開啟本地回顯功能,便于觀察
(3)打開命令行窗口,輸入telnet localhost 8080
(3)在命令行窗口輸入需要傳輸的數據內容:
(4)服務端查看結果
可以看出每次都是收到20個字節的數據。
————————————————————————————————————————
參考博客和書籍:
https://www.w3cschool.cn/essential_netty_in_action/essential_netty_in_action-x7mn28bx.html
https://blog.csdn.net/usagoole/article/details/87389182
https://www.cnblogs.com/yuanrw/p/9866356.html
https://blog.csdn.net/mascf/article/details/60478539
https://www.cnblogs.com/ZhuChangwu/p/11225158.html
《Netty 權威指南》
如果喜歡的話希望點贊收藏,關注我,將不間斷更新博客。
希望熱愛技術的小伙伴私聊,一起學習進步
來自于熱愛編程的小白
總結
以上是生活随笔為你收集整理的你了解Netty的编解码器吗?史上最通俗易懂的Netty解码器应用案例带你解开Netty解码器的神秘面纱的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java实现游戏道具购买_基于jsp的虚
- 下一篇: Expo大作战(三十一)--expo s