【Netty】Netty 异步任务模型 及 Future-Listener 机制
文章目錄
- 一、 Netty 模型
- 二、 異步模型
- 三、 Future-Listener 機制
- 四、 Future-Listener 機制代碼示例
一、 Netty 模型
以服務器端為例
1 . 線程池 : Netty 模型核心就是兩個線程池 , BossGroup 線程池 和 WorkerGroup 線程池 ;
① BossGroup 線程池 : 負責維護客戶端連接操作 ;
② WorkerGroup 線程池 : 負責與客戶端進行數據交互 ;
③ 線程池類型 : 上述兩個線程池 ( BossGroup / WorkerGroup ) 都是 NioEventLoopGroup 類型的 ;
④ 線程池中的線程 : NioEventLoopGroup 線程池中維護了多個 NioEventLoop 線程 ;
2 . 線程池中的線程 : NioEventLoopGroup 線程池中維護了若干 NioEventLoop 線程 , 這相當于主從反應器 ( Reactor ) 模型中的反應器 , 每個 NioEventLoop 中都有一個 選擇器 ( Selector ) , 用于監聽 Socket IO 事件 , 如 建立連接 , 數據讀寫 等 ;
3 . NioEventLoop 工作流程 :
NioEventLoop 中可以按照一定順序進行數據處理 , 如數據到來后 , 按照下面的流程執行一系列操作 ;
讀取數據 -> 數據解碼 -> 業務邏輯處理 -> 數據編碼 -> 數據發送
4 . NioEventLoop 中封裝內容 :
- 選擇器 Selector
- 任務隊列 TaskQueue
- 調度任務隊列 ScheduleTaskQueue
- NIO 通道 NioChannel
- 管道 ChannelPipeline
上面是 Netty 的模型的總體架構 , 下面重點介紹 Netty 模型中的異步模型 , Netty 中的每次綁定端口 , 連接遠程端口 , 讀寫數據都要涉及到異步操作 ;
二、 異步模型
1 . 異步操作概念 : 調用者調用一個異步操作后 , 并不能馬上知道該操作的返回值 , 該操作也不會馬上執行完成 , 該操作完成后 , 會通過回調機制 , 如 通知 , 注冊的回調函數等機制通知調用者 ;
2 . Netty 中的異步操作與 ChannelFuture 返回值 :
① 異步操作 : Netty 模型中凡是關于 IO 的操作 , 如綁定端口 ( Bind ) , 遠程連接 ( Connect ) , 讀取數據 ( Read ) , 寫出數據 ( Write ) 等操作都是異步操作 ;
② 異步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 類型實例 , ChannelFuture 是異步 IO 操作的返回結果 ;
③ 在服務器端綁定端口號時 , 調用 Bootstrap 的 bind 方法 , 會返回 ChannelFuture 對象 ;
④ 在客戶端調用 Bootstrap 的 connect 方法 , 也會返回 ChannelFuture 對象 ;
3 . Netty 中的異步操作機制 :
① Future-Listener 機制 : Future 表示當前不知道結果 , 在未來的某個時刻才知道結果 , Listener 表示監聽操作 , 監聽返回的結果 ;
② Netty 異步模型的兩個基礎 : Future ( ChannelFuture 未來知道結果 ) , Callback ( 監聽回調 ) ;
4 . 以客戶端寫出數據到服務器端為例 :
客戶端寫出數據 : 客戶端調用寫出數據方法 ChannelFuture writeAndFlush(Object msg) , 向服務器寫出數據 ;
操作耗時 : 假設在服務器中接收到該數據后 , 要執行一個非常耗時的操作才能返回結果 , 就是操作非常耗時 ;
客戶端不等待 : 客戶端這里寫出了數據 , 肯定不能阻塞等待寫出操作的結果 , 需要立刻執行下面的操作 , 因此該方法是異步的 ;
客戶端監聽 : writeAndFlush 方法返回一個 ChannelFuture 對象 , 如果客戶端需要該操作的返回結果 , 那么通過 ChannelFuture 可以監聽該寫出方法是否成功 ;
5 . 異步操作返回結果 :
① 返回結果 : Future 表示異步 IO 操作執行結果 , 通過該 Future 提供的 檢索 , 計算 等方法檢查異步操作是否執行完成 ;
② 常用接口 : ChannelFuture 繼承了 Future , 也是一個接口 , 可以為該接口對象注冊監聽器 , 當異步任務完成后會回調該監聽器方法 ;
public interface ChannelFuture extends Future<Void>6 . Future 鏈式操作 : 這里以讀取數據 , 處理后返回結果為例 ;
-
數據讀取操作 ;
-
對讀取的數據進行解碼處理 ;
-
執行業務邏輯
-
將數據編碼 ;
-
將編碼后的數據寫出 ;
上述 555 個步驟 , 每個數據處理操作 , 都有與之對應的 Handler 處理器 ;
異步機制 : 在 Handler 處理器中需要實現異步機制 , 一般使用 Callback 回調 , 或 Future 機制 ;
鏈式操作優勢 : 上述的鏈式操作 , 簡潔 , 高效 , 可以讓開發者快速開發高性能 , 高可靠性服務器 , 只關注業務邏輯 , 不用過多的將精力浪費在網絡基礎功能開發上 ;
這里的網絡基礎功能就是高可靠性 , 高性能的網絡傳輸模塊 ;
三、 Future-Listener 機制
1 . Future-Listener 機制 :
① Future 返回值 : 在 Netty 中執行 IO 操作 , 如 bind , read , write , connect 等方法 , 會立刻返回 ChannelFuture 對象 ;
② ChannelFuture 返回時狀態 : 調用 IO 方法后 , 立刻返回 ChannelFuture 對象 , 此時該操作未完成 ;
③ 注冊監聽器 : ChannelFuture 可以設置 ChannelFutureListener 監聽器 , 監聽該 IO 操作完成狀態 , 如果 IO 操作完成 , 那么會回調其 public void operationComplete(ChannelFuture future) throws Exception 接口實現方法 ;
④ IO 操作執行狀態判定 : 在 operationComplete 方法中通過 調用 ChannelFuture future 參數的如下方法 , 判定當前 IO 操作完成狀態 ;
- future.isDone() : IO 操作是否完成 ;
- future.isSuccess() : IO 操作是否成功 ; ( 常用 )
- future.isCancelled() : IO 操作是否被取消 ;
- future.cause() : IO 操作的失敗原因 ;
2 . IO 操作的同步與異步 :
① 同步 IO 操作 : BIO 中的同步 IO 操作 , 會阻塞當前的線程 , IO 操作返回前 , 處于阻塞狀態 , 不能執行其它操作 ;
② 異步 IO 操作 : 異步 IO 操作不會阻塞當前的線程 , 調用 IO 操作之后 , 可以立即執行其它操作 , 不會阻塞當前線程 , 該機制非常適用于高并發的場景 , 開發穩定 , 并發 , 高吞吐量 的服務器 ;
3 . 核心代碼示例 :
// 監聽綁定操作的結果 // 添加 ChannelFutureListener 監聽器, 監聽 bind 操作的結果 channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isDone()){System.out.println("綁定端口完成");}if(future.isSuccess()){System.out.println("綁定端口成功");}else{System.out.println("綁定端口失敗");}if(future.isCancelled()){System.out.println("綁定端口取消");}System.out.println("失敗原因 : " + future.cause());} });四、 Future-Listener 機制代碼示例
1 . 代碼示例 : 這里以服務器程序為例 , 客戶端程序就不貼了 ;
package kim.hsl.netty;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Netty 案例服務器端*/ public class Server {public static void main(String[] args) {// 1. 創建 BossGroup 線程池 和 WorkerGroup 線程池, 其中維護 NioEventLoop 線程// NioEventLoop 線程中執行無限循環操作// BossGroup 線程池 : 負責客戶端的連接// 指定線程個數 : 客戶端個數很少, 不用很多線程維護, 這里指定線程池中線程個數為 1EventLoopGroup bossGroup = new NioEventLoopGroup(1);// WorkerGroup 線程池 : 負責客戶端連接的數據讀寫EventLoopGroup workerGroup = new NioEventLoopGroup();// 2. 服務器啟動對象, 需要為該對象配置各種參數ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup) // 設置 主從 線程組 , 分別對應 主 Reactor 和 從 Reactor.channel(NioServerSocketChannel.class) // 設置 NIO 網絡套接字通道類型.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列維護的連接個數.childOption(ChannelOption.SO_KEEPALIVE, true) // 設置連接狀態行為, 保持連接狀態.childHandler( // 為 WorkerGroup 線程池對應的 NioEventLoop 設置對應的事件 處理器 Handlernew ChannelInitializer<SocketChannel>() {// 創建通道初始化對象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 該方法在服務器與客戶端連接建立成功后會回調// 為 管道 Pipeline 設置處理器 Hanedler// 這里暫時設置為 null , 執行不會失敗 , 服務器綁定端口會成功ch.pipeline().addLast(null);}});System.out.println("服務器準備完畢 ...");ChannelFuture channelFuture = null;try {// 綁定本地端口, 進行同步操作 , 并返回 ChannelFuturechannelFuture = bootstrap.bind(8888).sync();System.out.println("服務器開始監聽 8888 端口 ...");// ( 本次示例核心代碼 ) ----------------------------------------------------------// 監聽綁定操作的結果 ( 本次示例核心代碼 )// 添加 ChannelFutureListener 監聽器, 監聽 bind 操作的結果channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isDone()){System.out.println("綁定端口完成");}if(future.isSuccess()){System.out.println("綁定端口成功");}else{System.out.println("綁定端口失敗");}if(future.isCancelled()){System.out.println("綁定端口取消");}System.out.println("失敗原因 : " + future.cause());}});// ( 本次示例核心代碼 ) ----------------------------------------------------------// 關閉通道 , 開始監聽操作channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 出現異常后, 優雅的關閉bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }2 . 執行結果 : 執行上述服務器 , 由此可見 綁定 bind 操作執行完成 , 并且執行成功 , 沒有失敗 , 因此失敗原因為 null ;
總結
以上是生活随笔為你收集整理的【Netty】Netty 异步任务模型 及 Future-Listener 机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Netty】 异步任务调度 ( Tas
- 下一篇: 【Netty】使用 Netty 开发 H