JUC线程池服务ExecutorService接口实现源码分析
之前的一篇文章 JUC线程池ThreadPoolExecutor源码分析 深入分析了JUC线程池的源码实现,特别对 Executor#execute()
接口的实现做了行级别的源码分析。这篇文章主要分析一下线程池扩展服务 ExecutorService
接口的实现源码,同时会重点分析 Future
的底层实现。 ThreadPoolExecutor
和其抽象父类 AbstractExecutorService
的源码从JDK8到JDK11基本没有变化,本文编写的时候使用的是JDK11,由于 ExecutorService
接口的定义在JDK[8,11]都没有变化,本文的分析适用于这个JDK版本范围的任意版本。最近尝试找 Hexo
可以渲染 Asciidoc
的插件,但是没有找到,于是就先移植了 Asciidoc
中的五种 Tip
。
ExecutorService接口简介
ExecutorService
接口是线程池扩展功能服务接口,它的定义如下:
public interface ExecutorService extends Executor { // 停止线程池 void shutdown(); // 立即停止线程池,返回尚未执行的任务列表 List shutdownNow(); // 线程池是否停止 boolean isShutdown(); // 线程池是否终结 boolean isTerminated(); // 等待线程池终结 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable类型任务 Future submit(Callable task); // 提交Runnable类型任务,预先知道返回值 Future submit(Runnable task, T result); // 提交Runnable类型任务,对返回值无感知 Future submit(Runnable task); // 永久阻塞 - 提交和执行一个任务列表的所有任务 List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException; // 带超时阻塞 - 提交和执行一个任务列表的所有任务 List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和执行一个任务列表的某一个任务 T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException; // 带超时阻塞 - 提交和执行一个任务列表的某一个任务 T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ExecutorService
继承自 Executor
,主要提供了线程池的关闭、状态查询查询、可获取返回值的任务提交、整个任务列表或者执行任务列表中任意一个任务(返回执行最快的任务的结果)等功能。
Future实现的通俗原理
ExecutorService
接口的扩展方法都是返回 Future
相关的实例。 java.util.concurrent.Future
(中文翻译就是未来,还是挺有意思的), 代表着一次异步计算的结果 ,它提供了检查计算是否已经完成、等待计算完成、获取计算结果等一系列方法。笔者之前强调过:线程池 ThreadPoolExecutor
的顶级接口 Executor
只提供了一个无状态的返回值类型为 void
的 execute(Runnable command)
方法,无法感知异步任务执行的完成时间和获取任务计算结果。如果我们需要感知异步任务执行的返回值或者计算结果,就必须提供带返回值的接口方法去承载计算结果的操作。这些方法上一节已经介绍过,而 Future
就是一个担任了承载计算结果(包括结果值、状态、阻塞等待获取结果操作等)的工具。这里举一个模拟 Future
实现过程的例子,例子是伪代码和真实代码的混合实现,不需要太较真。
首先,假设我们定义了一个动作函数式接口 Action
:
// 带泛型的动作接口,可以返回一个泛型结果 public interface Action{ V doAction(); }
我们可以尝试实现一下 Action
接口:
// 假设1个动作做的是一个十分复杂的运算,返回一个BigDecimal类型的结果 Action action1 = () -> { // 模拟随机耗时 sleep(x秒); return BigDecimal.valueOf(result); }; // 假设1个动作做的是制作一个面包的过程,返回一个Bread面包实例 Action action2 = () -> { // 模拟随机耗时 sleep(x秒); return new Bread(); };
由于 Action
没有实现 Runnable
接口,上面的两个动作无法通过 Executor#execute()
方法提交异步任务,所以我们需要添加一个适配器 ActionAdapter
:
public class ActionAdapter implements Runnable { private Action action; private ActionAdapter(Action action) { this.action = action; } public static ActionAdapter newActionAdapter(Action action) { return new ActionAdapter(action); } @Override public void run() { action.doAction(); } }
这里只做了简单粗暴的适配,虽然可以提交到线程池中执行,但是功能太过简陋。很多时候,我们还需要添加任务执行状态判断和获取结果的功能,于是新增一个接口 ActionFuture
:
public interface ActionFuture extends Runnable{ V get() throws Exception; boolean isDone(); }
然后 ActionAdapter
实现 ActionFuture
接口,内部添加简单的状态控制:
public class ActionAdapter implements Runnable, ActionFuture { private static final int NEW = 0; private static final int DONE = 1; private int state; private final Action action; private Object result; private ActionAdapter(Action action) { this.action = action; this.state = NEW; } public static ActionAdapter newActionAdapter(Action action) { return new ActionAdapter(action); } @Override public void run() { try { result = action.doAction(); } catch (Throwable e) { result = e; } finally { state = DONE; } } @Override public V get() throws Exception{ while (state < DONE){ // 这个等待方法没有实现,只是表明逻辑 currentThreadWaitForResult(); } if (result instanceof Throwable){ throw new ExecutionException((Throwable) result); }else { return (V) result; } } @Override public boolean isDone() { return state == DONE; } }
这里有个技巧是用 Object
类型的对象存放 Action
执行的结果或者抛出的异常实例,这样可以在 ActionFuture#get()
方法中进行判断和处理。最后一步,依赖 Executor#execute()
新增一个提交异步任务的方法:
public class ActionPool { private final Executor executor; public ActionPool(Executor executor) { this.executor = executor; } public ActionFuture submit(Action action) { ActionFuture actionFuture = ActionAdapter.newActionAdapter(action); executor.execute(actionFuture); return actionFuture; } public static void main(String[] args) throws Exception{ ActionPool pool = new ActionPool(Executors.newSingleThreadExecutor()); Action action1 = () -> { // 模拟随机耗时 sleep(x秒); return BigDecimal.valueOf(result); }; pool.submit(action1); Action action2 = () -> { // 模拟随机耗时 sleep(x秒); return new Bread(); }; pool.submit(action2); } }
上面例子提到的虚拟核心组件,在 JUC
包中有对应的实现(当时,JUC包对逻辑和状态控制会比虚拟例子更加严谨),对应关系如下:
虚拟组件 | JUC中的组件 |
---|---|
Action |
Callable |
ActionFuture |
RunnableFuture |
ActionAdapter |
FutureTask |
ActionPool |
ExecutorService(ThreadPoolExecutor) |
其中大部分实现逻辑都由 FutureTask
和 ThreadPoolExecutor
的抽象父类 AbstractExecutorService
承担,下面会重点分析这两个类核心功能的源码实现。
Tip 实际上,Future的实现使用的是Promise模式,具体可以查阅相关的资料。 |
FutureTask源码实现
提供回调的 Runnable
类型任务实际最终都会包装为 FutureTask
再提交到线程池中执行,而 FutureTask
是 Runnable
、 Future
和 Callable
三者的桥梁。先看 FutureTask
的类继承关系:
利用接口可以多继承的特性, RunnableFuture
接口继承自 Runnable
和 Future
接口:
public interface RunnableFuture extends Runnable, Future { void run(); } @FunctionalInterface public interface Runnable { public abstract void run(); } public interface Future { // 取消,mayInterruptIfRunning用于控制是否中断,实际上这个方法并不能终止已经提交的任务,后面会详细说明 boolean cancel(boolean mayInterruptIfRunning); // 是否取消 boolean isCancelled(); // 是否完成,包括正常和异常的情况 boolean isDone(); // 永久阻塞获取结果,响应中断 V get() throws InterruptedException, ExecutionException; // 带超时的阻塞获取结果,响应中断 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而 FutureTask
实现了 RunnableFuture
接口,本质就是实现 Runnable
和 Future
接口的方法。先看 FutureTask
的重要属性:
// 状态 private volatile int state; // 初始化状态 private static final int NEW = 0; // 完成中状态 private static final int COMPLETING = 1; // 正常情况下的完成状态 private static final int NORMAL = 2; // 异常情况下的完成状态 private static final int EXCEPTIONAL = 3; // 取消状态 private static final int CANCELLED = 4; // 中断中状态 private static final int INTERRUPTING = 5; // 已中断状态 private static final int INTERRUPTED = 6; // 底层的Callable实现,执行完毕后需要置为null private Callable callable; // 输出结果,如果是正常执行完成,get()方法会返回此结果,如果是异常执行完成,get()方法会抛出outcome包装为ExecutionException的异常 private Object outcome; // 真正的执行Callable对象的线程实例,运行期间通过CAS操作此线程实例 private volatile Thread runner; // 等待线程集合,Treiber Stack实现 private volatile WaitNode waiters; // 下面是变量句柄,底层是基于Unsafe实现,通过相对顶层的操作原语,如CAS等 private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class ensureLoaded = LockSupport.class; } // ... 省略其他代码
上面的主要属性中,有两点比较复杂,但却是最重要的:
-
FutureTask
生命周期的状态管理或者跃迁。 - 等待(获取结果)线程集合
WaitNode
基于Treiber Stack
实现,需要彻底弄清楚Treiber Stack
的工作原理。
FutureTask的状态管理
FutureTask
的内建状态包括了七种,也就是属性 state
有七种可选状态值,总结成表格如下:
状态 | 状态值 | 描述 |
---|---|---|
NEW |
0 | 初始化状态, FutureTask 实例创建时候在构造函数中标记为此状态 |
COMPLETING |
1 | 完成中状态,这个是中间状态,执行完成后设置 outcome 之前标记为此状态 |
NORMAL |
2 | 正常执行完成,通过调用 get() 方法能够获取正确的计算结果 |
EXCEPTIONAL |
3 | 异常执行完成,通过调用 get() 方法会抛出包装后的 ExecutionException 异常 |
CANCELLED |
4 | 取消状态 |
INTERRUPTING |
5 | 中断中状态,执行线程实例 Thread#interrupt() 之前会标记为此状态 |
INTERRUPTED |
6 | 中断完成状态 |
这些状态之间的跃迁流程图如下:
每一种状态跃迁都是由于调用或者触发了某个方法,下文的一个小节会分析这些方法的实现。
等待线程集合数据结构Treiber Stack的原理
Treiber Stack
,中文翻译是 驱动栈 ,听起来比较怪。实际上, Treiber Stack
算法是 R. Kent Treiber
在其1986年的论文 Systems Programming: Coping with Parallelism
中首次提出,这种算法提供了一种 可扩展的无锁栈 ,基于细粒度的并发原语 CAS(Compare And Swap)
实现。笔者并没有花时间去研读 Treiber
的论文,因为在 Doug Lea
大神参与编写的《Java Concurrency in Practice(Java并发编程实战)》中的第 15.4.1
小节中有简单分析非阻塞算法中的非阻塞栈。
在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更加复杂。创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。下面的 ConcurrentStack
是基于Java语言实现的 Treiber
算法:
public class ConcurrentStack { private AtomicReference<Node> top = new AtomicReference(); public void push(E item) { Node newHead = new Node(item); Node oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while (!top.compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if (null == oldHead) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; } private static class Node { final E item; Node next; Node(E item) { this.item = item; } } }
ConcurrentStack
是一个栈,它是由 Node
元素构成的一个链表,其中栈顶作为根节点,并且每个元素都包含了一个值以及指向下一个元素的链接。 push()
方法创建一个新的节点,该节点的 next
域指向了当前的栈顶,然后通过 CAS
把这个新节点放入栈顶。如果在开始插入节点时,位于栈顶的节点没有发生变化,那么 CAS
就会成功,如果栈顶节点发生变化(例如由于其他线程在当前线程开始之前插入或者移除了元素),那么 CAS
就会失败,而 push()
方法会根据栈的当前状态来更新节点(其实就是 while
循环会进入下一轮),并且再次尝试。无论哪种情况,在 CAS
执行完成之后,栈仍然回处于一致的状态。这里通过一个图来模拟一下 push()
方法的流程:
而 pop()
方法可以简单理解为 push()
方法的逆向操作,具体流程是:
- 创建一个引用
newHead
指向当前top
的下一个节点,也就是top.next
,top
所在引用称为oldHead
。 - 通过
CAS
更新top
的值,伪代码是CAS(expect=oldHead,update=newHead)
,如果更新成功,那么top
就指向top.next
,也就是newHead
。
Warning 这里可以看出Treiber Stack算法有个比较大的问题是有可能产生无效的节点,所以FutureTask也存在可能产生无效的等待节点的问题。 |
FutureTask方法源码分析
先看 FutureTask
提供的非阻塞栈节点的实现:
// 等待获取结果的线程节点(集合),实际上是一个单链表,实现了一个非阻塞栈 static final class WaitNode { // 记录等待线程实例 volatile Thread thread; // 指向下一个节点的引用 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
和我们上面分析 Treiber Stack
时候使用的单链表如出一辙。接着看 FutureTask
的构造函数:
// 适配使用Callable类型任务的场景 public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } // 适配使用Runnable类型任务和已经提供了最终计算结果的场景 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; } // Executors中 public static Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result); } // Runnable和Callable的适配器,设计十分巧妙,实际上run()方法委托给传入的Runnable实例执行,实现了Callable的call()方法,使用的是外部传入的值作为返回结果 private static final class RunnableAdapter implements Callable { private final Runnable task; private final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } public String toString() { return super.toString() + "[Wrapped task = " + task + "]"; } }
主要是针对两种不同场景的任务类型进行适配,构造函数中直接设置状态 state = NEW(0)
。因为 FutureTask
是最终的任务包装类,它的核心功能都在其实现的 Runnable#run()
方法中,这里重点分析一下 run()
方法:
// FutureTask实现的Runnable#run()方法 public void run() { // 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回,不执行任务 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { // 获取Callable任务实例赋值到临时变量c Callable c = callable; // 判断任务不能为空,二次校验状态必须为NEW(0) if (c != null && state == NEW) { V result; boolean ran; try { // 调用任务实例Callable#call()方法,正常情况下的执行完毕,没有抛出异常,则记录执行结果 result = c.call(); // 记录正常执行完毕 ran = true; } catch (Throwable ex) { // 异常情况下的执行完毕,执行结果记录为null result = null; // 记录异常执行完毕 ran = false; // 设置异常实例 setException(ex); } // 正常执行完毕设置结果 if (ran) set(result); } } finally { // runner更新为null,防止并发执行run()方法 runner = null; // 记录新的状态值,因为run()方法执行的时候,状态值有可能被其他方法更新了 int s = state; if (s >= INTERRUPTING) // 处理run()方法执行期间调用了cancel(true)方法的情况 handlePossibleCancellationInterrupt(s); } } // 异常执行挖鼻的情况下,设置异常实例 protected void setException(Throwable t) { // CAS更新状态state,由NEW(0)更新为COMPLETING(1) if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 设置异常实例到outcome属性中 outcome = t; // 设置最终状态state = EXCEPTIONAL(3),意味着任务最终异常执行完毕 STATE.setRelease(this, EXCEPTIONAL); // final state // 完成后的通知方法 finishCompletion(); } } // 完成任务后的通知方法,最要作用是移除和唤醒所有的等待结果线程,调用钩子方法done()和设置任务实例callable为null private void finishCompletion() { // 遍历栈,终止条件是下一个元素为null for (WaitNode q; (q = waiters) != null;) { // CAS设置栈顶为null if (WAITERS.weakCompareAndSet(this, q, null)) { // 遍历栈中的所有节点,唤醒节点中的线程,这是一个十分常规的遍历单链表的方法,注意几点: // 1. 使用LockSupport.unpark()唤醒线程,因为后面会分析,线程阻塞等待的时候使用的是LockSupport.park()方法 // 2. 断开链表节点的时候后继节点需要置为null,这样游离节点才能更容易被JVM回收 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } // 回调钩子方法done(),这个可以通过子类进行扩展 done(); // 置任务实例callable为null,从而减少JVM memory footprint(这个东西有兴趣可以自行扩展阅读) callable = null; // to reduce footprint } // 正常执行完毕的情况下设置执行结果 protected void set(V v) { // CAS更新状态state,由NEW(0)更新为COMPLETING(1) if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 最终执行结果值更新到outcome中 outcome = v; // 设置最终状态state = NORMAL(2),意味着任务最终正常执行完毕 STATE.setRelease(this, NORMAL); // 完成后的通知方法 finishCompletion(); } } // 处理run()方法执行期间调用了cancel(true)方法的情况 // 这里还没分析cancel()方法,但是可以提前告知:它会先把状态更新为INTERRUPTING,再进行线程中断,最后更新状态为INTERRUPTED // 所以如果发现当前状态为INTERRUPTING,当前线程需要让出CPU控制权等待到状态更变为INTERRUPTED即可,这个时间应该十分短暂 private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); } // 钩子方法,可以通过子类扩展此方法,方法回调的时机是任务已经执行完毕,阻塞获取结果的线程被唤醒之后 protected void done() { }
run()
方法的执行流程比较直观,这里提供一个简单的流程图:
FutureTask
还提供了一个能够重置状态(准确来说是保持状态)的 runAndReset()
方法,这个方法专门提供给 ScheduledThreadPoolExecutor
使用:
// 执行任务并且重置状态 // 由于没有执行set()方法设置执行结果,这个方法除了执行过程中抛出异常或者主动取消会到导致state由NEW更变为其他值,正常执行完毕一个任务之后,state是保持为NEW不变 protected boolean runAndReset() { // 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回false,不执行任务 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable c = callable; if (c != null && s == NEW) { try { // 这里会忽略执行结果,只记录是否正常执行 c.call(); ran = true; } catch (Throwable ex) { // 记录执行异常结果 setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 正常情况下的执行完毕,ran会更新为true,state此时也保持为NEW,这个时候方法返回true return ran && s == NEW; }
runAndReset()
方法保证了在任务正常执行完成之后返回 true
,此时 FutureTask
的状态 state
保持为 NEW
,由于没有调用 set()
方法,也就是没有调用 finishCompletion()
方法,它内部持有的 Callable
任务引用不会置为 null
,等待获取结果的线程集合也不会解除阻塞。这种设计方案专门针对可以周期性重复执行的任务。异常执行情况和取消的情况导致的最终结果和 run()
方法是一致的。接下来分析一下获取执行结果的 get()
方法:
// 获取执行结果 - 永久阻塞 public V get() throws InterruptedException, ExecutionException { int s = state; // 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成 if (s <= COMPLETING) // 注意这里调用awaitDone方法的参数为永久阻塞参数,也就是没有超时期限,返回最新的状态值 s = awaitDone(false, 0L); // 根据状态值报告结果 return report(s); } // 获取执行结果 - 带超时的阻塞 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; // 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成 // 注意这里调用awaitDone方法的参数为带超时上限的阻塞参数 // 如果超过了指定的等待期限(注意会把时间转化为纳秒),返回的最新状态依然为COMPLETING(1)或者NEW(0),那么抛出TimeoutException异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 状态值等于COMPLETING(1),说明任务执行到达尾声,在执行set()或者setException(),只需让出CPU控制权等待完成即可等待下一轮循环重试即可 Thread.yield(); else if (Thread.interrupted()) { // 如果线程被中断,则清除其中断状态,并且断开超时或中断的等待节点的链接 removeWaiter(q); // 抛出InterruptedException异常 throw new InterruptedException(); } else if (q == null) { // 等待节点尚未初始化,如果设置了超时期限并且超时时间小于等于0,则直接返回状态并且终止等待,说明已经超时了 // 这里的逻辑属于先行校验,如果命中了就不用进行超时阻塞 if (timed && nanos = nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // 如果状态为NEW(0),则进行超时阻塞,阻塞的是当前的线程 if (state = CANCELLED) throw new CancellationException(); // 其他情况,实际上只剩下状态值为EXCEPTIONAL(3),则基于outcome强转为Throwable类型,则包装成ExecutionException抛出 throw new ExecutionException((Throwable)x); }
上面的方法中, removeWaiter()
方法相对复杂,它涉及到单链表移除中间节点、考虑多种竞态情况进行重试等设计,需要花大量心思去理解。接着看 cancel()
方法:
public boolean cancel(boolean mayInterruptIfRunning) { // 状态必须为NEW(0) // 如果mayInterruptIfRunning为true,则把状态通过CAS更新为INTERRUPTING(5) // 如果mayInterruptIfRunning为false,则把状态通过CAS更新为CANCELLED(4) // 如果状态不为NEW(0)或者CAS更新失败,直接返回false,说明任务已经执行到set()或setException(),无法取消 if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // mayInterruptIfRunning为true,调用执行任务的线程实例的Thread#interrupt()进行中断,更新最终状态为INTERRUPTED(6) if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state STATE.setRelease(this, INTERRUPTED); } } } finally { // 完成后的通知方法 finishCompletion(); } return true; }
cancel()
方法只能够中断状态为 NEW(0)
的线程,并且由于线程只在某些特殊情况下(例如阻塞在同步代码块或者同步方法中阻塞在 Object#wait()
方法、主动判断线程的中断状态等等)才能响应中断,所以需要思考这个方法是否可以达到预想的目的。最后看剩下的状态判断方法:
// 判断是否取消状态,包括CANCELLED(4)、INTERRUPTING(5)、INTERRUPTED(6)三种状态 public boolean isCancelled() { return state >= CANCELLED; } // 判断是否已经完成,这里只是简单判断状态值不为NEW(0),原因是所有的中间状态都是十分短暂的 public boolean isDone() { return state != NEW; }
AbstractExecutorService源码实现
AbstractExecutorService
虽然只是 ThreadPoolExecutor
的抽象父类,但是它已经实现了 ExecutorService
接口中除了 shutdown()
、 shutdownNow()
、 isShutdown()
、 isTerminated()
和 awaitTermination()
五个方法之外的其他所有方法(这五个方法在 ThreadPoolExecutor
实现,因为它们是和线程池的状态相关的)。它的源码体积比较小,下面全量贴出分析:
public abstract class AbstractExecutorService implements ExecutorService { // 静态工厂方法,通过Runnable和具体的返回结果创建FutureTask实例 protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value); } // 静态工厂方法,通过Callable实例创建FutureTask实例 protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable); } // 提交Runnable类型任务 public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); // 适配任务为FutureTask实例,注意最终计算结果已经提前设置为null RunnableFuture ftask = newTaskFor(task, null); // 提交到线程池 execute(ftask); return ftask; } // 提交Runnable类型任务,同时传入最终计算结果 public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 适配任务为FutureTask实例 RunnableFuture ftask = newTaskFor(task, result); // 提交到线程池 execute(ftask); return ftask; } // 提交Callable类型任务 public Future submit(Callable task) { if (task == null) throw new NullPointerException(); // 适配任务为FutureTask实例 RunnableFuture ftask = newTaskFor(task); // 提交到线程池 execute(ftask); return ftask; } // 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回) private T doInvokeAny(Collection<? extends Callable> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future> futures = new ArrayList(ntasks); // 通过当前的线程池实例构建ExecutorCompletionService实例 ExecutorCompletionService ecs = new ExecutorCompletionService(this); try { ExecutionException ee = null; // 计算deadline final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable> it = tasks.iterator(); // 提交任务列表的第一个任务实例 futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { // 这里获取上一轮(或者第一个任务)任务执行的Future实例 Future f = ecs.poll(); // 如果拿到Future实例为null说明上一轮的任务尚未执行完毕 if (f == null) { // 如果任务队列中还有任务任务,则添加到线程池中执行 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 活跃计算任务为0,说明至少有一个任务成功返回了Future实例 else if (active == 0) break; else if (timed) { // 允许超时的模式下用超时阻塞获取Future实例 f = ecs.poll(nanos, NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } // 非超时的模式下永久阻塞获取Future实例 else f = ecs.take(); } // 获取到的Future实例不为null,说明已经有至少一个任务执行完毕 if (f != null) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 取消所有任务,确保至少有一个任务完成,即使取消所有任务,由于状态管理,成功的任务不受干扰 cancelAll(futures); } } // 永久阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回) public T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } // 带超时阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回) public T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 永久阻塞 - 执行任务列表中的所有任务 public List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future> futures = new ArrayList(tasks.size()); try { // 遍历任务列表进行FutureTask并且提交到线程池,FutureTask实例添加到futures列表中 for (Callable t : tasks) { RunnableFuture f = newTaskFor(t); futures.add(f); execute(f); } // 遍历futures列表调用get()方法获取结果,注意会忽略所有的CancellationException、ExecutionException for (int i = 0, size = futures.size(); i < size; i++) { Future f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException | ExecutionException ignore) {} } } return futures; } catch (Throwable t) { // 只要出现非CancellationException或者ExecutionException异常,则取消所有任务,尚未执行或者尚未执行完毕的任务有可能受到影响 cancelAll(futures); throw t; } } // 带超时阻塞 - 执行任务列表中的所有任务 public List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); // 转换超时时间单位为纳秒 final long nanos = unit.toNanos(timeout); // 计算deadline final long deadline = System.nanoTime() + nanos; ArrayList<Future> futures = new ArrayList(tasks.size()); int j = 0; timedOut: try { // 遍历任务列表进行FutureTask,FutureTask实例添加到futures列表中 for (Callable t : tasks) futures.add(newTaskFor(t)); final int size = futures.size(); // 遍历futures列表,进行一次超时先验,如果已经超时,则直接跳出,无须执行任务 for (int i = 0; i < size; i++) { // 这里有个特殊处理,第一个任务只要timeout不为0,必定会进行提交第二个任务起才判断deadline if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) break timedOut; // 提交FutureTask到线程池 execute((Runnable)futures.get(i)); } // j记录了超时的那个任务的Future的索引值,遍历futures列表进行超时阻塞的get()方法调用 for (; j < size; j++) { Future f = futures.get(j); if (!f.isDone()) { try { f.get(deadline - System.nanoTime(), NANOSECONDS); } catch (CancellationException | ExecutionException ignore) {} catch (TimeoutException timedOut) { break timedOut; } } } return futures; } catch (Throwable t) { cancelAll(futures); throw t; } // 所有任务完成之前发现已经超时,则取消超时任务索引之后的所有任务,已经完成的不受影响 cancelAll(futures, j); return futures; } // 取消所有的Future实例 private static void cancelAll(ArrayList<Future> futures) { cancelAll(futures, 0); } // 遍历所有的Future实例调用其cancel方法,因为参数为true,所以会响应中断 // j参数是决定遍历的起点,0表示整个列表遍历 private static void cancelAll(ArrayList<Future> futures, int j) { for (int size = futures.size(); j < size; j++) futures.get(j).cancel(true); } }
整个类的源码并不复杂,注意到 Callable
和 Runnable
的任务最重都会包装为适配器 FutureTask
的实例,然后通过 execute()
方法提交包装好的 FutureTask
任务实例,返回值是 Future
或者 Future
的集合时候,实际上是 RunnableFuture
或者 RunnableFuture
的集合,只因为 RunnableFuture
是 Future
的子接口,这种设计遵循了设计模式原则里面的 依赖倒置原则 。这里小结一下分析过的几个方法的特征:
方法 | 特征 |
---|---|
submit(Runnable task) |
异步执行,执行结果无感知,通过 get() 方法虽然返回 null 但是可以确定执行完毕的时刻 |
submit(Runnable task, T result) |
异步执行,预先传入执行结果,最终通过 get() 方法返回的就是初始传入的结果 |
submit(Callable task) |
异步执行,最终通过 get() 方法返回的是 Callable#call() 的结果 |
invokeAny(Collection<? extends Callable> tasks) |
异步执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回),永久阻塞同步返回结果 |
invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) |
功能同上,获取结果的时候是超时阻塞获取 |
invokeAll(Collection<? extends Callable> tasks) |
异步执行任务列表中的所有任务,必须等待所有 Future#get() 永久阻塞方法都返回了结果才返回 Future 列表 |
invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) |
异步执行任务列表中的所有任务,只要其中一个任务 Future#get() 超时阻塞方法超时就会取消该任务索引之后的所有任务并且返回 Future 列表 |
小结
ExecutorService
提供了一系列便捷的异步任务提交方法,它使用到多种技术:
- 相对底层的
CAS
原语。 - 基于
CAS
实现的无锁并发栈。 - 依赖于线程池实现的
execute()
方法进行异步任务提交。 - 使用适配器模式设计
FutureTask
适配Futrue
、Runnable
和Callable
,提供了状态的生命周期管理。
下一篇文章将会分析一下调度线程池 ScheduledThreadPoolExecutor
的底层实现和源码。
(本文完 c-7-d e-a-20190727)