OkHttp 源码分析

摘要

本文主要是分析Android主流网络请求框架OkHttp源码,看看每天都在打交道的伙计到底在干什么。处理了网络中最根本的HTTP问题,如对HTTP请求报文和响应报文的处理。允许自定义拦截器实现特殊的需求,如日志打印,增加请求头等。默认六个拦截器处理网络请求,缓存复用,连接复用,数据转化,重试恢复机制。

请求

OkHttp的异步请求主要了调用Call对象的enqueue函数,而Call接口唯一的实现类是RealCall。所以异步请求主要是调用了RealCallenqueue函数,而同步请求则是RealCall对象的execute函数。

RealCall的enqueue与execute函数

下面先分析异步请求:

1
2
3
4
5
6
7
//RealCall
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }

callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

RealCall调用Dispatcher的enqueue函数

1
2
3
4
5
6
7
8
9
10
11
//Dispatcher
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}

readyAsyncCalls是一个队列,用于保存即将执行的AsyncCall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()

val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//maxRequests默认最大网络请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//maxRequestsPerHost默认最大目的主机请求数5
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0//待运行+正在运行主机数
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//调用AsyncCall
asyncCall.executeOn(executorService)
}

return isRunning
}

runningAsyncCalls 是一个队列,保存正在执行AsyncCallpromoteAndExecute()函数主要将readyAsyncCalls队列中待运行的请求添加到runningAsyncCalls队列中,并调用其executeOn函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()

var success = false
try {
//将AsyncCall添加到线程池中
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}

executeOn函数主要是将AsyncCall添加到线程池中。此时看看AsyncCallrun函数如何实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//在这里返回了响应数据
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}

run函数,通过getResponseWithInterceptorChain()函数返回了响应数据,可见在该函数里面进行了网络请求。

再回到同步请求中,即调用realCallexecute函数。

1
2
3
4
5
6
7
8
9
10
11
12
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }

timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}

可见,同步请求直接调用了getResponseWithInterceptorChain()函数,而不是交给线程池去执行。因此同步请求 的时候记得切换到子线程执行。

getResponseWithInterceptorChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

getResponseWithInterceptorChain()函数先是将我们自定义的拦截器和默认的拦截器添加到集合中,并创建RealInterceptorChain对象chain,即所谓的拦截链。然后调用chain.proceed(originalRequest)返回响应数据。每一个拦截器都会实现自己特定的功能,并在自己的重载的intercept函数中调用chain.proceed(request)函数去调用下一个拦截器的intercept函数。这里request已经不再是原始的originalRequest,而且经过当前拦截器改造,假如需要的话。先不管每个拦截器分做了什么,大概就知道了拦截链的调用图。

未命名绘图

拦截链

OkHttp中,最重要的就是拦截链中每个拦截器的工作。例如发起网络请求的CallServerInterceptor,网络连接的ConnectInterceptor,缓存策略的CacheInterceptor,桥接模式的BridgeInterceptor,重试恢复的RetryAndFollowUpInterceptor。以及我们自定义的Interceptor,可以用来打印日志等。从它们在拦截链的不同位置,以及所做的事情,可以掌握更好的自定义的Interceptor。

网络请求:CallServerInterceptor

那么,我们假装所有拦截器都会调用下一个拦截器,直到最后的CallServerInterceptor拦截器。作为拦截链最后一个拦截器,只重写了intercept函数,并进行真正的网络请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
 @Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()

exchange.writeRequestHeaders(request)

var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
//除了head+get,其他返回false,因为其他需要请求体
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//客户端请求头携带"Expect: 100-continue"表示告诉服务器,客户端有一个期望值(例如请求体很大),希望得到服务器能否妥善处理该请求,响应报文状态码:100能417不能
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
//responseBuilder=null表示服务器能处理
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//Http 2.0全双工,发送请求,并将请求体写到底层TCP发送缓存
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
//服务器能处理`"Expect: 100-continue"`期望,将请求体写到底层TCP发送缓存
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
//非100-continue情况下,防止HTTP/1.1的使用
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}

if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
//开始获取响应头
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
//服务器响应:Expect:100-continue
if (code == 100) {
//重新读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
//响应头读取结束
exchange.responseHeadersEnd(response)

response = if (forWebSocket && code == 101) {
// 表示服务器支持协议升级,切换到websocket或者HTTP 2。
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response)) //获取响应数据真正地方
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()//关闭连接
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}

整块代码看下来,主要是为了区别下面几种情况:

  • Expect:100-continue: 客户端请求报文携带该请求头字段,表示期望服务器能处理客户端的某个期望,例如大报文。如果能满足该条件,服务器则返回状态码是100的响应报文。

    OKHttp的处理是,如果满足该条件,直接发送请求,开始等待服务器响应,若服务器支持,则直接将请求报文数据写到发送缓存中。

  • 全双工 HTTP2.0:因为目前HTTP中只有2.0版本支持全双工,全双工的过程中,请求报文与响应报文的传输会有交差的情况,HTTP/1.1 半双工不存在这种情况。

    OKHttp的处理是,如果满足该条件,直接发送请求,开始等待服务器响应,若服务器支持,则直接将请求报文数据写到发送缓存中。

  • 一般情况:直接调用exchange.noRequestBody()进行请求。

最后通过根据响应头判断是否需要协议升级,否则通过exchange.openResponseBody(response)获取响应数据部分。

所以大概流程就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//发送请求
exchange.flushRequest()
//写请求数据部分
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
//结束请求
exchange.finishRequest()
//开始读取响应头
exchange.responseHeadersStart()
//读取响应头
exchange.readResponseHeaders(expectContinue = false)!!
//结束读取响应头
exchange.responseHeadersEnd(response)
//读取响应体
exchange.openResponseBody(response)

而在正常获得响应数据后,回调Callback对象的onResponse函数。到此,整个过程也就结束了。

连接复用:ConnectInterceptor

ConnectInterceptor连接拦截器的intercept函数非常简单,但却涉及到OkHttp的一个重要概念,连接复用。众所周知,HTTP是基于运输层的TCP协议(HTTP/3将基于UDP协议),意味着TCP三次握手建立连接和4次挥手断开连接,以及拥塞机制的慢启动阶段,都会影响到APP性能和数据响应速度。通过TCP连接的复用,能有效提高效率和降低时延。

1
2
3
4
5
6
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}

intercept函数中最重要的代码realChain.call.initExchange(chain),创建或复用一条TCP连接进行HTTP请求或响应。这里有条调用链:

1
2
RealCall.initExchange->ExchangeFinder.find->ExchangeFinder.findHealthyConnection
->ExchangeFinder.findConnection

findConnection函数中,查找有用的连接步骤:

  1. 复用当前请求RealCall的连接。如果当前RealCall携带的连接connection不为null,且端口号与主机与当前主机请求一致。判断是否有其他请求call在该connection与服务器交互,没有的情况下通知连接池释放该连接,并返回Socket,并复用该connection
  2. 在连接池中寻找。遍历连接池中所有的connection,通过connection.isEligible来判断是否可以复用该connectionisEligible函数会检查很多东西,例如域名、域名、代理、DNS等等是否一致。甚至HTTP/2的connect的一些要求,例如证书。
  3. 创建新的连接。如果在前两步没有找到合适的connection,只能创建新的connection了。在创建新的连接前,还会在创建路由后,再尝试在连接池中寻找合适的connection。否则会为当前请求call创建新的connection并添加在连接池。

连接池:RealConnectionPool

RealConnectionPool内部有一个ConcurrentLinkedQueue<RealConnection类型的connections,用于持有所有连接。在创建OkHttpClient时,会创建默认的最大空闲连接数为5,存活时间为5分钟的连接池。

缓存策略:CacheInterceptor

缓存拦截器通常情况下只支持GET方法请求的缓存,每次请求都会先根据URL计算出key,然后在DiskLruCache中获取缓存快照(上次请求缓存的数据),假如有的话。然后根据缓存快照和请求报文,生成缓存策略,即根据请求报文和响应报文的首部行中相关字段(Cache-ControlEtag等等)来生成策略。根据不同的缓存策略进行不同的操作。

  1. 通过url的md5+hex获取缓存快照

    通过RequestURL计算出的md5hex值作为key,来获取DiskLruCache中的缓存快照SnapShotSnapshot缓存了请求报文和响应报文的相关数据,例如请求报文的请求行和响应报文的状态行。通过SnapShot构建Entry对象,该对象主要存储了请求报文和响应报文的首部信息。最后通过EntrySnapShot的创建出缓存响应Response

    SnapShot通过snapshot.getSource(ENTRY_METADATA)获取首部元数据信息,和snapshot.getSource(ENTRY_BODY)获取缓存的响应报文数据部分。

  2. CacheStrategy.Factory计算缓存策略

    通过CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()生成的CacheStartegy对象含有两个属性networkRequestcacheResponse,当cacheResponse==null表示不使用缓存,从服务器拉取数据;当networkRequest==null表示强制使用缓存。同时为null,表示不满足only-if-cachedcompute函数主要是根据请求报文和响应报文的请求头来生成缓存策略。

    **cacheResponse==null**情况:

    • 本地没有缓存,即第一步得出的cacheCandidate==nll
    • HTTP中不支持缓存的报文或者请求报文和响应报文首部行设置了Cache-Control:no-store
    • 缓存的响应报文请求头都没设置ETag,Last-Modified,Date字段。

    **networkRequest==null**情况:

    • request设置了Cache-Control:only-if-cached
    • 使用协商缓存,且缓存未过期。
  3. 判断networkRequest==null的情况,表示只使用缓存。

  4. 发起网络请求,若服务器响应304且本地缓存不为null,则使用本地缓存,并更新本地缓存。

  5. 最后将请求报文和响应报文存放到缓存中,仅支持GET缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
 override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
//第一步:通过url的md5+hex获取缓存快照
val cacheCandidate = cache?.get(chain.request())

val now = System.currentTimeMillis()
//计算缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse

cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}

//请求报文使用缓存,即请求头携带`Cache-Control:if-only-cached`
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}

// networkRequest==null表示不使用网络请求
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}

if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}

var networkResponse: Response? = null
try {
//发起网络请求,也就是会调用CallServerInterceptor的intercept函数
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}

//如果有本地缓存,且服务器返回304报文的,则使用响应报文
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

networkResponse.body!!.close()

//更新缓存
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}

val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
//存放缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
//仅支持GET缓存
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

桥接模式:BridgeInterceptor

之所以将它称为桥接模式拦截器,是因为它将应用发起的请求转化成HTTP请求所需的内容,将网络响应数据转化成应用数据。其实也没那么神奇,就是已有的Request,判断请求报文是否添加一些头部信息,没有话就帮我们添加,如:Content-TypeContent-Length等等。

请求报文首部行:

  • Content-Type:如果在ReqeuestBody添加了ContentType,则帮我们添加到header
  • Content-Length:如果ReqeuestBodycontentLength没有设置,则添加Transfer-Encoding:chunked,否则添加到header
  • Transfer-Encoding:chunked,在没有ReqeuestBody情况下才会设置该请求头。
  • Host:域名
  • Connection:Keep-Alive
  • Accept-Encoding: gzip如果帮我们添加了该请求头,还要负责帮我们解压缩。
  • Cookie:XXX 假如有的话
  • User-Agent:okhttp/${OkHttp.VERSION}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()

val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
//Host:域名 请求头
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
//Connection:Keep-Alive HTTP1.1 短连接请求头
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}

val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkResponse = chain.proceed(requestBuilder.build())

cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(userRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}

重试恢复:RetryAndFollowUpInterceptor

该拦截器在客户端允许重试恢复机制下,主要根据网络请求抛出的一些异常类型或者响应报文的状态码,进行重试恢复请求。如果该异常或者状态码不支持恢复,或者重试次数达到了设置次数,则抛出异常,本次客户端发起的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)

var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}

try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}

//while是一个死循环
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}

val exchange = call.interceptorScopedExchange
//判读一些常见的网络错误或者重定向,例如400,然后尝试新建request的恢复连接
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}

val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}

response.body?.closeQuietly()

if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}

request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}

recover函数主要检测发生的异常是否属于可以恢复的类型。如果设置了client.retryOnConnectionFailure=false,直接抛出异常不会重新进行请求。主要是根据一些错误的类型,来判断该错误是否避免,如果不能避免就直接抛异常,可以的话重新进入下次while循环。

自定义拦截器

我们自定的拦截器有两种,一种是普通的应用拦截器,也就是拦截链最开始的地方。第二种是网络拦截器,在拦截链倒数第二,即发起网络前。前者处于拦截链的前端,可以多次调用拦截链后续的拦截器,但拦截不到它们处理的内容。后者只有发起网络才会被拦截到,例如命中缓存的话,并不会调用到。因此在自定义拦截器时,要根据具体的需求实现不同拦截器。

责任链模式

OkHttp的拦截链使用了责任链设计模式,将一个请求交给多个对象处理,解耦了请求的发送者和接收者。将对象构成链,并将请求沿着该链传递,直到某个处理对象终止该请求。例如缓存拦截器命中缓存后终止将请求交付给下个一个处理对象。

每个处理对象根据自己的特性,即处理能力,当请求满足自身条件,则进行处理而不继续传递。

总结:

通过分析OkHttp的源码,发现大部分的HTTP知识点,也就是OkHttp自己实现了一套从低到上的网络请求框架。以前总以为OkHttp应该很复杂,毕竟那么流行。却没想到如此的基础和稳定,学习了,不要惧怕源码,勇敢过去学。


【相关连接】

HTTP与HTTPS

运输层TCP

OkHttp使用

欢迎点赞+关注+评论三连击

Github】【掘金】【博客