Netty实战六之ChannelHandler和ChannelPipeline
1、Channel的生命周期
Interface Channel定義了一組和ChannelInboundHandler API密切相關的簡單但功能強大的狀態模型,以下列出Channel的4個狀態。
ChannelUnregistered:Channel已經被創建,但還未注冊到EventLoop
ChannelRegistered:Channel已經被注冊到了EventLoop
ChannelActive:Channel處于活動狀態(已經連接到它的遠程節點)。它現在可以接收和發送數據了
ChannelInactive:Channel沒有連接到遠程節點
Channel的正常生命周期如下圖所示,當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其可以隨后對它們做出響應。?
2、ChannelHandler的生命周期
下表列出了interface ChannelHandler定義的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被從ChannelPipeline中移除時會調用這些操作,這些方法中的每一個都接受一個ChannelHandlerContext參數。
handlerAdded:當把ChannelHandler添加到ChannelPipeline中時被調用
handlerRemoved:當從ChannelPipeline中移除ChannelHandler時被調用
exceptionCaught:當處理過程中在ChannelPipeline中有錯誤產生時被調用
Netty定義了下面兩個重要的ChannelHandler子接口:
·ChannelInboundHandler——處理入站數據以及各種狀態變化
·ChannelOutboundHandler——處理出站數據并且允許攔截所有的操作
3、ChannelInboundHandler接口
當某個ChannelInboundHandler的實現重寫channelRead()方法時,它將負責顯式地釋放與池化ByteBuf實例相關的內存,Netty為此提供了一個實用方法ReferenceCountUtil.release()。
Netty將使用WARN級別的日志消息記錄未釋放的資源,使得可以非常簡單地在代碼中發現違規的實例,但是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用SimpleChannelInboundHandler。
由于SimpleChannelInboundHandler會自動釋放資源,所以你不應該存儲指向任何消息的引用供將來使用,因為這些引用都將會失效。
4、ChannelOutboundHandler接口
出站操作和數據將由ChannelOutboundHandler處理,它的方法將被Channel、ChannelPipeline以及ChannelHandlerContext調用。
ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些復雜的方法來處理請求。例如,如果到遠程節點的寫入被暫停了,那么你可以推遲沖刷并在稍后繼續。
ChannelPromise與ChannelFuture : ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。
5、ChannelHandler適配器
你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的ChannelHandler的起始點。這兩個適配器分別提供了ChannelInboundHandler和ChannelOutboundHandler的基本實現,通過擴展抽象類ChannelHandlerAdapter,他們獲得了他們共同的超接口ChannelHandler的方法。生成的類的層次結構如下圖。?ChannelHandlerAdapter還提供了使用方法isSharable(),如果其對應的實現被標注為Sharable,那么這個方法都將返回true,表示它可以被添加到多個ChannelPipeline中。
在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體調用了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。
6、資源管理
每當通過調用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法來處理數據時,你都需要確保沒有任何的資源泄露。你可能還記得前面的章節中所提到的,Netty使用引用技術來處理池化的ByteBuf。所以在完全使用完某個ByteBuf后,調整其引用計數是很重要的。
為了幫助你診斷潛在的(資源泄露)問題,Netty提供了class ResourceLeakDetector,它將對你應用程序的緩沖區分配做大約1%的采樣來檢測內存泄露。相關的開銷是非常小的。
Netty定義了4種泄露檢測級別。
DISABLED——禁用泄露檢測
SIMPLE——使用1%的默認采樣率檢測并報告任何發現的泄露
ADVANCED——使用默認的采樣率,報告所發現的任何的泄露以及對應的消息被訪問的位置
PARANOID——類似于ADVANCED,但是其將會對每次訪問都進行采樣,這對性能將會有很大的影響,應該只在調試階段使用
泄露檢測級別可以通過將下面的Java系統屬性設置為表中的一個值來定義:
java -Dio.netty.leakDetectionLevel = ADVANCED
如果帶著該JVM選項重新啟動你的應用程序,你將看到自己的應用程序最近被泄露的緩沖區被訪問的位置。
實現ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法時,應該如何使用這個診斷工具來防止泄露呢?讓我們看看你的channelRead()操作直接消費入站消息的情況,也就是說,他不會通過調用ChannelHandlerContext.fireChannelRead()方法將入站消息轉發給下一個ChannelInboundHandler。
消費入站消息的簡單方式: 由于消費入站數據是一項常規任務,所以Netty提供了一個特殊的被稱為SimpleChannelInboundHandler的ChannelInboundHandler實現,這個實現會在消息被channelRead0()方法消費之后自動釋放消息。
在出站方向這邊,如果你處理了write()操作并丟棄了一個消息,那么你也應該負責釋放它。以下代碼展示了一個丟棄所有的寫入數據的實現。
重要的是,不僅要釋放資源,還要通知ChannelPromise。否則可能會出現ChannelFutureListener收不到某個消息已經被處理了的通知的消息。
總之,如果一個消息被消費或者丟棄了,并且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那么用戶就有責任調用ReferenceCountUtil.release()。如果消息到達了實際的傳輸層,那么當它被寫入時或者Channel關閉時,都將被自動釋放。
7、ChannelPipeline接口
如果你認為ChannelPipeline是一個攔截流經Channel的入站和出站事件的ChannelHandler實例鏈,那么就很容易看出這些ChannelHandler之間的交互式如何組成一個應用程序數據和時間處理邏輯的核心的。
每一個新創建的Channel都將會被分配一個新的ChannelPipeline。這項關聯時永久的,Channel即不能附加另外一個ChannelPipeline,也不能分離其當前的,在Netty組件的生命周期中,這是一項固定的操作,不需要開發人員的任何干預。
根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理,隨后,通過調用ChannelHandlerContext實現,它將被轉發給同一個超類型的下一個ChannelHandler。
ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler交互,ChannelHandler可以通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline。ChannelHandlerContext具有豐富的用于處理事件和執行I/O操作的API。
下圖展示了一個典型的同時具有入站和出站ChannelHandler的ChannelPipeline的布局,并且印證了我們之前的關于ChannelPipeline主要由一系列的ChannelHandler所組成的說法,ChannelPipeline還提供了通過ChannelPipeline本身傳播事件的方法。如果一個入站事件被觸發,它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端。如圖所示,一個出站I/O事件將從ChannelPipeline的最右邊開始,然后向左傳播。?在ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個ChannelHandler的類型是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline將跳過該ChannelHandler并前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。
8、修改ChannelPipeline
通過調用ChannelPipeline上的相關方法,ChannelHandler可以添加、刪除或者替換其他的ChannelHandler,從而實時地修改ChannelPipeline的布局。
ChannelPipeline pipeline = ...;FirstHandler firstHandler = new FirstHandler();//將該實例作為“handler1”添加到ChannelPipeline中pipeline.addLast("handler1",firstHandler);//將一個SecondHandler的實例作為“handler2”添加到ChannelPipeline的第一個槽中,這意味著它將被放置在已有的“handler1”之前 pipeline.addLast("handler2",new SecondHandler()); //將一個ThirdHandler的實例作為“handler3”添加到ChannelPipeline的最后一個槽中 pipeline.addLast("handler3",new ThirdHandler()); ... //通過名稱移除“handler3” pipeline.remove("handler3"); //通過引用移除FirstHandler pipeline.remove(firstHandler); //將SecondHandler(“handler2”)替換為FourthHandler:"handler4" pipeline.replace("handler2","handler4",new ForthHandler());ChannelHandler的執行和阻塞:通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的I/O處理產生負面的影響。但有時可能需要與那些使用阻塞API的遺留代碼進行交互,對于這個情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法,如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除,對于這種用例,Netty提供了一種叫DefaultEventExecutorGroup的默認實現。
——ChannelPipeline保存了與Channel相關聯的ChannelHandler
——ChannelPipeline可以根據需要、通過添加或者刪除ChannelHandler來動態修改
——ChannelPipeline有著豐富的API用以被調用、以響應入站和出站事件
——ChannelHandlerContext和ChannelHandler之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的
9、使用ChannelHandlerContext
以下代碼,將通過ChannelHandlerContext獲取到Channel的引用,調用Channel上的write()方法將會導致寫入事件從尾端到頭部地流經ChannelPipeline。?以下代碼展示了一個類似的例子,但是這一次是寫入ChannelPipeline。我們再次看到,(到ChannelPipeline的)引用是通過ChannelHandlerContext獲取的。
ChannelHandlerContext ctx = ..;//獲取到與ChannelHandlerContext相關聯的Channel的引用Channel channel = ctx.channel();//通過Channel寫入緩沖區channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));為什么會想要從ChannelPipeline中的某個特定點開始傳播事件呢?
——為了減少將事件傳經對它不感興趣的ChannelHandler所帶來的開銷
——為了避免將事件傳經那些可能會對它感興趣的ChannelHandler。
10、ChannelHandler和ChannelHandlerContext的高級用法
可以通過將ChannelHandler添加到ChannelPipeline中來實現動態的協議切換,緩存到ChannelHandlerContext的引用以供稍后使用,這可能會發生在任何的ChannelHandler方法之外,甚至來自于不同的線程。
以下代碼,緩存到ChannelHandlerContext的引用
public class WriteHandler extends ChannelHandlerAdapter{ private ChannelHandlerContext ctx;因為一個ChannelHandler可以從屬于多個ChannelPipeline,所以它也可以綁定到多個ChannelHandlerContext實例,對于這種用法(指在多個ChannelPipeline中共享同一個ChannelHandler),對應的ChannelHandler必須要使用@Sharable注解標注;否則,試圖將它添加到多個ChannelPipeline時將會觸發異常,顯而易見,為了安全地被用于多個并發的Channel(連接),這樣的ChannelHandler必須是線程安全的。
以下代碼,展示這種模式。
前面的ChannelHandler實現了符合所有的將其加入到多個ChannelPipeline的需求,即它使用了注解@Sharable標注,并且也不持有任何的狀態。
以下代碼,演示@Sharable的錯誤用法
這段代碼的問題在于它擁有狀態,即用于跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個并發Channel訪問時導致問題。(可以將ChannelRead()方法變為同步方法)
總之,只應該在確定了你的ChannelHandler是線程安全的時才使用@Sharable注解。
為何要共享同一個ChannelHandler:在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的原因是用于收集跨越多個Channel的統計信息。
11、處理入站異常
異常處理是任何真實應用程序的重要組成部分,它也可以通過多種方式來實現,因此,Netty提供了幾種方式用于處理入站或者出站處理過程中所拋出的異常。
如果在處理入站事件的過程中有異常被拋出,那么它將從它在ChannelInboundHandler里被觸發的那一點開始流經ChannelPipeline。要想處理這種類型的入站異常,你需要在你的ChannelInboundHandler實現exceptionCaught方法。
以下代碼,展示了其關閉Channel并打印了異常的棧跟蹤信息
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{因為異常將會繼續按照入站方向流動(就像所有入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler通常位于ChannelPipeline的最后,這確保了所有的入站異常都總是會被處理,無論他們可能會發生在ChannelPipeline中的什么位置。
你應該如何響應異常,可能很大程序上取決于你的應用程序,你可能想要關閉Channel(和連接),也可能會嘗試進行恢復。如果你不實現任何處理入站異常的邏輯,那么Netty將會記錄該異常沒有被處理的事實。
——ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler
——如果異常到達了ChannelPipeline的尾端,它將會被記錄為未處理
——要想定義自定義的處理邏輯,你需要重寫exceptionCaught方法,然后你需要決定是否需要將該異常傳播出去
12、處理出站異常
——每個出站操作都將返回一個ChannelFuture。注冊到ChannelFuture的ChannelFutureListener將在操作完成時被通知該操作是成功了還是出錯了
——幾乎所有的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的實例,作為ChannelFuture的子類,ChannelPromise也可以被分配用于異步通知的監聽器,但是,ChannelPromise還具有提供立即通知的可寫方法。
以下代碼,添加channelFutureListener,它將打印棧跟蹤信息,并且隨后關閉Channel
ChannelFuture future = channel.wirte(someMessage);future.addListener(new ChannelFutureListener() {@Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()){ channelFuture.cause().printStackTrace(); channelFuture.channel().close(); } } });第二種方式是將ChannelFutrueListener添加到即將作為參數傳遞給ChannelOutboundHandler的方法的ChannelPromise。
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{ChannelPromise的可寫方法:通過調用ChannelPromise上的setSuccess()和setFailure()方法,可以使一個操作的狀態在ChannelHandler的方法返回給其調用者時便即刻被感知到。
轉載于:https://www.cnblogs.com/UncleCatMySelf/p/9187276.html
總結
以上是生活随笔為你收集整理的Netty实战六之ChannelHandler和ChannelPipeline的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RSEM-Ebseq-差异表达分析-无参
- 下一篇: OpenCode:template