目录
前言
前面两篇文章介绍了OKHttp的《》和《》,大概的流程都介绍了,但上一篇只介绍了同步的,因此异步的也单独介绍一下,因为这个部分线程管理的思路很不错。
正文
OKHttp同步请求和异步请求唯一区别点是异步请求用了线程池管理,其的都是一样的,因此这里主要介绍的也就是Dispatcher类。
Dispatcher
Dispatcher是核心了,涉及异步和同步请求,代码比较少,也容易理解。
线程最大请求数
private int maxRequests = 64; # 修改最大请求数 public void setMaxRequests(int maxRequests) { if (maxRequests < 1) { throw new IllegalArgumentException("max < 1: " + maxRequests); } synchronized (this) { this.maxRequests = maxRequests; } promoteAndExecute(); } # 获取最大请求数 public synchronized int getMaxRequests() { return maxRequests; }
主机最大请求数
private int maxRequestsPerHost = 5; # 修改每个主机请求个数 public void setMaxRequestsPerHost(int maxRequestsPerHost) { if (maxRequestsPerHost < 1) { throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost); } synchronized (this) { this.maxRequestsPerHost = maxRequestsPerHost; } promoteAndExecute(); } # 获取每个主机最大请求数 public synchronized int getMaxRequestsPerHost() { return maxRequestsPerHost; }
线程池
private @Nullable ExecutorService executorService; # 消息池的创建 public synchronized ExecutorService executorService() { if (executorService == null) { /* * int corePoolSize 核心线程大小 * int maximumPoolSize 线程池最大容量大小 * long keepAliveTime 线程空闲时,线程存活的时间 * TimeUnit unit 时间单位 * BlockingQueue<Runnable> workQueue 任务队列。一个阻塞队列 * threadFactory : 新建线程工厂 */ executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
请求的队列
# 准备好的请求任务队列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); # 正在执行的异步请求任务队列 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); # 同步请求任务队列[用于记录同步任务的] private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
异步请求Demo
OkHttpClient client = new OkHttpClient.Builder().build(); Request request = new Request.Builder().url("https://www.biumall.com/").build(); Call call = client.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(@NonNull Call call, @NonNull IOException e) { //失败 子线程中 } @Override public void onResponse(@NonNull Call call, @NonNull Response response) { //成功 子线程中,可以IO操作 } });
OkHttpClient,Request和Call的这里略过,上一篇介绍过,也就是初始化和赋值而已。
enqueue
# RealCall.java @Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); //[重点]调用了Dispatcher client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
这里对responseCallback封装成AsyncCall,继承于NamedRunnable,而NamedRunnable就是对Runnable的封装。
enqueue
# Dispatcher.java void enqueue(AsyncCall call) { synchronized (this) { //添加入准备好的异步队列 readyAsyncCalls.add(call); } // 刷新和执行任务 promoteAndExecute(); }
promoteAndExecute
private boolean promoteAndExecute() { List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //查看线程运行的任务是否大于限定值 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. //查看主机请求的任务是否大于限定值 if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity. // 如果没有达到上面条件,就从准备列表中移除。 i.remove(); //添加到临时的执行任务列表中 executableCalls.add(asyncCall); //正在执行任务列表也需要添加 runningAsyncCalls.add(asyncCall); } //判断是否有任务执行 isRunning = runningCallsCount() > 0; } //查看上面满足条件的需要执行的任务 for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); //调用AsyncCall的封装的executeOn方法,这里传入了线程池对象。 asyncCall.executeOn(executorService()); } return isRunning; }
executeOn
AsyncCall类定义在在RealCall.java中
void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { //使用线程池执行,其实就是执行NamedRunnable.run() executorService.execute(this); success = true; } catch (RejectedExecutionException e) { //略 } finally { //这里是失败了才走这,成功了,下面会有除了的。 if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } }
run
NamedRunnable中封装了
@Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { //调用的是AsyncCall的execute execute(); } finally { Thread.currentThread().setName(oldName); } }
execute
@Override protected void execute() { boolean signalledCallback = false; timeout.enter(); try { //[重点],这里进入了就跟同步一样的 // 把拦截器穿在一起,并返回处理的结果。 Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { e = timeoutExit(e); if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }
参考文章
《
© 版权声明