相关Java 5.0+ 并发包的探讨-2 section -补充-1
我们现在来看看CompletionService :
package ExecutorDemos; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorBasicDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { //创建ExecutorCompletionService实例 ExecutorService service=Executors.newFixedThreadPool(20); CompletionService<Object> serv = new ExecutorCompletionService<Object>(service); //创建50个工作 for (int index = 0; index < 50; index++) { final int Position = index; Callable<Object> dosomethings = new Callable<Object>() { public Object call() throws Exception { long value=(long) (Math.random() * 10000); System.out.println("Current sleep time is:"+value+", Position is "+Position); Thread.sleep(value); return value; } }; serv.submit(dosomethings); } Thread.sleep(1000); for (int index = 0; index < 50; index++) { //按顺序取出来 Future<Object> task = serv.take(); Object value = task.get(); System.out.println(value); } service.shutdown(); } }
执行的结果是:
你发现结果是按顺序取出结果的,而且可以看出它与Future有关系。在上一篇中,我们看过一个例子:
package ExecutorDemos; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class FutureTaskTest { private static AtomicInteger Count = new AtomicInteger(0); @SuppressWarnings("unchecked") public static void main(String args[]) { ExecutorService es = Executors.newFixedThreadPool(10); List<Future<Integer>> tasks = new ArrayList<Future<Integer>>(); for (int i = 0; i <= 10; i++) { final int iCount=i; FutureTask<Integer> futureTask = new FutureTask<Integer>( new Callable() { @SuppressWarnings("static-access") public Integer call() throws Exception { Thread.currentThread().sleep( (new Random()).nextInt(1000)); System.out.println("Postion is "+iCount); return Count.getAndIncrement(); } }); tasks.add(futureTask); es.submit(futureTask); } Integer result = new Integer(0); try { for (Future<Integer> task : tasks) { result += (Integer) task.get(); } es.shutdown(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
其执行结果是:
从结果看,取数据的时候并不能保证结果数据按照其原始顺序提取!我们对比一下上面这两段代码,发现貌似只是按顺序提取数据的那个代码多了个ExecutorCompletionService,其他部分基本相同,这样我们可以解释ExecutorCompletionService它的作用了,那就是
异步的执行任务,并按原始任务的顺序取得结果! 从ExecutorCompletionService源代码来看:
其构造函数中
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
实例化了一个堵塞队列,那么用这个干什么用哪?接下来看看submit的实现吧:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
呵呵,上面的submit实现解释Runnable和Callable是如何转化成Future的,另外还使用了Executor.execute(new QueueingFuture(f))! 而QueueingFuture是ExecutorCompletionService的一个私有内部类,我们看看它的
实现:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //completionQueue是使用的外部类的属性,即ExecutorCompletionService中的属性 protected void done() { completionQueue.add(task); } private final Future<V> task; }
Executor.execute方法会自动调用done方法并将Future加入到堵塞队列中,知道了吧?就是在这里把外部的Runnable或者Callable转化过来的Future和堵塞队列建立的联系(加入队列)completionQueue的类型是LinkedBlockingQueue, LinkedBlockingQueue表明它是一个Queue,这就意味着它的元素是按先进先出(FIFO)的次序进行存储的。以特定次序插入的元素会以相同的次序被取出--但根据插入保证,任何从空队列中取出元素的尝试都会堵塞调用线程直到该元素可被取出时为止。同样地,任何向一个已满队列中插入元素的尝试将会堵塞调用线程直到该队列的存储空间有空余时为止。
对于take()方法的实现:
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
可以明确看到take()时,是从堵塞队列中take(),这可以解释为什么ExecutorCompletionService可以保证结果安装原始顺序输出的了!对于堵塞队列,我将在后面的文章中讨论到.我们在了解其基本原理的基础上,才能很好的使用它,不是吗?
综上,如果您的应用对结果数据的顺序没有要求,你就可以使用FutureTask和ExecutorService! 如果有要求,最好使用CompletionService!