烦恼一般都是想太多了。

0%

Retrofit-OkHttp请求过程-源码阅读

本来是想要看一下封装一下下载的,结果一不小心就走进了读源码的境地。所以就这样吧。

文前

  • Retrofit 是一个类型安全的 Http 客户端,事实上其是对 OkHttp 的封装,让我们可以用注解,配置的形式来进行 Http 请求。
  • OkHttp 才是一个真正的 Http Client 实现。
  • Okio 这是一个 IO 库,为了补充 java.io java.nio 的不足。

示例

Retrofit 的简单执行过程如下:

  1. Retrofit 根据接口构造一个 Call
  2. Call 调用 enqueur 或者 execute 方法,然后开始请求。
  3. 返回结果。

而我们常用的方式跟下面差不多:

Retrofit retrofit = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
String REQ_URL = "https://storage.qoo-app.com/apk/com.qooapp.qoohelper/com.qooapp.qoohelper-20191119170001.apk";
Apis apis = retrofit.create(Apis.class);
Call<ResponseBody> call = apis.download(REQ_URL);
call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {

}

@Override
public void onFailure(Call<ResponseBody> call, Throwable t) {

}
});

这样我们在 onResponse 方法中就能获取到下载回来的内容了。

Retrofit

create()

Retrofit 会根据我们定义的 API 接口来生成代理类,调用每个接口中的方法,就会生成一个 ServiceMethod。

public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service);
if (validateEagerly) {
eagerlyValidateMethods(service);
}
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
private final Object[] emptyArgs = new Object[0];

@Override public @Nullable Object invoke(Object proxy, Method method,
@Nullable Object[] args) throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}

这个方法,返回的是一代理类,凡调用其代理方法的,都会调用到其 InvocationHandler 上去。而我们的 InvocationHandler 就会 API 接口中定义的方法,来生成对应的 ServiceMethod。

loadServiceMethod

ServiceMethod<?> loadServiceMethod(Method method) {
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;

synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = ServiceMethod.parseAnnotations(this, method);
serviceMethodCache.put(method, result);
}
}
return result;
}

其会经历两个阶段的注解分析:

  1. ServiceMethod.parseAnnotations(Retrofit retrofit, Method method)
  2. HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory)。

我们逐步来看这两个方法。

ServiceMethod

parseAnnotations

这个方法比较简单。

static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);

Type returnType = method.getGenericReturnType();
if (Utils.hasUnresolvableType(returnType)) {
throw methodError(method,
"Method return type must not include a type variable or wildcard: %s", returnType);
}
if (returnType == void.class) {
throw methodError(method, "Service methods cannot return void.");
}

return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}

主要干两个事情:

  1. 根据我们调用的方法上的注解,参数上的注解来建立相关的请求信息 RequestFactory。
  2. 检查我们的 API 中设置的返回类型是不是无法解析的,且不能为 void。
private final Method method;
private final HttpUrl baseUrl;
final String httpMethod;
private final @Nullable String relativeUrl;
private final @Nullable Headers headers;
private final @Nullable MediaType contentType;
private final boolean hasBody;
private final boolean isFormEncoded;
private final boolean isMultipart;
private final ParameterHandler<?>[] parameterHandlers;
final boolean isKotlinSuspendFunction;

HttpServiceMethod

HttpServiceMethod 级承自 ServiceMethod 的一个实现。

parseAnnotations

根据 方法,RequestFactory 来建立 HttpServiceMethod。在不考虑kotin 支持的情况下,其过程如下:

  1. 根据注解,返回类型来建立 CallAdapter。
  2. 检查返回的泛型参数不能是:okhttp3.Response.class 和 Response.class,如果请求的方法是 HEAD ,那么返回的泛型参数只能是 Void。
  3. 最后建立一个 CallAdapted(继承自 HttpServiceMethod)。

createCallAdapter

根据返回类型和注解来建立。

private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
try {
//noinspection unchecked
return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(method, e, "Unable to create call adapter for %s", returnType);
}

它的左右,就是对我们的请求做一个适配,比如指定不同的 executor ,我们指定的 RxJava2CallAdapter则会将我们的请求转换为一个 RxJava 订阅。

createResponseConverter

这个就是找到一个将返回的字符串转换为 Java 实体类的 Converter,比如我们的 GsonConverter,默认情况下,我们的 converterFactories 是 BuiltInConverters,不过对于我们来说,我们增加了一个 GsonConverterFactory。

private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(
Retrofit retrofit, Method method, Type responseType) {
Annotation[] annotations = method.getAnnotations();
try {
return retrofit.responseBodyConverter(responseType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(method, e, "Unable to create converter for %s", responseType);
}
}

CallAdapted

OK,现在我们有了 CallAdapter,RequestFactory,callFactory,responseConverter 来,就可以建立一个适配过后的请求了:

static final class CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT> {
private final CallAdapter<ResponseT, ReturnT> callAdapter;

CallAdapted(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
Converter<ResponseBody, ResponseT> responseConverter,
CallAdapter<ResponseT, ReturnT> callAdapter) {
super(requestFactory, callFactory, responseConverter);
this.callAdapter = callAdapter;
}

@Override protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
return callAdapter.adapt(call);
}
}

invoke

当建立了 ServiceMethod 后,就会用反射的时候进行调用了。

对于我们的 HttpServiceMethod 来讲,其只是建立一个 OkHttpCall ,然后简单的调用一下 adapt 方法进行适配而已,从这里,就是 Retrofit 与 OkHttp 开始交互的地方了:

@Override final @Nullable ReturnT invoke(Object[] args) {
Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
return adapt(call, args);
}

static final class CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT> {
private final CallAdapter<ResponseT, ReturnT> callAdapter;

CallAdapted(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
Converter<ResponseBody, ResponseT> responseConverter,
CallAdapter<ResponseT, ReturnT> callAdapter) {
super(requestFactory, callFactory, responseConverter);
this.callAdapter = callAdapter;
}

@Override protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
return callAdapter.adapt(call);
}
}

最终还是走到了 calladapter.adapt 上去。

DefaultCallAdapterFactory

在建立 Retrofit 实力的时候,会为 Retrofit 所使用的 OkHttpClient 设置默认的:callFactory(这里就是OkHttpClient),callAdapterFactories,converterFactories,callbackExecutor。

对于默认的 DefaultCallAdapterFactory 会将我们的 OkHttpCall 给适配成一个 ExecutorCallbackCall:

@Override public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalArgumentException(
"Call return type must be parameterized as Call<Foo> or Call<? extends Foo>");
}
final Type responseType = Utils.getParameterUpperBound(0, (ParameterizedType) returnType);

final Executor executor = Utils.isAnnotationPresent(annotations, SkipCallbackExecutor.class)
? null
: callbackExecutor;

return new CallAdapter<Object, Call<?>>() {
@Override public Type responseType() {
return responseType;
}

@Override public Call<Object> adapt(Call<Object> call) {
return executor == null
? call
: new ExecutorCallbackCall<>(executor, call);
}
};
}

OkHttpCall

这个类是对 okhttp3.Call 的封装。

OK,我们调用 API 的方法,会建立一个 OkHttpCall 然后被 calldapter 给 adapter 一下,默认一下,得到的是一个 ExecutorCallbackCall。其实现了 enqueue 和 execute 两个方法:

@Override public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");

delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
// Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation.
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}

@Override public void onFailure(Call<T> call, final Throwable t) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(ExecutorCallbackCall.this, t);
}
});
}
});
}

@Override public boolean isExecuted() {
return delegate.isExecuted();
}

@Override public Response<T> execute() throws IOException {
return delegate.execute();
}

本质上,执行的是 OkHttpCall 的 enqueue 方法,然后在 callbackExecutor 内进行回调

enqueue

这个方法,简而言之,就是通过参数来建立一个 okhttp3.Call 。

@Override public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");

okhttp3.Call call;
Throwable failure;

synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;

call = rawCall;
failure = creationFailure;
if (call == null && failure == null) {
try {
call = rawCall = createRawCall();
} catch (Throwable t) {
throwIfFatal(t);
failure = creationFailure = t;
}
}
}

if (failure != null) {
callback.onFailure(this, failure);
return;
}

if (canceled) {
call.cancel();
}

call.enqueue(new okhttp3.Callback() {
@Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse) {
Response<T> response;
try {
response = parseResponse(rawResponse);
} catch (Throwable e) {
throwIfFatal(e);
callFailure(e);
return;
}

try {
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
throwIfFatal(t);
t.printStackTrace(); // TODO this is not great
}
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
callFailure(e);
}

private void callFailure(Throwable e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
throwIfFatal(t);
t.printStackTrace(); // TODO this is not great
}
}
});
}

createRawCall

private okhttp3.Call createRawCall() throws IOException {
okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
if (call == null) {
throw new NullPointerException("Call.Factory returned null.");
}
return call;
}

我们知道,我们默认的是 callFactory 其实就是 OkHttpClient。

okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}

那么终于走到了 okhttp了。

OkHttpClient

newRealCall

RealCall 是对 okhttp3.Call 的实现。

@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}

RealCall.enqueue

将我们建立的 RealCall 放入 dispatcher 队列中,由 dispatcher 进行分发。

@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

Dispatcher.enqueue

针对异步调用,将我们的回调 responseCallback 封装到一个 AsyncCall 中,以便能在 Executor 内进行执行。

void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}

private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();

if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}

return isRunning;
}

AsyncCall.executeOn

在 ExecutorService 进行制定,ExecutorService 反过来回调 AsyncCall.execute 方法。

  void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

RealCall.getResponseWithInterceptorChain

于是,在这里,我们就开始进行请求的逻辑了。首先,建立一个 InterceptorChain,然后让我们的 originalRequest 在这个 Chain 上遍历进行处理,得出最终的返回。

Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);
}

重点在于 RealInterceptorChain 中,每次进行处理的时候,都会往后执行拦截器。

RealInterceptorChain.process

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}

关键处在于:

RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

在执行当前的拦截器的时候,还会获得下一个拦截器的信息(事实上应该描述为拦截器 chain 本身,止不过下一个需要处理的拦截器的索引 index 不同。)

所以,我们按序添加的拦截器,会按序进行执行,但是却倒序返回,最终,我们最后一个添加的 CallServerInterceptor 第一个先进行执行。

更确切的应该是说:

每个拦截器会做他该做的事情,比如打开套接字连接,然后就等待后一个拦截器进行处理后返回。

Interceptors

我们会添加几个默认的 Interceptor 。

RetryAndFollowUpInterceptor

构造一个 StreamAllocation 流分配器。这个对象和三个概念相关:

  • Connections 物理套接字。
  • Streams HTTP请求响应对,其建立在 Connections 之上。每个Connections 都有其资源限制,定义了同时可进行多少 Streams。 HTTP/1.x 一个连接只能携带一个流,而 HTTP/2.x 可以多个流。
  • Calls 多个 Streams 的逻辑序列,典型的是一个初始化的请求和后续的请求。我们会偏向于在一个 Connections 上对一个 Call 的所有流进行传输。
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 下一个拦截器
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// RealCall
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response;
boolean releaseConnection = true;
try {
// 先执行下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp;
try {
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}

if (followUp == null) {
streamAllocation.release();
return response;
}

closeQuietly(response.body());

if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}

if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

BridgeInterceptor

从一个用户请求构造一个网络请求,然后调用网络进行访问,最终从网络响应构造一个用户的响应数据。

@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

CacheInterceptor

缓存请求和响应。

@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

ConnectInterceptor

从连接池找到一个连接,然后分配一个 Stream (HTTP连接)。

intercept

@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

newStream

从 连接池查找 Connection。如果找不到,就会进行新的连接。是使用 java.net.Socket 来进行连接的。

public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}


private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");

// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}

if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}

// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}

synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}

if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}

// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}

// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;

// Pool the connection.
Internal.instance.put(connectionPool, result);

// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

eventListener.connectionAcquired(call, result);
return result;
}

RealConnection.newCodec

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}

从连接池上找到一个 Connection,然后在上面打开一个 HTTP 连接 Stream。

source/sink 就代表了, HTTP 的读写端点。

NetworkInterceptors

这个由我们在初始化 OkHttpClient 的时候添加,如:

new OkHttpClient().addInterceptor();

CallServerInterceptor

那么现在,套接字连接打开, HTTP 连接建立,就该发送数据了。

这里我们需要关注一点就行了,那就是我们的 Response 会有一个 body,这个 body 是从 HttpCodec 来的。

intercept

@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}

if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}

httpCodec.finishRequest();

if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}

Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);

response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);

if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

Http1CodeC

这个类有几个内部类,内部类都可以访问 Http1CodeC 的 source/sink,所以会看到很多 ResponseBody 没有 source/sink 的情况。就是因为内部类可以访问外部类的成员。

openResponseBody

@Override public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");

if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}

if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}

long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}

return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}

最终数据又会逐个的通过拦截器进行返回,然后返回到我们的 CallBack.

Retrofit.BuiltInConverters

Retrofit 默认的 BuiltInConverters 几个 Converter。

converterFactories.add(new BuiltInConverters());
converterFactories.addAll(this.converterFactories);
converterFactories.addAll(platform.defaultConverterFactories());

根据方法上是否有 Streaming 注解,来返回不同的 Converter。

@Override public @Nullable Converter<ResponseBody, ?> responseBodyConverter(
Type type, Annotation[] annotations, Retrofit retrofit) {
if (type == ResponseBody.class) {
return Utils.isAnnotationPresent(annotations, Streaming.class)
? StreamingResponseBodyConverter.INSTANCE
: BufferingResponseBodyConverter.INSTANCE;
}
if (type == Void.class) {
return VoidResponseBodyConverter.INSTANCE;
}
if (checkForKotlinUnit) {
try {
if (type == Unit.class) {
return UnitResponseBodyConverter.INSTANCE;
}
} catch (NoClassDefFoundError ignored) {
checkForKotlinUnit = false;
}
}
return null;
}

StreamingResponseBodyConverter

当y在 Streaming 注解的时候返回。

static final class StreamingResponseBodyConverter
implements Converter<ResponseBody, ResponseBody> {
static final StreamingResponseBodyConverter INSTANCE = new StreamingResponseBodyConverter();

@Override public ResponseBody convert(ResponseBody value) {
return value;
}
}

BufferingResponseBodyConverter

static final class BufferingResponseBodyConverter
implements Converter<ResponseBody, ResponseBody> {
static final BufferingResponseBodyConverter INSTANCE = new BufferingResponseBodyConverter();

@Override public ResponseBody convert(ResponseBody value) throws IOException {
try {
// Buffer the entire body to avoid future I/O.
return Utils.buffer(value);
} finally {
value.close();
}
}
}

可以看到,当有 Streaming 注解的时候, Retrofit 不会尝试将所有的 Body 进行缓存。

OkHttpCall.parseResponse

流程回到我们的 OkHttpCall,这个时候,我们的 OkHttp3 返回的是一个 okhttp3.Response,我们需要将他转换为我们想要的格式,而我们的 BuiltInConverters 就派上来用场了。

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();

// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();

int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}

if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}

ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
T body = responseConverter.convert(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}

ResponseBody

事实上,对于 ResponseBody 数据是没有独到内存中的,还是内核内存中,只有当我们进行读取的时候,才会从套接字上拿数据出来。