多线程JDK工具篇-线程池原理

12.1 为什么要使用线程池

  一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。】

使用线程池主要有以下三个原因:

  1. 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
  2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  3. 可以对线程做统一管理。

  

12.2 线程池的原理

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

我们先看看ThreadPoolExecutor类。

   public ThreadPoolExecutor(int corePoolSize,//核心线程,创建后就一直存在,空闲也不会被销毁
                              int maximumPoolSize,//线程数最大值
                              long keepAliveTime,//非核心线程存活时长,非核心线程空闲到一定程度就会被销毁。
                              TimeUnit unit,//keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue,//线程池的等待执行的任务(Runnanble对象)队列
                              ThreadFactory threadFactory,//可选参数,线程工厂默认创建一个
                   RejectedExecutionHandler handler//可选参数,拒绝处理策略) 

  

workQueue,工作队列,有几种常见的队列

  1.ArrayBlockingQueue 

  数组阻塞队列,限定长度的队列(必须声明队列长度)。

  2.LinkedBlockingQueue

  链表阻塞队列,队列的默认长度默认值为Integer.MAX。

  3.SynchronousQueue

  同步队列。没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

  4.DelayQueue

  延迟队列。队列中的元素需要实现Delayed()方法,线程需要等待延迟时间结束后才能获取队列的任务执行。

RejectedExecutionHandler handler拒绝处理策略

  线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :

  1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。

  2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。

  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。

  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

ThreadPoolExecutor的状态

线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有自己的状态。ThreadPoolExecutor类中定义了一个volatile int变量runState来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

    • 线程池创建后处于RUNNING状态。

    • 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。

    • 调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

    • 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

      ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。

    • 线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

常见的4种线程池

1.FixedThreadPool 固定长度的线程池

可以用来控制并发数量,如果没有空闲的核心线程,任务会在队列中等待。核心线程数=最大线程数。采用LinkedBlockingQueue(队列为Integer.MAX,理论上无限大)。

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2.CachedThreadPool 缓存线程池

核心线程数为0,非核心线程数最大值为Integer.MAX,理论上不限。线程存活时间为60秒,使用SynchronousQueue。一个任务进来,队列将任务提交给线程执行,如果没有空闲的线程则创建新的线程执行,线程空闲存活时间为60秒。一定程度上复用了线程。

  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

3.SingleThreadPool 单线程线程池

 核心线程数为1,非核心线程数为0,采用LinkedBlocakingQueue,与FixedThreadPool一致。

   public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }  

4.ScheduledThreadPool 周期线程池

定长线程池,支持周期和定时任务,采用DelayQueue,有核心线程数和非核心线程数,线程数最大值也为Integer.MAX

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    } 

5. WorkStealingPool

工作窃取线程池。采用Fork/Join线程池实现的,空闲的线程会去窃取其他线程的工作队列(于队尾)。

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

 

 线程复用

将线程封装成工作线程,采用while循环一直去Queue获取task,调用task的run方法执行,直到线程中断或者Queue中的task为0。

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//while循环
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行task实例的run方法,此时只是调用实例的方法,不会触发创建线程。
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  来一个例子

    public static void main(String[] args){
new MyThread().run();
System.out.println(Thread.currentThread().getName() +":main");
}

public static class MyThread extends Thread{
@Override
public void run(){
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":MyThread");
}
}

  输出如下,阻塞了,调用了当前的主线程来执行。

main:MyThread
main:main