Java线程池源码解读(ThreadPoolExecutor)

达芬奇密码2018-07-03 12:40
一、简介

  线程池是我们经常使用的一种提高系统效率的工具,本文以ThreadPoolExecutor类为例,通过阅读源码,和大家分享一下线程池的实现原理。


二、源码阅读
1、继承关系
1)最顶层的是一个Executor接口,仅定义了一个方法:

2)随后是ExecutorService,继承自Executor的接口,定义了

3)随后是一个抽象类AbstractExecutorService,基本把ExecutorService中声明的方法都实现了,除了execute(Runnable command),可以看到常用的submit方法其实也是调用了execute方法,只是再外层包了一个RunnableFuture对象,因此后面我们重点讲execute方法


2、线程池状态
在介绍execute方法之前,先介绍线程池的几种状态:


每个线程池对象都会维护一个变量ctl,头3位表示线程池的状态,后29位表示当前的任务数,线程池状态有五种,分别是:

1)RUNNING
此时可以接收任务,并执行
2)SHUTDOWN
此时线程池不接收新的任务,但是会等待所有任务执行完毕;当调用shutdown(),则状态会从running变为shutdown
3)STOP
此时线程池不接收新的任务,并且尝试中断正在执行的任务;当调用shutdownNow(),则状态会从running/shutdown变为stop
4)TIDYING
所有任务都已经终止,任务数为0,则状态为变成tidying,并且会回调terminated()方法;当队列和pool都为空,则会变为tidying
5)TERMINATED
terminated()方法执行完毕,状态即为terminated;可以调用awaitTermination(),则当状态变为terminated之前会一直阻塞

3、构造方法
构造方法包括如下几个参数:
1)corePoolSize
核心池大小,表示pool中维持的idle的线程数的最大数量,当然如果设置了allowCoreThreadTimeout,那么也会被回收
2)maxiumPoolSize
表示pool中最多能同时存在多少个线程,大于等于corePoolSize
3)keepAliveTime和unit
表示大于corePoolSize的线程idle多久的情况下会被回收
4)workQueue
阻塞队列,用于存储那些尚未排上队的任务
5)threadFactory
生成线程的工厂
6)handler
当线程都满了,且队列也满了的情况下,会调用该方法来拒绝任务,有多种策略可以选择,或者自定义实现
注:可以看到,构造方法仅仅做了一些赋值操作,此时线程池中一个线程都还没有

4、execute方法
execute方法是供外面调用的,分为3步:
1)如果当前任务数小于核心池的大小,那么生成一个新的worker线程,并执行当前提交的任务
2)如果当前任务数超过了核心池的大小,或者addWorker失败了,则判断线程状态是否是running,并将任务放到queue中
 此时,可以看到做了一个double check,目的是:
 一是防止任务加入到queue之后,状态忽然变更(如变成shutdown),此时会reject;
 二是防止任务加入到队列之后,所有的工作线程都die了,此时会起一个新的worker线程
3)如果queue也满了,此时会尝试在maxiumPoolSize限定下,尝试起一个worker线程,如果失败了,则reject

5、Worker线程
在介绍addWorker方法之前,先了解Worker线程,Worker是ThreadPoolExecutor的一个内部类,本身是一个Runnable

可以看到,通过构造方法中传入的ThreadFactory,初始化一个Thread,调用start方法,即可让Worker运行起来,关键方法即是runWorker(this),该方法由ThreadPoolExecutor实现。

该方法先判断Worker初始化时的firstTask是否为null,如果不为null,则执行其run方法。
否则调用getTask()方法从queue中拿到task,再执行;
如果拿到了null,则线程会退出
执行之前会检查线程池状态,如果大于等于stop,则会中断执行;
执行之前会调用beforeExecute,执行之后会调用afterExecute,这两个方法默认为空实现,可以调用方自己去实现;
注意,在执行任务之前,会先调用w.lock()获取锁,这个可以用于判断worker是否处于idle状态
我们继续看getTask方法

首先是检查线程池状态,如果大于等于shutdown,并且,大于等于stop或者queue为空,那么会返回null,让当前worker停止
(这里有点绕,换一种说法就是,如果现在是shutdown状态,那么除非queue为空,否则不会返回null,如果是stop状态,则会直接返回null,和shutdown/shutdownNow方法就可以联系起来了)
然后,判断当前线程是否可以过期,如果当前线程数量超过核心池大小,或者设置了允许核心池线程idle回收,则可以过期
如果线程可以过期,则调用poll方法,从queue中取任务,否则会调用take方法,一直阻塞在这里,
线程的过期在下一个for循环中(此时timedOut=true),返回null来中止worker线程

6、addWorker方法

首先,依然是检查线程状态,并且判断线程数量是否超限,并且如果检查过程中线程状态发生了变更,需要进行重试
在正在启用一个worker线程之前,需要先拿到一个全局锁mainLock,同时拿到锁之前,依然需要double check一下线程池状态
如果一切要求都满足,则会调用start方法,正式启动worker线程

7、shutdown和shutdownNow

shutdown会关闭idle的worker,并等待所有任务执行完毕之后再结束线程池
shutdownNow则会立即尝试打断所有worker线程,并把所有没有执行的任务返回

8、几种常用的线程池(实际上就是ThreadPoolExecutor的几种不同的参数配置)
1)newSingleThreadExecutor

2)newCachedThreadPool

3)newFixedThreadPool


三、总结
  希望对大家有用

本文来自网易实践者社区,经作者曹佳俊授权发布。