OkHttpClient源码分析(五)—— ConnectInterceptor和CallServerInterceptor
上一篇我們介紹了緩存攔截器CacheInterceptor,本篇將介紹剩下的兩個攔截器: ConnectInterceptor和CallServerInterceptor
ConnectInterceptor
該攔截器主要是負責建立可用的鏈接,主要作用是打開了與服務器的鏈接,正式開啟了網絡請求。 查看其intercept()方法:
@Override public Response intercept(Chain chain) throws IOException {RealInterceptorChain realChain = (RealInterceptorChain) chain;//從攔截器鏈中獲取StreamAllocation對象Request request = realChain.request();StreamAllocation streamAllocation = realChain.streamAllocation();//創建HttpCodec對象HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//獲取realConnetionRealConnection connection = streamAllocation.connection();//執行下一個攔截器,返回responsereturn realChain.proceed(request, streamAllocation, httpCodec, connection);} 復制代碼可以看到intercept中的處理很簡單,主要有以下幾步操作:
從攔截器鏈中獲取StreamAllocation對象,在講解第一個攔截器RetryAndFollowUpInterceptor的時候,我們已經初步了解了StreamAllocation對象,在RetryAndFollowUpInterceptor中僅僅只是創建了StreamAllocation對象,并沒有進行使用,到了ConnectInterceptor中,StreamAllocation才被真正使用到,該攔截器的主要功能都交給了StreamAllocation處理;
執行StreamAllocation對象的 newStream() 方法創建HttpCodec,用于處理編碼Request和解碼Response;
接著通過調用StreamAllocation對象的 connection() 方法獲取到RealConnection對象,這個RealConnection對象是用來進行實際的網絡IO傳輸的。
調用攔截器鏈的**proceed()**方法,執行下一個攔截器返回response對象。
上面我們已經了解了ConnectInterceptor攔截器的intercept()方法的整體流程,主要的邏輯是在StreamAllocation對象中,我們先看下它的 newStream() 方法:
public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {...try {//創建RealConnection對象RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);//創建HttpCodec對象HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);synchronized (connectionPool) {codec = resultCodec;//返回HttpCodec對象return resultCodec;}} catch (IOException e) {throw new RouteException(e);}} 復制代碼newStream()方法中,主要是創建了RealConnection對象(用于進行實際的網絡IO傳輸)和HttpCodec對象(用于處理編碼Request和解碼Response),并將HttpCodec對象返回。
findHealthyConnection()方法用于創建RealConnection對象:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)throws IOException {while (true) {//while循環//獲取RealConnection對象RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,connectionRetryEnabled);//同步代碼塊判斷RealConnection對象的successCount是否為0synchronized (connectionPool) {if (candidate.successCount == 0) {//如果為0則返回return candidate;}}//對鏈接池中不健康的鏈接做銷毀處理if (!candidate.isHealthy(doExtensiveHealthChecks)) {noNewStreams();continue;}return candidate;}} 復制代碼以上代碼主要做的事情有:
我們看下findConnection()方法做了哪些操作:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,boolean connectionRetryEnabled) throws IOException {...RealConnection result = null;...synchronized (connectionPool) {...releasedConnection = this.connection;toClose = releaseIfNoNewStreams();if (this.connection != null) {//如果不為 null,則復用,賦值給 resultresult = this.connection;releasedConnection = null;}...//如果result為 null,說明上面找不到可以復用的if (result == null) {//從連接池中獲取,調用其get()方法Internal.instance.get(connectionPool, address, this, null);if (connection != null) {//找到對應的 RealConnection對象//更改標志位,賦值給 resultfoundPooledConnection = true;result = connection;} else {selectedRoute = route;}}}...if (result != null) {//已經找到 RealConnection對象,直接返回return result;}...//連接池中找不到,new一個result = new RealConnection(connectionPool, selectedRoute);......//發起請求result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);...//存進連接池中,調用其put()方法Internal.instance.put(connectionPool, result);...return result;} 復制代碼以上代碼主要做的事情有:
ConnectionPool 連接池介紹
剛才我們說到從連接池中取出RealConnection對象時調用了Internal的get()方法,存進去的時候調用了其put()方法。其中Internal是一個抽象類,里面定義了一個靜態變量instance:
public abstract class Internal {...public static Internal instance;... } 復制代碼instance的實例化是在OkHttpClient的靜態代碼塊中:
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {...static {Internal.instance = new Internal() {...@Override public RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {return pool.get(address, streamAllocation, route);}...@Override public void put(ConnectionPool pool, RealConnection connection) {pool.put(connection);}};}... } 復制代碼這里我們可以看到實際上 Internal 的 get()方法和put()方法是調用了 ConnectionPool 的get()方法和put()方法,這里我們簡單看下ConnectionPool的這兩個方法:
private final Deque<RealConnection> connections = new ArrayDeque<>();@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {assert (Thread.holdsLock(this));for (RealConnection connection : connections) {if (connection.isEligible(address, route)) {streamAllocation.acquire(connection, true);return connection;}}return null;} 復制代碼在get()方法中,通過遍歷connections(用于存放RealConnection的ArrayDeque隊列),調用RealConnection的isEligible()方法判斷其是否可用,如果可用就會調用streamAllocation的acquire()方法,并返回connection。
我們看下調用StreamAllocation的acquire()方法到底做了什么操作:
public void acquire(RealConnection connection, boolean reportedAcquired) {assert (Thread.holdsLock(connectionPool));if (this.connection != null) throw new IllegalStateException();//賦值給全局變量this.connection = connection;this.reportedAcquired = reportedAcquired;//創建StreamAllocationReference對象并添加到allocations集合中connection.allocations.add(new StreamAllocationReference(this, callStackTrace));} 復制代碼先是從連接池中獲取的RealConnection對象賦值給StreamAllocation的成員變量connection;
創建StreamAllocationReference對象(StreamAllocation對象的弱引用), 并添加到RealConnection的allocations集合中,到時可以通過allocations集合的大小來判斷網絡連接次數是否超過OkHttp指定的連接次數。
接著我們查看ConnectionPool 的put()方法:
void put(RealConnection connection) {assert (Thread.holdsLock(this));if (!cleanupRunning) {cleanupRunning = true;executor.execute(cleanupRunnable);}connections.add(connection);} 復制代碼put()方法在將連接添加到連接池之前,會先執行清理任務,通過判斷cleanupRunning是否在執行,如果當前清理任務沒有執行,則更改cleanupRunning標識,并執行清理任務cleanupRunnable。
我們看下清理任務cleanupRunnable中到底做了哪些操作:
private final Runnable cleanupRunnable = new Runnable() {@Override public void run() {while (true) {//對連接池進行清理,返回進行下次清理的間隔時間。long waitNanos = cleanup(System.nanoTime());if (waitNanos == -1) return;if (waitNanos > 0) {long waitMillis = waitNanos / 1000000L;waitNanos -= (waitMillis * 1000000L);synchronized (ConnectionPool.this) {try {//進行等待ConnectionPool.this.wait(waitMillis, (int) waitNanos);} catch (InterruptedException ignored) {}}}}}}; 復制代碼可以看到run()方法里面是一個while死循環,其中調用了cleanup()方法進行清理操作,同時會返回進行下次清理的間隔時間,如果返回的時間間隔為-1,則會結束循環,如果不是-1,則會調用wait()方法進行等待,等待完成后又會繼續循環執行,具體的清理操作在cleanup()方法中:
long cleanup(long now) {//正在使用的連接數int inUseConnectionCount = 0;//空閑的連接數int idleConnectionCount = 0;//空閑時間最長的連接RealConnection longestIdleConnection = null;//最大的空閑時間,初始化為 Long 的最小值,用于記錄所有空閑連接中空閑最久的時間long longestIdleDurationNs = Long.MIN_VALUE;synchronized (this) {//for循環遍歷connections隊列for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {RealConnection connection = i.next();//如果遍歷到的連接正在使用,則跳過,continue繼續遍歷下一個if (pruneAndGetAllocationCount(connection, now) > 0) {inUseConnectionCount++;continue;}//當前連接處于空閑,空閑連接數++idleConnectionCount++;//計算空閑時間long idleDurationNs = now - connection.idleAtNanos;//空閑時間如果超過最大空閑時間if (idleDurationNs > longestIdleDurationNs) {//重新賦值最大空閑時間longestIdleDurationNs = idleDurationNs;//賦值空閑最久的連接longestIdleConnection = connection;}}if (longestIdleDurationNs >= this.keepAliveDurationNs|| idleConnectionCount > this.maxIdleConnections) {//如果最大空閑時間超過空閑保活時間或空閑連接數超過最大空閑連接數限制//則移除該連接connections.remove(longestIdleConnection);} else if (idleConnectionCount > 0) {//如果存在空閑連接//計算出線程清理的時間即(保活時間-最大空閑時間),并返回return keepAliveDurationNs - longestIdleDurationNs;} else if (inUseConnectionCount > 0) {//沒有空閑連接,返回keepAliveDurationNsreturn keepAliveDurationNs;} else {//連接池中沒有連接存在,返回-1cleanupRunning = false;return -1;}}//關閉空閑時間最長的連接closeQuietly(longestIdleConnection.socket());return 0;} 復制代碼cleanup()方法通過for循環遍歷connections隊列,記錄最大空閑時間和空閑時間最長的連接;如果存在超過空閑保活時間或空閑連接數超過最大空閑連接數限制的連接,則從connections中移除,最后執行關閉該連接的操作。
主要是通過pruneAndGetAllocationCount()方法判斷連接是否處于空閑狀態:
private int pruneAndGetAllocationCount(RealConnection connection, long now) {List<Reference<StreamAllocation>> references = connection.allocations;for (int i = 0; i < references.size(); ) {Reference<StreamAllocation> reference = references.get(i);if (reference.get() != null) {i++;continue;}...references.remove(i);connection.noNewStreams = true;...if (references.isEmpty()) {connection.idleAtNanos = now - keepAliveDurationNs;return 0;}}return references.size();} 復制代碼該方法通過for循環遍歷RealConnection的allocations集合,如果當前遍歷到的StreamAllocation被使用就遍歷下一個,否則就將其移除,如果移除后列表為空,則返回0,所以如果方法的返回值為0則說明當前連接處于空閑狀態,如果返回值大于0則說明連接正在使用。
CallServerInterceptor
接下來講解最后一個攔截器CallServerInterceptor了,查看intercept()方法:
@Override public Response intercept(Chain chain) throws IOException {RealInterceptorChain realChain = (RealInterceptorChain) chain;//相關對象的獲取 HttpCodec httpCodec = realChain.httpStream();StreamAllocation streamAllocation = realChain.streamAllocation();RealConnection connection = (RealConnection) realChain.connection();Request request = realChain.request();...//寫入請求頭httpCodec.writeRequestHeaders(request);Response.Builder responseBuilder = null;if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {//判斷是否有請求體if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {//詢問服務器是否愿意接收請求體httpCodec.flushRequest();//刷新請求realChain.eventListener().responseHeadersStart(realChain.call());responseBuilder = httpCodec.readResponseHeaders(true);}if (responseBuilder == null) {//服務器愿意接收請求體//寫入請求體...} else if (!connection.isMultiplexed()) {streamAllocation.noNewStreams();}}//結束請求httpCodec.finishRequest();if (responseBuilder == null) {realChain.eventListener().responseHeadersStart(realChain.call());//根據服務器返回的數據構建 responseBuilder對象responseBuilder = httpCodec.readResponseHeaders(false);}//構建 response對象Response response = responseBuilder.request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();...//設置 response的 bodyresponse = response.newBuilder().body(httpCodec.openResponseBody(response)).build();//如果請求頭中 Connection對應的值為 close,則關閉連接if ("close".equalsIgnoreCase(response.request().header("Connection"))|| "close".equalsIgnoreCase(response.header("Connection"))) {streamAllocation.noNewStreams();}...return response;} 復制代碼以上代碼具體的流程:
??好了,到這里OkHttpClient源碼分析就結束了,相信看完本套源碼解析會加深你對OkHttpClient的認識,同時也學到了其巧妙的代碼設計思路,在閱讀源碼的過程中,我們的編碼能力也逐步提升,如果想要寫更加優質的代碼,閱讀源碼是一件很有幫助的事。
總結
以上是生活随笔為你收集整理的OkHttpClient源码分析(五)—— ConnectInterceptor和CallServerInterceptor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: easyui datagrid 表头固定
- 下一篇: laravel Collection m