ConnectionPool
中对于 RealConnection
的清理在put()方法中触发,执行 cleanupRunnable
来完成清理动作:
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
cleanupRunnable
每执行一次清理动作,都会等待一段时间再次执行,而具体等待的时长由cleanup()
方法决定,直到cleanup()
方法返回-1退出。cleanup()
方法定义如下:
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
cleanup()
方法遍历connections
,并从中找到处于空闲状态时间最长的一个RealConnection
,然后根据查找结果的不同,分为以下几种情况处理:
RealConnection
,且该RealConnection
处于空闲状态的时间超出了设置的保活时间,或者当前ConnectionPool
中处于空闲状态的连接数超出了设置的最大空闲连接数,将该RealConnection
从connections
中移除,并关闭该RealConnection
关联的底层socket,然后返回0,以此请求cleanupRunnable
立即再次检查所有的连接。RealConnection
,但该RealConnection
处于空闲状态的时间尚未超出设置的保活时间,且当前ConnectionPool
中处于空闲状态的连接数尚未超出设置的最大空闲连接数,则返回保活时间与该RealConnection
处于空闲状态的时间之间的差值,请求cleanupRunnable
等待这么长一段时间之后再次检查所有的连接。cleanupRunnable
等待这么长一段时间之后再次检查所有的连接。cleanupRunning
置为false,并返回 -1,请求 cleanupRunnable
退出。cleanup()
通过 pruneAndGetAllocationCount()
检查正在使用一个特定连接的请求个数,并以此来判断一个连接是否处于空闲状态。后者通遍历 connection.allocations
并检查每个元素的StreamAllocation
的状态,若StreamAllocation
为空,则认为是发现了一个leak,它会更新连接的空闲时间为当前时间减去保活时间并返回0,以此请求 cleanup()
立即关闭、清理掉该 leak 的连接。
OkHttp的用户可以自己创建 ConnectionPool
对象,这个类也提供了一些用户接口以方便用户获取空闲状态的连接数、总的连接数,以及手动清除空闲状态的连接:
/** Returns the number of idle connections in the pool. */
public synchronized int idleConnectionCount() {
int total = 0;
for (RealConnection connection : connections) {
if (connection.allocations.isEmpty()) total++;
}
return total;
}
/**
* Returns total number of connections in the pool. Note that prior to OkHttp 2.7 this included
* only idle connections and HTTP/2 connections. Since OkHttp 2.7 this includes all connections,
* both active and inactive. Use {@link #idleConnectionCount()} to count connections not currently
* in use.
*/
public synchronized int connectionCount() {
return connections.size();
}
......
/** Close and remove all idle connections in the pool. */
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
回到新建流的过程,连接建立的各种细节处理都在这里。 StreamAllocation.newStream()
完成新建流的动作:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec;
if (resultConnection.http2Connection != null) {
resultCodec = new Http2Codec(client, this, resultConnection.http2Connection);
} else {
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultCodec = new Http1Codec(
client, this, resultConnection.source, resultConnection.sink);
}
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
所谓的流,是封装了底层的IO,可以直接用来收发数据的组件,它会将请求的数据序列化之后发送到网络,并将接收的数据反序列化为应用程序方便操作的格式。在 OkHttp3 中,这样的组件被抽象为HttpCodec
。HttpCodec
的定义如下 (okhttp/okhttp/src/main/java/okhttp3/internal/http/HttpCodec.java):
/** Encodes HTTP requests and decodes HTTP responses. */
public interface HttpCodec {
/**
* The timeout to use while discarding a stream of input data. Since this is used for connection
* reuse, this timeout should be significantly less than the time it takes to establish a new
* connection.
*/
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
/** Returns an output stream where the request body can be streamed. */
Sink createRequestBody(Request request, long contentLength);
/** This should update the HTTP engine's sentRequestMillis field. */
void writeRequestHeaders(Request request) throws IOException;
/** Flush the request to the underlying socket. */
void finishRequest() throws IOException;
/** Read and return response headers. */
Response.Builder readResponseHeaders() throws IOException;
/** Returns a stream that reads the response body. */
ResponseBody openResponseBody(Response response) throws IOException;
/**
* Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
* That may happen later by the connection pool thread.
*/
void cancel();
}
HttpCodec
提供了这样的一些操作:
StreamAllocation.newStream()
主要做的事情正是创建HttpCodec
。StreamAllocation.newStream()
根据 OkHttpClient
中的设置,连接超时、读超时、写超时及连接失败是否重试,调用 findHealthyConnection()
完成 连接,即RealConnection
的创建。然后根据HTTP协议的版本创建Http1Codec或Http2Codec。
findHealthyConnection()
根据目标服务器地址查找一个连接,如果它是可用的就直接返回,如果不可用则会重复查找直到找到一个可用的为止。在连接已被破坏而不可用时,还会释放连接:
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
连接是否可用的标准如下 (okhttp/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.java):
/** Returns true if this connection is ready to host new streams. */
public boolean isHealthy(boolean doExtensiveChecks) {
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
if (http2Connection != null) {
return true; // TODO: check framedConnection.shutdown.
}
if (doExtensiveChecks) {
try {
int readTimeout = socket.getSoTimeout();
try {
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
return true;
}
首先要可以进行IO,此外对于HTTP/2,只要http2Connection
存在即可。如我们前面在ConnectInterceptor
中看到的,如果HTTP请求的method不是 "GET" ,doExtensiveChecks
为true时,需要做额外的检查。
findHealthyConnection()
通过 findConnection()
查找一个连接:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
synchronized (connectionPool) {
route = selectedRoute;
refusedStreamCount = 0;
}
}
RealConnection newConnection = new RealConnection(selectedRoute);
synchronized (connectionPool) {
acquire(newConnection);
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());
return newConnection;
}
findConnection()
返回一个用于流执行底层IO的连接。这个方法优先复用已经创建的连接;在没有可复用连接的情况下新建一个。
在同一次 newStream()
的执行过程中,有没有可能两次执行 findConnection()
,第一次connection
字段为空,第二次不为空?这个地方对connection
字段的检查,看起来有点多余。执行 findConnection()
时,connection
不为空的话,意味着 codec
不为空,而在方法的开始处已经有对codec
字段的状态做过检查。真的是这样的吗?
答案当然是否定的。同一次 newStream()
的执行过程中,没有可能两次执行findConnection()
,第一次connection
字段为空,第二次不为空,然而一个HTTP请求的执行过程,又不是一定只调用一次newStream()
。
newStream()
的直接调用者是ConnectInterceptor
,所有的Interceptor用RealInterceptorChain
链起来,在Interceptor链中,ConnectInterceptor
和RetryAndFollowUpInterceptor
隔着 CacheInterceptor
和 BridgeInterceptor
。然而newStream()
如果出错的话,则是会通过抛出Exception
返回到RetryAndFollowUpInterceptor
来处理错误的。
RetryAndFollowUpInterceptor
中会尝试基于相同的 StreamAllocation
对象来恢复对HTTP请求的处理。RetryAndFollowUpInterceptor
通过 hasMoreRoutes()
等方法,来检查StreamAllocation
对象的状态,通过 streamFailed(IOException e)
、release()
、streamFinished(boolean noNewStreams, HttpCodec codec)
等方法来reset StreamAllocation
对象的一些状态。
回到StreamAllocation
的 findConnection()
方法。没有连接存在,且连接池中也没有找到所需的连接时,它会新建一个连接。通过如下的步骤新建连接:
Route
。RealConnection
对象。public RealConnection(Route route) {
this.route = route;
}
StreamAllocation
对象的引用保存进RealConnection
的allocations。如我们前面在分析ConnectionPool时所见的那样,这主要是为了追踪RealConnection
。/**
* Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
* {@link #release} on the same connection.
*/
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
RealConnection
保存进连接池。RealConnection
的引用。Route
的状态。在OkHttp中,ConnectionSpec用于描述传输HTTP流量的socket连接的配置。对于https请求,这些配置主要包括协商安全连接时要使用的TLS版本号和密码套件,是否支持TLS扩展等;对于http请求则几乎不包含什么信息。
OkHttp有预定义几组ConnectionSpec (okhttp/okhttp/src/main/java/okhttp3/ConnectionSpec.java):
/** A modern TLS connection with extensions like SNI and ALPN available. */
public static final ConnectionSpec MODERN_TLS = new Builder(true)
.cipherSuites(APPROVED_CIPHER_SUITES)
.tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.supportsTlsExtensions(true)
.build();
/** A backwards-compatible fallback connection for interop with obsolete servers. */
public static final ConnectionSpec COMPATIBLE_TLS = new Builder(MODERN_TLS)
.tlsVersions(TlsVersion.TLS_1_0)
.supportsTlsExtensions(true)
.build();
/** Unencrypted, unauthenticated connections for {@code http:} URLs. */
public static final ConnectionSpec CLEARTEXT = new Builder(false).build();
预定义的这些ConnectionSpec
被组织为默认ConnectionSpec
集合 (okhttp/okhttp/src/main/java/okhttp3/OkHttpClient.java):
public class OkHttpClient implements Cloneable, Call.Factory {
private static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(
Protocol.HTTP_2, Protocol.HTTP_1_1);
private static final List<ConnectionSpec> DEFAULT_CONNECTION_SPECS = Util.immutableList(
ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS, ConnectionSpec.CLEARTEXT);
OkHttp中由OkHttpClient管理ConnectionSpec
集合 。OkHttp的用户可以在构造OkHttpClient
的过程中提供自己的ConnectionSpec
集合。默认情况下OkHttpClient
会使用前面看到的默认ConnectionSpec
集合。
在RetryAndFollowUpInterceptor
中创建Address
时,ConnectionSpec
集合被从OkHttpClient
获取,并由Address
引用。
OkHttp还提供了ConnectionSpecSelector
,用以从ConnectionSpec
集合中选择与SSLSocket匹配的ConnectionSpec
,并对SSLSocket做配置的操作。
在StreamAllocation
的findConnection()中,ConnectionSpec
集合被从Address
中取出来,以用于连接建立过程。
回到连接建立的过程。RealConnection.connect()
执行连接建立的过程(okhttp/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.java):
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
while (protocol == null) {
try {
if (route.requiresTunnel()) {
buildTunneledConnection(connectTimeout, readTimeout, writeTimeout,
connectionSpecSelector);
} else {
buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
}
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
}
这里的执行过程大体如下:
protocol
标识,它表示在整个连接建立,及可能的协议协商过程中选择的所要使用的协议。ConnectionSpec
集合connectionSpecs
构造ConnectionSpecSelector
。ConnectionSpec
集合中必须要包含ConnectionSpec.CLEARTEXT
。这也就是说,OkHttp的用户可以通过为OkHttpClient
设置不包含ConnectionSpec.CLEARTEXT
的ConnectionSpec
集合来禁用所有的明文请求。android.security.NetworkSecurityPolicy
执行 (okhttp/okhttp/src/main/java/okhttp3/internal/platform/AndroidPlatform.java):@Override public boolean isCleartextTrafficPermitted(String hostname) {
try {
Class<?> networkPolicyClass = Class.forName("android.security.NetworkSecurityPolicy");
Method getInstanceMethod = networkPolicyClass.getMethod("getInstance");
Object networkSecurityPolicy = getInstanceMethod.invoke(null);
Method isCleartextTrafficPermittedMethod = networkPolicyClass
.getMethod("isCleartextTrafficPermitted", String.class);
return (boolean) isCleartextTrafficPermittedMethod.invoke(networkSecurityPolicy, hostname);
} catch (ClassNotFoundException | NoSuchMethodException e) {
return super.isCleartextTrafficPermitted(hostname);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new AssertionError();
}
}
平台的这种安全策略并不是每个Android版本都有的。Android 6.0之后存在这种控制。buildTunneledConnection()
和 buildConnection()
。是否需要建立隧道连接的依据为 (okhttp/okhttp/src/main/java/okhttp3/Route.java):/**
* Returns true if this route tunnels HTTPS through an HTTP proxy. See <a
* href="http://www.ietf.org/rfc/rfc2817.txt">RFC 2817, Section 5.2</a>.
*/
public boolean requiresTunnel() {
return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
}
即对于设置了HTTP代理,且安全的连接 (SSL) 需要请求代理服务器建立一个到目标HTTP服务器的隧道连接,客户端与HTTP代理建立TCP连接,以此请求HTTP代理服务在客户端与HTTP服务器之间进行数据的盲转发。网易云新用户大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者韩鹏飞授权发布。