Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
Java中使用线程池技术一般都是使用Executors
这个工厂类,它提供了非常简单方法来创建各种类型的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
核心的接口其实是Executor
,它只有一个execute
方法抽象为对任务(Runnable接口)的执行, ExecutorService
接口在Executor
的基础上提供了对任务执行的生命周期的管理,主要是submit
和shutdown
方法, AbstractExecutorService
对ExecutorService
一些方法做了默认的实现,主要是submit和invoke方法,而真正的任务执行 的Executor接口execute
方法是由子类实现,就是ThreadPoolExecutor
,它实现了基于线程池的任务执行框架,所以要了解 JDK的线程池,那么就得先看这个类。
再看execute
方法之前需要先介几个变量或类。
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个变量是整个类的核心,AtomicInteger
保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:
- 所有有效线程的数量
- 各个线程的状态(runState)
低29位存线程数,高3位存runState
,这样runState有5个值:
- RUNNING:-536870912
- SHUTDOWN:0
- STOP:536870912
- TIDYING:1073741824
- TERMINATED:1610612736
线程池中各个状态间的转换比较复杂,主要记住下面内容就可以了:
- RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
- SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
- STOP状态:不再接受新任务,不处理队列中的任务
围绕ctl变量有一些操作,了解这些方法是看懂后面一些晦涩代码的基础:
/** * 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111 * ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000 * 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值 * * @param c * 该参数为存储runState和workerCount的int值 * @return runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 这个方法用于取出workerCount的值 * 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了 * 保留参数的低29位,也就是workerCount的值 * * @param c * ctl, 存储runState和workerCount的int值 * @return workerCount的值 */ private static int workerCountOf(int c) { return c & CAPACITY; } /** * 将runState和workerCount存到同一个int中 * “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111 * * @param rs * runState移位过后的值,负责填充返回值的高3位 * @param wc * workerCount移位过后的值,负责填充返回值的低29位 * @return 两者或运算过后的值 */ private static int ctlOf(int rs, int wc) { return rs | wc; } // 只有RUNNING状态会小于0 private static boolean isRunning(int c) { return c < SHUTDOWN; }
corePoolSize
核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程。
keepAliveTime
线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...
内部类Worker
是对任务的封装,所有submit的Runnable都被封装成了Worker,它本身也是一个Runnable, 然后利用AQS框架(关于AQS可以看我这篇文章)实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdown
和shutdownNow
方法的分析。
// state只有0和1,互斥 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功获得锁 } // 线程进入等待队列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
之所以不用ReentrantLock是为了避免任务执行的代码中修改线程池的变量,如setCorePoolSize
,因为ReentrantLock是可重入的。
execute
execute
方法主要三个步骤:
- 活动线程小于corePoolSize的时候创建新的线程;
- 活动线程大于corePoolSize时都是先加入到任务队列当中;
- 任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活动线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活动线程数 >= corePoolSize // runState为RUNNING && 队列未满 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败 else if (workerCountOf(recheck) == 0) // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); // 两种情况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
注释比较清楚了就不再解释了,其中比较难理解的应该是addWorker(null, false);
这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
addWorker
这个方法理解起来比较费劲。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 当前线程池状态 // Check if queue empty only if necessary. // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 满足下列调价则直接返回false,线程创建失败: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束 // rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务 // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null, // false),任务为null && 队列为空 // 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么? // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务 // 但是此时队列不为空,那么还得创建线程把任务给执行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到这的情形: // 1.线程池状态为RUNNING // 2.SHUTDOWN状态,但队列中还有任务需要执行 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操作递增workCount break retry;// 操作成功跳出的重试的循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 如果线程池的状态发生变化则重试 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount递增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 并发的访问线程池workers对象必须加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将新启动的线程添加到线程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行 // 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。 if (workerAdded) { t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 线程启动失败,则从wokers中移除w并递减wokerCount if (!workerStarted) // 递减wokerCount会触发tryTerminate方法 addWorkerFailed(w); } return workerStarted; }