处置 子线程的返回值
处理 子线程的返回值
package com.jimmy.Thread.ConcurrentTools; import static java.lang.System.out; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) throws InterruptedException { this.id = id; } @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(2); return "result of TaskWithResult " + id; } } public class CallableTest { public static void main(String[] args) throws Exception { // test1(); test3(); } /** * 用 Join()方法等待所有子线程执行完,再收集执行结果 * @throws Exception */ public static void test2() throws Exception { List<Thread> list = new ArrayList<Thread>(); ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<FutureTask<String>> results = new ArrayList<FutureTask<String>>(); // Future // 相当于是用来存放Executor执行的结果的一种容器 for (int i = 0; i < 10; i++) { Callable<String> callable = new TaskWithResult(i); FutureTask<String> futureTask = new FutureTask<String>(callable); Thread thread = new Thread(futureTask); thread.start(); results.add(futureTask); list.add(thread); } long time0 = System.currentTimeMillis(); for(Thread thread:list){ thread.join(); } out.println("共耗时:" + (System.currentTimeMillis()-time0)); String string = null; for (FutureTask<String> fs : results) { if(fs.isDone()){ try { string = fs.get(); } catch (Exception e) { e.printStackTrace(); } out.println(string); }else{ out.println("is not done!"); } } exec.shutdown(); } /** * 以BlockingQueue阻塞队列显式的接受子线程的返回值,操控灵活 * @throws Exception */ public static void test1() throws Exception { ExecutorService pool = Executors.newCachedThreadPool(); BlockingQueue<Future<String>> blockingQueue = new ArrayBlockingQueue<Future<String>>(10); CompletionService<String> service = new ExecutorCompletionService<String>(pool,blockingQueue); // 相当于是用来存放Executor执行的结果的一种容器 for (int i = 0; i < 10; i++) { Callable<String> callable = new TaskWithResult(i); service.submit(callable); } String string = null; int count = 0; while(true){ Future<String> future = blockingQueue.take(); string = future.get(); count ++; out.println(future.isDone() + "..value:===" + string); if(count == 10){ break; } } pool.shutdown(); } /** * 此种方式获取子线程的值是最为方便 * @throws Exception */ public static void test3() throws Exception { ExecutorService pool = Executors.newCachedThreadPool(); CompletionService<String> service = new ExecutorCompletionService<String>(pool); // 相当于是用来存放Executor执行的结果的一种容器 for (int i = 0; i < 10; i++) { Callable<String> callable = new TaskWithResult(i); service.submit(callable); } String string = null; for (int i = 0; i < 10; i++) { Future<String> future = service.take();//阻塞模式循环获取队列的值 string = future.get(); out.println(string); } pool.shutdown(); } }