高性能IO -Reactor模式的实现
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
在了解Reactor模式之前, 首先了解什么是NIO.
java.nio全稱java non-blocking IO 即非阻塞IO.這個(gè)地方要明白,非阻塞不等于異步。
非阻塞:當(dāng)一個(gè)線上讀取數(shù)據(jù),沒有數(shù)據(jù)時(shí)該線程可以干其他的事情。就是調(diào)用了之后立馬返回。
異步IO: 在一個(gè)IO操作中,用戶態(tài)的線程完全不用考慮數(shù)據(jù)的讀取過程,都交給操作系統(tǒng)完成,完成之后通知用戶線程即可。這才是真正的異步操作。
同步IO? 每個(gè)請(qǐng)求必須逐個(gè)的被處理,一個(gè)流程的處理會(huì)導(dǎo)致整個(gè)流程的暫時(shí)等待。
阻塞:? 某個(gè)請(qǐng)求發(fā)出后,該請(qǐng)求操作需要的條件不滿足,請(qǐng)求會(huì)一直阻塞,不會(huì)返回,直到條件滿足。
?
?其中java NIO 中的 Select 在Linux中基于epoll實(shí)現(xiàn)?;贗O多路復(fù)用。就是一個(gè)線程來管理多個(gè)IO.
epoll全稱eventpoll 是linux內(nèi)核針對(duì)IO多路復(fù)用的實(shí)現(xiàn)。在linux中,和epoll類似的由select和poll。
其中epoll監(jiān)聽的fd集合是一直在內(nèi)核存在的,有三個(gè)系統(tǒng)調(diào)用:epoll_create epoll_wait epoll_ctl 通過epoll_wait可以多次監(jiān)聽同一個(gè)fd結(jié)合,只返回可讀寫的那部分。
select只有一個(gè)系統(tǒng)調(diào)用,就是每次都需要將要監(jiān)聽的所有集合都傳給操作系統(tǒng),當(dāng)有事件發(fā)生時(shí)。操作系統(tǒng)在返回給你整個(gè)集合。
?
NIO核心包含三個(gè)部分: Channels Buffers Selectors.
Channel: 在NIO中,所有的IO過程都是從建立一個(gè)Channel開始的,數(shù)據(jù)可以從channel中讀取到Buffer中 也可以從Buffer中寫入到channel中。channel就好像BIO中的流。但是channel時(shí)雙向的,我感覺這樣更貼近于現(xiàn)實(shí),畢竟TCP連接是全雙工的。
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
channel分為這四種,分別對(duì)應(yīng)著文件,UDP TCP網(wǎng)絡(luò)IO.
Buffer buffer即為緩沖區(qū),也就是數(shù)據(jù)塊。
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
java基礎(chǔ)數(shù)據(jù)類型中除了boolean都有對(duì)應(yīng)的buffer實(shí)現(xiàn)。
Selector (選擇器): 他是NIO中的關(guān)鍵所在,我們?cè)诔绦蛑锌梢酝ㄟ^它來實(shí)現(xiàn)一個(gè)線程同時(shí)處理多個(gè)Channel 也就是多個(gè)連接。
如上圖,一個(gè)Selectot監(jiān)聽五個(gè)通道,在使用時(shí)首先需要將通道以及對(duì)應(yīng)感興趣的事件(Accept? ?read? writer等?)注冊(cè)到Selector上 。當(dāng)發(fā)生對(duì)應(yīng)的事件時(shí),操作系統(tǒng)回通知我們的程序。在Selector中可以讀取到對(duì)應(yīng)的Channel 根據(jù)事件類型做出相應(yīng)的操作。
零拷貝
java NIO中提供的FileChannel擁有transferTo和transferFrom兩個(gè)方法,可以直接把FileChannel中的數(shù)據(jù)拷貝到另一個(gè)Channel,或者把另一個(gè)Channel中的數(shù)據(jù)拷貝到FileChannel .在操作系統(tǒng)的支持下,通過這個(gè)方法傳輸數(shù)據(jù)不需要將原數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶態(tài),再?gòu)挠脩魬B(tài)拷貝到內(nèi)核態(tài)。
?
Reactor實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Echo服務(wù)器? 基于單個(gè)線程同時(shí)處理多個(gè)連接。這樣一個(gè)Selector同時(shí)完成Accept? Read Write事件的監(jiān)聽,同時(shí)業(yè)邏輯也和Selector在同一個(gè)線程中執(zhí)行。這里可以優(yōu)化一下將業(yè)務(wù)邏輯在新的線程中執(zhí)行。
public class EchoService {private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector, SelectionKey.OP_ACCEPT);while (true){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iteratorKey=keys.iterator();while (iteratorKey.hasNext()){SelectionKey key=iteratorKey.next();if (key.isAcceptable()){ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel();SocketChannel socketChannel=serverChannel.accept();socketChannel.configureBlocking(false).register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024));}if (key.isReadable()){SocketChannel sc= (SocketChannel) key.channel();ByteBuffer buffer= (ByteBuffer) key.attachment();buffer.clear();int readCount= sc.read(buffer);if (readCount<0){iteratorKey.remove();continue;}buffer.flip();sc.write(buffer);System.out.print(new String(buffer.array(),0,readCount));}iteratorKey.remove();}}} catch (Exception e) {e.printStackTrace();}finally {System.out.println("exit");}} }現(xiàn)在計(jì)算機(jī)的核數(shù)越來越多,僅僅用一個(gè)核心來處理IO連接有點(diǎn)讓費(fèi)系統(tǒng)資源,因此我們可以多見幾個(gè)Reactor? .其中住Reactor負(fù)責(zé)TCP的連接(Accept),連接之后分配到子Reactor來處理IO的讀寫事件。
并且每個(gè)子Reactor分別屬于一個(gè)獨(dú)立的線程,每個(gè)成功連接后的Channel的所有操作自始至終旨在一個(gè)線程處理。這樣保證了同一個(gè)請(qǐng)求的所有狀態(tài)和上下文在同一個(gè)線程中,方便監(jiān)控請(qǐng)求相應(yīng)狀態(tài)。
具體代碼實(shí)現(xiàn) EchoService為例:
https://github.com/WJ1020/reactor
public class EchoService {private static final Logger logger= LoggerFactory.getLogger(EchoService.class);private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){logger.info("echo service start......");try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector,SelectionKey.OP_ACCEPT);int coreNum = Runtime.getRuntime().availableProcessors();Processor[] processors = new Processor[coreNum];for (int i = 0; i < processors.length; i++) {logger.info("creat processor :{}",i+1);processors[i] = new Processor();}int index=0;while (Status.running){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey selectionKey=iterator.next();iterator.remove();if (selectionKey.isAcceptable()){ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=currServerSocketChannel.accept();socketChannel.configureBlocking(false);logger.info("Accept request from {}",socketChannel.getRemoteAddress());Processor processor=processors[(++index)%coreNum];processor.addChannel(socketChannel);}}}} catch (IOException e) {logger.error("io exception {}",e.getMessage());}}} public class Processor {private static final Logger logger= LoggerFactory.getLogger(Processor.class);private static final ExecutorService service=Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());private final Selector selector;private volatile boolean running=true;public Processor() throws IOException {this.selector= SelectorProvider.provider().openSelector();}public void addChannel(SocketChannel socketChannel){try {socketChannel.register(this.selector, SelectionKey.OP_READ);if (running){running=false;start();}wakeup();} catch (ClosedChannelException e) {logger.error("register channel error :{}",e.getMessage());}}private void wakeup(){this.selector.wakeup();}private void start(){service.submit(new ProcessorTask(selector));} } public class ProcessorTask implements Runnable {private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class);private Selector selector;ProcessorTask(Selector selector) {this.selector = selector;}@Overridepublic void run() {logger.info("{}\tsub reactor start listener",Thread.currentThread().getName());while (Status.running){try {selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey key=iterator.next();iterator.remove();if (key.isReadable()){ByteBuffer buffer= ByteBuffer.allocate(1024);SocketChannel socketChannel= (SocketChannel) key.channel();int count=socketChannel.read(buffer);if (count<0){socketChannel.close();key.cancel();logger.info("{}\t Read ended",socketChannel);}else if (count==0){logger.info("{}\t Message size is 0",socketChannel);}else {buffer.flip();socketChannel.write(buffer);logger.info("{}\t Read message{}",socketChannel,new String(buffer.array()));}}}} catch (IOException e) {logger.error("select error :{}",e.getMessage());}}} }在EchoService中 ,主Reactor接受到新的連接后,將channel注冊(cè)到subReactor的Selector中。每個(gè)子Reactor都有一個(gè)自己的Selector對(duì)象,并有獨(dú)立的一個(gè)線程處理。
轉(zhuǎn)載于:https://my.oschina.net/wang520/blog/3036562
總結(jié)
以上是生活随笔為你收集整理的高性能IO -Reactor模式的实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一句话,讲清楚java泛型的本质(非类型
- 下一篇: swoole-co-pool v1.0.