ThreadPoolExecutor源码解析(2)

叁叁肆2018-12-18 13:25

此文已由作者赵计刚授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


Work内部类:

/**
     * 工作线程。
     */
    private final class Worker implements Runnable {
        /**
         * 在每一个任务的执行前后都会获取和释放runLock。
         * 该锁只要是为了防止中断正在执行任务的work线程
         */
        private final ReentrantLock runLock = new ReentrantLock();

        /**
         * Initial task to run before entering run loop. 
         * 1、Possibly null.
         */
        private Runnable firstTask;

        /**
         * 每个work线程完成的任务总量
         * accumulated into completedTaskCount upon termination.
         */
        volatile long completedTasks;

        Thread thread;

        /**
         * 该work中的线程是不是确实正在执行了run()
         */
        volatile boolean hasRun = false;

        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
        }

        /*
         * true:已经有线程持有了该锁
         */
        boolean isActive() {
            return runLock.isLocked();
        }

        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();//获取锁runLock
            try {
                /*
                 * 如果pool状态为STOP或TERMINATED,确保线程被打断;
                 * 如果不是,确保线程不要被打断
                 */
                if ((runState >= STOP ||
                    (Thread.interrupted() && runState >= STOP)) &&
                    hasRun)
                    thread.interrupt();
                /*
                 * 确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch)
                 */
                boolean ran = false;
                beforeExecute(thread, task);//执行任务的run()方法之前要执行的操作
                try {
                    task.run();//执行线程的run()方法
                    ran = true;
                    afterExecute(task, null);//执行任务的run()方法之后要执行的操作
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();//释放锁runLock
            }
        }

        /**
         * Main run loop
         * 运行当前任务task,运行结束后,尝试获取队列中的其他任务,
         * 如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务
         * 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉
         */
        public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);//运行该任务
                    task = null;
                }
            } finally {
                workerDone(this);//将该线程从works集合中删除
            }
        }
    }

说明:这里列出了该内部类的全部属性和常用方法。

 

getTask()

    /**
     * 获取下一个worker线程将要运行的任务
     * Gets the next task for a worker thread to run.  
     */
    Runnable getTask() {
        for (;;) {//无限循环
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();//处理queue中的任务
                //下面的runState==RUNNING
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    //从队头获取任务,如果没有任务,等待keepAliveTime的时间
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    //从队头获取任务,如果没有任务,阻塞等待
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {//允许回收获取任务失败的线程
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();//中断闲置的work线程
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

workerCanExit()

    /**
     * 检测一个获取任务失败的work线程是否可以退出了。
     * 出现下面三种情况,work线程就会死亡。
     * 1、如果pool的状态为STOP或TERMINATED
     * 2、队列为空
     * 3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值
     */
    private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
    }

workerDone(Worker w)

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//从workers集合中删除该线程
            if (--poolSize == 0)//如果池中的线程数为0
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

 

3.2、ensureQueuedTaskHandled(Runnable command)

    /**
     * 在一个task入队之后重新检查state。
     * 当一个task入队后,pool的state发生了变化,该方法就会被调用。
     * 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝
     * 否则该方法会保证至少有一个线程来处理入队的task
     */
    private void ensureQueuedTaskHandled(Runnable command) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean reject = false;
        Thread t = null;
        try {
            int state = runState;
            if (state != RUNNING && workQueue.remove(command))
                reject = true;
            else if (state < STOP &&
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
                t = addThread(null);
        } finally {
            mainLock.unlock();
        }
        if (reject)
            reject(command);
    }

 

3.3、addIfUnderMaximumPoolSize(Runnable firstTask)

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。

 

3.4、reject(Runnable command)

    void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    public static class AbortPolicy implements RejectedExecutionHandler {
        
        public AbortPolicy() { }
        /** 直接抛异常 */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException();
        }
    }

 

说明:明白了上一章将的线程池机理,按着这个机理去看源代码是非常容易的事情。

总结:

  • 上一章的工作机理
  • 上一章的参数详细说明


免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐

更多网易技术、产品、运营经验分享请点击