处置 子线程的返回值

处理 子线程的返回值
package com.jimmy.Thread.ConcurrentTools;

import static java.lang.System.out;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

class TaskWithResult implements Callable<String>
{
  private int id;

  public TaskWithResult(int id) throws InterruptedException
  {

    this.id = id;
  }

  @Override
  public String call() throws Exception
  {
    TimeUnit.SECONDS.sleep(2);
    return "result of TaskWithResult " + id;
  }
}

public class CallableTest
{
  public static void main(String[] args) throws Exception
  {
//    test1();
    test3();
  }
  
  /**
   * 用 Join()方法等待所有子线程执行完,再收集执行结果
   * @throws Exception
   */
  public static void test2() throws Exception
  {
    List<Thread> list = new ArrayList<Thread>();
    ExecutorService exec = Executors.newCachedThreadPool();
    ArrayList<FutureTask<String>> results = new ArrayList<FutureTask<String>>(); // Future
    // 相当于是用来存放Executor执行的结果的一种容器
    for (int i = 0; i < 10; i++) {
      Callable<String> callable = new TaskWithResult(i);
      FutureTask<String> futureTask = new FutureTask<String>(callable);
      Thread thread = new Thread(futureTask);
      thread.start();
      results.add(futureTask);
      list.add(thread);
    }
    long time0 = System.currentTimeMillis();
    for(Thread thread:list){
      thread.join();
    }
    out.println("共耗时:" + (System.currentTimeMillis()-time0));
    
    String string = null;
    for (FutureTask<String> fs : results) {
      if(fs.isDone()){
        try {
          string = fs.get();
        } catch (Exception e) {
          e.printStackTrace();
        }
        out.println(string);
      }else{
        out.println("is not done!");
      }
    }
    exec.shutdown();
  }

  /**
   * 以BlockingQueue阻塞队列显式的接受子线程的返回值,操控灵活
   * @throws Exception
   */
  public static void test1() throws Exception
  {
    ExecutorService pool = Executors.newCachedThreadPool();
    BlockingQueue<Future<String>> blockingQueue = new ArrayBlockingQueue<Future<String>>(10);
    CompletionService<String> service = new ExecutorCompletionService<String>(pool,blockingQueue);
                                                                         // 相当于是用来存放Executor执行的结果的一种容器
    for (int i = 0; i < 10; i++) {
      Callable<String> callable = new TaskWithResult(i);
      service.submit(callable);
    }
    String string = null;
    int count = 0;
    while(true){
      Future<String> future = blockingQueue.take();
      string = future.get();
      count ++;
      out.println(future.isDone() + "..value:===" + string);
      if(count == 10){
        break;
      }
    }
    pool.shutdown();
  }
  
  /**
   * 此种方式获取子线程的值是最为方便
   * @throws Exception
   */
  public static void test3() throws Exception
  {
    ExecutorService pool = Executors.newCachedThreadPool();
    CompletionService<String> service = new ExecutorCompletionService<String>(pool);
                                                                         // 相当于是用来存放Executor执行的结果的一种容器
    for (int i = 0; i < 10; i++) {
      Callable<String> callable = new TaskWithResult(i);
      service.submit(callable);
    }
    
    String string = null;
    for (int i = 0; i < 10; i++) {
      Future<String> future = service.take();//阻塞模式循环获取队列的值
      string = future.get();
      out.println(string);
    }
    pool.shutdown();
  }
}