OkHttp是一个一个处理网络请求的开源项目,是安卓端最火热的轻量级框架,由移动支付Square公司贡献
Call
接口,准备处理的一个请求,可以被取消,它代表了一个请求/响应,不能被处理两次
public interface Call {
// 返回原始请求
Request request();
// 立刻调用请求,然后阻塞直到收到响应或错误
Response execute() throws IOException;
// 调度让请求在未来被调用
void enqueue(Callback responseCallback);
// 取消请求
void cancel();
boolean isExecuted();
boolean isCanceled();
// 产生Call的工厂
interface Factory {
Call newCall(Request request);
}
}
OkHttpClient
Call
的工厂,Call
能够被用来发送请求和接收响应
创建一个OkHttpClient
实例并被所有的HTTP
调用重用可以获得最好的表现,因为每个client
都持有自己的连接池和线程池,重用连接和线程可以减少时延和节省内存,为每一个请求创建client
浪费空闲池资源
public final OkHttpClient client = new OkHttpClient();
构造默认的OkHttpCilent
public final OkHttpClient client = new OkHttpClient.Builder().addInterceptor(new HttpLoggingInterceptor()).cache(new Cache(cacheDir, cacheSize)).build();
用Builder()
自定义
OkHttpClient eagerClient = client.newBuilder().readTimeout(500, TimeUnit.MILLISECONDS).build();
用newBuilder()
自定义,但是重用原来的OkHttpClient
public class OkHttpClient implements Cloneable, Call.Factory {
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
}
实现了Call.Factory
接口,调用newCall()
可以产生一个Call
,实际返回的是实现类RealCall
RealCall
protected RealCall(OkHttpClient client, Request originalRequest) {
}
构造函数接收两个参数OkHttpClient
和Request
Request request() {
return originalRequest;
}
所以request()
返回的就是这个原始请求
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
同步请求,一个请求只能执行一次,否则会抛异常,通过Dispatcher
来分发请求,执行请求后,通过getResponseWithInterceptorChain()
获取结果
private Response getResponseWithInterceptorChain() throws IOException {
// 除了构造client时指定的拦截器外 还添加了几个内部的
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!retryAndFollowUpInterceptor.isForWebSocket()) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(retryAndFollowUpInterceptor.isForWebSocket()));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
这个函数以及Chain
的实现版本之间应该有过重构
异步请求
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
将一个AsyncCall
放进队列,里面重写了execute()
,在getResponseWithInterceptorChain()
获得响应后调用回调
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
Dispatcher
分配器
// 准备好的异步调用,将要被执行
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 正在执行中的异步调用
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 正在执行中的同步调用
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
用这三个队列管理调用,可以指定最大请求数
Interceptor
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
Connection connection();
}
}
拦截器,观察、修改请求和响应,典型的用来添加删除修改头部信息
RealInterceptorChain
RealInterceptorChain
是Chain
的实现类
public Response proceed(Request request, StreamAllocation streamAllocation, HttpStream httpStream, Connection connection) throws IOException {
......
// 调用拦截链中的下一个拦截器
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpStream, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
......
return response;
}
最重要的proceed()
方法的实现,OkHttpClient
里有一个拦截器列表,会先用列表里这个拦截器进行拦截,传给它一个index+1
的Chain
,否则调用,之前getResponse()
做HTTP
请求getResponse()
里和HttpEngine
的功能逻辑拆分到了新的内部拦截器中
RetryAndFollowUpInterceptor
尝试恢复错误和之后的重定向BridgeInterceptor
首先从用户请求构造网络请求,然后处理网络调用,最后从网络响应构造用户响应CacheInterceptor
处理缓存ConnectInterceptor
打开服务器连接CallServerInterceptor
链中的最终的拦截器,它向服务器发送请求Chain
携带了请求的信息,还提供了获取响应的逻辑
拦截器是在构造的时候的自己添加上去的,一般是这样
OkHttpClient client = new OkHttpClient().newBuilder().addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 处理request
Response response = chain.proceed(request);
// 处理response
return response;
}
}).build();
通过关键方法Chain.proceed()
获得Response
,再看看上面的代码,发现在获得响应之前会依次执行拦截器
RetryAndFollowUpInterceptor
public Response intercept(Chain chain) throws IOException {
// 在一个循环里处理首个请求和后续请求
while (true) {
......
// 交由下一个拦截器处理
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
......
// 查看后续请求
Request followUp = followUpRequest(response);
......
}
}
followUpRequest()
用来指出为了回应响应应该发什么请求,可能是添加认证头、重定向或者处理客户端请求超时
BridgeInterceptor
public Response intercept(Chain chain) throws IOException {
// 构造网络请求头
// 添加Content-Type/Content-Length/Host/Connection/Accept-Encoding等字段
// 添加User-Agent字段
// 添加Cookie字段
......
Response networkResponse = chain.proceed(requestBuilder.build());
// 获得网络响应 处理后返回
// 处理Cookiew
// Gzip解压
......
}
CacheInterceptor
public Response intercept(Chain chain) throws IOException {
// 从缓存中获取请求,处理异常状况
......
networkResponse = chain.proceed(networkRequest);
// 获得网络响应
// 判断是使用网络响应还是缓存响应
// 可能要更新缓存
......
}
ConnectInterceptor
public Response intercept(Chain chain) throws IOException {
StreamAllocation streamAllocation = realChain.streamAllocation();
......
RealConnection connection = streamAllocation.connection();
......
return realChain.proceed(request, streamAllocation, httpStream, connection);
}
调用了StreamAllocation.newStream()
:
首先通过findHealthyConnection()
查找健康的连接,然后获得HttpStream
findConnection()
查找连接,首先看是否有存在的连接,然后查找连接池,都没有的话就创建一个并加入连接池,然后connect()
连接的调用路径:RealConnection.connect()
->Platform.connectSocket()
—>Socket.connect()
Platform
显然是与底层具体实现有关,在构造的时候会通过findPlatform()
决定底层实现
CallServerInterceptor
public Response intercept(Chain chain) throws IOException {
// 写入请求头
httpStream.writeRequestHeaders(request);
// 写入请求体
......
// 读取响应
Response response = httpStream.readResponseHeaders().request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();
// 读取响应体并返回
......
}
StreamAllocation
这个类用来协调三个实体的关系:
- 连接(
Connections
):到远程服务器的物理连接,可能建立起来很慢所以有必要能够在连接中取消 - 流(
Streams
):在连接上的逻辑HTTP
请求响应对,每个连接有它自己的分配限制,定义了同时能承载多少个流,HTTP/1.X
可以承载一个流,SPDY
和HTTP/2
可以承载多个 - 调用(
Calls
):一个流的逻辑序列,典型的是它的初始请求和后续请求,我们倾向于保持所有一个单独调用的流在一个连接上,为了更好的表现
newStream()
里面做了判断,可以返回Http1xStream
或Http2xStream
,它们都实现了HttpStream
接口
public interface HttpStream {
Sink createRequestBody(Request request, long contentLength) throws IOException;
void writeRequestHeaders(Request request) throws IOException;
void writeRequestBody(RetryableSink requestBody) throws IOException;
void finishRequest() throws IOException;
Response.Builder readResponseHeaders() throws IOException;
ResponseBody openResponseBody(Response response) throws IOException;
void setHttpEngine(HttpEngine httpEngine);
void cancel();
}
参考文章:HTTP/1和HTTP/2
总结
OkHttpClient
通过newCall()
产生Call
同步调用execute()
(异步调用就是个线程池),会依次调用拦截链上的拦截器,其中包括一些内置的拦截器:
- 自定义拦截器
- 重试与后续请求
- 构造请求和响应
- 缓存
- 连接:这里调用了
Socket
的连接,并且获得了请求和响应的缓冲区 - 请求:通过
HttpStream
写入请求读取响应
之前版本的实现
之前的版本在处理完拦截器之后,会调用getResponse()
方法,实际上包括了新版本的一部分拦截器功能
Response getResponse(Request request, boolean forWebSocket) throws IOException {
// 获得请求体,然后获得一些元数据构造请求头
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();
// 获得Content-Type字段
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
// 获得Content-Length字段或者Transfer-Encoding字段
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
// 构建HTTP引擎,重试和重定向需要重新构造
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
// 检测取消标志
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
throw e.getCause();
} catch (RouteException e) {
// 试图恢复路由错误
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
throw e.getLastConnectException();
} catch (IOException e) {
// 视图恢复IO错误
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
throw e;
} finally {
// 清理工作
if (releaseConnection) {
StreamAllocation streamAllocation = engine.close();
streamAllocation.release();
}
}
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
if (!forWebSocket) {
engine.releaseStreamAllocation();
}
return response;
}
StreamAllocation streamAllocation = engine.close();
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (!engine.sameConnection(followUp.url())) {
streamAllocation.release();
streamAllocation = null;
}
request = followUp;
engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null, response);
}
}
其中Content-Length
和Transfer-Encoding
参见HTTP 协议中的 Transfer-Encoding
实际的工作通过HttpEngine
完成,现在完全由拦截器完成
HttpEngine
HttpEngine
处理一个单独的请求响应对,按照下面的生命周期:
创建用sendRequest()
发送请求消息,一旦发送了请求再修改请求头就会出错,如果有请求体的话可以写入用readResponse()
读取响应消息,之后响应头和响应体可以被读取,所有的响应都有一个响应提输入流,尽管在一些实例中是空的
sendRequest()
方法
public void sendRequest() throws RequestException, RouteException, IOException {
if (cacheStrategy != null) {
return;
}
if (httpStream != null) {
throw new IllegalStateException();
}
Request request = networkRequest(userRequest);
// 缓存相关处理
InternalCache responseCache = Internal.instance.internalCache(client);
Response cacheCandidate = responseCache != null ? responseCache.get(request) : null;
long now = System.currentTimeMillis();
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
networkRequest = cacheStrategy.networkRequest;
cacheResponse = cacheStrategy.cacheResponse;
if (responseCache != null) {
responseCache.trackResponse(cacheStrategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
if (networkRequest == null && cacheResponse == null) {
userResponse = new Response.Builder().request(userRequest).priorResponse(stripBody(priorResponse)).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_BODY).build();
return;
}
if (networkRequest == null) {
userResponse = cacheResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).cacheResponse(stripBody(cacheResponse)).build();
userResponse = unzip(userResponse);
return;
}
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
long contentLength = OkHeaders.contentLength(request);
if (bufferRequestBody) {
// 请求体必须在传输之前缓冲
if (contentLength > Integer.MAX_VALUE) {
throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB.");
}
if (contentLength != -1) {
// 已知请求体的长度
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = new RetryableSink((int) contentLength);
} else {
// 缓存未知长度的请求体,直到整个请求体都准备好不会写入请求头
requestBodyOut = new RetryableSink();
}
} else {
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
}
success = true;
} finally {
if (!success && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
}
sendRequest()
中调用networkRequest()
构造请求
如果request
中不存在这些字段Host
/Connection
/Accept-Encoding
/User-Agent
则添加默认值
另外会添加Cookie
字段如果对应url的
cookie`不为空的话
通过connect()
发起连接
private HttpStream connect() throws RouteException, RequestException, IOException {
boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
return streamAllocation.newStream(client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis(), client.retryOnConnectionFailure(), doExtensiveHealthChecks);
}
调用了StreamAllocation.newStream()
,首先通过findHealthyConnection()
查找健康的连接,然后获得HttpStream
findConnection()
查找连接,首先看是否有存在的连接,然后查找连接池,都没有的话就创建一个并加入连接池
创建连接会生成一个RealConnection
的实例,它实现Connection
接口
连接的调用路径:RealConnection.connect()
->Platform.connectSocket()
—>Socket.connect()
Platform
显然是与底层具体实现有关,在构造的时候会通过findPlatform()
决定底层实现
readResponse()
方法
public void readResponse() throws IOException {
// 响应已经准备好了
if (userResponse != null) {
return;
}
if (networkRequest == null && cacheResponse == null) {
throw new IllegalStateException("call sendRequest() first!");
}
// 没有响应
if (networkRequest == null) {
return;
}
Response networkResponse;
if (forWebSocket) {
httpStream.writeRequestHeaders(networkRequest);
networkResponse = readNetworkResponse();
} else if (!callerWritesRequestBody) {
networkResponse = new NetworkInterceptorChain(0, networkRequest).proceed(networkRequest);
} else {
// 向Buffer中写入请求体
if (bufferedRequestBody != null && bufferedRequestBody.buffer().size() > 0) {
bufferedRequestBody.emit();
}
// 向Socket写入请求头 如果还没有的话
if (sentRequestMillis == -1) {
if (OkHeaders.contentLength(networkRequest) == -1 && requestBodyOut instanceof RetryableSink) {
long contentLength = ((RetryableSink) requestBodyOut).contentLength();
networkRequest = networkRequest.newBuilder().header("Content-Length", Long.toString(contentLength)).build();
}
httpStream.writeRequestHeaders(networkRequest);
}
// 向Socket写入请求体
if (requestBodyOut != null) {
if (bufferedRequestBody != null) {
// This also closes the wrapped requestBodyOut.
bufferedRequestBody.close();
} else {
requestBodyOut.close();
}
if (requestBodyOut instanceof RetryableSink) {
httpStream.writeRequestBody((RetryableSink) requestBodyOut);
}
}
// 读取响应
networkResponse = readNetworkResponse();
}
// 接受请求头 主要是处理Cookie
receiveHeaders(networkResponse.headers());
// 如果我们有缓存起来的响应,那么要做一个条件判断
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {
// 使用网络响应
userResponse = cacheResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).headers(combine(cacheResponse.headers(), networkResponse.headers())).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();
// 关闭响应体 释放
networkResponse.body().close();
releaseStreamAllocation();
// 更新缓存列表
InternalCache responseCache = Internal.instance.internalCache(client);
responseCache.trackConditionalCacheHit();
responseCache.update(cacheResponse, stripBody(userResponse));
userResponse = unzip(userResponse);
return;
} else {
closeQuietly(cacheResponse.body());
}
}
// 网络响应
userResponse = networkResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();
if (hasBody(userResponse)) {
// 缓存处理
maybeCache();
userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
}
}