把生命浪费在美好事物上

产品产品产品

358篇博客

Java 线程池源码分析 (上篇)

目录 
  • FutureTask
  • ExecutorCompletionService
  • AbstractExecutorService
  • ThreadPoolExecutor
FutureTask 
 FutureTask类 结构


 FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既是Runnable,也是Future。
 主要成员变量 
//任务运行状态
    private volatile int state;
    //预定义多7种状态
     private static final int NEW          = 0; //任务新建和执行中
     private static final int COMPLETING   = 1; //任务将要执行完毕中
     private static final int NORMAL       = 2; //任务正常执行结束
     private static final int EXCEPTIONAL  = 3; //任务异常
     private static final int CANCELLED    = 4; //任务取消
     private static final int INTERRUPTING = 5; //任务线程即将被中断
     private static final int INTERRUPTED  = 6; //任务线程已中断
     //可能的状态转换
     //NEW -> COMPLETING -> NORMAL
     // NEW -> COMPLETING -> EXCEPTIONAL
     // NEW -> CANCELLED
     //NEW -> INTERRUPTING -> INTERRUPTED

    private Callable callable;//被提交的任务
   
    private Object outcome; //任务执行结果或者任务异常
   
    private volatile Thread runner;//执行任务的线程
   
    private volatile WaitNode waiters;//等待节点,关联等待线程

  run

    public void run() {
       //如果运行状态不是NEW 或者CAS设置运行线程为当前线程失败则返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
          //当前被提交的任务
            Callable c = callable;
            //如果任务不为null同时运行状态是NEW
            if (c != null && state == NEW) {
                V result;
                //运行正常标记
                boolean ran;
                try {
                   //执行提交的任务,并将结果赋值给result,同时将运行正常标记设置为true
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                //如果执行提交任务异常,则将结果设置为null,运行正常标记设置为false,同时设置异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                //如果运行正常,则将设置结果
                if (ran)
                    set(result);
            }
        } finally {
             //退出前将当前运行线程设置为null
            runner = null;
            //重置runner后必须重新读取state防止有泄漏的中断
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }



     protected void set(V v) {
       // CAS将运行状态从NEW更新为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         //更新成功将结果赋值给outcome
            outcome = v;
            //将状态更新为NORMAL,终态
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
            //结束处理,删除或唤醒等待线程等
            finishCompletion();
        }
    }



    //删除或唤醒所有等待线程,待用done()同时将callable重置为null
    private void finishCompletion() {
         
        for (WaitNode q; (q = waiters) != null;) {
            //CAS将等待节点更新为null
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    //获取等待节点的线程
                    Thread t = q.thread;
                    //如果等待节点电池不为null,则将其设置为null并唤醒线程
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    //获取等待线程的直接后继节点
                    WaitNode next = q.next;
                    //如果直接后继为null则表示所有等待线程都处理完,跳出循环
                    if (next == null)
                        break;
                    //将节点从链表中删除   
                    q.next = null; // unlink to help gc
                    //继续处理后继节点
                    q = next;
                }
                //跳出循环
                break;
            }
        }
       //调用done
        done();
        //将callable重置为null
        callable = null;        
    }


     protected void setException(Throwable t) {
        // CAS 将运行状态从NEW 更新为 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
           //将异常赋值给outcome
            outcome = t;
            //将运行状态更新为终态 EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
            //结束处理
            finishCompletion();
        }
    }

    //等待状态从INTERRUPTING变为终态
     private void handlePossibleCancellationInterrupt(int s) {
       
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 
    }

 get

public V get() throws InterruptedException, ExecutionException {
        //运行状态
        int s = state;
        //如果还没有结束
        if (s <= COMPLETING)
            //等待结束
            s = awaitDone(false, 0L);
        //返回结果
        return report(s);
    }



     private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //如果有超时限制,则计算deadLine
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        //入队标记
        boolean queued = false;
        //循环
        for (;;) {
        //如果当前线程已经中断,则将该节点从等待队列中删除,并抛出异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            //运行状态
            int s = state;
            if (s > COMPLETING) { //任务可能已经完成或者被取消了
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)  // 可能任务线程被阻塞了,主线程让出CPU
                Thread.yield();
            else if (q == null) // 等待线程节点为空,则初始化新节点并关联当前线程
                q = new WaitNode();
            else if (!queued) //如果没有入队过,等待线程入队列,成功则queued=true
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {//限制超时
                nanos = deadline - System.nanoTime();
                //如果超时移除等待节点
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //没有超时则线程挂起
                LockSupport.parkNanos(this, nanos);
            }
            else  线程挂起
                LockSupport.park(this);
        }
    }


     private void removeWaiter(WaitNode node) {
         //如果要删除的节点不为null,则将其节点线程设置为null
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {
               //q初始化为等待节点          
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                  //q的直接后继
                    s = q.next;
                    //如果q的线程不为null,表示q不是需要删除的节点,则将q赋值给pred
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) { //如果pred不为null则表示pred是有效节点,
                    //需要删除的后继节点作为pred的直接后继,进而将需要删除的节点从队列中删除
                        pred.next = s;
                        if (pred.thread == null) // 竞态检查
                            continue retry;
                    }//如果头节点是需要删除的节点则CAS更新
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> {
    //具体执行任务的线程池
    private final Executor executor;
    private final AbstractExecutorService aes;
    //任务执行完成的阻塞队列
    private final BlockingQueue<Future<V>> completionQueue;

   
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        //FutureTask执行结束的时候会被调用,将task添加到阻塞队列
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
   
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
         //如果aes为null则将task包装成一个新的FutureTask,否则调用aes newTaskFor方法进行包装
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

   
    
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

   
   

    public Future<V> submit(Callable<V> task) {
        //如果提交的任务为null则抛出NPE
        if (task == null) throw new NullPointerException();
        //将当前任务包装为一个FutureTask
        RunnableFuture<V> f = newTaskFor(task);
        //再将FutureTask包装为QueueingFuture 后执行
        executor.execute(new QueueingFuture(f));
        return f;
    }

   
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    

}

ExecutorCompletionService

AbstractExecutorService 抽象类实现了 ExecutorService 接口,然后在其基础上实现了 invokeAny 方法和 invokeAll 方法,这里的 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。
 invokeAny
 public  T invokeAny(Collection> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }



    private  T doInvokeAny(Collection> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        //如果提交的任务集合为null则抛NPE
        if (tasks == null)
            throw new NullPointerException();
       //提交的任务数量
        int ntasks = tasks.size();
        //如果提交的任务数量为0则抛一个IllegalArgumentException
        if (ntasks == 0)
            throw new IllegalArgumentException();
        //创建一个结果集合列表   
        ArrayList> futures = new ArrayList>(ntasks);
        //创建一个ExecutorCompletionService
        ExecutorCompletionService ecs =
            new ExecutorCompletionService(this);

        

        try {
           
            ExecutionException ee = null;
            //如果有限制超时时间则计算出deadLine
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator> it = tasks.iterator();

            //执行第一个任务
            futures.add(ecs.submit(it.next()));
            //任务数自减
            --ntasks;
            //活动任务数为1
            int active = 1;

            for (;;) {
                //获取执行完成结果
                Future f = ecs.poll();
                //如果没有结果,则表示任务尚未执行完
                if (f == null) {
                    //剩余任务数大于0,则继续执行
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)//如果活动任务数为0,且没有获取到成功成功结果,则表示所有任务都失败了
                        break;
                    else if (timed) {// 如果限制超时时间
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else //如果没有要提交的任务,线程池中任务还没有结束阻塞等待结果
                        f = ecs.take();
                }
                //如果获取到了结果,活动任务数自减
                if (f != null) {
                    --active;
                    try {
                        //返回结果
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
        //退出前取消所有任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

相关阅读:Java 线程池源码分析 (下篇)

本文来自网易实践者社区,经作者张伟授权发布。