java创建线程池

1,创建线程池

ExecutorService analyzePool = new ThreadPoolExecutor(100,
            200,
            0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new CustomizableThreadFactory("abc-"));

2,javadoc解释

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default rejected execution handler.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set (线程池初始化之后,直接创建corePoolSize个空闲线程备用)
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool (线程池中的线程的峰值)
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating. (线程池中的线程数量超过corePoolSize的线程过多长时间被销毁)
     * @param unit the time unit for the {@code keepAliveTime} argument(keepAliveTime的时间单位,TimeUnit枚举类型)
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.(保存作业内容的队列,一般为new LinkedBlockingQueue();)
     * @param threadFactory the factory to use when the executor
     *        creates a new thread (创建线程的工厂类,该类的主要作用为控制线程的名称等)
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

其它队列:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

3,已经提供的线程池种类,及内部实现:

Executors.newCachedThreadPool();

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Executors.newFixedThreadPool(10);

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

4,线程工厂

当不指定该参数的时候,java使用默认的线程工厂

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

但是不太友好,不好看,可以自己修改,或者使用spring的

new CustomizableThreadFactory("abc-")

5,给线程池追加新线程:

5.1,追加runnable接口

        ExecutorService executorService = new ThreadPoolExecutor(
                5, 10, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        IntStream.rangeClosed(1, 10).forEach(i -> {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " : " + i);
            });
        });

5.2,追加callable接口

        ExecutorService executorService = new ThreadPoolExecutor(
                5, 10, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        List<Future<Integer>> futures = new ArrayList<>();

        IntStream.rangeClosed(1, 10).forEach(i -> {
            Future<Integer> future = executorService.submit(() -> {
                System.out.println(Thread.currentThread());
                return i;
            });

            futures.add(future);
        });

        futures.forEach(future -> {
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

5.3,可以追加Thread,但不推荐


5.4,一次性追加一个callable或者runnable的list。

        ExecutorService executorService = new ThreadPoolExecutor(
                5, 10, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        List<Callable<Integer>> callableList = new ArrayList<>();

        IntStream.rangeClosed(1, 10).forEach(i -> {

            Callable<Integer> callable = () -> {
                System.out.println(Thread.currentThread());
                return i;
            };
            callableList.add(callable);
        });

        executorService.invokeAll(callableList);

6,线程池销毁

threadPool.shutdown();

不调用shutdown的话,线程池会一直存活。