施用CompletionService获取多线程返回值

使用CompletionService获取多线程返回值

 

当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高的,实例如下:

 

实例一:用一个非complete Service完成的批量任务

 

public class NonCompleteServiceTest {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<String>[] futures = new FutureTask[10];        
        /**
         * 产生一个随机数,模拟不同的任务的处理时间不同
         */
        for (int i = 0; i < 10; i++) {
            futures[i] = executorService.submit(new Callable<String>() {
                public String call(){
                    int rnt = new Random().nextInt(5);
                    
                    try {
                        Thread.sleep(rnt*1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("run rnt = "+rnt);
                    return String.valueOf(rnt*1000);
                }
            });
        }
        
        /**
         * 获取结果时,如果任务没有完成,则阻塞,在顺序获取结果时,
         * 可能别的任务已经完成,显然效率不高
         */
        for (int i = 0; i < futures.length; i++) {
            System.out.println(futures[i].get());
        }
        executorService.shutdown();
    }

}

 

CompletionService:

 

JDK:

public interface CompletionService<V>

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

 

 

public class CompleteServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    
    /**
     * 产生一个随机数,模拟不同的任务的处理时间不同
     */
    for (int i = 0; i < 10; i++) {
        completionService.submit(new Callable<String>() {
            public String call(){
                int rnt = new Random().nextInt(5);
                
                try {
                    Thread.sleep(rnt*1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("run rnt = "+rnt);
                return String.valueOf(rnt*1000);
            }
        });
    }
    
    /**
     * 获取结果时,总是先拿到队列上已经存在的对象,这样不用依次等待结果
     * 显然效率更高
     */
    for (int i = 0; i < 10; i++) {
        Future<String> future = completionService.take();
        System.out.println(future.get());
    }
    executorService.shutdown();
}
}

 实例来源:http://tomyz0223.iteye.com/blog/1040300

JDK:http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/CompletionService.html