NIO和Reactor
本文參考Doug Lea的Scalable IO in Java.
網(wǎng)絡(luò)服務(wù)
隨著網(wǎng)絡(luò)服務(wù)的越來(lái)越多,我們對(duì)網(wǎng)絡(luò)服務(wù)的性能有了更高的要求,提供一個(gè)高性能,穩(wěn)定的web服務(wù)是一件很麻煩的事情,所以有了netty框架幫我們完成。
我們對(duì)各種各樣的網(wǎng)絡(luò)服務(wù)進(jìn)行抽象,得到最基本的業(yè)務(wù)流程:
1:讀取請(qǐng)求信息
2:對(duì)請(qǐng)求信息進(jìn)行解碼
3:進(jìn)行相關(guān)的業(yè)務(wù)處理
4:對(duì)處理結(jié)果進(jìn)行編碼
5:發(fā)送請(qǐng)求
看到這,netty的ChannelHandler就是干這幾件事情的。
傳統(tǒng)的網(wǎng)絡(luò)服務(wù)
在jdk1.4之前,我們只有BIO,所以當(dāng)時(shí)的網(wǎng)絡(luò)服務(wù)的架構(gòu)是這樣的。
每個(gè)線程處理一個(gè)請(qǐng)求, 由于線程個(gè)數(shù)和cpu個(gè)數(shù)的原因,這種設(shè)計(jì)性能是有上限的,所以就有了集群模式:tomcat集群。
/*** Created by gaoxing on 2015-01-20 .*/ public class ClasssicServer {public static void main(String[] args) {try {ServerSocket serverSocket=new ServerSocket(8888,10);System.out.println("server is start");while( ! Thread.interrupted()){new Thread(new Handler(serverSocket.accept())).start();}} catch (IOException e) {e.printStackTrace();}}static class Handler implements Runnable{final Socket socket;final static int MAX_SIZE=1024;public Handler(Socket socket){this.socket=socket;}@Overridepublic void run() {//TODO//在這里對(duì)socket中的數(shù)據(jù)進(jìn)行讀取和處理,然后把結(jié)果寫入到socket中去 }} }高性能的IO目標(biāo)和Reactor
1:高負(fù)載下可以穩(wěn)定的工作
2:提高資源的利用率
3:低延遲
這有就有了分而治之和事件驅(qū)動(dòng)的思想。這樣沒(méi)有thread就不用瞎跑了,cpu就不用不停的切換Thread . 這樣提出了Reactor模式
1:Reactor接收IO事件發(fā)送給該事件的處理器處理
2:處理器的操作是非阻塞的。
3:管理事件和處理器的綁定。
?
?
這是一個(gè)單線程版本,所有的請(qǐng)求都是一個(gè)線程處理,當(dāng)然Reactor不是無(wú)緣無(wú)故的提出來(lái)的,因?yàn)閖dk提供了nio包,nio使得Reator得于實(shí)現(xiàn)
1:channels: 通道就是數(shù)據(jù)源的抽象,通過(guò)它可以無(wú)阻塞的讀取數(shù)據(jù)
2:buffers? : 用來(lái)裝載數(shù)據(jù)的,可以把從channel讀取到buffer中,或者把buffer中的數(shù)據(jù)寫入到channel中
3:selector :?用來(lái)監(jiān)聽(tīng) 有IO事件,并告訴channel
4:selectionkeys:? IO事件和處理器綁定
/*** Created by gaoxing on 2015-01-20 .*/ public class Reactor implements Runnable {final Selector selector ;final ServerSocketChannel serverSocketChannel;public Reactor(int port) throws IOException {this.selector=Selector.open();this.serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));//一定是非阻塞的,阻塞的一個(gè)通道就只能處理一個(gè)請(qǐng)求了serverSocketChannel.configureBlocking(false);//把OP_ACCEPT事件和Acceptor綁定SelectionKey sk=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);sk.attach(new Acceptor());}@Overridepublic void run() {while(!Thread.interrupted()){try {selector.select();//該方法是阻塞的,只有IO事件來(lái)了才向下執(zhí)行Set<SelectionKey> selected=selector.selectedKeys();Iterator<SelectionKey> it=selected.iterator();while(it.hasNext()){dispatch(it.next());}selected.clear() } catch (IOException e) {e.printStackTrace();}}}private void dispatch(SelectionKey next) {Runnable runnable= (Runnable) next.attachment();if(runnable!=null){runnable.run();}}private class Acceptor implements Runnable{@Overridepublic void run() {SocketChannel channel= null;try {channel = serverSocketChannel.accept();if (channel!=null){new Handler(selector,channel);}} catch (IOException e) {e.printStackTrace();}}} }class Handler implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static final int READING=0,SENDING=1;int state=READING;public Handler(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個(gè)有個(gè)問(wèn)題,ppt上register是0,*/sk=this.channel.register(selector,SelectionKey.OP_READ);sk.attach(this);/*** 這里的作用是,設(shè)置key的狀態(tài)為可讀,然后讓后selector的selector返回。* 然后就可以處理OP_READ事件了*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {if (state==READING) read();if (state==SENDING) write();}void read(){state=SENDING;sk.interestOps(SelectionKey.OP_WRITE);}void write(){sk.cancel();} }
?上面代碼注解理解有誤:
sk=this.channel.register(selector,0); 這里是初始化一個(gè)SelectionKey 不監(jiān)聽(tīng)事件sk.interestOps(SelectionKey.OP_READ); 這里設(shè)置監(jiān)聽(tīng)的事件為OP_READ
多線程的Reactor模式
現(xiàn)在的CPU多核的,為了提高對(duì)硬件的使用效率需要考慮使用多線程的Reactor設(shè)計(jì)模式,Reactor主要用來(lái)處罰事件的,但是事件的處理會(huì)降低Reactor的性能,考慮把事件的處理放到別的線程上來(lái)做,有點(diǎn)想android的設(shè)計(jì)模式,UI線程用來(lái)接收用戶的事件,事件的處理放到線程去做來(lái)提高用戶的體驗(yàn)。多線程Reactor有兩種一種是Reactor線程只關(guān)注io事件,事件處理放到別的線程,一種對(duì)事件分類主Reactor只關(guān)注Accept事件,子Reactor關(guān)注read和write事件。事件處理放到線程去做,這也是netty的設(shè)計(jì)模式。
?
class HandlerPool implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static ExecutorService executor = Executors.newFixedThreadPool(100);static final int READING=0,SENDING=1;int state=READING;public HandlerPool(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個(gè)有個(gè)問(wèn)題,這里注冊(cè)的SelectionKey是不處理的,應(yīng)該他監(jiān)聽(tīng)的事件為0*/sk=this.channel.register(selector,0);sk.attach(this);/*** 這里的作用是,SelectionKey的監(jiān)聽(tīng)事件為OP_READ。interestOps對(duì)哪個(gè)事件感興趣*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {executor.execute(new Processer());}class Processer implements Runnable{@Overridepublic void run() {}} }這個(gè)就在Acceptor里面多實(shí)例化幾個(gè)Selector,它處理Read和Write事件。
?
大致架構(gòu)弄懂了。后面邊看netty源碼,邊學(xué)習(xí)nio
?
轉(zhuǎn)載于:https://www.cnblogs.com/gaoxing/p/4237789.html
總結(jié)
以上是生活随笔為你收集整理的NIO和Reactor的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。