Java 线程池源码分析 (下篇)

ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法
   首先看一下构造方法
 /**
     * 使用给定的初始化参数创建一个新的线程池
     *
     * @param corePoolSize 线程池中保持的线程数量,即使这些线程是空闲的;除非设置了allowCoreThreadTimeOut
     * @param maximumPoolSize 线程池中允许的最大线程数
     * @param keepAliveTime 当线程池中的线程数大于corePoolSize时,这些多余空闲线程等待新任务的最大时间
     * @param unit keepAliveTime的时间单位
     * @param workQueue 用来在线程执行前持有线程的队列。这个队列只持有通过executor方法提交的runnable任务
     * @param threadFactory 线程池创建新线程的线程工厂
     * @param handler 线程池关闭或饱和时的处理策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
除了构造方法中的这几个初始化参数外还有一个比较重要的属性需要理解

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 这里 COUNT_BITS 设置为 29(32-3),为什么减去三呢?这是因为线程池有5种状态,需要用到三位,高三位用来表示线程池的状态
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1 大约是5亿
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 我们说了,线程池的状态存放在高 3 位中
// 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 对CAPACITY 按位去反高三位全部为1在进行按位与运算,就将地位全部变为0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 按位与允许将高三位全部变为0就得到了当前线程池中的线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//按位或运算就得到了ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
在介绍完基本属性后接下来要看主要方法executor(Runnable),它处理过程分为如下三步:
  1.  如果小于corePoolSize的线程在运行,尝试使用给定的任务作为工作线程的第一个任务去去创建一个工作线程。 调用addWorker方法以原子方式检查runState和workeCount,从而防止虚假报警,在返回false时会在不应该的时候添加线程
  2. 如果任务可以排队成功,那么我们仍然要双重检查是否应该添加一个新的线程,因为自上次检查以来可能现有线程已经死亡或自进入这个方法以来线程池已经关闭。所以需要双重检查,如果有关闭了可以对队列进行回滚,如果线程池没有线程可以开启一个新线程
  3. 如果任务不能排队,则尝试创建一个新的线程,如果创建新线程失败就知道线程已经关闭或已经饱和了,因此执行拒绝策略
public void execute(Runnable command) {
         //如果任务为null则抛出NPE 
        if (command == null)
            throw new NullPointerException();
        //获取ctl
        int c = ctl.get();
        //如果线程池的线程数小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            //添加新工作线程,如果添加成功则返回
            if (addWorker(command, true))
                return;
             //添加失败再次获取ctl,进行重复检查   
            c = ctl.get();
        }
        //如果线程池还是运行的,则对当前任务进行排队;如果排队成功
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取ctl
            int recheck = ctl.get();
            //如果线程池已经关闭则将任务从队列中删除,如果删除成功对该任务执行饱和策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)//如果没有运行线程,则创建一个新线程
                addWorker(null, false);
        }//执行到此表示,线程池已经关闭或排队失败;则尝试创建新的工作线程,如果添加新的工作线程则执行饱和策略
        else if (!addWorker(command, false))
            reject(command);
    }
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
       

        //运行在Worker中的线程,真正执行任务的线程
        final Thread thread;
        //初始化的创建的第一个任务,可以为null
        Runnable firstTask;
        //用于存放此线程完全的任务数
        volatile long completedTasks;

       // 构造方法
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

       
        public void run() {
            runWorker(this);
        }



final void runWorker(Worker w) {
        //获取当前线程
        Thread wt = Thread.currentThread();
        //获取worker中的firstTask
        Runnable task = w.firstTask;
        //将firstTask设置为null,因为这个任务是创建worker时候指定的,稍后就会执行这个任务
        w.firstTask = null;
        //对worker进行解锁,允许中断
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
             //如果任务不为null或者getTask()中可以去到任务
            while (task != null || (task = getTask()) != null) {
               //对工作线程加锁不允许中断
                w.lock();
                 //如果线程池状态大于等于 STOP,那么意味着该线程也要中断 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //执行beforeExecute,这是一个扩展点
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //执行afterExecute,这里也是一个扩展点
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 置空 task,准备 getTask 获取下一个任务
                    task = null;
                    //完成的任务数自增
                    w.completedTasks++;
                    // 释放掉 worker 的独占锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
         // 如果到这里,需要执行线程关闭:
         // 1. 说明 getTask 返回 null,没有任务要执行了
         // 2. 任务执行过程中发生了异常
            processWorkerExit(w, completedAbruptly);
        }
    }

     private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            //获取运行状态
            int rs = runStateOf(c);

            // 如果线程池已经关闭 同时队列为空,则减少工作线程数
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //工作线程数
            int wc = workerCountOf(c);

            //是否需要计时处理,如果设置了allowCoreThreadTimeOut或当前工作线程数量大于corePoolSize 则需要计时处理
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //(wc > maximumPoolSize || (timed && timedOut)) 工作线程大于线程池的最大值 或需要计时且已经超时
            //(wc > 1 || workQueue.isEmpty()) 工作线程数大于1或任务队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //工作线程数量减少 返回null
                if (compareAndDecrementWorkerCount(c))
                    return null;
            //如果CAS更新数量失败则继续
                continue;
            }

            try {
                //如果需要计时,则使用超时poll否则阻塞获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                 //r 为null则是因为超时导致的   
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }



     private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果是意外结束,则工作线程自减
            decrementWorkerCount();
        //加锁处理        
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //将当前工作线程执行的任务数加到线程池的执行的任务数上
            completedTaskCount += w.completedTasks;
            //将当前工作线程从工作hashSet中删除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //尝试结束线程池
        tryTerminate();
        //获取ctl
        int c = ctl.get();
        //如果线程池运行状态小于STOP 即RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
                //如果不是意外结束
            if (!completedAbruptly) {
                //如果设置了allowCoreThreadTimeOut 则线程池最少持有线程则为0否则为corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果min==0但是队列不为空则min=1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 //如果工作线程大于或等于min返回   
                if (workerCountOf(c) >= min)
                    return; 
            }
            //此时表示工作线程小于线程池最小线程数则添加一个新的工作线程,不指定firstTask
            addWorker(null, false);
        }
    }

相关阅读:Java 线程池源码分析 (上篇)

本文来自网易实践者社区,经作者张伟授权发布。