JUC线程池服务ExecutorService接口实现源码分析

之前的一篇文章 JUC线程池ThreadPoolExecutor源码分析 深入分析了JUC线程池的源码实现,特别对 Executor#execute() 接口的实现做了行级别的源码分析。这篇文章主要分析一下线程池扩展服务 ExecutorService 接口的实现源码,同时会重点分析 Future 的底层实现。 ThreadPoolExecutor 和其抽象父类 AbstractExecutorService 的源码从JDK8到JDK11基本没有变化,本文编写的时候使用的是JDK11,由于 ExecutorService 接口的定义在JDK[8,11]都没有变化,本文的分析适用于这个JDK版本范围的任意版本。最近尝试找 Hexo 可以渲染 Asciidoc 的插件,但是没有找到,于是就先移植了 Asciidoc 中的五种 Tip

ExecutorService接口简介

ExecutorService 接口是线程池扩展功能服务接口,它的定义如下:

public interface ExecutorService extends Executor {
    
    // 停止线程池
    void shutdown();
    
    // 立即停止线程池,返回尚未执行的任务列表
    List shutdownNow();
   
    // 线程池是否停止
    boolean isShutdown();
    
    // 线程池是否终结
    boolean isTerminated();
    
    // 等待线程池终结
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
     
    // 提交Callable类型任务
     Future submit(Callable task);
    
    // 提交Runnable类型任务,预先知道返回值
     Future submit(Runnable task, T result);
    
    // 提交Runnable类型任务,对返回值无感知
    Future submit(Runnable task);
    
    // 永久阻塞 - 提交和执行一个任务列表的所有任务
     List<Future> invokeAll(Collection<? extends Callable> tasks)
        throws InterruptedException;
    
    // 带超时阻塞 - 提交和执行一个任务列表的所有任务
     List<Future> invokeAll(Collection<? extends Callable> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 永久阻塞 - 提交和执行一个任务列表的某一个任务
     T invokeAny(Collection<? extends Callable> tasks)
        throws InterruptedException, ExecutionException;
     
    // 带超时阻塞 - 提交和执行一个任务列表的某一个任务
      T invokeAny(Collection<? extends Callable> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService 继承自 Executor ,主要提供了线程池的关闭、状态查询查询、可获取返回值的任务提交、整个任务列表或者执行任务列表中任意一个任务(返回执行最快的任务的结果)等功能。

Future实现的通俗原理

ExecutorService 接口的扩展方法都是返回 Future 相关的实例。 java.util.concurrent.Future (中文翻译就是未来,还是挺有意思的), 代表着一次异步计算的结果 ,它提供了检查计算是否已经完成、等待计算完成、获取计算结果等一系列方法。笔者之前强调过:线程池 ThreadPoolExecutor 的顶级接口 Executor 只提供了一个无状态的返回值类型为 voidexecute(Runnable command) 方法,无法感知异步任务执行的完成时间和获取任务计算结果。如果我们需要感知异步任务执行的返回值或者计算结果,就必须提供带返回值的接口方法去承载计算结果的操作。这些方法上一节已经介绍过,而 Future 就是一个担任了承载计算结果(包括结果值、状态、阻塞等待获取结果操作等)的工具。这里举一个模拟 Future 实现过程的例子,例子是伪代码和真实代码的混合实现,不需要太较真。

首先,假设我们定义了一个动作函数式接口 Action

// 带泛型的动作接口,可以返回一个泛型结果
public interface Action{

    V doAction();
}

我们可以尝试实现一下 Action 接口:

// 假设1个动作做的是一个十分复杂的运算,返回一个BigDecimal类型的结果
Action action1 = () -> {
       // 模拟随机耗时
       sleep(x秒);
       return BigDecimal.valueOf(result);
};

// 假设1个动作做的是制作一个面包的过程,返回一个Bread面包实例
Action action2 = () -> {
       // 模拟随机耗时
       sleep(x秒);
       return new Bread();
};

由于 Action 没有实现 Runnable 接口,上面的两个动作无法通过 Executor#execute() 方法提交异步任务,所以我们需要添加一个适配器 ActionAdapter

public class ActionAdapter implements Runnable {

    private Action action;

    private ActionAdapter(Action action) {
        this.action = action;
    }

    public static  ActionAdapter newActionAdapter(Action action) {
        return new ActionAdapter(action);
    }

    @Override
    public void run() {
        action.doAction();
    }
}

这里只做了简单粗暴的适配,虽然可以提交到线程池中执行,但是功能太过简陋。很多时候,我们还需要添加任务执行状态判断和获取结果的功能,于是新增一个接口 ActionFuture

public interface ActionFuture extends Runnable{
    
    V get() throws Exception;
    
    boolean isDone();
}

然后 ActionAdapter 实现 ActionFuture 接口,内部添加简单的状态控制:

public class ActionAdapter implements Runnable, ActionFuture {

    private static final int NEW = 0;
    private static final int DONE = 1;
    private int state;
    private final Action action;
    private Object result;

    private ActionAdapter(Action action) {
        this.action = action;
        this.state = NEW;
    }

    public static  ActionAdapter newActionAdapter(Action action) {
        return new ActionAdapter(action);
    }

    @Override
    public void run() {
        try {
            result = action.doAction();
        } catch (Throwable e) {
            result = e;
        } finally {
            state = DONE;
        }
    }

    @Override
    public V get() throws Exception{
        while (state < DONE){
            // 这个等待方法没有实现,只是表明逻辑
            currentThreadWaitForResult();
        }
        if (result instanceof Throwable){
            throw new ExecutionException((Throwable) result);
        }else {
            return (V) result;
        }
    }

    @Override
    public boolean isDone() {
        return state == DONE;
    }
}

这里有个技巧是用 Object 类型的对象存放 Action 执行的结果或者抛出的异常实例,这样可以在 ActionFuture#get() 方法中进行判断和处理。最后一步,依赖 Executor#execute() 新增一个提交异步任务的方法:

public class ActionPool {

    private final Executor executor;

    public ActionPool(Executor executor) {
        this.executor = executor;
    }

    public  ActionFuture submit(Action action) {
        ActionFuture actionFuture = ActionAdapter.newActionAdapter(action);
        executor.execute(actionFuture);
        return actionFuture;
    }
    
    public static void main(String[] args) throws Exception{
        ActionPool pool = new ActionPool(Executors.newSingleThreadExecutor());
        Action action1 = () -> {
            // 模拟随机耗时
            sleep(x秒);
            return BigDecimal.valueOf(result);
        };
        pool.submit(action1);
        Action action2 = () -> {
            // 模拟随机耗时
            sleep(x秒);
            return new Bread();
        };
        pool.submit(action2);
    }
}

上面例子提到的虚拟核心组件,在 JUC 包中有对应的实现(当时,JUC包对逻辑和状态控制会比虚拟例子更加严谨),对应关系如下:

虚拟组件 JUC中的组件
Action Callable
ActionFuture RunnableFuture
ActionAdapter FutureTask
ActionPool ExecutorService(ThreadPoolExecutor)

其中大部分实现逻辑都由 FutureTaskThreadPoolExecutor 的抽象父类 AbstractExecutorService 承担,下面会重点分析这两个类核心功能的源码实现。

Tip

实际上,Future的实现使用的是Promise模式,具体可以查阅相关的资料。

FutureTask源码实现

提供回调的 Runnable 类型任务实际最终都会包装为 FutureTask 再提交到线程池中执行,而 FutureTaskRunnableFutureCallable 三者的桥梁。先看 FutureTask 的类继承关系:

利用接口可以多继承的特性, RunnableFuture 接口继承自 RunnableFuture 接口:

public interface RunnableFuture extends Runnable, Future {
   
    void run();
}

@FunctionalInterface
public interface Runnable {
    
    public abstract void run();
}    

public interface Future {
    
    // 取消,mayInterruptIfRunning用于控制是否中断,实际上这个方法并不能终止已经提交的任务,后面会详细说明
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 是否取消
    boolean isCancelled();
    
    // 是否完成,包括正常和异常的情况
    boolean isDone();
    
    // 永久阻塞获取结果,响应中断
    V get() throws InterruptedException, ExecutionException;
    
    // 带超时的阻塞获取结果,响应中断
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask 实现了 RunnableFuture 接口,本质就是实现 RunnableFuture 接口的方法。先看 FutureTask 的重要属性:

// 状态
private volatile int state;
// 初始化状态
private static final int NEW          = 0;
// 完成中状态
private static final int COMPLETING   = 1;
// 正常情况下的完成状态
private static final int NORMAL       = 2;
// 异常情况下的完成状态
private static final int EXCEPTIONAL  = 3;
// 取消状态
private static final int CANCELLED    = 4;
// 中断中状态
private static final int INTERRUPTING = 5;
// 已中断状态
private static final int INTERRUPTED  = 6;

// 底层的Callable实现,执行完毕后需要置为null
private Callable callable;

// 输出结果,如果是正常执行完成,get()方法会返回此结果,如果是异常执行完成,get()方法会抛出outcome包装为ExecutionException的异常
private Object outcome; 

// 真正的执行Callable对象的线程实例,运行期间通过CAS操作此线程实例
private volatile Thread runner;

// 等待线程集合,Treiber Stack实现
private volatile WaitNode waiters;

// 下面是变量句柄,底层是基于Unsafe实现,通过相对顶层的操作原语,如CAS等
private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
static {
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();
        STATE = l.findVarHandle(FutureTask.class, "state", int.class);
        RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
        WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class ensureLoaded = LockSupport.class;
}
// ... 省略其他代码

上面的主要属性中,有两点比较复杂,但却是最重要的:

  1. FutureTask 生命周期的状态管理或者跃迁。
  2. 等待(获取结果)线程集合 WaitNode 基于 Treiber Stack 实现,需要彻底弄清楚 Treiber Stack 的工作原理。

FutureTask的状态管理

FutureTask 的内建状态包括了七种,也就是属性 state 有七种可选状态值,总结成表格如下:

状态 状态值 描述
NEW 0 初始化状态, FutureTask 实例创建时候在构造函数中标记为此状态
COMPLETING 1 完成中状态,这个是中间状态,执行完成后设置 outcome 之前标记为此状态
NORMAL 2 正常执行完成,通过调用 get() 方法能够获取正确的计算结果
EXCEPTIONAL 3 异常执行完成,通过调用 get() 方法会抛出包装后的 ExecutionException 异常
CANCELLED 4 取消状态
INTERRUPTING 5 中断中状态,执行线程实例 Thread#interrupt() 之前会标记为此状态
INTERRUPTED 6 中断完成状态

这些状态之间的跃迁流程图如下:

每一种状态跃迁都是由于调用或者触发了某个方法,下文的一个小节会分析这些方法的实现。

等待线程集合数据结构Treiber Stack的原理

Treiber Stack ,中文翻译是 驱动栈 ,听起来比较怪。实际上, Treiber Stack 算法是 R. Kent Treiber 在其1986年的论文 Systems Programming: Coping with Parallelism 中首次提出,这种算法提供了一种 可扩展的无锁栈 ,基于细粒度的并发原语 CAS(Compare And Swap) 实现。笔者并没有花时间去研读 Treiber 的论文,因为在 Doug Lea 大神参与编写的《Java Concurrency in Practice(Java并发编程实战)》中的第 15.4.1 小节中有简单分析非阻塞算法中的非阻塞栈。

在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更加复杂。创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。下面的 ConcurrentStack 是基于Java语言实现的 Treiber 算法:

public class ConcurrentStack {

    private AtomicReference<Node> top = new AtomicReference();

    public void push(E item) {
        Node newHead = new Node(item);
        Node oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node oldHead;
        Node newHead;
        do {
            oldHead = top.get();
            if (null == oldHead) {
                return null;
            }
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private static class Node {

        final E item;
        Node next;

        Node(E item) {
            this.item = item;
        }
    }
}

ConcurrentStack 是一个栈,它是由 Node 元素构成的一个链表,其中栈顶作为根节点,并且每个元素都包含了一个值以及指向下一个元素的链接。 push() 方法创建一个新的节点,该节点的 next 域指向了当前的栈顶,然后通过 CAS 把这个新节点放入栈顶。如果在开始插入节点时,位于栈顶的节点没有发生变化,那么 CAS 就会成功,如果栈顶节点发生变化(例如由于其他线程在当前线程开始之前插入或者移除了元素),那么 CAS 就会失败,而 push() 方法会根据栈的当前状态来更新节点(其实就是 while 循环会进入下一轮),并且再次尝试。无论哪种情况,在 CAS 执行完成之后,栈仍然回处于一致的状态。这里通过一个图来模拟一下 push() 方法的流程:

pop() 方法可以简单理解为 push() 方法的逆向操作,具体流程是:

  1. 创建一个引用 newHead 指向当前 top 的下一个节点,也就是 top.nexttop 所在引用称为 oldHead
  2. 通过 CAS 更新 top 的值,伪代码是 CAS(expect=oldHead,update=newHead) ,如果更新成功,那么 top 就指向 top.next ,也就是 newHead

Warning

这里可以看出Treiber Stack算法有个比较大的问题是有可能产生无效的节点,所以FutureTask也存在可能产生无效的等待节点的问题。

FutureTask方法源码分析

先看 FutureTask 提供的非阻塞栈节点的实现:

// 等待获取结果的线程节点(集合),实际上是一个单链表,实现了一个非阻塞栈
static final class WaitNode {
    // 记录等待线程实例
    volatile Thread thread;
    // 指向下一个节点的引用
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

和我们上面分析 Treiber Stack 时候使用的单链表如出一辙。接着看 FutureTask 的构造函数:

// 适配使用Callable类型任务的场景
public FutureTask(Callable callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       
}

// 适配使用Runnable类型任务和已经提供了最终计算结果的场景
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;      
}

// Executors中
public static  Callable callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter(task, result);
}

// Runnable和Callable的适配器,设计十分巧妙,实际上run()方法委托给传入的Runnable实例执行,实现了Callable的call()方法,使用的是外部传入的值作为返回结果
private static final class RunnableAdapter implements Callable {
    private final Runnable task;
    private final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
    public String toString() {
        return super.toString() + "[Wrapped task = " + task + "]";
    }
}

主要是针对两种不同场景的任务类型进行适配,构造函数中直接设置状态 state = NEW(0) 。因为 FutureTask 是最终的任务包装类,它的核心功能都在其实现的 Runnable#run() 方法中,这里重点分析一下 run() 方法:

// FutureTask实现的Runnable#run()方法
public void run() {
    // 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回,不执行任务
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        // 获取Callable任务实例赋值到临时变量c
        Callable c = callable;
        // 判断任务不能为空,二次校验状态必须为NEW(0)
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用任务实例Callable#call()方法,正常情况下的执行完毕,没有抛出异常,则记录执行结果
                result = c.call();
                // 记录正常执行完毕
                ran = true;
            } catch (Throwable ex) {
                // 异常情况下的执行完毕,执行结果记录为null
                result = null;
                // 记录异常执行完毕
                ran = false;
                // 设置异常实例
                setException(ex);
            }
            // 正常执行完毕设置结果
            if (ran)
                set(result);
        }
    } finally {
        // runner更新为null,防止并发执行run()方法
        runner = null;
        // 记录新的状态值,因为run()方法执行的时候,状态值有可能被其他方法更新了
        int s = state;
        if (s >= INTERRUPTING)
            // 处理run()方法执行期间调用了cancel(true)方法的情况
            handlePossibleCancellationInterrupt(s);
    }
}

// 异常执行挖鼻的情况下,设置异常实例
protected void setException(Throwable t) {
    // CAS更新状态state,由NEW(0)更新为COMPLETING(1)
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 设置异常实例到outcome属性中
        outcome = t;
        // 设置最终状态state = EXCEPTIONAL(3),意味着任务最终异常执行完毕
        STATE.setRelease(this, EXCEPTIONAL); // final state
        // 完成后的通知方法
        finishCompletion();
    }
}

// 完成任务后的通知方法,最要作用是移除和唤醒所有的等待结果线程,调用钩子方法done()和设置任务实例callable为null
private void finishCompletion() {
    // 遍历栈,终止条件是下一个元素为null
    for (WaitNode q; (q = waiters) != null;) {
        // CAS设置栈顶为null
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            // 遍历栈中的所有节点,唤醒节点中的线程,这是一个十分常规的遍历单链表的方法,注意几点:
            // 1. 使用LockSupport.unpark()唤醒线程,因为后面会分析,线程阻塞等待的时候使用的是LockSupport.park()方法
            // 2. 断开链表节点的时候后继节点需要置为null,这样游离节点才能更容易被JVM回收
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; 
                q = next;
            }
            break;
        }
    }
    // 回调钩子方法done(),这个可以通过子类进行扩展
    done();
    // 置任务实例callable为null,从而减少JVM memory footprint(这个东西有兴趣可以自行扩展阅读)
    callable = null;        // to reduce footprint
}

// 正常执行完毕的情况下设置执行结果
protected void set(V v) {
    // CAS更新状态state,由NEW(0)更新为COMPLETING(1)
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 最终执行结果值更新到outcome中
        outcome = v;
        // 设置最终状态state = NORMAL(2),意味着任务最终正常执行完毕
        STATE.setRelease(this, NORMAL); 
        // 完成后的通知方法
        finishCompletion();
    }
}

// 处理run()方法执行期间调用了cancel(true)方法的情况
// 这里还没分析cancel()方法,但是可以提前告知:它会先把状态更新为INTERRUPTING,再进行线程中断,最后更新状态为INTERRUPTED
// 所以如果发现当前状态为INTERRUPTING,当前线程需要让出CPU控制权等待到状态更变为INTERRUPTED即可,这个时间应该十分短暂
private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); 

}

// 钩子方法,可以通过子类扩展此方法,方法回调的时机是任务已经执行完毕,阻塞获取结果的线程被唤醒之后
protected void done() { 
}

run() 方法的执行流程比较直观,这里提供一个简单的流程图:

FutureTask 还提供了一个能够重置状态(准确来说是保持状态)的 runAndReset() 方法,这个方法专门提供给 ScheduledThreadPoolExecutor 使用:

// 执行任务并且重置状态
// 由于没有执行set()方法设置执行结果,这个方法除了执行过程中抛出异常或者主动取消会到导致state由NEW更变为其他值,正常执行完毕一个任务之后,state是保持为NEW不变
protected boolean runAndReset() {
     // 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回false,不执行任务
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable c = callable;
        if (c != null && s == NEW) {
            try {
                // 这里会忽略执行结果,只记录是否正常执行
                c.call(); 
                ran = true;
            } catch (Throwable ex) {
                // 记录执行异常结果
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 正常情况下的执行完毕,ran会更新为true,state此时也保持为NEW,这个时候方法返回true
    return ran && s == NEW;
}

runAndReset() 方法保证了在任务正常执行完成之后返回 true ,此时 FutureTask 的状态 state 保持为 NEW ,由于没有调用 set() 方法,也就是没有调用 finishCompletion() 方法,它内部持有的 Callable 任务引用不会置为 null ,等待获取结果的线程集合也不会解除阻塞。这种设计方案专门针对可以周期性重复执行的任务。异常执行情况和取消的情况导致的最终结果和 run() 方法是一致的。接下来分析一下获取执行结果的 get() 方法:

// 获取执行结果 - 永久阻塞
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成
    if (s <= COMPLETING)
        // 注意这里调用awaitDone方法的参数为永久阻塞参数,也就是没有超时期限,返回最新的状态值
        s = awaitDone(false, 0L);
    // 根据状态值报告结果
    return report(s);
}

// 获取执行结果 - 带超时的阻塞
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成
    // 注意这里调用awaitDone方法的参数为带超时上限的阻塞参数
    // 如果超过了指定的等待期限(注意会把时间转化为纳秒),返回的最新状态依然为COMPLETING(1)或者NEW(0),那么抛出TimeoutException异常
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout)))  COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // 状态值等于COMPLETING(1),说明任务执行到达尾声,在执行set()或者setException(),只需让出CPU控制权等待完成即可等待下一轮循环重试即可
            Thread.yield();
        else if (Thread.interrupted()) {
            // 如果线程被中断,则清除其中断状态,并且断开超时或中断的等待节点的链接
            removeWaiter(q);
            // 抛出InterruptedException异常
            throw new InterruptedException();
        }
        else if (q == null) {
            // 等待节点尚未初始化,如果设置了超时期限并且超时时间小于等于0,则直接返回状态并且终止等待,说明已经超时了
            // 这里的逻辑属于先行校验,如果命中了就不用进行超时阻塞
            if (timed && nanos = nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // 如果状态为NEW(0),则进行超时阻塞,阻塞的是当前的线程
            if (state = CANCELLED)
        throw new CancellationException();
    // 其他情况,实际上只剩下状态值为EXCEPTIONAL(3),则基于outcome强转为Throwable类型,则包装成ExecutionException抛出
    throw new ExecutionException((Throwable)x);
}

上面的方法中, removeWaiter() 方法相对复杂,它涉及到单链表移除中间节点、考虑多种竞态情况进行重试等设计,需要花大量心思去理解。接着看 cancel() 方法:

public boolean cancel(boolean mayInterruptIfRunning) {
    // 状态必须为NEW(0)
    // 如果mayInterruptIfRunning为true,则把状态通过CAS更新为INTERRUPTING(5)
    // 如果mayInterruptIfRunning为false,则把状态通过CAS更新为CANCELLED(4)
    // 如果状态不为NEW(0)或者CAS更新失败,直接返回false,说明任务已经执行到set()或setException(),无法取消
    if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        // mayInterruptIfRunning为true,调用执行任务的线程实例的Thread#interrupt()进行中断,更新最终状态为INTERRUPTED(6)
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                STATE.setRelease(this, INTERRUPTED);
            }
        }
    } finally {
        // 完成后的通知方法
        finishCompletion();
    }
    return true;
}

cancel() 方法只能够中断状态为 NEW(0) 的线程,并且由于线程只在某些特殊情况下(例如阻塞在同步代码块或者同步方法中阻塞在 Object#wait() 方法、主动判断线程的中断状态等等)才能响应中断,所以需要思考这个方法是否可以达到预想的目的。最后看剩下的状态判断方法:

// 判断是否取消状态,包括CANCELLED(4)、INTERRUPTING(5)、INTERRUPTED(6)三种状态
public boolean isCancelled() {
    return state >= CANCELLED;
}

// 判断是否已经完成,这里只是简单判断状态值不为NEW(0),原因是所有的中间状态都是十分短暂的
public boolean isDone() {
    return state != NEW;
}

AbstractExecutorService源码实现

AbstractExecutorService 虽然只是 ThreadPoolExecutor 的抽象父类,但是它已经实现了 ExecutorService 接口中除了 shutdown()shutdownNow()isShutdown()isTerminated()awaitTermination() 五个方法之外的其他所有方法(这五个方法在 ThreadPoolExecutor 实现,因为它们是和线程池的状态相关的)。它的源码体积比较小,下面全量贴出分析:

public abstract class AbstractExecutorService implements ExecutorService {

    // 静态工厂方法,通过Runnable和具体的返回结果创建FutureTask实例
    protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        return new FutureTask(runnable, value);
    }

    // 静态工厂方法,通过Callable实例创建FutureTask实例
    protected  RunnableFuture newTaskFor(Callable callable) {
        return new FutureTask(callable);
    }
    
    // 提交Runnable类型任务
    public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 适配任务为FutureTask实例,注意最终计算结果已经提前设置为null
        RunnableFuture ftask = newTaskFor(task, null);
        // 提交到线程池
        execute(ftask);
        return ftask;
    }
    
    // 提交Runnable类型任务,同时传入最终计算结果
    public  Future submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 适配任务为FutureTask实例
        RunnableFuture ftask = newTaskFor(task, result);
        // 提交到线程池
        execute(ftask);
        return ftask;
    }
    
    // 提交Callable类型任务
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
         // 适配任务为FutureTask实例
        RunnableFuture ftask = newTaskFor(task);
        // 提交到线程池
        execute(ftask);
        return ftask;
    }
    
    // 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
    private  T doInvokeAny(Collection<? extends Callable> tasks, boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future> futures = new ArrayList(ntasks);
        // 通过当前的线程池实例构建ExecutorCompletionService实例
        ExecutorCompletionService ecs = new ExecutorCompletionService(this);
        try {
            ExecutionException ee = null;
            // 计算deadline
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable> it = tasks.iterator();
            // 提交任务列表的第一个任务实例
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
            for (;;) {
                // 这里获取上一轮(或者第一个任务)任务执行的Future实例
                Future f = ecs.poll();
                // 如果拿到Future实例为null说明上一轮的任务尚未执行完毕
                if (f == null) {
                    // 如果任务队列中还有任务任务,则添加到线程池中执行
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 活跃计算任务为0,说明至少有一个任务成功返回了Future实例
                    else if (active == 0)
                        break;
                    else if (timed) {
                        // 允许超时的模式下用超时阻塞获取Future实例
                        f = ecs.poll(nanos, NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    // 非超时的模式下永久阻塞获取Future实例
                    else
                        f = ecs.take();
                }
                // 获取到的Future实例不为null,说明已经有至少一个任务执行完毕
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 取消所有任务,确保至少有一个任务完成,即使取消所有任务,由于状态管理,成功的任务不受干扰
            cancelAll(futures);
        }
    }
    
    // 永久阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
    public  T invokeAny(Collection<? extends Callable> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    
    // 带超时阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
    public  T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    
    // 永久阻塞 - 执行任务列表中的所有任务
    public  List<Future> invokeAll(Collection<? extends Callable> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future> futures = new ArrayList(tasks.size());
        try {
            // 遍历任务列表进行FutureTask并且提交到线程池,FutureTask实例添加到futures列表中
            for (Callable t : tasks) {
                RunnableFuture f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            // 遍历futures列表调用get()方法获取结果,注意会忽略所有的CancellationException、ExecutionException
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future f = futures.get(i);
                if (!f.isDone()) {
                    try { f.get(); }
                    catch (CancellationException | ExecutionException ignore) {}
                }
            }
            return futures;
        } catch (Throwable t) {
            // 只要出现非CancellationException或者ExecutionException异常,则取消所有任务,尚未执行或者尚未执行完毕的任务有可能受到影响
            cancelAll(futures);
            throw t;
        }
    }

    // 带超时阻塞 - 执行任务列表中的所有任务
    public  List<Future> invokeAll(Collection<? extends Callable> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        // 转换超时时间单位为纳秒
        final long nanos = unit.toNanos(timeout);
        // 计算deadline
        final long deadline = System.nanoTime() + nanos;
        ArrayList<Future> futures = new ArrayList(tasks.size());
        int j = 0;
        timedOut: try {
            // 遍历任务列表进行FutureTask,FutureTask实例添加到futures列表中
            for (Callable t : tasks)
                futures.add(newTaskFor(t));
            final int size = futures.size();
            // 遍历futures列表,进行一次超时先验,如果已经超时,则直接跳出,无须执行任务
            for (int i = 0; i < size; i++) {
                // 这里有个特殊处理,第一个任务只要timeout不为0,必定会进行提交第二个任务起才判断deadline
                if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
                    break timedOut;
                // 提交FutureTask到线程池
                execute((Runnable)futures.get(i));
            }
            // j记录了超时的那个任务的Future的索引值,遍历futures列表进行超时阻塞的get()方法调用
            for (; j < size; j++) {
                Future f = futures.get(j);
                if (!f.isDone()) {
                    try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
                    catch (CancellationException | ExecutionException ignore) {}
                    catch (TimeoutException timedOut) {
                        break timedOut;
                    }
                }
            }
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
        // 所有任务完成之前发现已经超时,则取消超时任务索引之后的所有任务,已经完成的不受影响
        cancelAll(futures, j);
        return futures;
    }
    
    // 取消所有的Future实例
    private static  void cancelAll(ArrayList<Future> futures) {
        cancelAll(futures, 0);
    }

    // 遍历所有的Future实例调用其cancel方法,因为参数为true,所以会响应中断
    // j参数是决定遍历的起点,0表示整个列表遍历
    private static  void cancelAll(ArrayList<Future> futures, int j) {
        for (int size = futures.size(); j < size; j++)
            futures.get(j).cancel(true);
    }
}

整个类的源码并不复杂,注意到 CallableRunnable 的任务最重都会包装为适配器 FutureTask 的实例,然后通过 execute() 方法提交包装好的 FutureTask 任务实例,返回值是 Future 或者 Future 的集合时候,实际上是 RunnableFuture 或者 RunnableFuture 的集合,只因为 RunnableFutureFuture 的子接口,这种设计遵循了设计模式原则里面的 依赖倒置原则 。这里小结一下分析过的几个方法的特征:

方法 特征
submit(Runnable task) 异步执行,执行结果无感知,通过 get() 方法虽然返回 null 但是可以确定执行完毕的时刻
submit(Runnable task, T result) 异步执行,预先传入执行结果,最终通过 get() 方法返回的就是初始传入的结果
submit(Callable task) 异步执行,最终通过 get() 方法返回的是 Callable#call() 的结果
invokeAny(Collection<? extends Callable> tasks) 异步执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回),永久阻塞同步返回结果
invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 功能同上,获取结果的时候是超时阻塞获取
invokeAll(Collection<? extends Callable> tasks) 异步执行任务列表中的所有任务,必须等待所有 Future#get() 永久阻塞方法都返回了结果才返回 Future 列表
invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 异步执行任务列表中的所有任务,只要其中一个任务 Future#get() 超时阻塞方法超时就会取消该任务索引之后的所有任务并且返回 Future 列表

小结

ExecutorService 提供了一系列便捷的异步任务提交方法,它使用到多种技术:

  • 相对底层的 CAS 原语。
  • 基于 CAS 实现的无锁并发栈。
  • 依赖于线程池实现的 execute() 方法进行异步任务提交。
  • 使用适配器模式设计 FutureTask 适配 FutrueRunnableCallable ,提供了状态的生命周期管理。

下一篇文章将会分析一下调度线程池 ScheduledThreadPoolExecutor 的底层实现和源码。

(本文完 c-7-d e-a-20190727)