netty系列之:netty初探
文章目錄
- 簡介
- netty介紹
- netty的第一個服務器
- netty的第一個客戶端
- 運行服務器和客戶端
- 總結
簡介
我們常用瀏覽器來訪問web頁面得到相關的信息,通常來說使用的都是HTTP或者HTTPS協議,這些協議的本質上都是IO,客戶端的請求就是In,服務器的返回就是Out。但是在目前的協議框架中,并不能完全滿足我們所有的需求。比如使用HTTP下載大文件,可能需要長連接等待等。
我們也知道IO方式有多種多樣的,包括同步IO,異步IO,阻塞IO和非阻塞IO等。不同的IO方式其性能也是不同的,而netty就是一個基于異步事件驅動的NIO框架。
本系列文章將會探討netty的詳細使用,通過原理+例子的具體結合,讓大家了解和認識netty的魅力。
netty介紹
netty是一個優秀的NIO框架,大家對IO的第一映像應該是比較復雜,尤其是跟各種HTTP、TCP、UDP協議打交道,使用起來非常復雜。但是netty提供了對這些協議的友好封裝,通過netty可以快速而且簡潔的進行IO編程。netty易于開發、性能優秀同時兼具穩定性和靈活性。如果你希望開發高性能的服務,那么使用netty總是沒錯的。
netty的最新版本是4.1.66.Final,事實上這個版本是官方推薦的最穩定的版本,netty還有5.x的版本,但是官方并不推薦。
如果要在項目中使用,則可以引入下面的代碼:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.66.Final</version></dependency>下面我們將會從一個最簡單的例子,體驗netty的魅力。
netty的第一個服務器
什么叫做服務器?能夠對外提供服務的程序就可以被稱為是服務器。建立服務器是所有對外服務的第一步,怎么使用netty建立一個服務器呢?服務器主要負責處理各種服務端的請求,netty提供了一個ChannelInboundHandlerAdapter的類來處理這類請求,我們只需要繼承這個類即可。
在NIO中每個channel都是客戶端和服務器端溝通的通道。ChannelInboundHandlerAdapter定義了在這個channel上可能出現一些事件和情況,如下圖所示:
如上圖所示,channel上可以出現很多事件,比如建立連接,關閉連接,讀取數據,讀取完成,注冊,取消注冊等。這些方法都是可以被重寫的,我們只需要新建一個類,繼承ChannelInboundHandlerAdapter即可。
這里我們新建一個FirstServerHandler類,并重寫channelRead和exceptionCaught兩個方法,第一個方法是從channel中讀取消息,第二個方法是對異常進行處理。
public class FirstServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 對消息進行處理ByteBuf in = (ByteBuf) msg;try {log.info("收到消息:{}",in.toString(io.netty.util.CharsetUtil.US_ASCII));}finally {ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 異常處理log.error("出現異常",cause);ctx.close();} }上面例子中,我們收到消息后調用release()方法將其釋放,并不進行實際的處理。調用release方法是在消息使用完成之后常用的做法。上面代碼將msg進行了ByteBuf的強制轉換,如果并不想進行轉換的話,可以直接這樣使用:
try {// 消息處理} finally {ReferenceCountUtil.release(msg);}在異常處理方法中,我們打印出異常信息,并關閉異常的上下文。
有了Handler,我們需要新建一個Server類用來使用Handler創建channel和接收消息。接下來我們看一下netty的消息處理流程。
在netty中,對IO進行處理是使用多線程的event loop來實現的。netty中的EventLoopGroup就是這些event loop的抽象類。
我們來觀察一下EventLoopGroup的類結構。
可以看出EventLoopGroup繼承自EventExecutorGroup,而EventExecutorGroup繼承自JDK自帶的ScheduledExecutorService。
所以EventLoopGroup本質是是一個線程池服務,之所以叫做Group,是因為它里面包含了很多個EventLoop,可以通過調用next方法對EventLoop進行遍歷。
EventLoop是用來處理注冊到該EventLoop的channel中的IO信息,一個EventLoop就是一個Executor,通過不斷的提交任務進行執行。當然,一個EventLoop可以注冊多個channel,不過一般情況下并不這樣處理。
EventLoopGroup將多個EventLoop組成了一個Group,通過其中的next方法,可以對Group中的EventLoop進行遍歷。另外EventLoopGroup提供了一些register方法,將Channel注冊到當前的EventLoop中。
從上圖可以看到,register的返回結果是一個ChannelFuture,Future大家都很清楚,可以用來獲得異步任務的執行結果,同樣的ChannelFuture也是一個異步的結果承載器,可以通過調用sync方法來阻塞Future直到獲得執行結果。
可以看到,register方法還可以傳入一個ChannelPromise對象,ChannelPromise它同時是ChannelFuture和Promise的子類,Promise又是Future的子類,它是一個特殊的可以控制Future狀態的Future。
EventLoopGroup有很多子類的實現,這里我們使用NioEventLoopGroup,Nio使用Selector對channel進行選擇。還有一個特性是NioEventLoopGroup可以添加子EventLoopGroup。
對于NIO服務器程序來說,我們需要兩個Group,一個group叫做bossGroup,主要用來監控連接,一個group叫做worker group,用來處理被boss accept的連接,這些連接需要被注冊到worker group中才能進行處理。
將這兩個group傳給ServerBootstrap,就可以從ServerBootstrap啟動服務了,相應的代碼如下:
//建立兩個EventloopGroup用來處理連接和消息EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new FirstServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 綁定端口并開始接收連接ChannelFuture f = b.bind(port).sync();我們最開始創建的FirstServerHandler最作為childHandler的處理器在初始化Channel的時候就被添加進去了。
這樣,當有新建立的channel時,FirstServerHandler就會被用來處理該channel的數據。
上例中,我們還指定了一些ChannelOption,用于對channel的一些屬性進行設定。
最后,我們綁定了對應的端口,并啟動服務器。
netty的第一個客戶端
上面我們已經寫好了服務器,并將其啟動,現在還需要一個客戶端和其進行交互。
如果不想寫代碼的話,可以直接telnet localhost 8000和server端進行交互即可,但是這里我們希望使用netty的API來構建一個client和Server進行交互。
構建netty客戶端的流程和構建netty server端的流程基本一致。首先也需要創建一個Handler用來處理具體的消息,同樣,這里我們也繼承ChannelInboundHandlerAdapter。
上一節講到了ChannelInboundHandlerAdapter里面有很多方法,可以根據自己業務的需要進行重寫,這里我們希望當Channel active的時候向server發送一個消息。那么就需要重寫channelActive方法,同時也希望對異常進行一些處理,所以還需要重寫exceptionCaught方法。如果你想在channel讀取消息的時候進行處理,那么可以重寫channelRead方法。
創建的FirstClientHandler代碼如下:
@Slf4j public class FirstClientHandler extends ChannelInboundHandlerAdapter {private ByteBuf content;private ChannelHandlerContext ctx;@Overridepublic void channelActive(ChannelHandlerContext ctx) {this.ctx = ctx;content = ctx.alloc().directBuffer(256).writeBytes("Hello flydean.com".getBytes(StandardCharsets.UTF_8));// 發送消息sayHello();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 異常處理log.error("出現異常",cause);ctx.close();}private void sayHello() {// 向服務器輸出消息ctx.writeAndFlush(content.retain());} }上面的代碼中,我們首先從ChannelHandlerContext申請了一個ByteBuff,然后調用它的writeBytes方法,寫入要傳輸的數據。最后調用ctx的writeAndFlush方法,向服務器輸出消息。
接下來就是啟動客戶端服務了,在服務端我們建了兩個NioEventLoopGroup,是兼顧了channel的選擇和channel中消息的讀取兩部分。對于客戶端來說,并不存在這個問題,這里只需要一個NioEventLoopGroup即可。
服務器端使用ServerBootstrap來啟動服務,客戶端使用的是Bootstrap,其啟動的業務邏輯基本和服務器啟動一致:
EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new FirstClientHandler());}});// 連接服務器ChannelFuture f = b.connect(HOST, PORT).sync();運行服務器和客戶端
有了上述的準備工作,我們就可以運行了。首先運行服務器,再運行客戶端。
如果沒有問題的話,應該會輸出下面的內容:
[nioEventLoopGroup-3-1] INFO com.flydean01.FirstServerHandler - 收到消息:Hello flydean.com總結
一個完整的服務器,客戶端的例子就完成了。我們總結一下netty的工作流程,對于服務器端,首先建立handler用于對消息的實際處理,然后使用ServerBootstrap對EventLoop進行分組,并綁定端口啟動。對于客戶端來說,同樣需要建立handler對消息進行處理,然后調用Bootstrap對EventLoop進行分組,并綁定端口啟動。
有了上面的討論就可以開發屬于自己的NIO服務了。是不是很簡單? 后續文章將會對netty的架構和背后的原理進行深入討論,敬請期待。
本文的例子可以參考:learn-netty4
本文已收錄于 http://www.flydean.com/01-netty-startup/
最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!
總結
以上是生活随笔為你收集整理的netty系列之:netty初探的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NumPy之:多维数组中的线性代数
- 下一篇: netty系列之:netty中的Byte