湖畔镇

OkHttp源码分析

OkHttp是一个一个处理网络请求的开源项目,是安卓端最火热的轻量级框架,由移动支付Square公司贡献

Call

接口,准备处理的一个请求,可以被取消,它代表了一个请求/响应,不能被处理两次

public interface Call {
    //  返回原始请求
    Request request();

    //  立刻调用请求,然后阻塞直到收到响应或错误
    Response execute() throws IOException;

    //  调度让请求在未来被调用
    void enqueue(Callback responseCallback);

    //  取消请求
    void cancel();

    boolean isExecuted();

    boolean isCanceled();

    //  产生Call的工厂
    interface Factory {
        Call newCall(Request request);
    }
}

OkHttpClient

Call的工厂,Call能够被用来发送请求和接收响应

创建一个OkHttpClient实例并被所有的HTTP调用重用可以获得最好的表现,因为每个client都持有自己的连接池和线程池,重用连接和线程可以减少时延和节省内存,为每一个请求创建client浪费空闲池资源

public final OkHttpClient client = new OkHttpClient();

构造默认的OkHttpCilent

public final OkHttpClient client = new OkHttpClient.Builder().addInterceptor(new HttpLoggingInterceptor()).cache(new Cache(cacheDir, cacheSize)).build();

Builder()自定义

OkHttpClient eagerClient = client.newBuilder().readTimeout(500, TimeUnit.MILLISECONDS).build();

newBuilder()自定义,但是重用原来的OkHttpClient

public class OkHttpClient implements Cloneable, Call.Factory {

    @Override public Call newCall(Request request) {
        return new RealCall(this, request);
    }

}

实现了Call.Factory接口,调用newCall()可以产生一个Call,实际返回的是实现类RealCall

RealCall

protected RealCall(OkHttpClient client, Request originalRequest) {

}

构造函数接收两个参数OkHttpClientRequest

Request request() {
    return originalRequest;
}

所以request()返回的就是这个原始请求

@Override public Response execute() throws IOException {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    try {
        client.dispatcher().executed(this);
        Response result = getResponseWithInterceptorChain(false);
        if (result == null) throw new IOException("Canceled");
        return result;
    } finally {
        client.dispatcher().finished(this);
    }
}

同步请求,一个请求只能执行一次,否则会抛异常,通过Dispatcher来分发请求,执行请求后,通过getResponseWithInterceptorChain()获取结果

private Response getResponseWithInterceptorChain() throws IOException {

    //  除了构造client时指定的拦截器外 还添加了几个内部的
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!retryAndFollowUpInterceptor.isForWebSocket()) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(retryAndFollowUpInterceptor.isForWebSocket()));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

这个函数以及Chain的实现版本之间应该有过重构

异步请求

void enqueue(Callback responseCallback, boolean forWebSocket) {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
 }

将一个AsyncCall放进队列,里面重写了execute(),在getResponseWithInterceptorChain()获得响应后调用回调

@Override protected void execute() {
    boolean signalledCallback = false;
    try {
        Response response = getResponseWithInterceptorChain(forWebSocket);
        if (canceled) {
            signalledCallback = true;
            responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
            signalledCallback = true;
            responseCallback.onResponse(RealCall.this, response);
        }
    } catch (IOException e) {
        if (signalledCallback) {
            logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
        } else {
            responseCallback.onFailure(RealCall.this, e);
        }
    } finally {
        client.dispatcher().finished(this);
    }
}

Dispatcher

分配器

//  准备好的异步调用,将要被执行
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

//  正在执行中的异步调用
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

//  正在执行中的同步调用
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

用这三个队列管理调用,可以指定最大请求数

Interceptor

public interface Interceptor {
    Response intercept(Chain chain) throws IOException;

    interface Chain {
        Request request();

        Response proceed(Request request) throws IOException;

        Connection connection();
    }
}

拦截器,观察、修改请求和响应,典型的用来添加删除修改头部信息

RealInterceptorChain

RealInterceptorChainChain的实现类

public Response proceed(Request request, StreamAllocation streamAllocation, HttpStream httpStream, Connection connection) throws IOException {

    ......

    //  调用拦截链中的下一个拦截器
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpStream, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    ......

    return response;
}

最重要的proceed()方法的实现,OkHttpClient里有一个拦截器列表,会先用列表里这个拦截器进行拦截,传给它一个index+1Chain否则调用getResponse()HTTP请求,之前getResponse()里和HttpEngine的功能逻辑拆分到了新的内部拦截器中

RetryAndFollowUpInterceptor尝试恢复错误和之后的重定向
BridgeInterceptor首先从用户请求构造网络请求,然后处理网络调用,最后从网络响应构造用户响应
CacheInterceptor处理缓存
ConnectInterceptor打开服务器连接
CallServerInterceptor链中的最终的拦截器,它向服务器发送请求
Chain携带了请求的信息,还提供了获取响应的逻辑

拦截器是在构造的时候的自己添加上去的,一般是这样

OkHttpClient client = new OkHttpClient().newBuilder().addInterceptor(new Interceptor() {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();

        //  处理request
        Response response = chain.proceed(request);

        //  处理response
        return response;
    }
}).build();

通过关键方法Chain.proceed()获得Response,再看看上面的代码,发现在获得响应之前会依次执行拦截器

RetryAndFollowUpInterceptor

public Response intercept(Chain chain) throws IOException {

    //  在一个循环里处理首个请求和后续请求
    while (true) {

        ......

        //  交由下一个拦截器处理
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

        ......

        //  查看后续请求
        Request followUp = followUpRequest(response);

        ......
    }

}

followUpRequest()用来指出为了回应响应应该发什么请求,可能是添加认证头、重定向或者处理客户端请求超时

BridgeInterceptor

public Response intercept(Chain chain) throws IOException {

    //  构造网络请求头
    //  添加Content-Type/Content-Length/Host/Connection/Accept-Encoding等字段
    //  添加User-Agent字段
    //  添加Cookie字段
    ......

    Response networkResponse = chain.proceed(requestBuilder.build());

    //  获得网络响应 处理后返回
    //  处理Cookiew
    //  Gzip解压
    ......
}

CacheInterceptor

public Response intercept(Chain chain) throws IOException {

    //  从缓存中获取请求,处理异常状况
    ......

    networkResponse = chain.proceed(networkRequest);

    //  获得网络响应
    //  判断是使用网络响应还是缓存响应
    //  可能要更新缓存
    ......
}

ConnectInterceptor

public Response intercept(Chain chain) throws IOException {
    StreamAllocation streamAllocation = realChain.streamAllocation();
    ......
    RealConnection connection = streamAllocation.connection();
    ......
    return realChain.proceed(request, streamAllocation, httpStream, connection);
  }

调用了StreamAllocation.newStream():

首先通过findHealthyConnection()查找健康的连接,然后获得HttpStream
findConnection()查找连接,首先看是否有存在的连接,然后查找连接池,都没有的话就创建一个并加入连接池,然后connect()
连接的调用路径:RealConnection.connect()->Platform.connectSocket()—>Socket.connect()
Platform显然是与底层具体实现有关,在构造的时候会通过findPlatform()决定底层实现

CallServerInterceptor

public Response intercept(Chain chain) throws IOException {

    //  写入请求头
    httpStream.writeRequestHeaders(request);

    //  写入请求体
    ......

    //  读取响应
    Response response = httpStream.readResponseHeaders().request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();

    //  读取响应体并返回
    ......
}

StreamAllocation

这个类用来协调三个实体的关系:

  • 连接(Connections):到远程服务器的物理连接,可能建立起来很慢所以有必要能够在连接中取消
  • 流(Streams):在连接上的逻辑HTTP请求响应对,每个连接有它自己的分配限制,定义了同时能承载多少个流,HTTP/1.X可以承载一个流,SPDYHTTP/2可以承载多个
  • 调用(Calls):一个流的逻辑序列,典型的是它的初始请求和后续请求,我们倾向于保持所有一个单独调用的流在一个连接上,为了更好的表现

newStream()里面做了判断,可以返回Http1xStreamHttp2xStream,它们都实现了HttpStream接口

public interface HttpStream {

    Sink createRequestBody(Request request, long contentLength) throws IOException;

    void writeRequestHeaders(Request request) throws IOException;

    void writeRequestBody(RetryableSink requestBody) throws IOException;

    void finishRequest() throws IOException;

    Response.Builder readResponseHeaders() throws IOException;

    ResponseBody openResponseBody(Response response) throws IOException;

    void setHttpEngine(HttpEngine httpEngine);

    void cancel();
}

参考文章:HTTP/1和HTTP/2

总结

OkHttpClient通过newCall()产生Call

同步调用execute()(异步调用就是个线程池),会依次调用拦截链上的拦截器,其中包括一些内置的拦截器:

  • 自定义拦截器
  • 重试与后续请求
  • 构造请求和响应
  • 缓存
  • 连接:这里调用了Socket的连接,并且获得了请求和响应的缓冲区
  • 请求:通过HttpStream写入请求读取响应

之前版本的实现

之前的版本在处理完拦截器之后,会调用getResponse()方法,实际上包括了新版本的一部分拦截器功能

Response getResponse(Request request, boolean forWebSocket) throws IOException {

    //  获得请求体,然后获得一些元数据构造请求头
    RequestBody body = request.body();
    if (body != null) {
        Request.Builder requestBuilder = request.newBuilder();

        //  获得Content-Type字段
        MediaType contentType = body.contentType();
        if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
        }

        //  获得Content-Length字段或者Transfer-Encoding字段
        long contentLength = body.contentLength();
        if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
        } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
        }

        request = requestBuilder.build();
    }

    //  构建HTTP引擎,重试和重定向需要重新构造
    engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);

    int followUpCount = 0;
    while (true) {

        //  检测取消标志
        if (canceled) {
            engine.releaseStreamAllocation();
            throw new IOException("Canceled");
        }

        boolean releaseConnection = true;
        try {
            engine.sendRequest();
            engine.readResponse();
            releaseConnection = false;
        } catch (RequestException e) {
            throw e.getCause();
        } catch (RouteException e) {
            //  试图恢复路由错误
            HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
            if (retryEngine != null) {
                releaseConnection = false;
                engine = retryEngine;
                continue;
            }
            throw e.getLastConnectException();
        } catch (IOException e) {
            //  视图恢复IO错误
            HttpEngine retryEngine = engine.recover(e, null);
            if (retryEngine != null) {
                releaseConnection = false;
                engine = retryEngine;
                continue;
            }
            throw e;
        } finally {
            //  清理工作
            if (releaseConnection) {
                StreamAllocation streamAllocation = engine.close();
                streamAllocation.release();
            }
        }

        Response response = engine.getResponse();
        Request followUp = engine.followUpRequest();

        if (followUp == null) {
            if (!forWebSocket) {
                engine.releaseStreamAllocation();
            }
            return response;
        }

        StreamAllocation streamAllocation = engine.close();

        if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }

        if (!engine.sameConnection(followUp.url())) {
            streamAllocation.release();
            streamAllocation = null;
        }

        request = followUp;
        engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null, response);
    }
}

其中Content-LengthTransfer-Encoding参见HTTP 协议中的 Transfer-Encoding

实际的工作通过HttpEngine完成,现在完全由拦截器完成

HttpEngine

HttpEngine处理一个单独的请求响应对,按照下面的生命周期:

  1. 创建
  2. sendRequest()发送请求消息,一旦发送了请求再修改请求头就会出错,如果有请求体的话可以写入
  3. readResponse()读取响应消息,之后响应头和响应体可以被读取,所有的响应都有一个响应提输入流,尽管在一些实例中是空的

sendRequest()方法

public void sendRequest() throws RequestException, RouteException, IOException {

    if (cacheStrategy != null) {
        return;
    }

    if (httpStream != null) {
        throw new IllegalStateException(); 
    }

    Request request = networkRequest(userRequest);

    //  缓存相关处理
    InternalCache responseCache = Internal.instance.internalCache(client);
    Response cacheCandidate = responseCache != null ? responseCache.get(request) : null;

    long now = System.currentTimeMillis();
    cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
    networkRequest = cacheStrategy.networkRequest;
    cacheResponse = cacheStrategy.cacheResponse;

    if (responseCache != null) {
        responseCache.trackResponse(cacheStrategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
        closeQuietly(cacheCandidate.body());
    }

    if (networkRequest == null && cacheResponse == null) {
        userResponse = new Response.Builder().request(userRequest).priorResponse(stripBody(priorResponse)).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_BODY).build();
    return;
    }

    if (networkRequest == null) {
        userResponse = cacheResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).cacheResponse(stripBody(cacheResponse)).build();
        userResponse = unzip(userResponse);
        return;
    }

    boolean success = false;

    try {
        httpStream = connect();
        httpStream.setHttpEngine(this);

        long contentLength = OkHeaders.contentLength(request);
        if (bufferRequestBody) {
            //  请求体必须在传输之前缓冲
            if (contentLength > Integer.MAX_VALUE) {
                throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB.");
            }

            if (contentLength != -1) {
                //  已知请求体的长度
                httpStream.writeRequestHeaders(networkRequest);
                requestBodyOut = new RetryableSink((int) contentLength);
            } else {
                //  缓存未知长度的请求体,直到整个请求体都准备好不会写入请求头
                requestBodyOut = new RetryableSink();
            }
        } else {
          httpStream.writeRequestHeaders(networkRequest);
          requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
        }

        success = true;
    } finally {
        if (!success && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
        }
    }
}

sendRequest()中调用networkRequest()构造请求
如果request中不存在这些字段Host/Connection/Accept-Encoding/User-Agent则添加默认值
另外会添加Cookie字段如果对应url的cookie`不为空的话

通过connect()发起连接

private HttpStream connect() throws RouteException, RequestException, IOException {
    boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
    return streamAllocation.newStream(client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis(), client.retryOnConnectionFailure(), doExtensiveHealthChecks);
}

调用了StreamAllocation.newStream(),首先通过findHealthyConnection()查找健康的连接,然后获得HttpStream

findConnection()查找连接,首先看是否有存在的连接,然后查找连接池,都没有的话就创建一个并加入连接池

创建连接会生成一个RealConnection的实例,它实现Connection接口

连接的调用路径:RealConnection.connect()->Platform.connectSocket()—>Socket.connect()

Platform显然是与底层具体实现有关,在构造的时候会通过findPlatform()决定底层实现

readResponse()方法

public void readResponse() throws IOException {

    //  响应已经准备好了
    if (userResponse != null) {
      return;
    }

    if (networkRequest == null && cacheResponse == null) {
      throw new IllegalStateException("call sendRequest() first!");
    }

    //  没有响应
    if (networkRequest == null) {
      return;
    }

    Response networkResponse;

    if (forWebSocket) {
        httpStream.writeRequestHeaders(networkRequest);
        networkResponse = readNetworkResponse();
    } else if (!callerWritesRequestBody) {
        networkResponse = new NetworkInterceptorChain(0, networkRequest).proceed(networkRequest);
    } else {

        //  向Buffer中写入请求体
        if (bufferedRequestBody != null && bufferedRequestBody.buffer().size() > 0) {
            bufferedRequestBody.emit();
        }

        //  向Socket写入请求头 如果还没有的话
        if (sentRequestMillis == -1) {
            if (OkHeaders.contentLength(networkRequest) == -1 && requestBodyOut instanceof RetryableSink) {
                long contentLength = ((RetryableSink) requestBodyOut).contentLength();
                networkRequest = networkRequest.newBuilder().header("Content-Length", Long.toString(contentLength)).build();
            }
            httpStream.writeRequestHeaders(networkRequest);
        }

        //  向Socket写入请求体 
        if (requestBodyOut != null) {
            if (bufferedRequestBody != null) {
              // This also closes the wrapped requestBodyOut.
                bufferedRequestBody.close();
            } else {
                requestBodyOut.close();
            }
            if (requestBodyOut instanceof RetryableSink) {
                httpStream.writeRequestBody((RetryableSink) requestBodyOut);
            }
        }

        //  读取响应
        networkResponse = readNetworkResponse();
    }

    //  接受请求头 主要是处理Cookie
    receiveHeaders(networkResponse.headers());

    //  如果我们有缓存起来的响应,那么要做一个条件判断
    if (cacheResponse != null) {

        if (validate(cacheResponse, networkResponse)) {
            //  使用网络响应
            userResponse = cacheResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).headers(combine(cacheResponse.headers(), networkResponse.headers())).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();

            //  关闭响应体 释放
            networkResponse.body().close();
            releaseStreamAllocation();

            //  更新缓存列表
            InternalCache responseCache = Internal.instance.internalCache(client);
            responseCache.trackConditionalCacheHit();
            responseCache.update(cacheResponse, stripBody(userResponse));
            userResponse = unzip(userResponse);
            return;
        } else {
            closeQuietly(cacheResponse.body());
        }
    }

    //  网络响应
    userResponse = networkResponse.newBuilder().request(userRequest).priorResponse(stripBody(priorResponse)).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();

    if (hasBody(userResponse)) {
        //  缓存处理
        maybeCache();
        userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
    }
}
分享