java channel源码_Netty 4.0 源码分析(三):Channel和ChannelPipeline
Client和server通過Channel連接,然后通過ByteBuf進行傳輸。每個Channel有自己的Pipeline,Pipeline上面可以添加和定義Handler和Event。
Channel類
1?package?io.netty.channel;2?import?io.netty.buffer.ByteBuf;
3?import?io.netty.buffer.MessageBuf;
4?import?io.netty.channel.socket.DatagramChannel;
5?import?io.netty.channel.socket.ServerSocketChannel;
6?import?io.netty.channel.socket.SocketChannel;
7?import?io.netty.util.AttributeMap;
8?import?java.net.InetSocketAddress;
9?import?java.net.SocketAddress;
10?import?java.nio.channels.SelectionKey;
11?public?interface?Channel?extends?AttributeMap,?ChannelOutboundInvoker,?ChannelFutureFactory,?Comparable?{
12?????Integer?id();
13?????EventLoop?eventLoop();
14?????Channel?parent();
15?????ChannelConfig?config();
16?????ChannelPipeline?pipeline();
17?????boolean?isOpen();
18?????boolean?isRegistered();
19?????boolean?isActive();
20?????ChannelMetadata?metadata();
21?????ByteBuf?outboundByteBuffer();
22??????MessageBuf?outboundMessageBuffer();
23?????SocketAddress?localAddress();
24?????SocketAddress?remoteAddress();
25?????ChannelFuture?closeFuture();
26?????Unsafe?unsafe();
27?????interface?Unsafe?{
28?????????ChannelHandlerContext?directOutboundContext();
29?????????ChannelFuture?voidFuture();
30?????????SocketAddress?localAddress();
31?????????SocketAddress?remoteAddress();
32?????????void?register(EventLoop?eventLoop,?ChannelFuture?future);
33?????????void?bind(SocketAddress?localAddress,?ChannelFuture?future);
34?????????void?connect(SocketAddress?remoteAddress,?SocketAddress?localAddress,?ChannelFuture?future);
35?????????void?disconnect(ChannelFuture?future);
36?????????void?close(ChannelFuture?future);
37?????????void?closeForcibly();
38?????????void?deregister(ChannelFuture?future);
39?????????void?flush(ChannelFuture?future);
40?????????void?flushNow();
41?????????void?suspendRead();
42?????????void?resumeRead();
43?????}
44?}
45
Channel類UML圖
Netty 4.0中,定義了Channel接口,這個接口用于連接網絡的socket傳輸,或者具有I/O操作的組件連接。這里的I/O操作有,read,write,bind,connect
Channel接口為用戶提供了:
1.Channel的當前狀態,比如:Channel是否open,或者Channel是否已經連接。
2.Channel的參數,比如:接受的buffer大小。
3.Channel支持的I/O操作,比如:read,write,connect,bind。
4.注冊在Channel上的ChannelPipeline,ChannelPipeline用于處理所有的I/O事件和請求。
Channel類的幾個重要方法
ChannelFuture closeFuture();
所有在Netty中的I/O操作都是異步的,這就意味著任何的I/O調用都會立即返回,但是無法保證所有被調用的I/O操作到最后能夠成功執行完成。closeFuture() 返回一個ChannelFuture對象, 并且告訴I/O的調用者,這個I/O調用的最后狀態是succeeded,failed或者canceled。
void register(EventLoop eventLoop, ChannelFuture future);
在Channel中注冊EventLoop和對應的ChannelFuture。
void deregister(ChannelFuture future);
在Channel中取消ChannelFuture的注冊。
在Channel的層次結構中,Channel子類的實現取決于傳輸的具體實現。比如SocketChannel,能夠被ServerSocketChannel接受,并且SocketChannel中的getParent()方法會返回ServerSocketChannel。開發者可以實現Channel接口,共享Socket連接,比如SSH。
ChannelPipeLine接口
1?public?interface?ChannelPipeline?extends?ChannelInboundInvoker,?ChannelOutboundInvoker?{
2?????MessageBuf?inboundMessageBuffer();
3?????ByteBuf?inboundByteBuffer();
4?????MessageBuf?outboundMessageBuffer();
5?????ByteBuf?outboundByteBuffer();
6?????ChannelPipeline?addFirst(String?name,?ChannelHandler?handler);
7?????ChannelPipeline?addFirst(EventExecutorGroup?group,?String?name,?ChannelHandler?handler);
8?????ChannelPipeline?addLast(String?name,?ChannelHandler?handler);
9?????ChannelPipeline?addLast(EventExecutorGroup?group,?String?name,?ChannelHandler?handler);
10?????ChannelPipeline?addBefore(String?baseName,?String?name,?ChannelHandler?handler);
11?????ChannelPipeline?addBefore(EventExecutorGroup?group,?String?baseName,?String?name,?ChannelHandler?handler);
12?????ChannelPipeline?addAfter(String?baseName,?String?name,?ChannelHandler?handler);
13?????ChannelPipeline?addAfter(EventExecutorGroup?group,?String?baseName,?String?name,?ChannelHandler?handler);
14?????ChannelPipeline?addFirst(ChannelHandler
?handlers);
15?????ChannelPipeline?addFirst(EventExecutorGroup?group,?ChannelHandler
?handlers);
16?????ChannelPipeline?addLast(ChannelHandler
?handlers);
17?????ChannelPipeline?addLast(EventExecutorGroup?group,?ChannelHandler
?handlers);
18?????void?remove(ChannelHandler?handler);
19?????ChannelHandler?remove(String?name);
20??????T?remove(Class?handlerType);
21?????ChannelHandler?removeFirst();
22?????ChannelHandler?removeLast();
23?????void?replace(ChannelHandler?oldHandler,?String?newName,?ChannelHandler?newHandler);
24?????ChannelHandler?replace(String?oldName,?String?newName,?ChannelHandler?newHandler);
25??????T?replace(Class?oldHandlerType,?String?newName,?ChannelHandler?newHandler);
26?????ChannelHandler?first();
27?????ChannelHandlerContext?firstContext();
28?????ChannelHandler?last();
29?????ChannelHandlerContext?lastContext();
30?????ChannelHandler?get(String?name);
31??????T?get(Class?handlerType);
32?????ChannelHandlerContext?context(ChannelHandler?handler);
33?????ChannelHandlerContext?context(String?name);
34?????ChannelHandlerContext?context(Class?extends?ChannelHandler>?handlerType);
35?????Channel?channel();
36?????List?names();
37?????Map?toMap();
38?}
ChannelPipeline類UML圖
ChannelHandler接口用于處理和攔截Channel接口中的ChannelEvents。Netty中的ChannelPipeline概念使用了Intecepting Filter Patter模式來實現,這樣做的好處是允許用戶可以完全控制Event的處理,并且控制ChannelHandlers在ChannelPipeline的交互。
當一個全新的Channel創建的時候,都必須創建一個ChannelPipeline,并使Channel和ChannelPipeline想關聯。這種關聯關系式永久性的,這意味著,一旦一個ChannelPipeleine和Channel關聯了,那么這個Channel就在也無法關聯其他ChannelPipeline,也無法取消與當前ChannelPipeline的關聯。
【官方推薦】使用Channel接口中的pipeleine()方法來創建一個ChannelPipelien,而不要用new去實例一個ChannePipeline類
ChannelPipeline pipeline = Channel.pipeline();
Pipeline中的事件流
ChannelPipeline中的事件流
圖中顯示了一個典型的ChannelHandler或者ChannelPipeline對于ChannelEvent的處理流程。ChannelHandler接口有兩個子類,分別是ChannelUpstreamHandler(ChannelInboundHandler)和ChannelDownstreamHandler(ChannelOutBoundstreamHandler)。這兩個之類用于處理每一個ChannelEvent,然后由ChannelHandlerContext.sendUpstream(ChannelEvent)和ChannelHandlerContext.sendDownstream(ChannelEvent)將每一個ChannelEvent轉發到最近的handler。根據upstream和downstream的不同,每個Event的處理也會有所不同。
如事件流圖中的左邊顯示,Upstream Handlers會從低至上的處理一個Upstream Event。Inbound的數據有圖中底部的Netty Internal I/O threads生成。通過調用InputStream.readByte(byte[])方法,可以從一個遠程的服務器讀取inbound data。如果一個upstream event達到upstream handler的頂部,那么這個upstream event最終將被丟棄掉。
如事件流圖中的右邊顯示,Dpstream Handlers會從低至上的處理一個Upstream Event。Downstream Handler會生成和傳輸outbount數據流,比如一個寫操作。當一個Downstream Event達到Downstream Handler的底部,那么與之相關的Channal中的I/O thread對對其進行處理。Channel中的I/Othread會執行真正的操作,例如OutputStream.write(byte[])。
假設我們創建了這么一個ChannelPipeline,
ChannelPipelien p = Channel.pipeline();
p.addLast(“1”, new UpstreamHandlerA());
p.addList(“2”, new UpstreamHandlerB());
p.addList(“3”, new DownstreamHandlerA());
p.addList(“4”, new DownstreamHandlerB());
p.addList(“5”, new UpstreamHandlerX());
在ChannelPipeline的棧中,upstream的執行順序是1,2,而downstream的執行順序是4,3。
生產Pipeline
在一條Pipeline中,一般會有一個或者多個ChannelHandler用于接收I/O事件(read)或者請求I/O操作(write,close)。一個典型的服務器會有如下的一個ChannelPipeline用于處理不同的ChannelHandler。
ChannelPipelien p = Channel.pipeline();
p.addLast(“decoder”, new MyProtocalDecoder());
p.addList(“encoder”, new MyProtocalEncoder());
p.addList(“executor”, new ExectionHandler());
p.addList(“handler”, new MyBusinessLogicHandler());
1.Protocal Decoder –將二進制數據(如ByteBuf)裝換成Java對象。
2.Protocal Encoder –將Java對象裝換成二進制數據。
3.ExecutionHandler –使用一個線程模型。
4.BusinessLogicHandler –執行一個具體的業務邏輯(如數據庫訪問)
由于ChannelPipeline是線程安全的,所以ChannelHandler可以再任何時候從ChannelPipeline中被添加或者刪除。例如,可以插入一個Handler用于處理被加密過的敏感數據信息,在處理之后,刪除掉這個Handler。
備注:因為筆者開始寫Netty源碼分析的時候,Netty 4.0還是處于Alpha階段,之后的API可能還會有改動,筆者將會及時更改。使用開源已經有好幾年的時間了,一直沒有時間和精力來具體研究某個開源項目的具體實現,這次是第一次寫開源項目的源碼分析,如果文中有錯誤的地方,歡迎讀者可以留言指出。對于轉載的讀者,請注明文章的出處。
希望和廣大的開發者/開源愛好者進行交流,歡迎大家的留言和討論。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;
posted on 2012-11-25 14:53 Chan Chen 閱讀(8944) 評論(0) ?編輯 ?收藏 所屬分類: Netty
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的java channel源码_Netty 4.0 源码分析(三):Channel和ChannelPipeline的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Qt实现界面的窗口的局部动态添加并布局
- 下一篇: html5 coverflow,使用Fa