深入理解java线程池 一

本文为博主原创,未经允许不得转载

  在多线程和高并发场景中,需要创建大量的线程来进行业务处理,我们通常创建线程有两种方法,一种是通过继承Thread类,另一种是实现Runnable的接口,但是我们创建这两种线程在运行结束后都会被虚拟机销毁,如果数量多的话,频繁的创建和销毁线程会大大浪费时间和效率,更重要的是浪费内存,线程执行完毕后变为死亡状态,线程对象变为垃圾,这个需要依靠虚拟机进行监督和回收,影响系统的性能。这种问题使用线程池便可以很好的解决。通过线程池线程,销毁及回收等交由线程池进行管理,就可以避免以上的问题。

我们在使用过程中经常会直接使用newSingleThreadExecutor(),newCachedThreadPool(),newFixedThreadPool(int Threads)等已经封装好的线程池,但这些都是通过ThreadPoolExecutor类中通过构造函数传入不同的参数封装的对象,所以想要了解线程池,我们就要认真研究一下线程池中最重要的ThreadPoolExecutor类。

ThreadPoolExecutor类最重要的构造函数:

 public ThreadPoolExecutor(int corePoolSize,
     int maximumPoolSize,
     long keepAliveTime,
     TimeUnit unit,
     BlockingQueue<Runnable> workQueue,
     ThreadFactory threadFactory,
     RejectedExecutionHandler handler) 

函数的参数含义如下:

corePoolSize:核心池大小,指定了线程池中的线程数量。

maximumPoolSize:最大池大小,指定了线程池中的最大线程数量。

keepAliveTime:存活时间,当线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即超过corePoolSize的空闲线程,在多长时间内会被销毁。

unit:keepAliveTime的单位。

workQueue:任务队列,被提交单尚未被执行的任务。

threadFactory:线程工厂,用于创建线程,一般用于默认的即可。

handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。

核心池大小,最大池大小和存活时间共同管理这线程的创建与销毁。核心池大小是目标大小;线程池的实现试图维护线程池的大小,即是没有任务执行,池的大小也等于核心池的大小,并且在工作队列充满前,线程池都不会创建更多的线程。最大池的大小是可同时活动的线程数的上限。如果一个线程已经闲置的时间超过了存活时间,它将被线程池回收。

构造函数的参数中大部分都很简单,只有参数workQueue和handler需要进行详细说明,下面对这两个参数进行详细的说明:

参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor类的构造函数中可以使用以下几种BlockingQueue接口。

1.直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,则提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。

2.有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue类实现。ArrayBlockingQueue类的构造函数必须带一个容量参数,表示该队列的最大容量:

public ArrayBlockingQueue(int capacity)

当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入。则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则要确保核心线程数维持在corePoolSize。

3.*的任务队列:*任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则*的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize时,线程就不会继续增加了。若后续任由新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,*队列会保持快速增长,直到耗尽系统内存。

4.优先任务队列:优先任务队列是带有执行优先级的任务队列。它通过PriorityBlockingQueue类实现,可以控制任务的执行先后顺序。他是一个特殊的*队列。无论是有界队列ArrayBlockingQueue类,还是未指定大小的*队列LinkedBlockingQueue类都是按照先进先出算法处理任务的。而PriorityBlockingQueue类则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)。

拒绝策略:

ThreadPoolExecutor类的最后一个参数指定了拒绝策略。也就是当任务数量超过系统实际承载能力时,就要用到拒绝策略了。拒绝策略可以说是系统超负荷运行时的补救措施,通常由于压力太大而引起的,也就是线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列中也已经排满了,再也放不下新任务了。这时,我们就需要有一套机制合理的处理这个问题。

jdk在ThreadPoolExecutor类中定义了四种内置的拒绝策略,其均实现RejectedExecutionHandler接口。其四种拒绝策略为:

1.AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

2.CallRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

3.DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

4.DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧。

线程池的主要作用是为了线程复用,也就是避免了线程的频繁创建。但是,最开始的那些线程从何而来呢?答案就是ThreadFactory。ThreadFactory是一个接口,它只有一个用来创建线程的方法:

Thread newThread(Runnable r);

当线程池需要新建线程时,就会调用这个方法。

对于核心的几个线程池,无论是newFixedThreadPool()方法,newSingleThreadExecutor()方法,还是newCacheThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了ThreadPoolExecutor类,下面给出这三个线程池的实现方式

  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool() 方法的实现,它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对固定大小的线程池而言,不存在线程数量的动态变化,因此corePoreSize和maximumPoolSize相等。同时,它使用*队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor()方法返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1。它的特点在于工作线程数目被限制为1,操作一个*的工作队列,所以他能保证了所有任务都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCacheThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列时一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内60秒内被回收。它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过60秒,则被终止并移除缓存,长时间闲置时,这种线程池,不会消耗什么资源,其内部使用SynchronousQueue作为工作队列,*线程池,可以进行自动线程回收。

在使用自定义线程池时,要根据应用的具体情况,选择合适的并发队列作为任务的缓冲。当线程资源紧张时,不同的并发队列对系统行为和性能的影响也不相同。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolDefinedTest {
    public static void main(String[] args) {
        LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(100);
        ThreadFactory threadFactory = new ThreadFactory() {
            //  int i = 0;  用并发安全的包装类
            AtomicInteger atomicInteger = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                //创建线程任务传进来
                Thread thread = new Thread(r);
                // 给线程起个名字
                thread.setName("MyThread" + atomicInteger.getAndIncrement());
                return thread;
            }
        };
        
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 1, TimeUnit.SECONDS, blockingQueue, threadFactory);
        for (int i = 0; i < 5; i++) {
           pool.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       method();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           });
    
        }
    }
    
    private static void method() throws InterruptedException {
        System.out.println("ThreadName" + Thread.currentThread().getName() + "进来了");
        Thread.sleep(2000);
        System.out.println("ThreadName" + Thread.currentThread().getName() + "出去了");
    }
}

通过探究ThreadPoolExecutor类中封装的线程池的构造函数,可以有效的理解创建线程池时的各个参数的作用,从而选择适合我们业务场景所需要的线程池类型。线程池涵盖的内容很多很丰富,我们需要不断通过学习和实践,增强我们对线程,线程池的理解,希望通过本篇文章对你能有所帮助。