/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
static CompletableFuture asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncRun(d, f));
return d;
}
/** The encoding of the null value. */
static final AltResult NIL = new AltResult(null);
/** Completes with the null value, unless already completed. */
final boolean completeNull() {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
NIL);
}
可以看到,对于任务返回值为 null 的执⾏结果,被封装为 new AltResult(null) 对象。⽽且,还是 调⽤的 CAS 本地⽅法实现了原⼦操作。 为什么需要对 null 值进⾏单独封装呢?观察 get() ⽅法的源码:
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
原来原因是便于使⽤ null 值区分异步任务是否执⾏完毕。 如果你对 CAS 不太了解的话,可以查阅 compareAndSwapObject ⽅法的四个参数的含义。该⽅法的参 数 RESULT 是什么呢?查看代码如下:
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
原来,RESULT 是获取 CompletableFuture 对象中 result 字段的偏移地址。这个 result 字段⼜是啥 呢?就是任务执⾏完毕后的结果值。代码如下:
// Either the result or boxed AltResult
volatile Object result;
static CompletableFuture asyncSupplyStage(Executor e,
Supplier f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncSupply(d, f));
return d;
}
你以为 Completion 栈内元素会依次调⽤,不会的。从代码中来看,当回调⽅法 t 不等于 null,有两种 情况:
情况 1:如果当前迭代到的 CompletableFuture 对象是 this (也就是 CompletableFuture 链表头), 会令 h.next = null ,因为 h.next 也就是 t 通过 CAS 的⽅式压到了 this 对象的 stack 栈顶。
情况 2:如果当前迭代到的 CompletableFuture 对象 f 不是 this (不是链表头)的话,会将回调函数 h 压⼊ this (链表头)的 stack 中。然后从链表头再次迭代遍历。这样下去,对象 f 中的回调⽅法栈假设 为 3-2-1,从 f 的栈顶推出再压⼊ this 的栈顶,顺序就变为了 1-2-3。这时候,情况就变成了第 1 种。
这样,当回调⽅法 t = h.next 等于 null 或者 f 等于 this 时,都会对栈顶的回调⽅法进⾏调⽤。
简单来说,就是将拥有多个回调⽅法的 CompletableFuture 对象的多余的回调⽅法移到到 this 对象的 栈内。
回调⽅法执⾏结束要么返回下⼀个 CompletableFuture 对象,要么返回 null 然后⼿动设置为 f = this, 再次从头遍历。
private CompletableFuture uniWhenCompleteStage(
Executor e, BiConsumer super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
if (e != null || !d.uniWhenComplete(this, f, null)) {
UniWhenComplete c = new UniWhenComplete(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
public CompletableFuture thenAccept(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action, Executor executor)
public CompletableFuture thenCompose(Function super T,? extends CompletionStage> fn)
public CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn)
public CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn, Executor executor)
public CompletableFuture thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn, Executor executor)
public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T,U> fn)
public CompletableFuture applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn)
public CompletableFuture applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn, Executor executor)