/** * 工作线程。 */ private final class Worker implements Runnable { /** * 在每一个任务的执行前后都会获取和释放runLock。 * 该锁只要是为了防止中断正在执行任务的work线程 */ private final ReentrantLock runLock = new ReentrantLock(); /** * Initial task to run before entering run loop. * 1、Possibly null. */ private Runnable firstTask; /** * 每个work线程完成的任务总量 * accumulated into completedTaskCount upon termination. */ volatile long completedTasks; Thread thread; /** * 该work中的线程是不是确实正在执行了run() */ volatile boolean hasRun = false; Worker(Runnable firstTask) { this.firstTask = firstTask; } /* * true:已经有线程持有了该锁 */ boolean isActive() { return runLock.isLocked(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock();//获取锁runLock try { /* * 如果pool状态为STOP或TERMINATED,确保线程被打断; * 如果不是,确保线程不要被打断 */ if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun) thread.interrupt(); /* * 确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch) */ boolean ran = false; beforeExecute(thread, task);//执行任务的run()方法之前要执行的操作 try { task.run();//执行线程的run()方法 ran = true; afterExecute(task, null);//执行任务的run()方法之后要执行的操作 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock();//释放锁runLock } } /** * Main run loop * 运行当前任务task,运行结束后,尝试获取队列中的其他任务, * 如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务 * 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉 */ public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task);//运行该任务 task = null; } } finally { workerDone(this);//将该线程从works集合中删除 } } }
/** * 获取下一个worker线程将要运行的任务 * Gets the next task for a worker thread to run. */ Runnable getTask() { for (;;) {//无限循环 try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll();//处理queue中的任务 //下面的runState==RUNNING else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //从队头获取任务,如果没有任务,等待keepAliveTime的时间 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else //从队头获取任务,如果没有任务,阻塞等待 r = workQueue.take(); if (r != null) return r; if (workerCanExit()) {//允许回收获取任务失败的线程 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers();//中断闲置的work线程 return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
/** * 检测一个获取任务失败的work线程是否可以退出了。 * 出现下面三种情况,work线程就会死亡。 * 1、如果pool的状态为STOP或TERMINATED * 2、队列为空 * 3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值 */ private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
/** * 在一个task入队之后重新检查state。 * 当一个task入队后,pool的state发生了变化,该方法就会被调用。 * 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝 * 否则该方法会保证至少有一个线程来处理入队的task */ private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { int state = runState; if (state != RUNNING && workQueue.remove(command)) reject = true; else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); }
说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。
void reject(Runnable command) { handler.rejectedExecution(command, this); }