ThreadPoolExecutor核心源码学习笔记

/*
 * 当一个任务通过execute(Runnable)提交后,假设当前运行的线程数目 = running_thread_num
 * 
 * 增加工作线程以及放入队列的策略
 * ① if running_thread_num < corePoolSize
 * 即使其他线程都是idle的,也会创建一个新的线程来处理请求。 
 * ② else if corePoolSize <= running_thread_num < maximumPoolSize and queue.isNotFull
 * 只有当队列满的时候才会创建新的线程,否则加入请求队列
 * ③ else if corePoolSize <= running_thread_num < maximumPoolSize and queue.isFull 创建新线程
 * ④ reject
 * 
 * 默认情况下,即使是核心线程都是当新的任务抵达的时候才会初始化创建和启动。
 * 但是可以通过调用prestartCoreThread和prestartAllCoreThreads来启动核心线程
 *
 * Keep-alive times
 * if running_thread_num > corePoolSize ---> 那么超过核心数目的线程idle时间超过keepAliveTime就会被终止回收。
 * 默认情况下,Keep-alive times只适用于回收核心线程数以外的线程,
 * 但是如果allowCoreThreadTimeOut参数设置为true,那么核心线程也可以被回收。
 *
 * 队列策略:
 * 1) 同步队列 SynchronousQueue 直接将任务提交给线程,不存储任务,如果没有线程可以执行任务,那么会失败,所以会新建一个线程
 * 这种策略需要将maximumPoolSizes设置为MAX,避免任务被reject,还需要注意的就是避免任务的提交速度超过任务的处理速度 
 * if maximumPoolSize == Integer.MAX_VALUE ---> 那么当队列满的时候,就会一直创建线程来处理请求
 *
 * 2)*队列 LinkedBlockingQueue 当没有活跃的线程的时候任务都在队列中等待,不会有超过corePoolSize的线程会被创建
 * 任务持续到抵达服务端,抵达的速度超过了处理的速度,适合彼此无关联的任务执行
 * if corePoolSize == maximumPoolSize ---> 即创建了固定大小的线程池
 *
 * 3) 有界队列 ArrayBlockingQueue 避免在使用太大maximumPoolSizes的时候资源被耗尽 ,但是很难给出一个合适的值
 * 如何调整队列大小和线程池大小是一个tradeoff 
 * ① 使用较长的队列和较小的线程池会降低CPU和OS切换上下文的开销,但是吞吐量也会较小
 * 如果任务经常阻塞【IO较多】,那么系统其实可以开启更多的线程去做事情
 * ② 使用较短的队列通常需要较大的线程池,这样会保持CPU一直在运转,但是也会导致上下文切换的开销过大。
 *
 * 拒绝策略 —— 当线程池关闭了,或者任务队列满了,线程池也都在忙碌中的时候,就会拒绝任务
 * 1) ThreadPoolExecutor.AbortPolicy 默认策略,抛出异常
 * 2)ThreadPoolExecutor.CallerRunsPolicy 调用execute的线程执行
 * 3)ThreadPoolExecutor.DiscardPolicy 直接丢弃
 * 4)ThreadPoolExecutor.DiscardOldestPolicy 丢弃最老的
 */
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 线程池的状态和转化流程
     * 1)RUNNING:接收新的任务并且处理队列中的任务
     * 2)SHUTDOWN:不接受新的任务,但是会处理队列中的剩余任务
     * 3)STOP:不接受新的任务,也不处理剩余任务,并且会中断正在处理中的任务
     * 4)TIDYING:所有的任务都被终止了,工作线程数为0,将线程池状态转变为TIDYING的线程会执行terminated()
     * 5)TERMINATED: terminated()执行结束
     *
     * RUNNING -> SHUTDOWN   -> shutdown(), finalize()
     * (RUNNING or SHUTDOWN) -> STOP shutdownNow()
     * SHUTDOWN -> TIDYING   -> When both queue and pool are empty
     * STOP -> TIDYING       -> When pool is empty
     * TIDYING -> TERMINATED -> When the terminated() hook method has completed
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // CAPACITY =    0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 运行状态保存在前3位 2^3 = 8
    // RUNNING =     1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN =    0000 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP =        0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING =     0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED =  0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    // ~CAPACITY =   1110 0000 0000 0000 0000 0000 0000 0000
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 低29位代表当前的worker数目
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl.
     * This is called only on abrupt termination of a thread (see processWorkerExit).
     * Other decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    private final BlockingQueue<Runnable> workQueue;

    private final ReentrantLock mainLock = new ReentrantLock();

    /** 线程集合,只有获取了锁才可以使用 */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /** 支持awaitTermination操作的条件 */
    private final Condition termination = mainLock.newCondition();

    /** 线程池相关统计数据 */
    private int largestPoolSize;
    private long completedTaskCount;

    private volatile ThreadFactory threadFactory;
    
    private volatile RejectedExecutionHandler handler;

    /**
     * Idle超时时间
     * 如果①当前线程数目大于核心线程数目 或者 ② allowCoreThreadTimeOut = true
     * 那么看比较线程Idle时间和线程保活时间,如果超过了保活时间,那么就将其回收掉
     */
    private volatile long keepAliveTime;

    /**
     * 核心线程是否可以回收,如果为true的话,是可以回收的,不过有个超时时间,如果超时了,就回收核心线程
     *
     * 如果该选项为false,核心线程在idle的时候还是存活的
     * 如果该选项为true,核心线程使用keepAliveTime作为Idle等待的时间,等待超时了,就回收
     */
    private volatile boolean allowCoreThreadTimeOut;

    private volatile int corePoolSize;

    private volatile int maximumPoolSize;

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * 工作线程包装类
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Worker持有的线程 */
        final Thread thread;
        /** 初始化的任务,可以为空 */
        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 从线程工厂类获取一个新的线程
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        
		// 实现AQS的获锁方法
        protected boolean tryAcquire(int unused) {
            // CAS设置state为1 成功后将同步器的持有线程设置为当前线程
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 实现AQS的释放锁方法
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 线程非中断状态且可以获取锁,代表空闲,那么可以直接中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // ①-① 已经使用了shutdown了
            // ①-② !(shutdown了 且 首个任务为空 且 任务队列非空 )
            if (rs >= SHUTDOWN &&
                // 添加线程执行任务队列中没有处理的任务
                // 如果rs > SHUTDOWN, 那么代表线程池已经关闭了,此时是不可以添加线程的
                // 如果rs == SHUTDOWN,那么判断是否当前首个任务为空且还有未处理的任务,如果是,那么可以添加线程,否则不可以
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果大于最大数目 或者 当前约束的最大线程数,添加工作线程失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS尝试增加工作线程数目,如果成功则跳出retry整个块
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 失败了,会通过ctl判断为什么失败
                c = ctl.get();  // Re-read ctl
                // ① 如果线程池的状态发生了变化,那么重新尝试下,可能是线程池已经关闭了
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // ② 否则,可能是workerCount此时已经并发修改了,重试
            }
        }

        // 工作线程是否启动了
        boolean workerStarted = false;
        // 工作线程是否添加了
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 构建一个工作线程,线程来自线程工厂
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // 获取到了工作线程
            if (t != null) {
                // 上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        // 将新启工作线程添加到set中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 添加成功,开启线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程未启动,添加线程失败,回滚掉增加的线程数目
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 回滚添加工作线程
            if (w != null)
                workers.remove(w);
            // 减小工作线程数目
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // ①-① SHUTDOWN < STOP < TIDYING < TERMINATED
            // ①-② STOP < TIDYING < TERMINATED 或者任务队列为空
            // SHUTDOWN -> shutDown()
            // STOP     -> shutDownNow()

            // 第一种路径:STOP || TIDYING || TERMINATED
            // 这里代表已经shutDownNow()了,再去获取任务,直接减少工作线程数目

            // 第二种路径:SHUTDOWN || STOP || TIDYING || TERMINATED 且 任务队列为空
            // 这里代表shutDown,shutDown()后只有任务队列是空的时候才会需要减少工作线程数目
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 减小工作线程数目
                decrementWorkerCount();
                return null;
            }

            // 获取当前工作线程数目
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 是否要剔除掉工作线程,【工作线程 > 核心线程】线程可以回收

            // 允许核心线程超时回收 或者 当前工作线程数 > 核心线程数
            // 是否需要判断线程的Idle时间
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // ①-① 工作线程大于当前最大数目 或者 如果需要判断Idle时间且已经超时
            // ①-② 当前的工作线程数 > 1 或 者任务队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 减少工作线程数目,返回空
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // 这轮没获取到,下一轮再获取
                continue;
            }

            try {
                Runnable r = timed ?
                    // 从队列中获取任务,等待keepAliveTime时间,
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 如果无需判断Idle时间,就一直阻塞
                    workQueue.take();
                if (r != null)
                    return r;
                // 等待超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        // 获取首个任务,置空
        w.firstTask = null;
        w.unlock(); // allow interrupts

        // 突然结束标志
        boolean completedAbruptly = true;
        try {
            // 如果task不为空,则直接执行task,如果为空,就获取队列中的任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.
                // This requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt

                // 第一种情况:如果线程池关闭【了】且线程未被中断,那么此时需要中断线程
                // 第二种情况:线程池还在运行或者正在关闭,线程已经被中断了【清空了中断标志】,此时其他线程关闭了线程池,因为中断信息被清空了,所以要重新恢复中断标识
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    // 中断线程
                    wt.interrupt();


                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 获取不到任务,主动回收自己
            processWorkerExit(w, completedAbruptly);
        }
    }

    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        // 当前工作线程 < 核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 如果添加工作线程成功了,就reture
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果线程池正在运行,且插入任务成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 重检查
            int recheck = ctl.get();
            // 如果非运行状态 --》 移除任务 --》 成功 -》 拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 如果线程池中没有线程了,这里为什么使用的约束是最大核心数呢???
                addWorker(null, false);
        }
        // 如果线程池非运行或者任务队列满了,则不以core作为约束了
        else if (!addWorker(command, false))
            reject(command);
    }

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Invokes {@code shutdown} when this executor is no longer
     * referenced and it has no threads.
     */
    protected void finalize() {
        shutdown();
    }
    
    /**
     * Starts a core thread, causing it to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed. This method will return {@code false}
     * if all core threads have already been started.
     *
     * @return {@code true} if a thread was started
     */
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }

    /**
     * Same as prestartCoreThread except arranges that at least one
     * thread is started even if corePoolSize is 0.
     */
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    /**
     * Starts all core threads, causing them to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed.
     *
     * @return the number of threads started
     */
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

       interruptIdleWorkers();
    }
}