前言
網(wǎng)站設(shè)計(jì)制作過(guò)程拒絕使用模板建站;使用PHP+MYSQL原生開(kāi)發(fā)可交付網(wǎng)站源代碼;符合網(wǎng)站優(yōu)化排名的后臺(tái)管理系統(tǒng);成都做網(wǎng)站、網(wǎng)站建設(shè)收費(fèi)合理;免費(fèi)進(jìn)行網(wǎng)站備案等企業(yè)網(wǎng)站建設(shè)一條龍服務(wù).我們是一家持續(xù)穩(wěn)定運(yùn)營(yíng)了10年的成都創(chuàng)新互聯(lián)公司網(wǎng)站建設(shè)公司。
HTTP是我們交換數(shù)據(jù)和媒體流的現(xiàn)代應(yīng)用網(wǎng)絡(luò),有效利用HTTP可以使我們節(jié)省帶寬和更快地加載數(shù)據(jù),Square公司開(kāi)源的OkHttp網(wǎng)絡(luò)請(qǐng)求是有效率的HTTP客戶(hù)端。之前的知識(shí)面僅限于框架API的調(diào)用,接觸到實(shí)際的工作之后深知自己知識(shí)的不足,故而深挖框架源碼盡力吸取前輩的設(shè)計(jì)經(jīng)驗(yàn)。關(guān)于此框架的源碼解析網(wǎng)上的教程多不勝數(shù),此文名為源碼解析,實(shí)則是炒冷飯之作,如有錯(cuò)誤和不足之處還望各位看官指出。
攔截器
攔截器是OkHttp框架設(shè)計(jì)的精髓所在,攔截器所定義的是Request的所通過(guò)的責(zé)任鏈而不管Request的具體執(zhí)行過(guò)程,并且可以讓開(kāi)發(fā)人員自定義自己的攔截器功能并且插入到責(zé)任鏈中
用戶(hù)自定義的攔截器位于 OkHttpClient.addInterceptor() 添加到interceptors責(zé)任鏈中
RealCall.execute()執(zhí)行的時(shí)候調(diào)用RealCall.getResponseWithInterceptorChain()將 來(lái)自 OkHttpClient的interceptors以及默認(rèn)的攔截器一并加入到RealInterceptorChain責(zé)任鏈中并調(diào)用, 代碼并沒(méi)有對(duì)originalRequest進(jìn)行封裝, InterceptorChain和originalRequest一并流轉(zhuǎn)到 RealInterceptorChain類(lèi)中處理
CustomInterceptor RetryAndFollowUpInterceptor BridgeInterceptor CacheInterceptor ConnectInterceptor NetworkInterceptors CallServiceInterceptor
RealInterceptorChain.proceed()
EventListener.callStart()也是在RealCall.execute()嵌入到Request調(diào)用過(guò)程, EventListener.callEnd()位于StreamAllocation中調(diào)用
Request.Builder
url (String/URL/HttpUrl)
header
CacheControl
Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)
BridgeInterceptor
Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.
此攔截器是應(yīng)用碼到網(wǎng)絡(luò)碼的橋接。它會(huì)將用戶(hù)請(qǐng)求封裝成一個(gè)網(wǎng)絡(luò)請(qǐng)求并且執(zhí)行請(qǐng)求,同時(shí)它還完成從網(wǎng)絡(luò)響應(yīng)到用戶(hù)響應(yīng)的轉(zhuǎn)化. 最后Chain.proceed() 方法啟動(dòng)攔截器責(zé)任鏈, RealInterceptorChain中通過(guò)遞歸調(diào)用將網(wǎng)絡(luò)請(qǐng)求以及響應(yīng)的任務(wù)分別分配到各個(gè)攔截器中, 然后通過(guò)ResponseBuilder.build()方法將網(wǎng)絡(luò)響應(yīng)封裝, 然后遞歸調(diào)用責(zé)任鏈模式使得調(diào)用以及Response處理的過(guò)程可以一并寫(xiě)入BridgeInterceptor中
public?final?class?RealInterceptorChain?implements?Interceptor.Chain?{?public?Response?proceed(Request?request,?StreamAllocation?streamAllocation,? ?HttpCodec?httpCodec,?RealConnection?connection)?throws?IOException?{?if?(index?>=?interceptors.size())?throw?new?AssertionError(); ?calls++; ?...?//?Call?the?next?interceptor?in?the?chain. ?RealInterceptorChain?next?=?new?RealInterceptorChain(interceptors,? ?streamAllocation,?httpCodec,connection,?index?+?1,?request,?call,? ?eventListener,?connectTimeout,?readTimeout,writeTimeout); ?Interceptor?interceptor?=?interceptors.get(index); ?Response?response?=?interceptor.intercept(next); ?...?return?response; ?} }
CallServiceInterceptor;
Interceptor的邏輯均在intercept()方法中實(shí)現(xiàn), 在通過(guò)Chain實(shí)體類(lèi)獲取到請(qǐng)求主題之后,通過(guò)BufferedSink接口將請(qǐng)求轉(zhuǎn)發(fā)到Okio接口,在攔截過(guò)程中通過(guò)EventListener接口將攔截器處理狀態(tài)(主要是RequestBodyStart和RequestBodyEnd兩個(gè)狀態(tài))發(fā)送出去
public?final?class?CallServiceInterceptor?implements?Interceptor?{?@Override?public?Response?intercept(Chain?chain)?throws?IOException?{ ?Response.Builder?responseBuilder?=?null;?if?(HttpMethod.permitsRequestBody(request.method())?&&?request.body()?!=?null)?{?//?If?there's?a?"Expect:?100-continue"?header?on?the?request,?wait?for?a?"HTTP/1.1?100 ?//?Continue"?response?before?transmitting?the?request?body.?If?we?don't?get?that,?return ?//?what?we?did?get?(such?as?a?4xx?response)?without?ever?transmitting?the?request?body. ?if?("100-continue".equalsIgnoreCase(request.header("Expect")))?{ ?httpCodec.flushRequest(); ?realChain.eventListener().responseHeadersStart(realChain.call()); ?responseBuilder?=?httpCodec.readResponseHeaders(true); ?}?if?(responseBuilder?==?null)?{?//?Write?the?request?body?if?the?"Expect:?100-continue"?expectation?was?met. ?realChain.eventListener().requestBodyStart(realChain.call());?long?contentLength?=?request.body().contentLength(); ?CountingSink?requestBodyOut?=?new?CountingSink(httpCodec.createRequestBody(request,?contentLength)); ?BufferedSink?bufferedRequestBody?=?Okio.buffer(requestBodyOut); ?request.body().writeTo(bufferedRequestBody); ?bufferedRequestBody.close(); ?realChain.eventListener() ?.requestBodyEnd(realChain.call(),?requestBodyOut.successfulCount); ?}?else?if?(!connection.isMultiplexed())?{?//?If?the?"Expect:?100-continue"?expectation?wasn't?met,?prevent?the?HTTP/1?connection ?//?from?being?reused.?Otherwise?we're?still?obligated?to?transmit?the?request?body?to ?//?leave?the?connection?in?a?consistent?state. ?streamAllocation.noNewStreams(); ?} ?} ?} }
CacheInterceptor;
public?final?class?CacheInterceptor?implements?Interceptor?{?@Override?public?Response?intercept(Chain?chain)?throws?IOException?{ ?CacheStrategy?strategy?=?new?CacheStrategy.Factory(now,?chain.request(),?cacheCandidate).get(); ?Request?networkRequest?=?strategy.networkRequest; ?Response?cacheResponse?=?strategy.cacheResponse;?if?(cache?!=?null)?{?/** ?*?Track?an?HTTP?response?being?satisfied?with?{@code?cacheStrategy}. ?*?主要是跟蹤networkRequest次數(shù)以及對(duì)應(yīng)Cache的hitcount ?*/ ?cache.trackResponse(strategy); ?}?if?(cacheCandidate?!=?null?&&?cacheResponse?==?null)?{ ?closeQuietly(cacheCandidate.body());?//?The?cache?candidate?wasn't?applicable.?Close?it. ?}?//?If?we're?forbidden?from?using?the?network?and?the?cache?is?insufficient,?fail. ?if?(networkRequest?==?null?&&?cacheResponse?==?null)?{?return?new?Response.Builder() ?.request(chain.request()) ?.protocol(Protocol.HTTP_1_1) ?.code(504) ?.message("Unsatisfiable?Request?(only-if-cached)") ?.body(Util.EMPTY_RESPONSE) ?.sentRequestAtMillis(-1L) ?.receivedResponseAtMillis(System.currentTimeMillis()) ?.build(); ?}?//?If?we?don't?need?the?network,?we're?done. ?if?(networkRequest?==?null)?{?return?cacheResponse.newBuilder() ?.cacheResponse(stripBody(cacheResponse)) ?.build(); ?}?//在chain.proceed()調(diào)用下一個(gè)攔截器 ?Response?networkResponse?=?null;?try?{ ?networkResponse?=?chain.proceed(networkRequest); ?}?finally?{?//?If?we're?crashing?on?I/O?or?otherwise,?don't?leak?the?cache?body. ?if?(networkResponse?==?null?&&?cacheCandidate?!=?null)?{ ?closeQuietly(cacheCandidate.body()); ?} ?}?//處理response并返回 ?...?return?response; ?} }
OkHttpClient
OkHttpClient托管著所有HTTP調(diào)用, 每個(gè)Client均擁有自己的連接池和線(xiàn)程池
實(shí)現(xiàn)抽象類(lèi)Internal的方法,這是Internel抽象類(lèi)唯一的實(shí)現(xiàn),方法與CacheInterceptor控制Http的Header.Lenient區(qū)域和StreamAlloction從連接池中獲取連接有關(guān)
private?RealConnection?findConnection(int?connectTimeout,?int?readTimeout,?int?writeTimeout,?int?pingIntervalMillis,?boolean?connectionRetryEnabled)?throws?IOException?{ ?...?synchronized?(connectionPool)?{ ?...?if?(result?==?null)?{?//?Attempt?to?get?a?connection?from?the?pool. ?Internal.instance.get(connectionPool,?address,?this,?null);?if?(connection?!=?null)?{ ?foundPooledConnection?=?true; ?result?=?connection; ?}?else?{ ?selectedRoute?=?route; ?} ?} ?}?return?result;
RouteDatabase && RouteSeletor
RouteDatabase是記錄連接失敗的連接路徑的黑名單,從而OkHttp可以從失敗中學(xué)習(xí)并且傾向于選擇其他可用的路徑,RouteSeletor通過(guò)RouteDatabase.shouldPostpone(route)方法可獲知此路徑是否近期曾連接失敗,RouteSelector部分源碼如下:
public?final?class?RouteSelector?{?/** ?*?Clients?should?invoke?this?method?when?they?encounter?a?connectivity?failure?on?a?connection ?*?returned?by?this?route?selector. ?*?在StreamAllocation.streamFailed()中添加了routeSelector.connectFailed()邏輯 ?*/ ?public?void?connectFailed(Route?failedRoute,?IOException?failure)?{?if?(failedRoute.proxy().type()?!=?Proxy.Type.DIRECT?&&?address.proxySelector()?!=?null)?{?//?Tell?the?proxy?selector?when?we?fail?to?connect?on?a?fresh?connection. ?address.proxySelector().connectFailed( ?address.url().uri(),?failedRoute.proxy().address(),?failure); ?} ?routeDatabase.failed(failedRoute); ?} }
synchronized?void?enqueue(AsyncCall?call)?{?if?(runningAsyncCalls.size()?ExecutorSevice.execute(AsyncCall)執(zhí)行代碼位于AsyncCall內(nèi)部復(fù)寫(xiě)的execute()方法, 方法內(nèi)定義一些Callback回調(diào)節(jié)點(diǎn)運(yùn)行邏輯,包括用戶(hù)主動(dòng)取消執(zhí)行(使用retryAndFollowUpInterceptor)以及執(zhí)行請(qǐng)求成功或者失敗時(shí)的回調(diào)方法
final?class?AsyncCall?extends?NamedRunnable?{ ?...?@Override?protected?void?execute()?{?boolean?signalledCallback?=?false;?try?{ ?Response?response?=?getResponseWithInterceptorChain();?if?(retryAndFollowUpInterceptor.isCanceled())?{ ?signalledCallback?=?true; ?responseCallback.onFailure(RealCall.this,?new?IOException("Canceled")); ?}?else?{ ?signalledCallback?=?true; ?responseCallback.onResponse(RealCall.this,?response); ?} ?}?catch?(IOException?e)?{?if?(signalledCallback)?{?//?Do?not?signal?the?callback?twice! ?Platform.get().log(INFO,?"Callback?failure?for?"?+?toLoggableString(),?e); ?}?else?{ ?eventListener.callFailed(RealCall.this,?e); ?responseCallback.onFailure(RealCall.this,?e); ?} ?}?finally?{ ?client.dispatcher().finished(this); ?} ?} ?}
惰性初始模式(Created Lazily)成員
ExecutorService()
CacheControl
WebSocket
WebSocket 異步非堵塞的web socket接口 (通過(guò)Enqueue方法來(lái)實(shí)現(xiàn))
OkHttpClient 通過(guò)實(shí)現(xiàn) WebSocket.Factory.newWebSocket 接口實(shí)現(xiàn)工廠(chǎng)構(gòu)造, 通常是由 OkHttpClient來(lái)構(gòu)造
WebSocket生命周期:
Connecting狀態(tài): 每個(gè)websocket的初始狀態(tài), 此時(shí)Message可能位于入隊(duì)狀態(tài)但是還沒(méi)有被Dispatcher處理
Open狀態(tài): WebSocket已經(jīng)被服務(wù)器端接受并且Socket位于完全開(kāi)放狀態(tài), 所有Message入隊(duì)之后會(huì)即刻被處理
Closing狀態(tài): WebSocket進(jìn)入優(yōu)雅的關(guān)閉狀態(tài),WebSocket繼續(xù)處理已入隊(duì)的Message但拒絕新的Message入隊(duì)
Closed狀態(tài): WebSocket已完成收發(fā)Message的過(guò)程, 進(jìn)入完全關(guān)閉狀態(tài)
WebSocket受到網(wǎng)絡(luò)等各種因素影響, 可能會(huì)斷路而提前進(jìn)入關(guān)閉流程
Canceled狀態(tài): 被動(dòng)WebSocket失敗連接為非優(yōu)雅的過(guò)程, 而主動(dòng)則是優(yōu)雅短路過(guò)程
RealWebSocket
RealWebSocket管理著Request隊(duì)列內(nèi)容所占的空間大小以及關(guān)閉Socket之后留給優(yōu)雅關(guān)閉的時(shí)間,默認(rèn)為16M和60秒,在RealWebSocket.connect()方法中RealWebSocket對(duì)OkHttpClient以及Request封裝成Call的形式,然后通過(guò)Call.enqueue()方法定義調(diào)用成功和失敗時(shí)的Callback代碼
public?void?connect(OkHttpClient?client)?{ ?client?=?client.newBuilder() ?.eventListener(EventListener.NONE) ?.protocols(ONLY_HTTP1) ?.build();?final?Request?request?=?originalRequest.newBuilder() ?.header("Upgrade",?"websocket") ?.header("Connection",?"Upgrade") ?.header("Sec-WebSocket-Key",?key) ?.header("Sec-WebSocket-Version",?"13") ?.build(); ?call?=?Internal.instance.newWebSocketCall(client,?request); ?call.enqueue(new?Callback()?{?@Override?public?void?onResponse(Call?call,?Response?response)?{?try?{ ?checkResponse(response); ?}?catch?(ProtocolException?e)?{ ?failWebSocket(e,?response); ?closeQuietly(response);?return; ?}?//?Promote?the?HTTP?streams?into?web?socket?streams. ?StreamAllocation?streamAllocation?=?Internal.instance.streamAllocation(call); ?streamAllocation.noNewStreams();?//?Prevent?connection?pooling! ?Streams?streams?=?streamAllocation.connection().newWebSocketStreams(streamAllocation);?//?Process?all?web?socket?messages. ?try?{ ?listener.onOpen(RealWebSocket.this,?response); ?String?name?=?"OkHttp?WebSocket?"?+?request.url().redact(); ?initReaderAndWriter(name,?streams); ?streamAllocation.connection().socket().setSoTimeout(0); ?loopReader(); ?}?catch?(Exception?e)?{ ?failWebSocket(e,?null); ?} ?}?@Override?public?void?onFailure(Call?call,?IOException?e)?{ ?failWebSocket(e,?null); ?} ?}); ?}
當(dāng)Call請(qǐng)求被服務(wù)端響應(yīng)的時(shí)候就將HTTP流導(dǎo)入到Web Socket流中,并且調(diào)用WebSocketListener相對(duì)應(yīng)的狀態(tài)方法, WebSocketListener狀態(tài)如下:
onOpen()onMessage()onClosing()onClosed()onFailure()
WebSocket -> RealWebSocket
Connection -> RealConnection
Interceptor -> RealInterceptorChain
Call -> RealCall
ResponseBody -> RealResponseBody
Gzip壓縮機(jī)制
處理Gzip壓縮的代碼在BridgeInterceptor中,默認(rèn)情況下為gzip壓縮狀態(tài),可以從下面的源碼片段中獲知。如果header中沒(méi)有Accept-Encoding,默認(rèn)自動(dòng)添加 ,且標(biāo)記變量transparentGzip為true
//?If?we?add?an?"Accept-Encoding:?gzip"?header?field?we're?responsible?for?also?decompressing ?//?the?transfer?stream. ?boolean?transparentGzip?=?false;?if?(userRequest.header("Accept-Encoding")?==?null?&&?userRequest.header("Range")?==?null)?{ ?transparentGzip?=?true; ?requestBuilder.header("Accept-Encoding",?"gzip"); ?}
BridgeInterceptor解壓縮的過(guò)程調(diào)用了okio.GzipSource()方法并調(diào)用Okio.buffer()緩存解壓過(guò)程,源碼如下
if?(transparentGzip ?&&?"gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) ?&&?HttpHeaders.hasBody(networkResponse))?{ ?GzipSource?responseBody?=?new?GzipSource(networkResponse.body().source()); ?Headers?strippedHeaders?=?networkResponse.headers().newBuilder() ?.removeAll("Content-Encoding") ?.removeAll("Content-Length") ?.build(); ?responseBuilder.headers(strippedHeaders);?String?contentType?=?networkResponse.header("Content-Type"); ?responseBuilder.body(new?RealResponseBody(contentType,?-1L,?Okio.buffer(responseBody))); ?}
RealCall構(gòu)造方法
在RealCall構(gòu)造方法上面,早期版本的RealCall構(gòu)造方法中將EventListener.Factory以及EventListenerFactory.Create()分開(kāi)處理導(dǎo)致RealCall構(gòu)造方法非線(xiàn)程安全. 現(xiàn)在版本的RealCall的構(gòu)造函數(shù)使用OkHttpClient.eventListenerFactory().create()
早期版本如下:
final?class?RealCall?implements?Call?{ ?RealCall(OkHttpClient?client,?Request?originalRequest,?boolean?forWebSocket)?{ ?...?final?EventListener.Factory?eventListenerFactory?=?client.eventListenerFactory();?this.client?=?client;?this.originalRequest?=?originalRequest;?this.forWebSocket?=?forWebSocket;?//重試和跟進(jìn)攔截器 ?this.retryAndFollowUpInterceptor?=?new?RetryAndFollowUpInterceptor(client,?forWebSocket);?//?TODO(jwilson):?this?is?unsafe?publication?and?not?threadsafe.? ?//?這是不安全的發(fā)布,不是線(xiàn)程安全的。 ?this.eventListener?=?eventListenerFactory.create(this); ?} }
現(xiàn)在 OkHttp 3.11.0 的RealCall源代碼如下
final?class?RealCall?implements?Call?{?private?EventListener?eventListener; ?...?private?RealCall(OkHttpClient?client,?Request?originalRequest,?boolean?forWebSocket)?{?this.client?=?client;?this.originalRequest?=?originalRequest;?this.forWebSocket?=?forWebSocket;?this.retryAndFollowUpInterceptor?=?new?RetryAndFollowUpInterceptor(client,?forWebSocket); ?}?static?RealCall?newRealCall(OkHttpClient?client,?Request?originalRequest,?boolean?forWebSocket)?{?//?Safely?publish?the?Call?instance?to?the?EventListener. ?RealCall?call?=?new?RealCall(client,?originalRequest,?forWebSocket); ?call.eventListener?=?client.eventListenerFactory().create(call);?return?call; ?} }
ConnetionPool
連接池能夠復(fù)用http連接從而減少訪(fǎng)問(wèn)相同目標(biāo)主機(jī)情況下的網(wǎng)絡(luò)延遲,此類(lèi)實(shí)現(xiàn)管理連接開(kāi)閉的策略并使用與連接池一一對(duì)應(yīng)的后臺(tái)線(xiàn)程清理過(guò)期的連接。ConnectionPool提供對(duì)Deque
public?final?class?ConnectionPool?{ ?...?private?static?final?Executor?executor?=?new?ThreadPoolExecutor(0?/*?corePoolSize?*/, ?Integer.MAX_VALUE?/*?maximumPoolSize?*/,?60L?/*?keepAliveTime?*/,?TimeUnit.SECONDS,?new?SynchronousQueue(),?Util.threadFactory("OkHttp?ConnectionPool",?true));?/**?The?maximum?number?of?idle?connections?for?each?address.?*/ ?private?final?int?maxIdleConnections;?private?final?long?keepAliveDurationNs;?private?final?Runnable?cleanupRunnable?=?new?Runnable()?{?@Override?public?void?run()?{?while?(true)?{?long?waitNanos?=?cleanup(System.nanoTime());?if?(waitNanos?==?-1)?return;?if?(waitNanos?>?0)?{?long?waitMillis?=?waitNanos?/?1000000L; ?waitNanos?-=?(waitMillis?*?1000000L);?synchronized?(ConnectionPool.this)?{?try?{ ?ConnectionPool.this.wait(waitMillis,?(int)?waitNanos); ?}?catch?(InterruptedException?ignored)?{ ?} ?} ?} ?} ?} ?}; ?... }
cleanUpRunnable里面是一個(gè)while(true),一個(gè)循環(huán)包括:
調(diào)用一次cleanUp方法進(jìn)行清理并返回一個(gè)long
如果是-1則退出,否則調(diào)用wait方法等待這個(gè)long值的時(shí)間
okhttp是根據(jù)StreamAllocation引用計(jì)數(shù)是否為0來(lái)實(shí)現(xiàn)自動(dòng)回收連接的。cleanUpRunnable遍歷每一個(gè)RealConnection,通過(guò)引用數(shù)目確定哪些是空閑的,哪些是在使用中,同時(shí)找到空閑時(shí)間最長(zhǎng)的RealConnection。如果空閑數(shù)目超過(guò)最大空閑數(shù)或者空閑時(shí)間超過(guò)最大空閑時(shí)間,則清理掉這個(gè)RealConnection并返回0,表示需要立刻再次清理
public?final?class?ConnectionPool?{ ?...?void?put(RealConnection?connection)?{?assert?(Thread.holdsLock(this));?if?(!cleanupRunning)?{ ?cleanupRunning?=?true; ?executor.execute(cleanupRunnable); ?} ?connections.add(connection); ?} ?... }
我們?cè)趐ut操作前首先要調(diào)用executor.execute(cleanupRunnable)來(lái)清理閑置的線(xiàn)程。
RealConnection
RealConnection是socket物理連接的包裝,它里面維護(hù)了List