在 JUC 包中有许多支持 Executor 线程池执行的 Task,它们提供了:执行、回调、延时、定时、分治、流式调用等功能,为多线程的易用性、健壮性提供保证。
Task接口
Runnable 和 Callable
Runnable
线程的执行单元,其并不返回执行结果,接口定义如下:
public interface Runnable {
//在接口定义中 public abstract 关键字为默认可忽略
public abstract void run();
}
Callable
Callable 是一个支持泛型的接口,其在执行完毕后支持返回结果并且可能抛出异常。
public interface Callable<V> {
V call() throws Exception;
}
Future
当需要获取线程的执行结果时,需要用到 Future。Callable 用于产生结果,Future 用于获取结果。Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get();
V get(long timeout, TimeUnit unit);
}
ThreadPoolExecutor Task
RunnableFuture
RunnableFuture 合并了 Runnable 和 Future 这两个接口,
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
我们可以看到 run 方法的定义完全就是多余的,我能想到它唯一的用途就是在 Javadoc 增加了新的注释。
FutureTask
FutureTask 实现了 RunnableFuture 接口,间接实现了 Runnable 接口和 Future 接口,所以 FutureTask 以一个适配器的形式将线程池需要实现的接口 ExecutorService 统一起来,而不是实现多个不同版本的方法。
另外,FutureTask 清晰的划分了任务执行的状态,方便调用后的逻辑处理。状态转换关系如下图。
在 FutureTask 被创建时,其状态被初始化为 New,其也被称之为初始态。New 表示新的任务或者还没被执行完的任务。除 New 之外,其他状态被分成两种类别,中间态和最终态。Completing 和 Interrupting 是中间态,Normal、Exceptional、Cancelled 和 Interrupted 为最终态。
- Completing(中间态):任务执行完成或者执行任务时发生异常,此时任务执行结果或者异常原因还没有保存到 outcome 字段时任务所处的状态。
- Normal(最终态):任务执行完成并且任务执行结果保存到 outcome 字段,状态从 COMPLETING 转换到 NORMAL。
- Exceptional(最终态):任务执行异常并且异常原因保存到 outcome 字段中,状态从 COMPLETING 转换到 EXCEPTIONAL。
- Cancel(最终态):任务未开始执行或已经开始执行但是还没有执行完成时,用户调用了 cancel(false) 方法取消任务且不中断任务执行线程。状态从 NEW 转化为 CANCELLED 状态。
- Interrupting(中间态):任务未开始执行或已经执行但还未执行完成时,用户调用了 cancel(true) 方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态从 NEW 转化为 INTERRUPTING。
- Interrupted(最终态):调用 interrupt() 中断任务执行线程之后状态会从 INTERRUPTING 转换到 INTERRUPTED。
需要注意的是,FutureTask 与 Runnable/Callable 的调用方式并不同(虽然看起来差不多)。FutureTask 主动封装了 Callable 中的 call 方法,并对其捕获了异常记录在 outcome 成员变量中。当然对于使用 get()
获得经过 report()
返回的数据,可以正常捕获异常。需要捕获异常可通过线程池的后处理模式 afterExecute 和线程的异常捕获机制 UncaughtExceptionHandler 两种方式。
ScheduledThreadPoolExecutor Task
RunnableScheduledFuture
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
boolean isPeriodic();
}
ScheduledFutureTask
精华实现都体现在 ScheduledThreadPoolExecutor 中阻塞队列的实现,该 Task 只是提供基础的支持。
ForkJoinPool Task
ForkJoin 框架包含 ForkJoinTask、ForkJoinWorkerThread、ForkJoinPool 和若干 ForkJoinTask 的子类,它的核心在于分治和工作窍取,最大程度利用线程池中的工作线程。
ForkJoinTask
ForkJoinTask 可以理解为类线程但比线程轻量的实体,在 ForkJoinPool 中运行的少量 ForkJoinWorkerThread 可以持有大量的 ForkJoinTask 和它的子任务。ForkJoinTask 同时也是一个轻量的 Future,使用时应避免较长阻塞。
ForkJoinTask 实现了 future,也可以序列化,但它不是一个 Runnable 或 Callable。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
运行状态
//volatie修饰的任务状态值,由ForkJoinPool或工作线程修改.
volatile int status;
static final int DONE_MASK = 0xf0000000;//用于屏蔽完成状态位.
static final int NORMAL = 0xf0000000;//表示正常完成,是负值.
static final int CANCELLED = 0xc0000000;//表示被取消,负值,且小于NORMAL
static final int EXCEPTIONAL = 0x80000000;//异常完成,负值,且小于CANCELLED
static final int SIGNAL = 0x00010000;//用于signal,必须不小于1<<16,默认为1<<16.
static final int SMASK = 0x0000ffff;//后十六位的task标签
执行和异常处理
doExec()
方法调用了留给子类实现的方法。
//final修饰,运行ForkJoinTask的核心方法.
final int doExec() {
int s; boolean completed;
//仅未完成的任务会运行,其他情况会忽略.
if ((s = status) >= 0) {
try {
//调用exec,留给子类实现
completed = exec();
} catch (Throwable rex) {
//发生异常,用setExceptionalCompletion设置结果
return setExceptionalCompletion(rex);
}
if (completed)
//正常完成,调用setCompletion,参数为normal,并将返回值作为结果s.
s = setCompletion(NORMAL);
}
//返回s
return s;
}
正常完成调用 setCompletion
//标记当前task的completion状态,同时根据情况唤醒等待该task的线程.
private int setCompletion(int completion) {
for (int s;;) {
//开启一个循环,如果当前task的status已经是各种完成(小于0),则直接返回status,这个status可能是某一次循环前被其他线程完成.
if ((s = status) < 0)
return s;
//尝试将原来的status设置为它与completion按位或的结果.
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
//此处体现了SIGNAL的标记作用,很明显,只要task完成(包含取消或异常),或completion传入的值不小于1<<16,
//就可以起到唤醒其他线程的作用.
synchronized (this) { notifyAll(); }
//cas成功,返回参数中的completion.
return completion;
}
}
}
发生异常调用 setExceptionalCompletion
//记录异常并且在符合条件时传播异常行为
private int setExceptionalCompletion(Throwable ex) {
//首先记录异常信息到结果
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
//status去除非完成态标志位(只保留前4位),等于EXCEPTIONAL.内部传播异常
internalPropagateException(ex);
return s;
}
//internalPropagateException方法是一个空方法,留给子类实现,可用于completer之间的异常传递
void internalPropagateException(Throwable ex) {}
//记录异常完成
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
//只能是异常态的status可以记录.
//hash值禁止重写,不使用子类的hashcode函数.
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
//异常锁,加锁
lock.lock();
try {
//抹除脏异常,后面叙述
expungeStaleExceptions();
//异常表数组.ExceptionNode后面叙述.
ExceptionNode[] t = exceptionTable;//exceptionTable是一个全局的静态常量,后面叙述
//用hash值和数组长度进行与运算求一个初始的索引
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
//找到空的索引位,就创建一个新的ExceptionNode,保存this,异常对象并退出循环
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);//(1)
break;
}
if (e.get() == this) //已设置在相同的索引位置的链表中,退出循环.//2
break;
//否则e指向t[i]的next,进入下个循环,直到发现判断包装this这个ForkJoinTask的ExceptionNode已经出现在t[i]这个链表并break(2),
//或者直到e是null,意味着t[i]出发开始的链表并无包装this的ExceptionNode,则将构建一个新的ExceptionNode并置换t[i],
//将原t[i]置为它的next(1).整个遍历判断和置换过程处在锁中进行.
}
} finally {
lock.unlock();
}
//记录成功,将当前task设置为异常完成.
s = setCompletion(EXCEPTIONAL);
}
return s;
}
//exceptionTable声明
private static final ExceptionNode[] exceptionTable;//全局异常node表
private static final ReentrantLock exceptionTableLock;//上面用到的锁,就是一个普通的可重入锁.
private static final ReferenceQueue<Object> exceptionTableRefQueue;//变量表引用队列,后面详述.
private static final int EXCEPTION_MAP_CAPACITY = 32;//异常表的固定容量,不大,只有32而且是全局的.
//初始化在一个静态代码块.
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
(k.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
}
//先来看ExceptionNode内部类的实现
//签名,实现了一个ForkJoinTask的弱引用.
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
final int hashCode; // store task hashCode before weak ref disappears
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);//指向弱引用的构造函数,保存引用为task,队列为全局的exceptionTableRefQueue.
this.ex = ex;//抛出的异常的引用
this.next = next;//数组中的ExceptionNode以链表形式存在,前面分析过,先入者为后入者的next
this.thrower = Thread.currentThread().getId();//保存抛出异常的线程id(严格来说是创建了this的线程)
this.hashCode = System.identityHashCode(task);//哈希码保存关联task的哈希值.
}
}
//清除掉异常表中的脏数据,仅在持有全局锁时才可使用.前面看到在记录新的异常信息时要进行一次清除尝试
private static void expungeStaleExceptions() {
//循环条件,全局exceptionTableRefQueue队列不为空,前面说过ExceptionNode是弱引用,当它被回收时会被放入此队列.
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
//从队首依次取出元素.
if (x instanceof ExceptionNode) {
//计算在全局exceptionTable中的索引.
int hashCode = ((ExceptionNode)x).hashCode;
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1);
//取出node
ExceptionNode e = t[i];
ExceptionNode pred = null;
//不停遍历,直到e是null为止.
while (e != null) {
//e的next
ExceptionNode next = e.next;//2
//x是队首出队的元素.它与e相等说明找到
if (e == x) {
//e是一个链表的元素,pred表示它是否有前置元素
if (pred == null)
//无前置元素,说明e在链表首部,直接将首部元素指向next即可.
t[i] = next;
else
//有前置元素,说明循环过若干次,将当前e出链表
pred.next = next;
//在链表中发现x即break掉内循环,继续从exceptionTableRefQueue的队首弹出新的元素.
break;
}
//只要发现当前e不是x,准备下一次循环,pred指向e.e指向next,进行下一个元素的比较.
pred = e;
e = next;
}
}
}
}
等待任务完成
//内部等待任务完成,直到完成或超时.
final void internalWait(long timeout) {
int s;
//status小于0代表已完成,直接忽略wait.
//未完成,则试着加上SIGNAL的标记,令完成任务的线程唤醒这个等待.
if ((s = status) >= 0 &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
//加锁,只有一个线程可以进入.
synchronized (this) {
//再次判断未完成.等待timeout,且忽略扰动异常.
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
//已完成则响醒其他等待者.
notifyAll();
}
}
}
同时,它的使用方法目前只有一个 ForkJoinPool::awaitJoin,在该方法中使用循环的方式进行 internalWait,满足了每次按截止时间或周期进行等待,同时也顺便解决了虚假唤醒。
继续看 externalAwaitDone 函数,它体现了ForkJoin框架的一个核心:外部帮助。
//外部线程等待一个common池中的任务完成.
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ?
//当前task是一个CountedCompleter,尝试使用common ForkJoinPool去外部帮助完成,并将完成状态返回.
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
//当前task不是CountedCompleter,则调用common pool尝试外部弹出该任务并进行执行,
//status赋值doExec函数的结果,若弹出失败(其他线程先行弹出)赋0.
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
//检查上一步的结果,即外部使用common池弹出并执行的结果(不是CountedCompleter的情况),或外部尝试帮助CountedCompleter完成的结果
//status大于0表示尝试帮助完成失败.
//扰动标识,初值false
boolean interrupted = false;
do {
//循环尝试,先给status标记SIGNAL标识,便于后续唤醒操作.
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
//CAS成功,进同步块发现double check未完成,则等待.
wait(0L);
} catch (InterruptedException ie) {
//若在等待过程中发生了扰动,不停止等待,标记扰动.
interrupted = true;
}
}
else
//进同步块发现已完成,则唤醒所有等待线程.
notifyAll();
}
}
} while ((s = status) >= 0);//循环条件,task未完成.
if (interrupted)
//循环结束,若循环中间曾有扰动,则中断当前线程.
Thread.currentThread().interrupt();
}
//返回status
return s;
}
externalAwaitDone 的逻辑不复杂,在当前 task 为 ForkJoinPool.common 的情况下可以在外部进行等待和尝试帮助完成。方法会首先根据 ForkJoinTask 的类型进行尝试帮助,并返回当前的 status,若发现未完成,则进入下面的等待唤醒逻辑。该方法的调用者为非worker线程。
相似的方法:externalInterruptibleAwaitDone
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
//不同于externalAwaitDone,入口处发现当前线程已中断,则立即抛出中断异常.
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
//wait时也不catch中断异常,发生即抛出.
wait(0L);
else
notifyAll();
}
}
}
}
return s;
}
externalInterruptibleAwaitDone 的逻辑与 externalAwaitDone 相似,只是对中断异常的态度为抛,后者为 catch。
它们的使用点,externalAwaitDone 为 doJoin 或 doInvoke 方法调用,externalInterruptibleAwaitDone 为 get 方法调用。很明显,join 操作不可扰动,get 则可以扰动。
Get
//get方法还有get(long time)的变种.
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
//当前线程是ForkJoinWorkerThread则调用前面提过的doJoin方法.
//否则调用前述externalInterruptibleAwaitDone
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
//异常处理,取消的任务,抛出CancellationException.
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
//异常处理,调用getThrowableException获取异常,封进ExecutionException.
throw new ExecutionException(ex);
//无异常处理,返回原始结果.
return getRawResult();
}
//getRawResult默认为一个抽象实现,在ForkJoinTask中,并未保存该结果的字段.
public abstract V getRawResult();
//getThrowableException方法
private Throwable getThrowableException() {
//不是异常标识,直接返回null,从方法名的字面意思看,要返回一个可抛出的异常.
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
//系统哈希码来定位ExceptionNode
int h = System.identityHashCode(this);
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
//加异常表全局锁
lock.lock();
try {
//先清理已被回收的异常node,前面已述.
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)];
//循环找出this匹配的异常node
while (e != null && e.get() != this)
e = e.next;
} finally {
lock.unlock();
}
Throwable ex;
//前面找不出异常node或异常node中存放的异常为null,则返回null
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
//不是当前线程抛出的异常.
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;//该异常的无参构造器
Constructor<?>[] cs = ec.getConstructors();//该异常类公有构造器
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
//构建器参数列表长度0说明存在无参构造器,存放.
noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class) {
//发现有参构造器且参数长度1且第一个参数类型是Throwable,说明可以存放cause.
//反射将前面取出的ex作为参数,反射调用该构造器创建一个要抛出的Throwable.
Throwable wx = (Throwable)c.newInstance(ex);
//反射失败,异常会被catch,返回ex,否则返回wx.
return (wx == null) ? ex : wx;
}
}
if (noArgCtor != null) {
//在尝试了寻找有参无参构造器,并发现只存在无参构造器的情况,用无参构造器初始化异常.
Throwable wx = (Throwable)(noArgCtor.newInstance());
if (wx != null) {
//将ex设置为它的cause并返回它的实例.
wx.initCause(ex);
return wx;
}
}
} catch (Exception ignore) {
//此方法不可抛出异常,一定要成功返回.
}
}
//有参无参均未成功,返回找到的异常.
return ex;
}
Join
//join公有方法
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
//调用doJoin方法阻塞等待的结果不是NORMAL,说明有异常或取消.报告异常.
reportException(s);
//等于NORMAL,正常执行完毕,返回原始结果.
return getRawResult();
}
//报告异常,可在前一步判断执行status是否为异常态,然后获取并重抛异常.
private void reportException(int s) {
//参数s必须用DONE_MASK处理掉前4位以后的位.
if (s == CANCELLED)
//传入的状态码等于取消,抛出取消异常.
throw new CancellationException();
if (s == EXCEPTIONAL)
//使用前面的getThrowableException方法获取异常并重新抛出.
rethrow(getThrowableException());
}
//invoke公有方法.
public final V invoke() {
int s;
//先尝试执行
if ((s = doInvoke() & DONE_MASK) != NORMAL)
//doInvoke方法的结果status只保留完成态位表示非NORMAL,则报告异常.
reportException(s);
//正常完成,返回原始结果.
return getRawResult();
}
doJoin&doInvoke
//join的核心方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//已完成,返回status,未完成再尝试后续
if ((s = status) < 0) {
return s;
}
//未完成,当前线程是ForkJoinWorkerThread
else {
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
//当前task出队然后执行,执行的结果是完成则返回状态
if ((w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0) {
return s;
}
//否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待.
else {
return wt.pool.awaitJoin(w, this, 0L);
}
}
//当前线程不是ForkJoinWorkerThread,调用前面说的externalAwaitDone方法.
else {
return externalAwaitDone();
}
}
}
//invoke的核心方法
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
//先尝试本线程执行,不成功才走后续流程
if (s = doExec()) < 0) {
return s;
} else {
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
return wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this, 0L);
} else {
return externalAwaitDone();
}
}
}
ForkJoinTask 是个抽象类,且它并未保存任务的完成结果,也不负责这个结果的处理,但声明并约束了返回结果的抽象方法 getRawResult 供子类实现。因此,ForkJoinTask 的自身关注任务的完成/异常/未完成,子类关注这个结果的处理。
每当获取到任务的执行状态时,ForkJoinTask 可根据 status 来判断是否是异常/正常完成,并进入相应的处理逻辑,最终使用子类实现的方法完成一个闭环。如果理解为将 ForkJoinTask 和子类的有关代码合并起来,在结果/完成状态/异常信息这一块,相当于同时有三个部分在合作。
status:
它同时表示了未完成/正常完成/取消/异常完成等状态,也同时告诉有关等待线程是否要唤醒其他线程(每个线程等待前会设置SIGNAL),同时留出了后面 16 位对付其他情况。
result:
在 ForkJoinTask 见不到它,也没有相应的字段,子类也未必需要提供这个 result 字段。CountedCompleter 就没有提供这个 result,它的 getRawResult 会固定返回 null。但是 CountedCompleter 可以继承子类并实现这个 result 的保存与返回。在 JAVA8 中,stream api 中的并行流也会保存每一步的计算结果,并对结果进行合并。
异常:
在 ForkJoinTask 中已经完成了所有异常处理流程和执行流程的定义,重点在于异常的存放,它是由 ForkJoinTask 的类变量进行存放的,结构为数组+链表,且元素利用了弱引用,借 gc 帮助清除掉已经被回收的 ExceptionNode,显然在 gc 之前必须得到使用。而异常随时可以发生并进行 record 入列,但相应的能消费掉这个异常的只有相应的外部的
get
、join
、invoke
等方法或者内部扩展了exec()
等方式,得到其他线程执行的 task 异常结果的情况。巧妙的是,只有外部调用者调用(get
、invoke
、join
)时,这个异常信息才足够重要,需要 rethrow 出去并保存关键的堆栈信息;而内部线程在访问一些非自身执行的任务时,往往只需要 status 判断是否异常即可,在exec()
中 fork 新任务的,也往往必须立即 join 这些新的子任务,这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了)。
Task的运行过程
Task 可由三种方式启动
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
//Fork方法,将当前任务入池.
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//如果当前线程是ForkJoinWorkerThread,将任务压入该线程的任务队列.
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//否则调用common池的externalPush方法入队.
ForkJoinPool.common.externalPush(this);
return this;
}
对一个task,先 fork()
压队,再 join()
等待执行结果,这是一个 ForkJoinTask 的执行周期闭环(但不要简单理解为生命周期,前面提到过,任务可以被重新初始化,而且重新初始化时还会清空 ExceptionNode 数组上的已回收成员)。
CountedCompleter
RecursiveAction
RecursiveTask
public class Main {
public static void main(String[] args) {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveTask任务
ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecursiveTask(0, 10));
try {
//根据返回类型获取返回值
Integer result = forkJoinTask.get();
System.out.println("结果为:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
private int start;
private int end;
public CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
if ((end - start) <= 5) {
return IntStream.rangeClosed(start, end).sum();
} else {
//任务分割
int middle = (end + start) / 2;
CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
//执行
task1.fork();
task2.fork();
//等待返回结果
return task1.join() + task2.join();
}
}
}
}
CompletableFuture
CompletionStage
CompletionStage 表示一个可能异步运行的“阶段”,在该阶段内要执行相应的行为,而这些运算会在另一个 CompletionStage 完成后开始,它自身完成后又可触发另一个依赖的 CompletionStage。CompletionStage 只是一个接口定义。
方法宏观横向分类
Consumer(消费型)
用上一个阶段的结果作为指定函数的参数执行函数产生新的结果。这一类接口方法名中基本都有 appl y字样,接口的参数是 (Bi)Function 类型。
Function(函数型)
用上一个阶段的结果作为指定操作的参数执行指定的操作,但不对阶段结果产生影响。这一类接口方法名中基本都有 accept 字样,接口的参数就是 (Bi)Consumer 类型。
Runnable(接续型)
不依据上一个阶段的执行结果,只要上一个阶段完成(但一般要求正常完成),就执行指定的操作,且不对阶段的结果产生影响。这一类接口方法名中基本都有 run 字样,接口的参数是 Runnable 类型。
Compose(组合型)
它以依赖阶段本身作为参数而不是阶段产生的结果进行产出型(或函数型)操作。
Usage
CompletableFuture 实现了 CompletableStage 和 Future 的所有接口,这意味着它即满足 CompletableStage 的阶段执行,也提供了 Future 中获取该执行结果的方法。
public class CompletableFuture<T>
implements Future<T>, CompletionStage<T>
其执行可大概概括为以下三个部分:Create(创建)、Process Control(过程控制)、Exception Handle(异常处理)。
Create 阶段
有返回值的调用链
静态生成(completedFuture)
CompletableFuture<U> completedFuture(U value);
Lambda Supplier生成(supplyAsync)
CompletableFuture<U> supplyAsync(Supplier<U> supplier);
CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
也可以当作 Callable 的特性使用。
执行后生成调用链
执行无返回值异步任务(runAsync)
CompletableFuture<Void> runAsync(Runnable runnable);
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
聚合后生成调用链
全部 CompletableFuture 执行完成后 Trigger(allOf)
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
任一 CompletableFuture 执行完成后,返回其返回值(anyOf)
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
Process Control 阶段
因为几乎都是对一些关键字的组合,因此在这里介绍关键字的含义。
阶段性依赖
- Then 方法,在上一个 future 执行完成后触发。
- Both 方法,完成指定的两个 future 后触发。
- Either 方法,完成指定的两个 future 中任一个触发。
状态性依赖
依赖任务执行返回的状态。
- Complete 方法:接收计算结果或异常并进行进一步处理,无返回值。
- Handle 方法:接收计算结果或异常并进行进一步处理,有返回值。
- Exceptionally 方法:当任务抛出异常后,可以选择使用 handle、complete 来进行处理,不过两者有些不同。handle、complete 是用来处理上一个任务的结果,如果有异常情况,就处理异常。而 exceptionally 可以放在 CompletableFuture 处理的最后,作为兜底逻辑来处理未知异常。
异步状态标识
- Async 方法会在指定线程池进行任务的执行
- Non-Async 方法会在当前线程进行任务的执行
操作流程
- Apply 方法:将上一个 CompletableFuture 执行结果作为入参,再次进行转换或者计算,重新返回一个新的值。
- Accept 方法:将上一个 CompletableFuture 执行结果作为入参,但是不返回结果。
- Compose 方法:将上一个 CompletableFuture 执行结果作为入参,并返回一个新的 CompletableFuture 对象。
- Run 方法:执行 Runnable 方法,在上一个 CompletableFuture 任务执行完成后,再执行另外一个接口,不需要上一个任务的结果,也不需要返回值,只需要在上一个任务执行完成后执行即可。
- Combine 方法:组合两个 CompletableFuture 的执行结果,并通过 BiFunction 将两个返回值合并后返回一个全新的值作为出参。
Exception Handle 阶段
handle 方法能同时接受执行结果和异常信息,在 BiFunction 进行处理。
CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
exceptionally 方法可以在上一个阶段以异常完成时进行处理,它可以根据上一个阶段的异常产出新的 CompletableFuture。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
Get Result 阶段
get()
获取结果。getNow()
立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。join()
不带超时时间的等待任务完成。
Completion
整个调用过程通过调用链的方式实现,并将调用方式分成了四种调用方式:
- zero-input source actions
- single-input source actions(UniCompletion)
- two-input source actions
- projected(BiCompletion used by either (not both) of two inputs)
- shared(CoCompletion used by the second of two sources)
- Signallers(Signallers is unblock waiters)
Completion 继承了 ForkJoinTask 支持异步执行,并且利用 ForkJoinTask Tag 并没有增加开销。
具体继承关系如下:
- ForkJoinTask
<V>
- Completion
- CoCompletion
- Signaller
- UniCompletion
<T, V>
- BiCompletion
<T, U, V>
- BiAccept、BiApply、BiRelay、BiRun
- OrAccept、OrApply、OrRelay、OrRun
- UniAccept
<T>
、UniApply<T, V>
、UniRelay<T>
、UniRun<T>
- UniCompose
<T, V>
- UniExceptionally
<T>
- UniHandle
<T, V>
- UniWhenComplete
<T>
- BiCompletion
- Completion
下面挑几个来进行讲解,由于 Completion 流程使用了策略模式,所以大概流程基本相似。
UniCompletion 调用逻辑
以 thenApply(对前一任务有依赖性)为例
1. 解除外观模式
将为了方便调用的外观模式 thenApply、thenApplyAsync,聚合为更集中的 Apply 操作:uniApplyStage。
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
2. 将同步异步分离
需要清楚的是,无论是同步还是异步,所做的操作都是相同的,只不过由不同的线程执行而已。
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
//参数合法性校验
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
//同步异步分离,妙用基础语法
//1. 存在线程池时,直接执行 if 语法块内逻辑
//2. 不存在线程池时,执行 !d.uniApply(this, f, null),即同步逻辑
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
//对于Completion操作来说,push有三种:push、bipush、orpush
//push Pushes the given completion (if it exists) unless done.
//bipush Pushes completion to this and b unless both done.
//orpush Pushes completion to this and b unless either done.
push(c);
c.tryFire(SYNC);
}
return d;
}
3. 异步执行逻辑
首先创建 UniApply 任务,根据 UniApply > UniCompletion > Completion 的继承关系,创建一个 Completion 操作。再压到 stack 中,尝试以同步的方式触发一下。
3.1 压栈操作
final void push(UniCompletion<?,?> c) {
if (c != null) {
//以 CAS 方式尝试入栈
while (result == null && !tryPushStack(c))
//在 CAS 失败后,清除 c.next 的引用
lazySetNext(c, null); // clear on failure
}
}
final boolean tryPushStack(Completion c) {
Completion h = stack;
//将当前 Completion 的 next 设置为栈顶
lazySetNext(c, h);
//CAS替换栈顶,成功返回true,失败返回false
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
这里我们需要注意的是,压栈操作是进行了反向压栈。举个例子:
命令的执行:
A -> B -> C
(A) -> A1 -> A2 -> A3 -> A4
(B) -> B1 -> B2 -> B3 -> B4
(C) -> C1 -> C2 -> C3 -> C4
压栈顺序:
C -> B -> A
(A) -> A4 -> A3 -> A2 -> A1
(B) -> B4 -> B3 -> B2 -> B1
(C) -> C4 -> C3 -> C2 -> C1
因为这种压栈机制,所以并不是依赖栈的顺序执行任务,而是直接在 Future 中定义了 depenence 和 source 维护相关的调用依赖关系。但要避免过深的递归调用,我们还需要将树形逆向的压栈方式修改为正向链式的方式,由 PostCompletion 解决这个问题。因此我们可将调用顺序理解为:在任务可直接执行时直接调用;否则等重排序好进行顺序传播触发。
反向压栈有问题吗?答案是没有。因为栈中的每一个 Completion 在执行上互不影响,它们的顺序只影响到 cleanStack 和 postComplete 的处理顺序。CompletableFuture 和它的栈元素产生的 CompletableFuture 彼此间有顺序要求,但对同一个 CompletableFuture 的栈内的 Completion 元素彼此间没有顺序要求,决定他们顺序的是对源 CompletionFuture 调用 orApply,thenApply 等等方法的顺序,后续运行也完全独立。只不过在源 CompletableFuture 进行 postComplete 时,执行的顺序将会与原本的”先来先出“相反。
3.2 尝试触发
tryFire 函数是在 UniApply 中实现的。
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
//dependence 为空,说明已经调用过了
//如果是异步模式(mode = 1),就不判断任务是否结束
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
//摘除引用
dep = null; src = null; fn = null;
//触发后处理
return d.postFire(a, mode);
}
}
//绑定一个依赖源
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
//当前future不为空且stack不为空
if (a != null && a.stack != null) {
//NESTED = -1(以嵌套的方式调用tryFire)或 result = null(src的任务未执行完毕)
//才会 cleanStack
if (mode < 0 || a.result == null)
a.cleanStack(); //清理 Stack 直到有结果产生
//否则将所有元素出栈并执行存活元素
else
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
//清除Stack中执行完毕的节点(node=null)
final void cleanStack() {
for (Completion p = null, q = stack; q != null;) {
//遍历Stack
Completion s = q.next;
//q是Live状态,跳过
if (q.isLive()) {
p = q;
q = s;
}
//q不是Live状态,将stack中对q的引用摘除掉
//此时分为两种情况:
//1. p本身就是过期的节点时(=null),直接将下一节点设为栈顶
//2. p不为null时摘掉q引用即可
else if (p == null) {
//说明p节点本身就是执行过的节点
casStack(q, s);
q = stack;
}
else {
p.next = s;
if (p.isLive()) //再次判断p结点是否还或者(在这期间是否有别的线程改动了)
q = s;
else {
p = null; //restart
q = stack;
}
}
}
}
//将树形的调用链调整为链式调用链, 在每次任务创建初和任务执行后反复进行触发
//解决了轮询和阻塞带来的对性能方面的影响
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
//当f的stack为空时,使f重新指向当前的CompletableFuture,继续后面的结点
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
//从头遍历stack
//将栈顶的下一个元素设置为栈顶,将旧栈顶弹出
//此时新栈顶为t,旧栈顶为h
if (f.casStack(h, t = h.next)) {
//如果栈顶不为空且栈顶不是当前future,压入栈---该行为是将树形的调用链调整为链式调用链
if (t != null) {
if (f != this) {
pushStack(h);
//继续下一个节点
continue;
}
//如果是当前CompletableFuture, 解除头节点与栈的联系
h.next = null;
}
//调用头节点的tryFire()方法,该方法可看作Completion的钩子方法,执行完逻辑后,
//会向后传播的
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
3.3 逻辑执行
对于不同的 Completion 来说,异同都主要集中在这一步操作。
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
//只有上一个任务执行完才会触发下面的逻辑
if (a == null || (r = a.result) == null || f == null)
return false;
//设置result或者throwable
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
//这里不同的实现可能不同,有些不需要返回值的会被直接设置成 null
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
//主要异步调用逻辑
final boolean claim() {
Executor e = executor;
//如果当前任务可以被执行,返回true,否则,返回false; 保证任务只被执行一次
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
BiCompletion 调用逻辑
final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
if (c != null) {
Object r;
//a的result还没准备好,c压入栈
while ((r = result) == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
//b的result也还没准备好
if (b != null && b != this && b.result == null) {
//根据a的result决定是否构建CoCompletion
//如果a未结束,则构建一个CoCompletion
//CoCompletion最后调用的也是BiCompletion的tryFire
Completion q = (r != null) ? c : new CoCompletion(c);
while (b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
}
}
CoCompletion 调用逻辑
以 applyToEither(对前一任务有依赖性)为例。
与 UniCompletion 基本相同,区别在于 orpush 和 BiCompletion。
final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
if (c != null) {
// a和b的result都没好,才会考虑入栈
while ((b == null || b.result == null) && result == null) {
if (tryPushStack(c)) { // 先入a的栈
// 入a的栈成功,b的result还没好
if (b != null && b != this && b.result == null) {
Completion q = new CoCompletion(c); // a还未结束,用c构建CoCompletion
// 再次判断,a和b的result都没好,才会考虑入栈
while (result == null && b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // 失败置空q的next域
}
break;
}
lazySetNext(c, null); // 失败置空c的next域
}
}
}
static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
Function<? super T,? extends V> fn;
OrApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
CompletableFuture<U> snd,
Function<? super T,? extends V> fn) {
super(executor, dep, src, snd); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null ||
!d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; snd = null; fn = null;
return d.postFire(a, b, mode);
}
}
Signaller 调用逻辑
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
get
和 join
方法都会在获取不到结果是按条件轮循 watingGet 方法。
private Object waitingGet(boolean interruptible) {
Signaller q = null;//信号器
boolean queued = false;//是否入队
int spins = -1;//自旋次数
Object r;//结果引用
//循环条件是只等待result,内部有根据扰动决定的break
while ((r = result) == null) {
//自旋次数只有第一次进来是负值,后续只能是0或其他正数。
if (spins < 0)
//自旋次数,多处理器下初始化为16,否则为0,即不自旋。设置值后此次循环结束。
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0;
//第二次循环时才会判断自旋次数。只要spins大于0就继续循环,直到达到0为止再执行下面的else代码。
else if (spins > 0) {
//仅当下一个种子数不小于0时,减小一次自旋次数。nextSecondarySeed是Thread类中使用@Contended注解标识的变量,
//这与传说中的伪共享有关。
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
//停止自旋后的第一轮循环,result依旧是null,则对q进行初始化,关于Signaller后续再讲。
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
//初始化q后的下一轮循环(停止自旋后的第二轮),queued是false,将上一轮循环初始化的q压入栈。
else if (!queued)
queued = tryPushStack(q);
//停止自旋后的若干次循环(上一步可能压栈失败,则下一轮自旋会再次压栈,直到成功)后,判断是否可扰动。
else if (interruptible && q.interruptControl < 0) {
//扰动信号匹配,将q的有关字段全部置空,顺带清一下栈,返回null。
q.thread = null;
//这个清栈的过程,细看上面的解释还有有关的源码,可能会发出一个疑问,cleanStack只能清除isLive判断false的Completion,
//但目前的实现,基本上都只能在dep为null,base为null等仅当dep执行完成的情况发生,而dep完成的情况是当前CompletableFuture的
//result不是null,而方法运行到此,很明显result必然是null,那么还有必要清栈吗?
//答案是必要的,首先将来也许能出现存活或死亡状态与source的result无关的Completion,那么此处清一下栈也是帮助后面的工作。
//其次,刚才压入栈的q在thread指向null时即已死亡,它也必须要进行清除。
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
//q关联的线程存在,即q存活,且依旧没有执行完毕,使用ForkJoinPool的阻塞管理机制,q的策略进行阻塞。
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
//阻塞是可以扰动的,此时会将q的扰动控制信号设置为-1,则下一次循环时将可能进入上一个else if。
q.interruptControl = -1;
}
}
}
//前面的循环没有break,能执行到此,只有result获得非null值的情况。
if (q != null) {
//若q不是null,说明没有在自旋阶段获取到result,需要对它进行禁用。
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
//可扰动且有扰动信号,则说明扰动后未能进入上面带有cleanStack的那个else if,
//可能是恰好在这次循环开始时获取到了非空result,从而退出循环,也可能是参数interruptible为假,
//在外部扰动了当前线程后,依旧等到了result。
//只要发生了扰动,就将结果置null,外面调用者如果是join,可以报出扰动。
r = null; // report interruption
else
//如果不可扰动,则中断当前线程(创建q的线程)。
Thread.currentThread().interrupt();
}
}
//当前future已经有结果,进行postComplete逻辑并返回r。
postComplete();
return r;
}
根据该方法的注释,waitingGet方法只会有两个结果,null(可扰动并且扰动了)和原始的 result。而 get 方法可扰动,也即可返回 null,join 方法不可扰动,只能等待结束或抛出异常。
waitingGet 方法中出现了 Signaller。Signaller 是一个 Completion 的直接子类,同时实现了 ForkJoinPool 的内部接口 ManagedBlocker,这使得它可以在当 ForkJoinPool 出现大量线程阻塞堆积时避免饥饿。Signaller 的作用是持有和释放一个线程,并提供相应的阻塞策略。前面提到的 waitingGet 方法创建了一个 Signaller(interruptible, 0L, 0L)
,类似的,可以看到 timedGet 方法使用 Signaller(true, nanos, d == 0L ? 1L : d)
来进行阻塞的管理,管理的方法依赖 ForkJoinPool 内部的。
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
long nanos; // 计时的情况下,要等待的时间。
final long deadline; // 计时的情况下指定不为0的值
volatile int interruptControl; // 大于0代表可扰动,小于0代表已扰动。
volatile Thread thread;//持有的线程
Signaller(boolean interruptible, long nanos, long deadline) {
this.thread = Thread.currentThread();
this.interruptControl = interruptible ? 1 : 0;//不可扰动,赋0
this.nanos = nanos;
this.deadline = deadline;
}
final CompletableFuture<?> tryFire(int ignore) {//ignore无用
Thread w; //Signaller自持有创建者线程,tryFire只是单纯唤醒创建它的线程。
if ((w = thread) != null) {
thread = null;//释放引用
LockSupport.unpark(w);//解除停顿。
}
//返回null,当action已执行并进行postComlete调用时,f依旧指向当前CompletableFuture引用并解除停顿。
return null;
}
public boolean isReleasable() {
//线程是空,允许释放。这可能是某一次调用本方法或tryFire方法造成。
if (thread == null)
return true;
if (Thread.interrupted()) {
//如果调用isReleasable方法的线程被扰动了,则置扰动信号为-1
int i = interruptControl;
interruptControl = -1;
if (i > 0)
//原扰动信号是”可扰动“,则是本次调用置为”已扰动“,返回true。
return true;
}
//未定时(deadline是0)的情况只能在上面释放,定时的情况,本次计算nanos(deadline-System.nanoTime())
//或上次计算的nanos不大于0时,说明可以释放。
if (deadline != 0L &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
//只要可释放,将创建者线程的引用释放。下次调用直接返回true,线程运行结束销毁后可被gc回收。
thread = null;
return true;
}
//仍持有创建者线程,调用此方法的线程未扰动或当前扰动不是第一次,未定时或不满足定时设置的一律返回false。
return false;
}
public boolean block() {
//block方法
if (isReleasable())
//判断可释放,直接return true。
return true;
//判断deadline是0,说明不计时,默认park。
else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
//计时情况,park指定nanos。
LockSupport.parkNanos(this, nanos);
//睡醒后再次返回isReleasable的结果。
return isReleasable();
}
//创建者线程引用被释放即代表死亡。
final boolean isLive() { return thread != null; }
}
ForkJoinPool.managedBlock(q) 来实现,而这用到了被 Signaller 实现的 ForkJoinPool.ManagedBlocker。
//ForkJoinPool的managedBlock方法。
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();//调用此方法的线程,即前面的Signaller的创建者线程。
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
//调用managedBlock方法的线程是ForkJoinWorkerThread,则它可运行在ForkJoinPool中。此处要求内部持有pool的引用。
WorkQueue w = wt.workQueue;
//循环,只要判断blocker(即Signaller)不可释放。
while (!blocker.isReleasable()) {
//尝试用ForkJoinPool对当前线程的工作队列进行补偿。
//tryCompensate方法会尝试减少活跃数并可能创建或释放一个准备阻塞的worker线程,
//它会在发生竞态,脏数据,松弛或池终止时返回false。
//关于ForkJoinPool的详情单独准备文章。
if (p.tryCompensate(w)) {
try {
//补偿成功,不停地对线程池尝试先isReleasable再block,任何一个方法返回true则终止循环。
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
//出现任何异常,或循环终止时,控制信号加上一个活跃数单元,因为前面通过补偿才会进入循环,已减少了一个单元。
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
//当前线程不是ForkJoinWorkerThread或不持有ForkJoinPool的引用。连续先尝试isReleasable再尝试block,直到有一者返回true为止。
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
特别说明:allOf & anyOf
allOf 和 anyOf 使用分治法来解决对任务完成的结果进行触发的功能。
allOf
allOf 方法,当所有 cfs 列表中的成员进入完成态后完成(使用空结果),或有任何一个列表成员异常完成时完成(使用同一个异常)。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
int lo, int hi) {
//声明一个后续返回的dep
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture<?> a, b;
//二分分治
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] : //只剩两个元素时,a作为偏左元素
//否则继续进行数组左部分元素拆分
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : //如果只剩下一个值,b=a
andTree(cfs, mid+1, hi))) == null) //只剩两个元素时,b作为偏右元素
//否则继续进行数组右部分拆分
throw new NullPointerException();
if (!d.biRelay(a, b)) {
//不满足完成条件,生成一个中继并压栈,再次尝试同步完成。若不满足条件,ab任何一个完成后都会再间接调用它的tryFire。
BiRelay<?,?> c = new BiRelay<>(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
anyOf
anyOf方法,返回一个 CompletableFuture 对象,任何一个 cfs 列表中的成员进入完成态(正常完成或异常),则它也一并完成,结果一致。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
//直接调用orTree
return orTree(cfs, 0, cfs.length - 1);
}
其与 allOf 的区别主要在后面 push 的处理逻辑上,一个是接收两者,一个是接收其一,它们本质是分治法拆分的子任务。