//任务运行状态
private volatile int state;
//预定义多7种状态
private static final int NEW = 0; //任务新建和执行中
private static final int COMPLETING = 1; //任务将要执行完毕中
private static final int NORMAL = 2; //任务正常执行结束
private static final int EXCEPTIONAL = 3; //任务异常
private static final int CANCELLED = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED = 6; //任务线程已中断
//可能的状态转换
//NEW -> COMPLETING -> NORMAL
// NEW -> COMPLETING -> EXCEPTIONAL
// NEW -> CANCELLED
//NEW -> INTERRUPTING -> INTERRUPTED
private Callable callable;//被提交的任务
private Object outcome; //任务执行结果或者任务异常
private volatile Thread runner;//执行任务的线程
private volatile WaitNode waiters;//等待节点,关联等待线程
run
public void run() {
//如果运行状态不是NEW 或者CAS设置运行线程为当前线程失败则返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//当前被提交的任务
Callable c = callable;
//如果任务不为null同时运行状态是NEW
if (c != null && state == NEW) {
V result;
//运行正常标记
boolean ran;
try {
//执行提交的任务,并将结果赋值给result,同时将运行正常标记设置为true
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果执行提交任务异常,则将结果设置为null,运行正常标记设置为false,同时设置异常
result = null;
ran = false;
setException(ex);
}
//如果运行正常,则将设置结果
if (ran)
set(result);
}
} finally {
//退出前将当前运行线程设置为null
runner = null;
//重置runner后必须重新读取state防止有泄漏的中断
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// CAS将运行状态从NEW更新为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//更新成功将结果赋值给outcome
outcome = v;
//将状态更新为NORMAL,终态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
//结束处理,删除或唤醒等待线程等
finishCompletion();
}
}
//删除或唤醒所有等待线程,待用done()同时将callable重置为null
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
//CAS将等待节点更新为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//获取等待节点的线程
Thread t = q.thread;
//如果等待节点电池不为null,则将其设置为null并唤醒线程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
//获取等待线程的直接后继节点
WaitNode next = q.next;
//如果直接后继为null则表示所有等待线程都处理完,跳出循环
if (next == null)
break;
//将节点从链表中删除
q.next = null; // unlink to help gc
//继续处理后继节点
q = next;
}
//跳出循环
break;
}
}
//调用done
done();
//将callable重置为null
callable = null;
}
protected void setException(Throwable t) {
// CAS 将运行状态从NEW 更新为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将异常赋值给outcome
outcome = t;
//将运行状态更新为终态 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
//结束处理
finishCompletion();
}
}
//等待状态从INTERRUPTING变为终态
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
get
public V get() throws InterruptedException, ExecutionException {
//运行状态
int s = state;
//如果还没有结束
if (s <= COMPLETING)
//等待结束
s = awaitDone(false, 0L);
//返回结果
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//如果有超时限制,则计算deadLine
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
//入队标记
boolean queued = false;
//循环
for (;;) {
//如果当前线程已经中断,则将该节点从等待队列中删除,并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//运行状态
int s = state;
if (s > COMPLETING) { //任务可能已经完成或者被取消了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 可能任务线程被阻塞了,主线程让出CPU
Thread.yield();
else if (q == null) // 等待线程节点为空,则初始化新节点并关联当前线程
q = new WaitNode();
else if (!queued) //如果没有入队过,等待线程入队列,成功则queued=true
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {//限制超时
nanos = deadline - System.nanoTime();
//如果超时移除等待节点
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//没有超时则线程挂起
LockSupport.parkNanos(this, nanos);
}
else 线程挂起
LockSupport.park(this);
}
}
private void removeWaiter(WaitNode node) {
//如果要删除的节点不为null,则将其节点线程设置为null
if (node != null) {
node.thread = null;
retry:
for (;;) {
//q初始化为等待节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
//q的直接后继
s = q.next;
//如果q的线程不为null,表示q不是需要删除的节点,则将q赋值给pred
if (q.thread != null)
pred = q;
else if (pred != null) { //如果pred不为null则表示pred是有效节点,
//需要删除的后继节点作为pred的直接后继,进而将需要删除的节点从队列中删除
pred.next = s;
if (pred.thread == null) // 竞态检查
continue retry;
}//如果头节点是需要删除的节点则CAS更新
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
public class ExecutorCompletionService<V> implements CompletionService<V> {
//具体执行任务的线程池
private final Executor executor;
private final AbstractExecutorService aes;
//任务执行完成的阻塞队列
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
//FutureTask执行结束的时候会被调用,将task添加到阻塞队列
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
//如果aes为null则将task包装成一个新的FutureTask,否则调用aes newTaskFor方法进行包装
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
//如果提交的任务为null则抛出NPE
if (task == null) throw new NullPointerException();
//将当前任务包装为一个FutureTask
RunnableFuture<V> f = newTaskFor(task);
//再将FutureTask包装为QueueingFuture 后执行
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
}
public T invokeAny(Collection> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
private T doInvokeAny(Collection> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//如果提交的任务集合为null则抛NPE
if (tasks == null)
throw new NullPointerException();
//提交的任务数量
int ntasks = tasks.size();
//如果提交的任务数量为0则抛一个IllegalArgumentException
if (ntasks == 0)
throw new IllegalArgumentException();
//创建一个结果集合列表
ArrayList> futures = new ArrayList>(ntasks);
//创建一个ExecutorCompletionService
ExecutorCompletionService ecs =
new ExecutorCompletionService(this);
try {
ExecutionException ee = null;
//如果有限制超时时间则计算出deadLine
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator> it = tasks.iterator();
//执行第一个任务
futures.add(ecs.submit(it.next()));
//任务数自减
--ntasks;
//活动任务数为1
int active = 1;
for (;;) {
//获取执行完成结果
Future f = ecs.poll();
//如果没有结果,则表示任务尚未执行完
if (f == null) {
//剩余任务数大于0,则继续执行
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)//如果活动任务数为0,且没有获取到成功成功结果,则表示所有任务都失败了
break;
else if (timed) {// 如果限制超时时间
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else //如果没有要提交的任务,线程池中任务还没有结束阻塞等待结果
f = ecs.take();
}
//如果获取到了结果,活动任务数自减
if (f != null) {
--active;
try {
//返回结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//退出前取消所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
相关阅读:Java 线程池源码分析 (下篇)
本文来自网易实践者社区,经作者张伟授权发布。