ThreadPoolExecutor源码解析(1)

叁叁肆2018-12-18 13:24

此文已由作者赵计刚授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章 ThreadPoolExecutor使用与工作机理 》

1、源代码主要掌握两个部分

  • 线程池的创建:构造器
  • 提交任务到线程池去执行:execute()

 

2、构造器

2.1、一些属性:

    /**
     * runState provides the main lifecyle control, taking on values:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TERMINATED
     *    When both queue and pool are empty
     * STOP -> TERMINATED
     *    When pool is empty
     */
    volatile int runState;
    static final int RUNNING    = 0;//接收新的任务,会处理队列中的任务
    static final int SHUTDOWN   = 1;//不接收新的任务,但是会处理队列中的任务
    static final int STOP       = 2;//不接收新的任务,也不会处理队列中的任务,而且还会中断正在执行的任务
    static final int TERMINATED = 3;//STOP+中止所有线程

    private final BlockingQueue<Runnable> workQueue;//队列

    /**
     * 对poolSize, corePoolSize, maximumPoolSize, runState, and workers set上锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 支持awaitTermination的等待条件
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * pool中的所有工作线程集合;仅仅在持有mainLock的时候才允许被访问
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    private volatile long  keepAliveTime;

    /**
     * false(默认):当核心线程处于闲置状态时,也会存活
     * true:核心线程使用keepAliveTime来决定自己的存活状态
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size,仅仅在持有mainLock的时候才允许被更新, 
     * 因为是volatile允许并发读(即使是在更新的过程中)
     */
    private volatile int   corePoolSize;

    /**
     * Maximum pool size, 其他同上
     */
    private volatile int   maximumPoolSize;

    /**
     * Current pool size, 其他同上
     */
    private volatile int   poolSize;

    /**
     * 回绝处理器
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * 所有的线程都通过这个线程工厂的addThread方法来创建。
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Tracks largest attained pool size.
     */
    private int largestPoolSize;

    /**
     * 已经完成的任务数.仅仅在工作线程被终结的时候这个数字才会被更新 
     */
    private long completedTaskCount;

    /**
     * 默认的回绝处理器(回绝任务并抛出异常)
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

说明:因为属性不多,这里列出了全部属性。

 

2.2、构造器:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        /*
         * 检查参数
         */
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        /*
         * 初始化值
         */
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);//转成纳秒
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

说明:4个构造器(1个5参+2个6参+1个7参)

注意:默认情况下,构造器只会初始化参数,不会提前构建好线程

建议:构造器参数众多,建议使用构建器模式,关于构建器模式的实际使用范例,请参照《第二章 Google guava cache源码解析1--构建缓存器

构造器中默认线程工厂的创建:Executors中的方法

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    /**
     * 默认的线程工厂
     */
    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
        final ThreadGroup group;//线程组
        final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
        final String namePrefix;

        /*
         * 创建默认的线程工厂
         */
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        /*
         * 创建一个新的线程
         */
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),//新线程的名字
                                  0);
            /*
             * 将后台线程设置为应用线程
             */
            if (t.isDaemon())
                t.setDaemon(false);
            /*
             * 将线程的优先级全部设置为NORM_PRIORITY
             */
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

说明,其中的newThread()方法会在第三部分用到。

 

3、提交任务的线程池去执行execute(Runnable command)

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /**
         * 这一块儿就是整个工作机理的部分(代码比较精致)
         * 1、addIfUnderCorePoolSize
         * 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,
         * 1.1)先获取锁
         * 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()
         * 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释
         * 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,
         * 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑
         * 
         * 2、如果poolSize>=corePoolSize或者上边的执行失败了
         * 1)如果pool的状态处于RUNNING,将该任务入队(offer(command))
         * 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释
         * 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize)
         * 如果增加线程也不成功,则回绝任务。
         * 
         */
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

 

3.1、addIfUnderCorePoolSize(Runnable firstTask)

    /**
     * 创建并且启动一个新的线程来处理任务
     * 1、其第一个任务就是传入的firstTask参数
     * 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候
     */
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//获取锁
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);//创建新线程
        } finally {
            mainLock.unlock();//释放锁
        }
        return t != null;
    }

addThread(Runnable firstTask)

    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);//构造一个work
        Thread t = threadFactory.newThread(w);//创建线程
        boolean workerStarted = false;
        if (t != null) {//
            if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡
                throw new IllegalThreadStateException();
            w.thread = t;
            workers.add(w);//将w工作线程加入workers线程池
            int nt = ++poolSize;//当前的池数量+1
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();//启动线程
                workerStarted = true;
            }
            finally {
                if (!workerStarted)//启动线程没有成功
                    workers.remove(w);//将w从workers集合中删除
            }
        }
        return t;
    }

newThread(Runnable r)

该方法在构建上边的默认线程工厂部分已经说过了。


免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐

更多网易技术、产品、运营经验分享请点击