OkHttp3通过Interceptor链来执行HTTP请求,整体的执行过程大体如下:
这些Interceptor中每一个的职责,这里不再赘述。
在OkHttp3中,StreamAllocation
是用来建立执行HTTP请求所需网络设施的组件,如其名字所显示的那样,分配Stream。但它具体做的事情根据是否设置了代理,以及请求的类型,如HTTP、HTTPS或HTTP/2的不同而有所不同。代理相关的处理,包括TCP连接的建立,在 OkHttp3中的代理与路由 一文中有详细的说明。
在整个HTTP请求的执行过程中,StreamAllocation
对象分配的比较早,在RetryAndFollowUpInterceptor.intercept(Chain chain)中就完成了:
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
StreamAllocation
的对象构造过程没有什么特别的:
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
在OkHttp3中,okhttp3.internal.http.RealInterceptorChain
将Interceptor连接成执行链。RetryAndFollowUpInterceptor
借助于RealInterceptorChain
将创建的StreamAllocation
对象传递给后面执行的Interceptor。而在RealInterceptorChain
中,StreamAllocation
对象并没有被真正用到。紧跟在RetryAndFollowUpInterceptor
之后执行的 okhttp3.internal.http.BridgeInterceptor
和 okhttp3.internal.cache.CacheInterceptor
,它们的职责分别是补足用户创建的请求中缺少的必须的请求头和处理缓存,也没有真正用到StreamAllocation
对象。
在OkHttp3的HTTP请求执行过程中,okhttp3.internal.connection.ConnectInterceptor
和okhttp3.internal.http.CallServerInterceptor
是与网络交互的关键。
CallServerInterceptor
负责将HTTP请求写入网络IO流,并从网络IO流中读取服务器返回的数据。而ConnectInterceptor
则负责为CallServerInterceptor
建立可用的连接。此处 可用的 含义主要为,可以直接写入HTTP请求的数据:
后面我们更详细地来看一下这个过程。
ConnectInterceptor
的代码看上去比较简单:
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
ConnectInterceptor
从RealInterceptorChain
获取前面的Interceptor传过来的StreamAllocation
对象,执行 streamAllocation.newStream()
完成前述所有的连接建立工作,并将这个过程中创建的用于网络IO的RealConnection对象,以及对于与服务器交互最为关键的HttpCodec等对象传递给后面的Interceptor,也就是CallServerInterceptor
。
在具体地分析 streamAllocation.newStream()
的执行过程之前,我们先来看一下OkHttp3的连接池的设计实现。
OkHttp3将客户端与服务器之间的连接抽象为Connection/RealConnection,为了管理这些连接的复用而设计了ConnectionPool。共享相同Address
的请求可以复用连接,ConnectionPool实现了哪些连接保持打开状态以备后用的策略。
借助于ConnectionPool的成员变量声明来一窥ConnectionPool究竟是什么:
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
* share the same {@link Address} may share a {@link Connection}. This class implements the policy
* of which connections to keep open for future use.
*/
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
ConnectionPool
的核心是RealConnection
的容器,且是顺序容器,而不是关联容器。ConnectionPool
用双端队列Deque<RealConnection>
来保存它所管理的所有RealConnection
。
ConnectionPool
还会对连接池中最大的空闲连接数及连接的保活时间进行控制,maxIdleConnections
和keepAliveDurationNs
成员分别体现对最大空闲连接数及连接保活时间的控制。这种控制通过匿名的Runnable cleanupRunnable
在线程池executor
中执行,并在向连接池中添加新的RealConnection
触发。
OkHttp3的用户可以自行创建ConnectionPool,对最大空闲连接数及连接的保活时间进行配置,并在OkHttpClient创建期间,将其传给OkHttpClient.Builder,在OkHttpClient中启用它。没有定制连接池的情况下,则在OkHttpClient.Builder构造过程中以默认参数创建:
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
ConnectionPool的默认构造过程如下:
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
在默认情况下,ConnectionPool
最多保存 5个 处于空闲状态的连接,且连接的默认保活时间为 5分钟。
OkHttp内部的组件可以通过put()方法向ConnectionPool
中添加RealConnection
:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
在向ConnectionPool
中添加RealConnection
时,若发现cleanupRunnable还没有运行会触发它的运行。
cleanupRunnable的职责本就是清理无效的RealConnection
,只要ConnectionPool
中存在RealConnection
,则这种清理的需求总是存在的,因而这里会去启动cleanupRunnable。
根据需要启动了cleanupRunnable之后,将RealConnection
添加进双端队列connections。
这里先启动 cleanupRunnable
,后向 connections
中添加RealConnection
。有没有可能发生:
启动cleanupRunnable
之后,向connections
中添加RealConnection
之前,执行 put() 的线程被抢占,cleanupRunnable
的线程被执行,它发现connections
中没有任何RealConnection
,于是从容地退出而导致后面添加的RealConnection
永远不会得得清理。
这样的情况呢?答案是 不会。为什么呢?put()
执行之前总是会用ConnectionPool
对象锁来保护,而在ConnectionPool.cleanup()
中,遍历connections
也总是会先对ConnectionPool
对象加锁保护的。即使执行 put() 的线程被抢占,cleanupRunnable
的线程也会由于拿不到ConnectionPool
对象锁而等待 put() 执行结束。
OkHttp内部的组件可以通过 get()
方法从ConnectionPool
中获取RealConnection
:
/** Returns a recycled connection to {@code address}, or null if no such connection exists. */
RealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit
&& address.equals(connection.route().address)
&& !connection.noNewStreams) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
get()
方法遍历 connections
中的所有 RealConnection
寻找同时满足如下三个条件的RealConnection
:
RealConnection
的allocations的数量小于allocationLimit。每个allocation代表在该RealConnection
上正在执行的一个请求。这个条件用于控制相同连接上,同一时间执行的并发请求的个数。对于HTTP/2连接而言,allocationLimit限制是在连接建立阶段由双方协商的。对于HTTP或HTTPS连接而言,这个值则总是1。从RealConnection.establishProtocol()
可以清晰地看到这一点:
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // Framed connection timeouts are set per-stream.
Http2Connection http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.build();
http2Connection.start();
// Only assign the framed connection once the preface has been sent successfully.
this.allocationLimit = http2Connection.maxConcurrentStreams();
this.http2Connection = http2Connection;
} else {
this.allocationLimit = 1;
}
RealConnection
的 address
与传入的 Address
参数相等。RealConnection
的 address
描述建立连接所需的配置信息,包括对端的信息等,不难理解只有所有相关配置相等时 RealConnection
才是真正能复用的。具体看一下Address
相等性比较的依据:@Override public boolean equals(Object other) {
if (other instanceof Address) {
Address that = (Address) other;
return this.url.equals(that.url)
&& this.dns.equals(that.dns)
&& this.proxyAuthenticator.equals(that.proxyAuthenticator)
&& this.protocols.equals(that.protocols)
&& this.connectionSpecs.equals(that.connectionSpecs)
&& this.proxySelector.equals(that.proxySelector)
&& equal(this.proxy, that.proxy)
&& equal(this.sslSocketFactory, that.sslSocketFactory)
&& equal(this.hostnameVerifier, that.hostnameVerifier)
&& equal(this.certificatePinner, that.certificatePinner);
}
return false;
}
这种相等性的条件给人感觉还是蛮苛刻的,特别是对url的对比。 这难免会让我们有些担心,对 Address
如此苛刻的相等性比较,又有多大的机会能复用连接呢? 我们的担心其实是多余的。只有在 StreamAllocation.findConnection()
中,会通过Internal.instance
调用 ConnectionPool.get()
来获取 RealConnection
: private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
synchronized (connectionPool) {
route = selectedRoute;
refusedStreamCount = 0;
}
}
RealConnection newConnection = new RealConnection(selectedRoute);
synchronized (connectionPool) {
acquire(newConnection);
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
Internal.instance的实现在OkHttpClient 中:
static {
Internal.instance = new Internal() {
@Override public void addLenient(Headers.Builder builder, String line) {
builder.addLenient(line);
}
@Override public void addLenient(Headers.Builder builder, String name, String value) {
builder.addLenient(name, value);
}
@Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
builder.setInternalCache(internalCache);
}
@Override public boolean connectionBecameIdle(
ConnectionPool pool, RealConnection connection) {
return pool.connectionBecameIdle(connection);
}
@Override public RealConnection get(
ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
return pool.get(address, streamAllocation);
}
@Override public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
@Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
return connectionPool.routeDatabase;
}
@Override public StreamAllocation callEngineGetStreamAllocation(Call call) {
return ((RealCall) call).streamAllocation();
}
可见 ConnectionPool.get()
的 Address
参数来自于StreamAllocation
。StreamAllocation
的Address
在构造时由外部传入。构造了StreamAllocation
对象的RetryAndFollowUpInterceptor
,其构造Address
的过程是这样的:
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}
Address
除了 uriHost
和 uriPort
外的所有构造参数均来自于OkHttpClient,而Address
的url
字段正是根据这两个参数构造的:
public Address(String uriHost, int uriPort, Dns dns, SocketFactory socketFactory,
SSLSocketFactory sslSocketFactory, HostnameVerifier hostnameVerifier,
CertificatePinner certificatePinner, Authenticator proxyAuthenticator, Proxy proxy,
List<Protocol> protocols, List<ConnectionSpec> connectionSpecs, ProxySelector proxySelector) {
this.url = new HttpUrl.Builder()
.scheme(sslSocketFactory != null ? "https" : "http")
.host(uriHost)
.port(uriPort)
.build();
可见 Address
的 url
字段仅包含HTTP请求url的 schema + host + port 这三部分的信息,而不包含 path 和 query 等信息。ConnectionPool
主要是根据服务器的地址来决定复用的。
RealConnection
还有可分配的Stream。对于HTTP或HTTPS而言,不能同时在相同的连接上执行多个请求。即使对于HTTP/2而言,StreamID的空间也是有限的,同一个连接上的StreamID总有分配完的时候,而在StreamID被分配完了之后,该连接就不能再被使用了。OkHttp内部对ConnectionPool
的访问总是通过Internal.instance来进行。整个OkHttp中也只有StreamAllocation
存取了 ConnectionPool
,也就是我们前面列出的StreamAllocation.findConnection()
方法,相关的组件之间的关系大体如下图:
相关阅读:
网易云新用户大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者韩鹏飞授权发布。