Java 并发编程中的 Executor 框架与线程池

Java 5 开始引入 Conccurent 软件包,提供完备的并发能力,对线程池有了更好的支持。其中,Executor 框架是最值得称道的。

Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。

一、创建线程池

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

 1 Executor executor = Executors.newFixedThreadPool(10);  
 2 Runnable task = new Runnable() {  
 3 
 4     @Override  
 5 
 6     public void run() {  
 7 
 8         System.out.println("task over");  
 9 
10     }  
11 
12 };  
13 
14 executor.execute(task);  

或者

1 executor = Executors.newScheduledThreadPool(10);  
2 ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;  
3 scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);  

 

二、ExecutorService与生命周期

ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。

如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

 1 ExecutorService executorService = (ExecutorService) executor;  
 2 
 3 while (!executorService.isShutdown()) {  
 4 
 5     try {  
 6 
 7         executorService.execute(task);  
 8 
 9     } catch (RejectedExecutionException ignored) {  
10 
11           
12 
13     }  
14 
15 }  
16 
17 executorService.shutdown();  

 三、使用Callable,Future返回结果

Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask实现了Future和Runable。Callable代表一个有返回值得操作。

 1 Callable func = new Callable(){  
 2 
 3     public Integer call() throws Exception {  
 4 
 5         System.out.println("inside callable");  
 6 
 7         Thread.sleep(1000);  
 8 
 9         return new Integer(8);  
10 
11     }         
12 
13 };        
14 
15 FutureTask futureTask  = new FutureTask(func);  
16 
17 Thread newThread = new Thread(futureTask);  
18 
19 newThread.start();  
20 
21   
22 
23 try {  
24 
25     System.out.println("blocking here");  
26 
27     Integer result = futureTask.get();  
28 
29     System.out.println(result);  
30 
31 } catch (InterruptedException ignored) {  
32 
33 } catch (ExecutionException ignored) {  
34 
35 }  

 ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

例子:并行计算数组的和。

  1 package executorservice;  
  2 
  3   
  4 
  5 import java.util.ArrayList;  
  6 
  7 import java.util.List;  
  8 
  9 import java.util.concurrent.Callable;  
 10 
 11 import java.util.concurrent.ExecutionException;  
 12 
 13 import java.util.concurrent.ExecutorService;  
 14 
 15 import java.util.concurrent.Executors;  
 16 
 17 import java.util.concurrent.Future;  
 18 
 19 import java.util.concurrent.FutureTask;  
 20 
 21   
 22 
 23 public class ConcurrentCalculator {  
 24 
 25   
 26 
 27     private ExecutorService exec;  
 28 
 29     private int cpuCoreNumber;  
 30 
 31     private List> tasks = new ArrayList>();  
 32 
 33   
 34 
 35     // 内部类  
 36 
 37     class SumCalculator implements Callable {  
 38 
 39         private int[] numbers;  
 40 
 41         private int start;  
 42 
 43         private int end;  
 44 
 45   
 46 
 47         public SumCalculator(final int[] numbers, int start, int end) {  
 48 
 49             this.numbers = numbers;  
 50 
 51             this.start = start;  
 52 
 53             this.end = end;  
 54 
 55         }  
 56 
 57   
 58 
 59         public Long call() throws Exception {  
 60 
 61             Long sum = 0l;  
 62 
 63             for (int i = start; i < end; i++) {  
 64 
 65                 sum += numbers[i];  
 66 
 67             }  
 68 
 69             return sum;  
 70 
 71         }  
 72 
 73     }  
 74 
 75   
 76 
 77     public ConcurrentCalculator() {  
 78 
 79         cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
 80 
 81         exec = Executors.newFixedThreadPool(cpuCoreNumber);  
 82 
 83     }  
 84 
 85   
 86 
 87     public Long sum(final int[] numbers) {  
 88 
 89         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
 90 
 91         for (int i = 0; i < cpuCoreNumber; i++) {  
 92 
 93             int increment = numbers.length / cpuCoreNumber + 1;  
 94 
 95             int start = increment * i;  
 96 
 97             int end = increment * i + increment;  
 98 
 99             if (end > numbers.length)  
100 
101                 end = numbers.length;  
102 
103             SumCalculator subCalc = new SumCalculator(numbers, start, end);  
104 
105             FutureTask task = new FutureTask(subCalc);  
106 
107             tasks.add(task);  
108 
109             if (!exec.isShutdown()) {  
110 
111                 exec.submit(task);  
112 
113             }  
114 
115         }  
116 
117         return getResult();  
118 
119     }  
120 
121   
122 
123       
124 
125     public Long getResult() {  
126 
127         Long result = 0l;  
128 
129         for (Future task : tasks) {  
130 
131             try {  
132 
133                 // 如果计算未完成则阻塞  
134 
135                 Long subSum = task.get();  
136 
137                 result += subSum;  
138 
139             } catch (InterruptedException e) {  
140 
141                 e.printStackTrace();  
142 
143             } catch (ExecutionException e) {  
144 
145                 e.printStackTrace();  
146 
147             }  
148 
149         }  
150 
151         return result;  
152 
153     }  
154 
155   
156 
157     public void close() {  
158 
159         exec.shutdown();  
160 
161     }  
162 
163 }  

 Main

1 int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };  
2 
3 ConcurrentCalculator calc = new ConcurrentCalculator();  
4 
5 Long sum = calc.sum(numbers);  
6 
7 System.out.println(sum);  
8 
9 calc.close();  
 四、CompletionService

在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。修改刚才的例子使用CompletionService:

  1 public class ConcurrentCalculator2 {  
  2 
  3   
  4 
  5     private ExecutorService exec;  
  6 
  7     private CompletionService completionService;  
  8 
  9   
 10 
 11   
 12 
 13     private int cpuCoreNumber;  
 14 
 15   
 16 
 17     // 内部类  
 18 
 19     class SumCalculator implements Callable {  
 20 
 21         ......  
 22 
 23     }  
 24 
 25   
 26 
 27     public ConcurrentCalculator2() {  
 28 
 29         cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
 30 
 31         exec = Executors.newFixedThreadPool(cpuCoreNumber);  
 32 
 33         completionService = new ExecutorCompletionService(exec);  
 34 
 35   
 36 
 37   
 38 
 39     }  
 40 
 41   
 42 
 43     public Long sum(final int[] numbers) {  
 44 
 45         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
 46 
 47         for (int i = 0; i < cpuCoreNumber; i++) {  
 48 
 49             int increment = numbers.length / cpuCoreNumber + 1;  
 50 
 51             int start = increment * i;  
 52 
 53             int end = increment * i + increment;  
 54 
 55             if (end > numbers.length)  
 56 
 57                 end = numbers.length;  
 58 
 59             SumCalculator subCalc = new SumCalculator(numbers, start, end);   
 60 
 61             if (!exec.isShutdown()) {  
 62 
 63                 completionService.submit(subCalc);  
 64 
 65   
 66 
 67   
 68 
 69             }  
 70 
 71               
 72 
 73         }  
 74 
 75         return getResult();  
 76 
 77     }  
 78 
 79   
 80 
 81       
 82 
 83     public Long getResult() {  
 84 
 85         Long result = 0l;  
 86 
 87         for (int i = 0; i < cpuCoreNumber; i++) {              
 88 
 89             try {  
 90 
 91                 Long subSum = completionService.take().get();  
 92 
 93                 result += subSum;             
 94 
 95             } catch (InterruptedException e) {  
 96 
 97                 e.printStackTrace();  
 98 
 99             } catch (ExecutionException e) {  
100 
101                 e.printStackTrace();  
102 
103             }  
104 
105         }  
106 
107         return result;  
108 
109     }  
110 
111   
112 
113     public void close() {  
114 
115         exec.shutdown();  
116 
117     }  
118 
119 }