【Netty】Netty 核心组件 ( Future | Channel | Selector | ChannelHandler )
文章目錄
- 一、 Future / ChannelFuture 異步操作監(jiān)聽組件
- 二、 Channel 通道組件
- 三、 Selector 選擇器組件
- 四、 ChannelHandler 通道處理器組件
一、 Future / ChannelFuture 異步操作監(jiān)聽組件
1 . Netty 中的 IO 操作 : Netty 中的 IO 操作 , 如 數(shù)據(jù)讀取 Read , 數(shù)據(jù)寫出 Write , 接受客戶端連接 Accept , 連接服務(wù)器 Connect 等 444 種 IO 操作 ;
2 . 異步操作 : 這些 IO 操作都是異步的 , 調(diào)用相應(yīng)的 IO 方法后 , 相應(yīng)的操作異步執(zhí)行 , 調(diào)用 IO 方法的代碼位置不產(chǎn)生阻塞 ;
3 . 獲取執(zhí)行結(jié)果 : IO 方法調(diào)用后 , 不能立刻得到執(zhí)行的結(jié)果 , 只返回一個 Future 對象 , 通過該 Future 對象可以異步監(jiān)聽 IO 操作的結(jié)果 ;
4 . 注冊監(jiān)聽 : 為 Future 對象添加 ChannelFutureListener 監(jiān)聽器 , 當異步 IO 操作執(zhí)行完畢后 , 會回調(diào)監(jiān)聽器的 operationComplete 方法 ;
// 監(jiān)聽綁定操作的結(jié)果 // 添加 ChannelFutureListener 監(jiān)聽器, 監(jiān)聽 bind 操作的結(jié)果 channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isDone()){System.out.println("綁定端口完成");}if(future.isSuccess()){System.out.println("綁定端口成功");}else{System.out.println("綁定端口失敗");}if(future.isCancelled()){System.out.println("綁定端口取消");}System.out.println("失敗原因 : " + future.cause());} });5 . 獲取通道 : 通過調(diào)用 ChannelFuture 對象的 channel 方法可以獲取到當前 IO 操作對應(yīng)的通道 ;
// 獲取并關(guān)閉通道 , 開始監(jiān)聽操作 channelFuture.channel().closeFuture().sync();6 . 等待異步操作完成 : 調(diào)用 ChannelFuture 對象的 sync 方法 , 可以等待該異步操作完成后 , 在執(zhí)行之后的操作 , 相當于將異步操作變成了同步操作 ;
// 綁定本地端口, 進行同步操作 , 并返回 ChannelFuture ChannelFuture channelFuture = channelFuture = bootstrap.bind(8888).sync();二、 Channel 通道組件
1 . 注意與 NIO 通道區(qū)分 : 該 Channel 組件不是 NIO 中的通道 , 是 Netty 中的 io.netty.channel 包的類 ;
2 . Channel 通道組件作用 : 執(zhí)行 IO 操作 , 獲取通道狀態(tài) , 獲取通道配置參數(shù) ;
① 執(zhí)行 Netty 中的 IO 操作 , 如數(shù)據(jù)寫出 , 讀取 , 連接 , 接受連接 等操作 ;
② Channel 通道組件獲取通道狀態(tài) ;
- isOpen : 通道是否打開 ;
- isRegistered : 是否注冊 ;
- isWritable : 是否可寫 ;
③ Channel 通道組件獲取網(wǎng)絡(luò)配置參數(shù) , 如發(fā)送和接收的緩沖區(qū)等 ;
3 . Channel 通道組件提供的異步操作 :
① 提供的異步 IO 操作 : Channel 通道提供的 IO 操作都是異步的 , 如 數(shù)據(jù)讀取 Read , 數(shù)據(jù)寫出 Write , 接受客戶端連接 Accept , 連接服務(wù)器 Connect ;
② 異步操作結(jié)果獲取 : IO 操作調(diào)用后 , 立刻返回 ChannelFuture 對象 , 此時不知道是否執(zhí)行成功 , 也不知道執(zhí)行結(jié)果 , 可以給 ChannelFuture 對象設(shè)置監(jiān)聽器 , 獲取執(zhí)行結(jié)果 ;
4 . Channel 通道關(guān)聯(lián)處理器 : Channel 中的 IO 操作可以關(guān)聯(lián) Handler 處理器 ;
5 . 常用的 Channel 組件類型 : 可以根據(jù) 協(xié)議 , 阻塞類型 , 選擇合適的 Channel 組件 ;
| NioServerSocketChannel | 異步 | TCP | 服務(wù)器 |
| NioSocketChannel | 異步 | TCP | 客戶端 |
| NioDatagramChannel | 異步 | UDP | 服務(wù)器 / 客戶端 |
| NioSctpChannel | 異步 | SCTP | 客戶端 |
| NioSctpServerChannel | 異步 | SCTP | 服務(wù)器 |
其中 SCTP 協(xié)議包含 UDP , TCP , 文件 IO 等相關(guān)協(xié)議和操作 ;
6 . Channel 類型設(shè)置 : 調(diào)用 ServerBootstrap 的 channel 方法 , 可以設(shè)置通道類型 ;
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // 核心代碼 ---------------------------------------------------------.channel(NioServerSocketChannel.class) // 設(shè)置 NIO 網(wǎng)絡(luò)套接字通道類型// 核心代碼 ---------------------------------------------------------.option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .handler(null).childHandler(null);7 . 獲取 Channel : BootStrap 啟動后 , 返回 ChannelFuture 對象 , 調(diào)用 ChannelFuture 對象的 channel() 方法即可獲取對應(yīng)的通道 ;
// 1. 綁定本地端口, 進行同步操作 , 并返回 ChannelFuture ChannelFuture channelFuture = bootstrap.bind(8888).sync(); // 2. 獲取通道 Channel Channel = channelFuture.channel();三、 Selector 選擇器組件
1 . Netty 中的 Selector 選擇器組件 :
① 實現(xiàn)多路復用 : Selector 選擇器是 Netty 中實現(xiàn) 多路 IO 復用的最重要的手段 ;
② 在 NioEventLoop 線程中維護 : 選擇器 Selector 在 NioEventLoopGroup 線程池中的 NioEventLoop 線程中維護 ;
③ 單線程監(jiān)聽多通道 : 借助 Selector 選擇器 , 可以實現(xiàn) 一個 NioEventLoop 線程 , 監(jiān)聽多個客戶端連接對應(yīng)的 Channel 通道事件 ;
2 . 選擇器 Selector 運行機制 :
① 注冊通道 : 注冊 Channel 通道到 Selector 選擇器 ;
② 監(jiān)聽事件 : 選擇器 Selector 調(diào)用 select 方法 , 監(jiān)聽該 Selector 注冊的通道 , 查看是否有 IO 事件觸發(fā) ;
③ 可觸發(fā)的 IO 事件列舉 : 數(shù)據(jù)讀取 Read , 數(shù)據(jù)寫出 Write , 接受客戶端連接 Accept , 連接服務(wù)器 Connect 等 444 種可觸發(fā)的 IO 事件 ;
使用上述 Selector 選擇器監(jiān)聽 Channel 通道事件機制 , 可以在單個 NioEventLoop 線程中 , 實現(xiàn)了多個客戶端 IO 操作的管理 ;
四、 ChannelHandler 通道處理器組件
1 . ChannelHandler 通道處理器組件 :
① ChannelHandler 作用 : 其實現(xiàn)類主要作用是 處理 或 攔截 IO 事件 , 將其轉(zhuǎn)給對應(yīng)的 ChannelPipeline 管道進行業(yè)務(wù)邏輯的處理 ;
② ChannelHandler 使用 : ChannelHandler 是接口 , 不能直接使用 , 使用的時候 , 需要使用 ChannelHandler 接口的 實現(xiàn)類 , 常用的類下面會介紹 ;
2 . 入站 和 出站 概念 :
① 入站 : 從管道讀取數(shù)據(jù) , 相當于有數(shù)據(jù)進來 ;
② 出站 : 向管道輸出數(shù)據(jù) , 相當于寫出數(shù)據(jù) ;
3 . 常用的 ChannelHandler 類列舉 :
-
ChannelInboundHandler : 處理數(shù)據(jù)入站事件 , 即其它設(shè)備向本設(shè)備發(fā)送數(shù)據(jù) ;
-
ChannelOutboundHandler : 處理數(shù)據(jù)出站事件 , 即本設(shè)備寫出數(shù)據(jù)到其它設(shè)備 ;
-
ChannelDuplexHandler ( 不推薦使用 ) : 該類繼承了 ChannelInboundHandler , 實現(xiàn)了 ChannelOutboundHandler 接口 , 因此該類既可以處理數(shù)據(jù)入站 , 又可以處理數(shù)據(jù)出站 ; 但是一般情況下不使用該類 , 容易產(chǎn)生混淆 ;
-
ChannelInboundHandlerAdapter : 入站 IO 事件處理器適配器 ;
-
ChannelOutboundHandlerAdapter : 出站 IO 事件處理器適配器 ;
4 . ChannelInboundHandlerAdapter 常用方法 :
① 通道就緒 : 通道就緒后回調(diào)該函數(shù) ;
public void channelActive(ChannelHandlerContext ctx) throws Exception② 數(shù)據(jù)讀取 : 當有數(shù)據(jù)入站時 , 回調(diào)該函數(shù) ;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception③ 數(shù)據(jù)讀取完畢 : 當數(shù)據(jù)讀取完畢后 , 回調(diào)該函數(shù) ;
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception④ 異常回調(diào) : 發(fā)生異常時 , 回調(diào)該函數(shù) ;
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception5 . 代碼示例 : 這是之前服務(wù)器端的 ChannelInboundHandlerAdapter 子類示例 , 用于處理服務(wù)器端的業(yè)務(wù)邏輯 ;
package kim.hsl.netty;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;import java.util.concurrent.TimeUnit;/*** Handler 處理者, 是 NioEventLoop 線程中處理業(yè)務(wù)邏輯的類** 繼承 : 該業(yè)務(wù)邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類* 才可以設(shè)置給 NioEventLoop 線程** 規(guī)范 : 該 Handler 類中需要按照業(yè)務(wù)邏輯處理規(guī)范進行開發(fā)*/ public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 讀取數(shù)據(jù) : 在服務(wù)器端讀取客戶端發(fā)送的數(shù)據(jù)* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* 管道 ( Pipeline ) : 注重業(yè)務(wù)邏輯處理 , 可以關(guān)聯(lián)很多 Handler* 通道 ( Channel ) : 注重數(shù)據(jù)讀寫* @param msg* 客戶端上傳的數(shù)據(jù)* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 查看 ChannelHandlerContext 中封裝的內(nèi)容System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);// 從 ChannelHandlerContext ctx 中獲取通道Channel channel = ctx.channel();// 獲取通道對應(yīng)的事件循環(huán)EventLoop eventLoop = channel.eventLoop();// 在 Runnable 中用戶自定義耗時操作, 異步執(zhí)行該操作, 該操作不能阻塞在此處執(zhí)行// schedule(Runnable command, long delay, TimeUnit unit)// Runnable command 參數(shù) : 異步任務(wù)// long delay 參數(shù) : 延遲執(zhí)行時間// TimeUnit unit參數(shù) : 延遲時間單位, 秒, 毫秒, 分鐘eventLoop.schedule(new Runnable() {@Overridepublic void run() {//執(zhí)行耗時操作}}, 100, TimeUnit.MILLISECONDS);// 將客戶端上傳的數(shù)據(jù)轉(zhuǎn)為 ByteBuffer// 這里注意該類是 Netty 中的 io.netty.buffer.ByteBuf 類// 不是 NIO 中的 ByteBuffer// io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBufferByteBuf byteBuf = (ByteBuf) msg;// 將 ByteBuf 緩沖區(qū)數(shù)據(jù)轉(zhuǎn)為字符串, 打印出來System.out.println(ctx.channel().remoteAddress() + " 接收到客戶端發(fā)送的數(shù)據(jù) : " + byteBuf.toString(CharsetUtil.UTF_8));}/*** 服務(wù)器端讀取數(shù)據(jù)完畢后回調(diào)的方法* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* * 管道 ( Pipeline ) : 注重業(yè)務(wù)邏輯處理 , 可以關(guān)聯(lián)很多 Handler* * 通道 ( Channel ) : 注重數(shù)據(jù)讀寫* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 數(shù)據(jù)編碼 : 將字符串編碼, 存儲到 io.netty.buffer.ByteBuf 緩沖區(qū)中ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);// 寫出并刷新操作 : 寫出數(shù)據(jù)到通道的緩沖區(qū) ( write ), 并執(zhí)行刷新操作 ( flush )ctx.writeAndFlush(byteBuf);}/*** 異常處理 , 上面的方法中都拋出了 Exception 異常, 在該方法中進行異常處理* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("通道異常, 關(guān)閉通道");//如果出現(xiàn)異常, 就關(guān)閉該通道ctx.close();} }總結(jié)
以上是生活随笔為你收集整理的【Netty】Netty 核心组件 ( Future | Channel | Selector | ChannelHandler )的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Netty】Netty 核心组件 (
- 下一篇: 【Netty】Netty 核心组件 (