【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )
文章目錄
- 一、 任務隊列 TaskQueue
- 二、 處理器 Handler 同步異步操作
- 三、 異步任務 ( 用戶自定義任務 )
- 四、 異步任務 ( 用戶自定義定時任務 )
- 五、 異步任務 ( 其它線程向本線程調度任務 )
一、 任務隊列 TaskQueue
任務隊列 TaskQueue 的任務 Task 應用場景 :
① 自定義任務 : 自己開發的任務 , 然后將該任務提交到任務隊列中 ;
② 自定義定時任務 : 自己開發的任務 , 然后將該任務提交到任務隊列中 , 同時可以指定任務的執行時間 ;
③ 其它線程調度任務 : 上面的任務都是在當前的 NioEventLoop ( 反應器 Reactor 線程 ) 中的任務隊列中排隊執行 , 在其它線程中也可以調度本線程的 Channel 通道與該線程對應的客戶端進行數據讀寫 ;
二、 處理器 Handler 同步異步操作
在之前的 Netty 服務器與客戶端項目中 , 用戶自定義的 Handler 處理器 , 該處理器繼承了 ChannelInboundHandlerAdapter 類 , 在重寫的 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 方法中 , 執行的業務邏輯要注意以下兩點 :
- 同步操作 : 如果在該業務邏輯中只執行一個短時間的操作 , 那么可以直接執行 ;
- 異步操作 : 如果在該業務邏輯中執行訪問數據庫 , 訪問網絡 , 讀寫本地文件 , 執行一系列復雜計算等耗時操作 , 肯定不能在該方法中處理 , 這樣會阻塞整個線程 ; 正確的做法是將耗時的操作放入任務隊列 TaskQueue , 異步執行 ;
在 ChannelInboundHandlerAdapter 的 channelRead 方法執行時 , 客戶端與服務器端的反應器 Reactor 線程 NioEventLoop 是處于阻塞狀態的 , 此時服務器端與客戶端同時都處于阻塞狀態 , 這樣肯定不行 , 因為 NioEventLoop 需要為多個客戶端服務 , 不能因為與單一客戶端交互而產生阻塞 ;
三、 異步任務 ( 用戶自定義任務 )
1 . 用戶自定義任務流程 :
① 獲取通道 : 首先獲取 通道 Channel ;
② 獲取線程 : 獲取通道對應的 EventLoop 線程 , 就是 NioEventLoop , 該 NioEventLoop 中封裝了任務隊列 TaskQueue ;
③ 任務入隊 : 向任務隊列 TaskQueue 中放入異步任務 Runnable , 調用 NioEventLoop 線程的 execute 方法 , 即可將上述 Runnable 異步任務放入任務隊列 TaskQueue ;
2 . 多任務執行 : 如果用戶連續向任務隊列中放入了多個任務 , NioEventLoop 會按照順序先后執行這些任務 , 注意任務隊列中的任務 是先后執行 , 不是同時執行 ;
順序執行任務 ( 不是并發 ) : 任務隊列任務執行機制是順序執行的 ; 先執行第一個 , 執行完畢后 , 從任務隊列中獲取第二個任務 , 執行完畢之后 , 依次從任務隊列中取出任務執行 , 前一個任務執行完畢后 , 才從任務隊列中取出下一個任務執行 ;
3 . 代碼示例 : 監聽到客戶端上傳數據后 , channelRead 回調 , 執行 獲取通道 -> 獲取線程 -> 異步任務調度 流程 ;
/*** Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類** 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類* 才可以設置給 NioEventLoop 線程** 規范 : 該 Handler 類中需要按照業務邏輯處理規范進行開發*/ public class ServerHandr extends ChannelInboundHandlerAdapter {/*** 讀取數據 : 在服務器端讀取客戶端發送的數據* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* 管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler* 通道 ( Channel ) : 注重數據讀寫* @param msg* 客戶端上傳的數據* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 1 . 從 ChannelHandlerContext ctx 中獲取通道Channel channel = ctx.channel();// 2 . 獲取通道對應的事件循環EventLoop eventLoop = channel.eventLoop();// 3 . 在 Runnable 中用戶自定義耗時操作, 異步執行該操作, 該操作不能阻塞在此處執行eventLoop.execute(new Runnable() {@Overridepublic void run() {//執行耗時操作}});} }四、 異步任務 ( 用戶自定義定時任務 )
1 . 用戶自定義定時任務 與 用戶自定義任務流程基本類似 , 有以下兩個不同之處 :
① 調度方法 :
- 定時異步任務使用 schedule 方法進行調度 ;
- 普通異步任務使用 execute 方法進行調度 ;
② 任務隊列 :
- 定時異步任務提交到 ScheduleTaskQueue 任務隊列中 ;
- 普通異步任務提交到 TaskQueue 任務隊列中 ;
2 . 用戶自定義定時任務流程 :
① 獲取通道 : 首先獲取 通道 Channel ;
② 獲取線程 : 獲取通道對應的 EventLoop 線程 , 就是 NioEventLoop , 該 NioEventLoop 中封裝了任務隊列 TaskQueue ;
③ 任務入隊 : 向任務隊列 ScheduleTaskQueue 中放入異步任務 Runnable , 調用 NioEventLoop 線程的 schedule 方法 , 即可將上述 Runnable 異步任務放入任務隊列 ScheduleTaskQueue ;
3 . 代碼示例 : 監聽到客戶端上傳數據后 , channelRead 回調 , 執行 獲取通道 -> 獲取線程 -> 異步任務調度 流程 ;
/*** Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類** 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類* 才可以設置給 NioEventLoop 線程** 規范 : 該 Handler 類中需要按照業務邏輯處理規范進行開發*/ public class ServerHandr extends ChannelInboundHandlerAdapter {/*** 讀取數據 : 在服務器端讀取客戶端發送的數據* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* 管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler* 通道 ( Channel ) : 注重數據讀寫* @param msg* 客戶端上傳的數據* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 1 . 從 ChannelHandlerContext ctx 中獲取通道Channel channel = ctx.channel();// 2 . 獲取通道對應的事件循環EventLoop eventLoop = channel.eventLoop();// 3 . 在 Runnable 中用戶自定義耗時操作, 異步執行該操作, 該操作不能阻塞在此處執行// schedule(Runnable command, long delay, TimeUnit unit)// Runnable command 參數 : 異步任務// long delay 參數 : 延遲執行時間// TimeUnit unit參數 : 延遲時間單位, 秒, 毫秒, 分鐘eventLoop.schedule(new Runnable() {@Overridepublic void run() {//執行耗時操作}}, 100, TimeUnit.MILLISECONDS);} }五、 異步任務 ( 其它線程向本線程調度任務 )
1 . 獲取通道 Channel 即可調度異步任務 : 由上面的任務調度流程可知 , 只要獲取到了本 NioEventLoop 線程對應的 Channel 通道 , 就可以獲取該 NioEventLoop 線程的 EventLoop 事件調度器 , 向 ScheduleTaskQueue 或 TaskQueue 任務隊列中加入異步任務 ;
2 . Channel 通道獲取與管理 :
① Channel 通道獲取 : 在服務器啟動設置 ServerBootstrap 中 , 會設置 ChannelInitializer , 在與客戶端的連接建立成功后 , 會回調 initChannel 方法 , 此時就會得到該客戶端連接對應的通道 SocketChannel ;
② Channel 通道管理 : 在服務器中使用 Map 集合管理該 Channel 通道 , 需要時根據用戶標識信息 , 獲取該通道 , 向該客戶端通道對應的 NioEventLoop 線程中調度任務 ;
3 . 代碼示例 : 這里只展示一下 ChannelInitializer 的回調位置 , 不再詳細描述怎么維護集合的過程了 , 自己定義 Map 集合維護 ;
// 服務器啟動對象, 需要為該對象配置各種參數 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // 設置 主從 線程組 , 分別對應 主 Reactor 和 從 Reactor.channel(NioServerSocketChannel.class) // 設置 NIO 網絡套接字通道類型.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列維護的連接個數.childOption(ChannelOption.SO_KEEPALIVE, true) // 設置連接狀態行為, 保持連接狀態.childHandler( // 為 WorkerGroup 線程池對應的 NioEventLoop 設置對應的事件 處理器 Handlernew ChannelInitializer<SocketChannel>() {// 創建通道初始化對象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 該方法在服務器與客戶端連接建立成功后會回調// 為 管道 Pipeline 設置處理器 Hanedlerch.pipeline().addLast(new ServerHandr());}});總結
以上是生活随笔為你收集整理的【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Netty】Netty 入门案例分析
- 下一篇: 【Netty】Netty 异步任务模型