@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
/** * Returns an immutable list of interceptors that observe the full span of each call: from before * the connection is established (if any) until after the response source is selected (either the * origin server, cache, or both). */ @get:JvmName("interceptors") val interceptors: List<Interceptor> = builder.interceptors.toImmutableList()
/** * Returns an immutable list of interceptors that observe a single network request and response. * These interceptors must call [Interceptor.Chain.proceed] exactly once: it is an error for * a network interceptor to short-circuit or repeat a network request. */ @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> = builder.networkInterceptors.toImmutableList()
@get:JvmName("eventListenerFactory") val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory
@get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure
@get:JvmName("authenticator") val authenticator: Authenticator = builder.authenticator
@get:JvmName("followRedirects") val followRedirects: Boolean = builder.followRedirects
@get:JvmName("followSslRedirects") val followSslRedirects: Boolean = builder.followSslRedirects
@get:JvmName("cookieJar") val cookieJar: CookieJar = builder.cookieJar
@get:JvmName("cache") val cache: Cache? = builder.cache
@get:JvmName("dns") val dns: Dns = builder.dns
@get:JvmName("proxy") val proxy: Proxy? = builder.proxy
@get:JvmName("proxySelector") val proxySelector: ProxySelector = when { // Defer calls to ProxySelector.getDefault() because it can throw a SecurityException. builder.proxy != null -> NullProxySelector else -> builder.proxySelector ?: ProxySelector.getDefault() ?: NullProxySelector }
@get:JvmName("proxyAuthenticator") val proxyAuthenticator: Authenticator = builder.proxyAuthenticator
@get:JvmName("socketFactory") val socketFactory: SocketFactory = builder.socketFactory
@get:JvmName("x509TrustManager") val x509TrustManager: X509TrustManager?
@get:JvmName("connectionSpecs") val connectionSpecs: List<ConnectionSpec> = builder.connectionSpecs
@get:JvmName("protocols") val protocols: List<Protocol> = builder.protocols
@get:JvmName("hostnameVerifier") val hostnameVerifier: HostnameVerifier = builder.hostnameVerifier
@get:JvmName("certificatePinner") val certificatePinner: CertificatePinner
@get:JvmName("certificateChainCleaner") val certificateChainCleaner: CertificateChainCleaner?
/** * Default call timeout (in milliseconds). By default there is no timeout for complete calls, but * there is for the connect, write, and read actions within a call. */ @get:JvmName("callTimeoutMillis") val callTimeoutMillis: Int = builder.callTimeout
/** Default connect timeout (in milliseconds). The default is 10 seconds. */ @get:JvmName("connectTimeoutMillis") val connectTimeoutMillis: Int = builder.connectTimeout
/** Default read timeout (in milliseconds). The default is 10 seconds. */ @get:JvmName("readTimeoutMillis") val readTimeoutMillis: Int = builder.readTimeout
/** Default write timeout (in milliseconds). The default is 10 seconds. */ @get:JvmName("writeTimeoutMillis") val writeTimeoutMillis: Int = builder.writeTimeout
/** Web socket and HTTP/2 ping interval (in milliseconds). By default pings are not sent. */ @get:JvmName("pingIntervalMillis") val pingIntervalMillis: Int = builder.pingInterval
/** * Minimum outbound web socket message size (in bytes) that will be compressed. * The default is 1024 bytes. */ @get:JvmName("minWebSocketMessageToCompress") val minWebSocketMessageToCompress: Long = builder.minWebSocketMessageToCompress
val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase()
publicenum Type { /** * Represents a direct connection, or the absence of a proxy. */ DIRECT, /** * Represents proxy for high level protocols such as HTTP or FTP. */ HTTP, /** * Represents a SOCKS (V4 or V5) proxy. */ SOCKS };
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() }
val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break// Max capacity. if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue// Host max capacity.
@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") }
try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue }
// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() }
val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange)
if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response }
val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response }
response.body?.closeQuietly()
if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") }
val next = copy(index = index + 1, request = request) val interceptor = interceptors[index] @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")
synchronized(this) { check(!responseBodyOpen) { "cannot make a new request because the previous response is still open: " + "please call response.close()" } check(!requestBodyOpen) }
@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val userRequest = chain.request() val requestBuilder = userRequest.newBuilder()
val body = userRequest.body if (body != null) { val contentType = body.contentType() if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()) }
val contentLength = body.contentLength() if (contentLength != -1L) { requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.removeHeader("Transfer-Encoding") } else { requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.removeHeader("Content-Length") } }
if (userRequest.header("Host") == null) { requestBuilder.header("Host", userRequest.url.toHostHeader()) }
if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive") }
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. var transparentGzip = false if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true requestBuilder.header("Accept-Encoding", "gzip") }
val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) }
if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) }
val networkResponse = chain.proceed(requestBuilder.build())
overridefunintercept(chain: Interceptor.Chain): Response { val call = chain.call() val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) { // The cache candidate wasn't applicable. Close it. cacheCandidate.body?.closeQuietly() }
// If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } }
// If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }
var networkResponse: Response? = null try { networkResponse = chain.proceed(networkRequest) } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } }
// If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { cacheResponse.body?.closeQuietly() } }
val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { // This will log a conditional cache miss only. listener.cacheMiss(call) } } }
if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } }
// Confirm that the connection is good. if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate }
// If it isn't, take it out of the pool. candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true if (routesSelectionLeft) continue
// Attempt to reuse the connection from the call. val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here // because we already acquired it. if (call.connection != null) { check(toClose == null) return callConnection }
// The call's connection was released. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) }
// We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0
// Attempt to get a connection from the pool. if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
// Nothing in the pool. Figure out what route we'll try next. val routes: List<Route>? val route: Route if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. routes = null route = nextRouteToTry!! nextRouteToTry = null } elseif (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing. if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
route = localRouteSelection.next() }
// Connect. Tell the call about the connecting call so async cancels work. val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3 // different lookups in the connection pool! if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result }
// If this connection is not accepting new exchanges, we're done. if (calls.size >= allocationLimit || noNewExchanges) returnfalse
// If the non-host fields of the address don't overlap, we're done. if (!this.route.address.equalsNonHost(address)) returnfalse
// If the host exactly matches, we're done: this connection can carry the address. if (address.url.host == this.route().address.url.host) { returntrue// This connection is a perfect match. }
// At this point we don't have a hostname match. But we still be able to carry the request if // our connection coalescing requirements are met. See also: // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2. if (http2Connection == null) returnfalse
// 2. The routes must share an IP address. if (routes == null || !routeMatchesAny(routes)) returnfalse
// 3. This connection's server certificate's must cover the new host. if (address.hostnameVerifier !== OkHostnameVerifier) returnfalse if (!supportsUrl(address.url)) returnfalse
// 4. Certificate pinning must match the host. try { address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates) } catch (_: SSLPeerUnverifiedException) { returnfalse }
returntrue// The caller's address can be carried by this connection. }
这个函数并不复杂,我们一点点看:
这个函数接受两个参数 address 和 routes
address 有两个最关键的成员变量 uriHost 和 uriPort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
classAddress( uriHost: String, uriPort: Int,
@get:JvmName("dns") val dns: Dns, @get:JvmName("socketFactory") val socketFactory: SocketFactory, @get:JvmName("sslSocketFactory") val sslSocketFactory: SSLSocketFactory?, @get:JvmName("hostnameVerifier") val hostnameVerifier: HostnameVerifier?, @get:JvmName("certificatePinner") val certificatePinner: CertificatePinner?, @get:JvmName("proxyAuthenticator") val proxyAuthenticator: Authenticator, @get:JvmName("proxy") val proxy: Proxy?,
protocols: List<Protocol>, connectionSpecs: List<ConnectionSpec>, @get:JvmName("proxySelector") val proxySelector: ProxySelector )
classRoute( @get:JvmName("address") val address: Address, /** * Returns the [Proxy] of this route. * * **Warning:** This may disagree with [Address.proxy] when it is null. When * the address's proxy is null, the proxy selector is used. */ @get:JvmName("proxy") val proxy: Proxy, @get:JvmName("socketAddress") val socketAddress: InetSocketAddress )
var routeException: RouteException? = null val connectionSpecs = route.address.connectionSpecs val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
if (route.address.sslSocketFactory == null) { if (ConnectionSpec.CLEARTEXT !in connectionSpecs) { throw RouteException(UnknownServiceException( "CLEARTEXT communication not enabled for client")) } val host = route.address.url.host if (!Platform.get().isCleartextTrafficPermitted(host)) { throw RouteException(UnknownServiceException( "CLEARTEXT communication to $host not permitted by network security policy")) } } else { if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) { throw RouteException(UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")) } }
while (true) { try { if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener) if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. break } } else { connectSocket(connectTimeout, readTimeout, call, eventListener) } establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener) eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol) break } catch (e: IOException) { socket?.closeQuietly() rawSocket?.closeQuietly() socket = null rawSocket = null source = null sink = null handshake = null protocol = null http2Connection = null allocationLimit = 1
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { source = rawSocket.source().buffer() sink = rawSocket.sink().buffer() } catch (npe: NullPointerException) { if (npe.message == NPE_THROW_WITH_NULL) { throw IOException(npe) } } }
@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis()
var invokeStartEvent = true var responseBuilder: Response.Builder? = null var sendRequestException: IOException? = null try { exchange.writeRequestHeaders(request)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() }
if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } } catch (e: IOException) { if (e is ConnectionShutdownException) { throw e // No request was sent so there's no response to read. } if (!exchange.hasFailure) { throw e // Don't attempt to read the response; we failed to send the request. } sendRequestException = e }
try { if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code