Tomcat NIO
說起Tomcat的NIO,不得不提的就是Connector這個Tomcat組件。Connector是Tomcat的連接器,其主要任務(wù)是負責處理收到的請求,并創(chuàng)建一個Request和Response的對象,然后用一個線程用于處理請求,Connector會把Request和Response對象傳遞給該線程,該線程的具體的處理過程是Container容器的事了。
在tomcat啟動過程中,會初始化Connector,并調(diào)用Connector的startInternal()方法開啟Connector,開始監(jiān)聽、處理請求。
?
想了解Tomcat NIO的工作方式,就得先了解一下Connector的實現(xiàn)原理。下面從三個方面來了解一下Connector組件:Connector的數(shù)據(jù)結(jié)構(gòu)、Connector初始化以及Connector開啟。
?
Connector
Connector的數(shù)據(jù)結(jié)構(gòu)
先了解一下Connector的數(shù)據(jù)結(jié)構(gòu)。Connector的一個主要的屬性:ProtocolHandler protocolHandler(協(xié)議)
?
protocolHandler(協(xié)議)
- 維護服務(wù)器使用的協(xié)議,如http1.1等。ProtocolHandler是接口,實現(xiàn)類有Http11Nio2Protocol?、Http11Nio2Protocol等
- 維護服務(wù)提供的IO方式,負責EndPoint的初始化、啟動。目前有BIO、NIO、AIO等IO方式,來實現(xiàn)監(jiān)聽端口、讀寫socket數(shù)據(jù)的功能。通過EndPoint封裝實現(xiàn)不同的IO方式
- EndPoint監(jiān)聽到IO讀寫,交給Tomcat線程池中的一個線程來處理,SocketProcessor會根據(jù)protocolHandler采用的協(xié)議,調(diào)用協(xié)議的process方法處理請求。
- 維護adapter(適配器),可以將請求/響應(yīng)數(shù)據(jù)進行適配
?
protocolHandler會找到socket對應(yīng)的處理器(如Http11Processor),然后進行數(shù)據(jù)讀寫、適配,處理。請求由adapter最終會交給servlet處理
?
常說的BIO、NIO,主要的應(yīng)用就在protocolHandler中。protocolHandler負責維護Connector使用的協(xié)議以及IO方式。在protocolHandler中,不同的IO方式,會使用不同的EndPoint,具體采用哪種IO方式,取決于采用哪個EndPoint,每一個EndPoint的實現(xiàn)類,都封裝了一種IO策略。若采用NIO,則為NioEndpoint。
?
Connector初始化
?
創(chuàng)建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協(xié)議配置,利用反射創(chuàng)建ProtocolHandler:
/*** Coyote Protocol handler class name.* Defaults to the Coyote HTTP/1.1 protocolHandler.*/ protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";public Connector(String protocol) {//設(shè)置protocolHandlerClassName類名setProtocol(protocol);// Instantiate protocol handlerProtocolHandler p = null;try {//根據(jù)server.xml中<connector/>標簽的protocol屬性值,獲取到對應(yīng)的http協(xié)議類Class<?> clazz = Class.forName(protocolHandlerClassName);p = (ProtocolHandler) clazz.getConstructor().newInstance();} catch (Exception e) {log.error(sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"), e);} finally {this.protocolHandler = p;}if (Globals.STRICT_SERVLET_COMPLIANCE) {uriCharset = StandardCharsets.ISO_8859_1;} else {uriCharset = StandardCharsets.UTF_8;} }//設(shè)置protocolHandlerClassName類名 public void setProtocol(String protocol) {boolean aprConnector = AprLifecycleListener.isAprAvailable() &&AprLifecycleListener.getUseAprConnector();//若配置了protocol="HTTP/1.1"或者沒配,則默認是Http11NioProtocol或者Http11AprProtocolif ("HTTP/1.1".equals(protocol) || protocol == null) {if (aprConnector) {setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");} else {setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");}} else if ("AJP/1.3".equals(protocol)) {if (aprConnector) {setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");} else {setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");}} else {//直接取配置的類名setProtocolHandlerClassName(protocol);} }
以Tomcat8.5.20為例,這里默認是http1.1的NIO。
?
?
Connector.start()開啟
?
Connector初始化后,調(diào)用start方法開啟。主要涉及一下幾個方法:
?
Connector的startInternal()方法,會調(diào)用protocolHandler.start();
protocolHandler中會調(diào)用endpoint.start(),從而達到開啟endpoint、監(jiān)聽端口、讀寫Socket的目的:
以Tomcat8.5.20為例,這里默認是http1.1的NIO。Connector.start()開啟Connector初始化后,調(diào)用start方法開啟。主要涉及一下幾個方法:Connector的startInternal()方法,會調(diào)用protocolHandler.start(); protocolHandler中會調(diào)用endpoint.start(),從而達到開啟endpoint、監(jiān)聽端口、讀寫Socket的目的:
至此,Connector完成了開啟的過程,開啟監(jiān)聽端口、可以讀寫Socket了。
?
總結(jié)一下,關(guān)于Connector:
創(chuàng)建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協(xié)議配置,利用反射創(chuàng)建ProtocolHandler。
ProtocolHandler負責維護Connector使用的協(xié)議以及IO方式,不同的IO方式如BIO、NIO、AIO封裝在EndPoint中
開啟Connector時,會開啟protocolHandler,從而達到EndPoint的開啟,開始監(jiān)聽端口、讀寫socket數(shù)據(jù)了
protocolHandler中將請求拿到的數(shù)據(jù)進行適配,通過adapter適配成Request和Response對象,最終交給Container去處理
?
下面重點就來了,NIO。
Tomcat NIO
?
Tomcat在處理客戶端請求時,讀寫socket數(shù)據(jù)是一種網(wǎng)絡(luò)IO操作。目前Tomcat有幾種IO方式,分別是BIO(同步阻塞),NIO(同步非阻塞)和AIO(異步非阻塞)。不同IO方式的讀寫機制,被封裝在了Endpoint中。BIO、AIO不再贅述。這里主要看NIO。
Tomcat NIO模型
當然要了解一下Tomcat NIO的模型了。Tomcat NIO是基于Java NIO實現(xiàn)的,其基本原理如下:
?
Tomcat NIO是對Java NIO的一種典型的應(yīng)用方式:通過JDK提供的同步非阻塞的IO方式,實現(xiàn)了IO多路復(fù)用,即一個線程管理多個客戶端的連接。了解Java NIO,可以看一下Java NIO。
Tomcat在NIO模式下,所有客戶端的請求先由一個接收線程接收,然后由若干個(一般為CPU的個數(shù))線程輪詢讀寫事件,最后將具體的讀寫操作交由線程池處理。
NioEndpoint
?
要了解Tomcat的NIO實現(xiàn),其實就是了解NioEndpoint的實現(xiàn)原理。
?
數(shù)據(jù)結(jié)構(gòu)
它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5個部分
?
- LimitLatch是連接控制器,它負責維護連接數(shù)的計算,nio模式下默認是10000,達到這個閾值后,就會拒絕連接請求。
- Acceptor負責接收連接,默認是1個線程來執(zhí)行,將請求的事件注冊到事件列表
- Poller來負責輪詢上述產(chǎn)生的事件。Poller線程數(shù)量是cpu的核數(shù)Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller將就緒的事件生成SocketProcessor,然后交給Excutor去執(zhí)行。
- SocketProcessor繼承了SocketProcessorBase,實現(xiàn)了Runnable接口,可以提交給線程池Excutor來執(zhí)行。它里面的doRun()方法,封裝了讀寫Socket、完成Container調(diào)用的邏輯
- Excutor線程池是一個Tomcat線程池。用來執(zhí)行Poller創(chuàng)建的SocketProcessor。Excutor線程池的大小就是我們在Connector節(jié)點配置的maxThreads的值。
?
SocketProcessor被一個線程執(zhí)行的時候,會完成從socket中讀取http request,解析成HttpServletRequest對象,分派到相應(yīng)的servlet并完成邏輯,然后將response通過socket發(fā)回client。在從socket中讀數(shù)據(jù)和往socket中寫數(shù)據(jù)的過程,并沒有像典型的非阻塞的NIO的那樣,注冊O(shè)P_READ或OP_WRITE事件到主Selector,而是直接通過socket完成讀寫,這時是阻塞完成的,但是在timeout控制上,使用了NIO的Selector機制,但是這個Selector并不是Poller線程維護的主Selector,而是BlockPoller線程中維護的Selector,稱之為輔Selector,實現(xiàn)可見org.apache.coyote.http11.Http11InputBuffer#fill。
?
了解了NioEndPoint的數(shù)據(jù)結(jié)構(gòu)之后,可以看一下它們的關(guān)系圖
NioEndpoint組件關(guān)系圖
以上過程就以同步非阻塞的方式完成了網(wǎng)絡(luò)IO。
?
其實是一個Reactor模型:
- 一個Acceptor(當然多個也行,不過一般場景一個夠了)負責accept事件,把接收到SocketChannel注冊到按某種算法從Reactor池中取出的一個Reactor上,注冊的事件為讀,寫等,之后這個Socket Channel的所有IO事件都和Acceptor沒關(guān)系,都由被注冊到的那個Reactor來負責。
- 每個Acceptor和每個Reactor都各自持有一個Selector
- 當然每個Acceptor和Reactor都是一個線程
?
這里的Poller池其實就是一個Reactor池,可以是多個線程。
?
?
NioEndPoint實現(xiàn)
?
工作原理簡單了解了一下,接下來看一下具體的代碼實現(xiàn)吧。先上一個NioEndpoint的UML圖:
NioEndPoint啟動
?
AbstractEndpoint里實現(xiàn)了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法
NioEndPoint啟動AbstractEndpoint里實現(xiàn)了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法
其中,bind()方法和startInternal()方法,由其子類具體實現(xiàn)。
bind()方法用于初始化endpoint,綁定監(jiān)聽端口等、設(shè)置最大線程數(shù)、ssl等。
startInternal()方法在EndPoint初始化完畢后,創(chuàng)建pollers輪詢線程以及acceptors線程并開啟。
其中,bind()方法和startInternal()方法,由其子類具體實現(xiàn)。 bind()方法用于初始化endpoint,綁定監(jiān)聽端口等、設(shè)置最大線程數(shù)、ssl等。 startInternal()方法在EndPoint初始化完畢后,創(chuàng)建pollers輪詢線程以及acceptors線程并開啟。
NioEndPoint時序圖
看完了開啟EndPoint的過程,再來詳細看一下NioEndpoint處理的的時序圖:
通過上面的時序圖,結(jié)合代碼來詳細了解一下Acceptor和Poller的工作方式。
?
Acceptor接收請求
?
NioEndPoint中的Acceptor方法實現(xiàn)了Runnable接口,主要干的活就是上述圖中的3,4,5,6,7
?
@Override public void run() { int errorDelay = 0; // 循環(huán),直到收到一個關(guān)閉的命令 while (running) { // 如果EndPoint被暫停,則循環(huán)sleep while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //如果達到了最大連接數(shù),則等待 countUpOrAwaitConnection(); SocketChannel socket = null; try { // 創(chuàng)建一個socketChannel,接收下一個從服務(wù)器進來的連接 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // 成功接收,重置error delay errorDelay = 0; // 如果處于EndPoint處于running狀態(tài)并且沒有沒暫停,Configure the socket if (running && !paused) { // setSocketOptions()將把socket傳遞給適當?shù)奶幚砥鳌H绻晒?#xff0c;會關(guān)閉socket。 // 否則,在這里關(guān)閉socket if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
看的出來,Acceptor使用serverSock.accept()阻塞的監(jiān)聽端口,如果有連接進來,拿到了socket,并且EndPoint處于正常運行狀態(tài),則調(diào)用NioEndPoint的setSocketOptions方法,一頓操作。
至于setSocketOptions做了什么,概括來說就是根據(jù)socket構(gòu)建一個NioChannel,然后把這個的NioChannel注冊到Poller的事件列表里面,等待poller輪詢。
?
看下setSocketOptions的代碼:
/*** 處理指定的連接* @param socket The socket channel* @return * 如果socket配置正確,并且可能會繼續(xù)處理,返回true * 如果socket需要立即關(guān)閉,則返回false*/ protected boolean setSocketOptions(SocketChannel socket) {// Process the connectiontry {//非阻塞模式socket.configureBlocking(false);Socket sock = socket.socket();socketProperties.setProperties(sock);//從緩存中拿一個nioChannel 若沒有,則創(chuàng)建一個。將socket傳進去NioChannel channel = nioChannels.pop();if (channel == null) {SocketBufferHandler bufhandler = new SocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if (isSSLEnabled()) {channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);} else {channel = new NioChannel(socket, bufhandler);}} else {channel.setIOChannel(socket);channel.reset();}//從pollers數(shù)組中獲取一個Poller對象,注冊這個nioChannelgetPoller0().register(channel);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);try {log.error("",t);} catch (Throwable tt) {ExceptionUtils.handleThrowable(tt);}// Tell to close the socketreturn false;}return true; }
顯然,下面的重點就是register這個方法了。這個方法是NioEndPoint中的Poller實現(xiàn)的,主要干的事就是在Poller注冊新創(chuàng)建的套接字。
/*** 使用輪詢器注冊新創(chuàng)建的socket** @param socket 新創(chuàng)建的socket*/ public void register(final NioChannel socket) {socket.setPoller(this);//創(chuàng)建一個NioSocketWrapper,包裝一下socket。然后一頓設(shè)置。NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);socket.setSocketWrapper(ka);ka.setPoller(this);ka.setReadTimeout(getSocketProperties().getSoTimeout());ka.setWriteTimeout(getSocketProperties().getSoTimeout());ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());ka.setSecure(isSSLEnabled());ka.setReadTimeout(getConnectionTimeout());ka.setWriteTimeout(getConnectionTimeout());//從緩存中取出一個PollerEvent對象,若沒有則創(chuàng)建一個。將socket和NioSocketWrapper設(shè)置進去PollerEvent r = eventCache.pop();ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);else r.reset(socket,ka,OP_REGISTER);//添到到該Poller的事件列表addEvent(r); }
總結(jié)一下,從Acceptor接收到請求,它做了這么些工作:
- 如果達到了最大連接數(shù),則等待。否則,阻塞監(jiān)聽端口。
- 監(jiān)聽到有連接,則創(chuàng)建一個socketChannel。若服務(wù)正常運行,則把socket傳遞給適當?shù)奶幚砥鳌H绻晒?#xff0c;會關(guān)閉socket。
?
在這里,適當?shù)奶幚硎侵刚{(diào)用NioEndPoint的setSocketOptions方法,處理指定的連接:
- 將socket設(shè)置為非阻塞
- 從緩存中拿一個nioChannel ?若沒有,則創(chuàng)建一個。將socket傳進去。
- 從pollers數(shù)組中獲取一個Poller對象,把nioChannel注冊到該Poller中。
?
其中最后一步注冊的過程,是調(diào)用Poller的register()方法:
- 創(chuàng)建一個NioSocketWrapper,包裝socket。然后配置相關(guān)屬性,設(shè)置感興趣的操作為SelectionKey.OP_READ
- PollerEvent。PollerEvent可以是從緩存中取出來的,若沒有則創(chuàng)建一個。初始化或者重置此Event對象,設(shè)置感興趣的操作為OP_REGISTER (Poller輪詢時會用到)
- 將新的PollerEvent添加到這個Poller的事件列表events,等待Poller線程輪詢。
?
Poller輪詢
?
其實上面已經(jīng)提到了Poller將一個事件注冊到事件隊列的過程。接下來便是Poller線程如何處理這些事件了,這就是Poller線程的工作機制。
Poller作為一個線程,實現(xiàn)了Runnable接口的run方法,在run方法中會輪詢事件隊列events,將每個PollerEvent中的SocketChannel感興趣的事件注冊到Selector中,然后將PollerEvent從隊列里移除。之后就是SocketChanel通過Selector調(diào)度來進行非阻塞的讀寫數(shù)據(jù)了。
看下Poller.run()代碼:
/*** The background thread that adds sockets to the Poller, checks the* poller for triggered events and hands the associated socket off to an* appropriate processor as events occur.*/ @Override public void run() {// 循環(huán)直到 destroy() 被調(diào)用while (true) {boolean hasEvents = false;try {if (!close) {//將events隊列,將每個事件中的通道感興趣的事件注冊到Selector中hasEvents = events();if (wakeupCounter.getAndSet(-1) > 0) {//如果走到了這里,代表已經(jīng)有就緒的IO通道//調(diào)用非阻塞的select方法,直接返回就緒通道的數(shù)量keyCount = selector.selectNow();} else {//阻塞等待操作系統(tǒng)返回 數(shù)據(jù)已經(jīng)就緒的通道,然后被喚醒keyCount = selector.select(selectorTimeout);}wakeupCounter.set(0);}if (close) {events();timeout(0, false);try {selector.close();} catch (IOException ioe) {log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);}break;}} catch (Throwable x) {ExceptionUtils.handleThrowable(x);log.error("",x);continue;}//如果上面select方法超時,或者被喚醒,先將events隊列中的通道注冊到Selector上。if ( keyCount == 0 ) hasEvents = (hasEvents | events());Iterator<SelectionKey> iterator =keyCount > 0 ? selector.selectedKeys().iterator() : null;// 遍歷已就緒的通道,并調(diào)用processKey來處理該Socket的IO。while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();// 如果其它線程已調(diào)用,則Attachment可能為空if (attachment == null) {iterator.remove();} else {iterator.remove();//創(chuàng)建一個SocketProcessor,放入Tomcat線程池去執(zhí)行processKey(sk, attachment);}}//while//process timeoutstimeout(keyCount,hasEvents);}//whilegetStopLatch().countDown(); }
讀取已就緒通道的部分,是常見的Java NIO的用法,Selector調(diào)用selectedKeys(),獲取IO數(shù)據(jù)已經(jīng)就緒的通道,遍歷并調(diào)用processKey方法來處理每一個通道就緒的事件。而processKey方法會創(chuàng)建一個SocketProcessor,然后丟到Tomcat線程池中去執(zhí)行。
其中需要注意的一個點是,events()方法,用來處理PollerEvent事件,執(zhí)行PollerEvent.run(),然后將PollerEvent重置再次放入緩存中,以便對象復(fù)用。
/*** Processes events in the event queue of the Poller.** @return <code>true</code> if some events were processed,* <code>false</code> if queue was empty*/ public boolean events() {boolean result = false;PollerEvent pe = null;while ( (pe = events.poll()) != null ) {result = true;try {//把SocketChannel感興趣的事件注冊到Selector中pe.run();pe.reset();if (running && !paused) {eventCache.push(pe);}} catch ( Throwable x ) {log.error("",x);}}return result; }?
可以看出,PollerEvent.run()方法才是重點:
public void run() {//Acceptor調(diào)用Poller.register()方法時,創(chuàng)建的PollerEvent感興趣的事件為OP_REGISTER,因此走這個分支if (interestOps == OP_REGISTER) {try {//將SocketChannel的讀事件注冊到Poller線程的Selector中,使用Selector來調(diào)度IO。socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);} catch (Exception x) {log.error(sm.getString("endpoint.nio.registerFail"), x);}} else {final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());try {if (key == null) {// The key was cancelled (e.g. due to socket closure)// and removed from the selector while it was being// processed. Count down the connections at this point// since it won't have been counted down when the socket// closed.socket.socketWrapper.getEndpoint().countDownConnection();} else {final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();if (socketWrapper != null) {//we are registering the key to start with, reset the fairness counter.int ops = key.interestOps() | interestOps;socketWrapper.interestOps(ops);key.interestOps(ops);} else {socket.getPoller().cancelledKey(key);}}} catch (CancelledKeyException ckx) {try {socket.getPoller().cancelledKey(key);} catch (Exception ignore) {}}} }
至此,可以看出Poller線程的作用
- 將Acceptor接收到的請求注冊到Poller的事件隊列中
- Poller輪詢事件隊列中,處理到達的事件,將PollerEvent中的通道注冊到Poller的Selector中
- 輪詢已就緒的通道,對每個就緒通道創(chuàng)建一個SocketProcessor,交個Tomcat線程池去處理
?
剩下的事情,就是SocketProcessor怎么適配客戶端發(fā)來請求的數(shù)據(jù)、然后怎樣交給Tomcat容器去處理了。
?
SocketProcessor處理請求
?
簡單提一下SocketProcessor的處理過程,不是這篇文章的重點。通過上面可以知道,具體處理一個請求,是在SocketProcessor通過線程池去執(zhí)行的。執(zhí)行一次請求的時序圖
SocketProcessor中通過Http11ConnectionHandler,取到Htpp11Processor,Htpp11Processor調(diào)用prepareRequest方法,準備好請求數(shù)據(jù)。然后調(diào)用CoyoteAdapter的service方法進行request和response的適配,之后交給容器進行處理。
?
在CoyoteAdapter的service方法中,主要干了2件事:
- org.apache.coyote.Request -> org.apache.catalina.connector.Request extends HttpServletRequest,org.apache.coyote.Response -> org.apache.catalina.connector. Response extends HttpServletResponse
- 將請求交給StandardEngineValue處理
?
將請求交給Tomcat容器處理后,后將請求一層一層傳遞到Engin、Host、Context、Wrapper,最終經(jīng)過一系列Filter,來到了Servlet,執(zhí)行我們自己具體的代碼邏輯。其中,容器之間數(shù)據(jù)的傳遞用到了管道流的機制。這里就不在贅述,以后有時間專門寫一篇Tomcat容器的工作原理。
?
參考文章:
《Tomcat內(nèi)核設(shè)計剖析》
深度解讀Tomcat中的NIO模型
轉(zhuǎn)載于:https://www.cnblogs.com/z941030/p/8796479.html
總結(jié)
以上是生活随笔為你收集整理的Tomcat NIO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: JavaScript常用内置对象之Arr
- 下一篇: JS--对象
