CompletionService

 CompletionService原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

一般情况下,使用Runnable接口、Thread实现的线程我们都是无法返回结果的。但是如果对一些场合需要线程返回的结果。就要使用用Callable、Future、FutureTask、CompletionService这几个类。Callable只能在ExecutorService的线程池中跑,但有返回结果,也可以通过返回的Future对象查询执行状态。Future 本身也是一种设计模式,它是用来取得异步任务的结果

一、基本源码

所以来看看它们的源码信息1、Callable看看其源码:

public interface Callable<V> {

    V call() throws Exception;

}

它只有一个call方法,并且有一个返回V,是泛型。可以认为这里返回V就是线程返回的结果。

ExecutorService接口:线程池执行调度框架

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

2、Future

Future是我们最常见的

public interface Future<V> {

    //试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动     //,则此任务将永不运行。如果任务已经启动,则 

    //mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返    //回 true,则对 isCancelled() 

    //的后续调用将始终返回 true。 

    boolean cancel(boolean mayInterruptIfRunning);

    //如果在任务正常完成前将其取消,则返回 true。 

    boolean isCancelled();

   //如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。

    boolean isDone();

   //等待线程结果返回,会阻塞

    V get() throws InterruptedException, ExecutionException;

   //设置超时时间

    V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException;

}

3、FutureTask从源码看其继承关系如下:

CompletionService

CompletionService

其源码如下:

public class FutureTask<V> implements RunnableFuture<V> {

    //真正用来执行线程的类

    private final Sync sync;

    //构造方法1,从Callable来创建FutureTask

    public FutureTask(Callable<V> callable) {

        if (callable == null)

            throw new NullPointerException();

        sync = new Sync(callable);

    }

    //构造方法2,从Runnable来创建FutureTask,V就是线程执行返回结果

    public FutureTask(Runnable runnable, V result) {

        sync = new Sync(Executors.callable(runnable, result));

    }

    //和Futrue一样

    public boolean isCancelled() {

        return sync.innerIsCancelled();

    }

    //和Futrue一样

    public boolean isDone() {

        return sync.innerIsDone();

    }

    //和Futrue一样

    public boolean cancel(boolean mayInterruptIfRunning) {

        return sync.innerCancel(mayInterruptIfRunning);

    }

    //和Futrue一样

    public V get() throws InterruptedException, ExecutionException {

        return sync.innerGet();

    }

    //和Futrue一样

    public V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

        return sync.innerGet(unit.toNanos(timeout));

    }


    //线程结束后的操作

    protected void done() { }

    //设置结果

    protected void set(V v) {

        sync.innerSet(v);

    }

    //设置异常

    protected void setException(Throwable t) {

        sync.innerSetException(t);

    }

    //线程执行入口

    public void run() {

        sync.innerRun();

    }

    //重置

    protected boolean runAndReset() {

        return sync.innerRunAndReset();

    }

    //这个类才是真正执行、关闭线程的类

    private final class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = -7828117401763700385L;

        //线程运行状态

        private static final int RUNNING   = 1;

        private static final int RAN       = 2;

        private static final int CANCELLED = 4;


        private final Callable<V> callable;

        private V result;

        private Throwable exception;

        //线程实例

        private volatile Thread runner;

        //构造函数

        Sync(Callable<V> callable) {

            this.callable = callable;

        }

     。。。。

    }

}
View Code

 FutureTask类是Future 的一个实现,并实现了Runnable,所以可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。 Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。FutureTask类既可以使用new Thread(Runnable r)放到一个新线程中跑,也可以使用ExecutorService.submit(Runnable r)放到线程池中跑,而且两种方式都可以获取返回结果,但实质是一样的,即如果要有返回结果那么构造函数一定要注入一个Callable对象。 

二、应用实例 

1、Future实例 

package com.func.axc.futuretask;

 

import java.util.Random;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

/**

 * 功能概要:

 * 

 * @author linbingwen

 * @since  2016年6月8日 

 */

public class FutureTest {

 

    /**

     * @author linbingwen

     * @since  2016年6月8日 

     * @param args    

     */

    public static void main(String[] args) {

           System.out.println("main Thread begin at:"+ System.nanoTime());

            ExecutorService executor = Executors.newCachedThreadPool();

            HandleCallable task1 = new HandleCallable("1");

            HandleCallable task2 = new HandleCallable("2");

            HandleCallable task3 = new HandleCallable("3");

            Future<Integer> result1 = executor.submit(task1);

            Future<Integer> result2 = executor.submit(task2);

            Future<Integer> result3 = executor.submit(task3);

            executor.shutdown();

            try {

                Thread.sleep(1000);

            } catch (InterruptedException e1) {

                e1.printStackTrace();

            }

            try {

                System.out.println("task1运行结果:"+result1.get());

                System.out.println("task2运行结果:"+result2.get());

                System.out.println("task3运行结果:"+result3.get());

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (ExecutionException e) {

                e.printStackTrace();

            }

            System.out.println("main Thread finish at:"+ System.nanoTime());

    }

 

}

 

class HandleCallable implements Callable<Integer>{

    private String name;

    public HandleCallable(String name) {

        this.name = name;

    }

    

    @Override

    public Integer call() throws Exception {

        System.out.println("task"+ name + "开始进行计算");

        Thread.sleep(3000);

        int sum = new Random().nextInt(300);

        int result = 0;

        for (int i = 0; i < sum; i++)

            result += i;

        return result;

    }

}
View Code

执行结果:

 CompletionService

 2、FutureTask方法

一、直接通过New Thread来启动线程 

package com.func.axc.futuretask;

 

import java.util.Random;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

 

import org.springframework.scheduling.config.Task;

 

/**

 * 功能概要:

 * 

 * @author linbingwen

 * @since 2016年5月31日

 */

public class FutrueTaskTest {

 

    public static void main(String[] args) {

        //采用直接启动线程的方法

        System.out.println("main Thread begin at:"+ System.nanoTime());

        MyTask task1 = new MyTask("1");

        FutureTask<Integer> result1 = new FutureTask<Integer>(task1);

        Thread thread1 = new Thread(result1);

        thread1.start();

        

        MyTask task2 = new MyTask("2");

        FutureTask<Integer> result2 = new FutureTask<Integer>(task2);

        Thread thread2 = new Thread(result2);

        thread2.start();

 

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e1) {

            e1.printStackTrace();

        }

        

        try {

            System.out.println("task1返回结果:"  + result1.get());

            System.out.println("task2返回结果:"  + result2.get());

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

 

        System.out.println("main Thread finish at:"+ System.nanoTime());

        

    }

}

 

class MyTask implements Callable<Integer> {

    private String name;

    

    public MyTask(String name) {

        this.name = name;

    }

    

    @Override

    public Integer call() throws Exception {

        System.out.println("task"+ name + "开始进行计算");

        Thread.sleep(3000);

        int sum = new Random().nextInt(300);

        int result = 0;

        for (int i = 0; i < sum; i++)

            result += i;

        return result;

    }

 

}
View Code

执行结果: 

CompletionService

方法二、通过线程池来启动线程 

package com.func.axc.futuretask;

import java.util.Random;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

/**

 * 功能概要:

 * 

 * @author linbingwen

 * @since 2016年5月31日

 */

public class FutrueTaskTest2 {

 

    public static void main(String[] args) {

        System.out.println("main Thread begin at:"+ System.nanoTime());

        ExecutorService executor = Executors.newCachedThreadPool();

        MyTask2 task1 = new MyTask2("1");

        MyTask2 task2 = new MyTask2("2");

        Future<Integer> result1 = executor.submit(task1);

        Future<Integer> result2 = executor.submit(task2);

        executor.shutdown();

 

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e1) {

            e1.printStackTrace();

        }

        

        try {

            System.out.println("task1返回结果:"  + result1.get());

            System.out.println("task2返回结果:"  + result2.get());

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

 

        System.out.println("main Thread finish at:"+ System.nanoTime());

        

    }

}

 

class MyTask2 implements Callable<Integer> {

    private String name;

    

    public MyTask2(String name) {

        this.name = name;

    }

    

    @Override

    public Integer call() throws Exception {

        System.out.println("task"+ name + "开始进行计算");

        Thread.sleep(3000);

        int sum = new Random().nextInt(300);

        int result = 0;

        for (int i = 0; i < sum; i++)

            result += i;

        return result;

    }

 

}
View Code

执行结果: 

CompletionService

三、CompletionService 

这个光看其单词,就可以猜到它应该是一个线程执行完成后相关的服务,没错。

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以将每个任务的Future保存进一个集合,然后为了防止get时阻塞,循环这个集合不断地调用 timeout为零的get。幸运的是CompletionService帮你做了这件事情。
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。
CompletionService的take返回的future是哪个先完成就先返回哪一个,而不是根据提交顺序。

CompletionService原理不是很难,它就是将一组线程的执行结果放入一个BlockingQueue当中。这里线程的执行结果放入到Blockqueue的顺序只和这个线程的执行时间有关。和它们的启动顺序无关。并且你无需自己在去写很多判断哪个线程是否执行完成,它里面会去帮你处理。

首先看看其源码: 

package java.util.concurrent;

 

public interface CompletionService<V> {

    //提交线程任务

    Future<V> submit(Callable<V> task);

    //提交线程任务

    Future<V> submit(Runnable task, V result);

   //阻塞等待

    Future<V> take() throws InterruptedException;

   //非阻塞等待

    Future<V> poll();

   //带时间的非阻塞等待

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

}

上面只是一个接口类,其实现类如下: 

package java.util.concurrent;

public class ExecutorCompletionService<V> implements CompletionService<V> {

    private final Executor executor;//线程池类

    private final AbstractExecutorService aes;

    private final BlockingQueue<Future<V>> completionQueue;//存放线程执行结果的阻塞队列

 

    //内部封装的一个用来执线程的FutureTask

    private class QueueingFuture extends FutureTask<Void> {

        QueueingFuture(RunnableFuture<V> task) {

            super(task, null);

            this.task = task;

        }

        protected void done() { completionQueue.add(task); }//线程执行完成后调用此函数将结果放入阻塞队列

        private final Future<V> task;

    }

 

    private RunnableFuture<V> newTaskFor(Callable<V> task) {

        if (aes == null)

            return new FutureTask<V>(task);

        else

            return aes.newTaskFor(task);

    }

 

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {

        if (aes == null)

            return new FutureTask<V>(task, result);

        else

            return aes.newTaskFor(task, result);

    }

 

     //构造函数,这里一般传入一个线程池对象executor的实现类

    public ExecutorCompletionService(Executor executor) {

        if (executor == null)

            throw new NullPointerException();

        this.executor = executor;

        this.aes = (executor instanceof AbstractExecutorService) ?

            (AbstractExecutorService) executor : null;

        this.completionQueue = new LinkedBlockingQueue<Future<V>>();//默认的是链表阻塞队列

    }

 

    //构造函数,可以自己设定阻塞队列

    public ExecutorCompletionService(Executor executor,

                                     BlockingQueue<Future<V>> completionQueue) {

        if (executor == null || completionQueue == null)

            throw new NullPointerException();

        this.executor = executor;

        this.aes = (executor instanceof AbstractExecutorService) ?

            (AbstractExecutorService) executor : null;

        this.completionQueue = completionQueue;

    }

    //提交线程任务,其实最终还是executor去提交

    public Future<V> submit(Callable<V> task) {

        if (task == null) throw new NullPointerException();

        RunnableFuture<V> f = newTaskFor(task);

        executor.execute(new QueueingFuture(f));

        return f;

    }

    //提交线程任务,其实最终还是executor去提交

    public Future<V> submit(Runnable task, V result) {

        if (task == null) throw new NullPointerException();

        RunnableFuture<V> f = newTaskFor(task, result);

        executor.execute(new QueueingFuture(f));

        return f;

    }

 

    public Future<V> take() throws InterruptedException {

        return completionQueue.take();

    }

 

    public Future<V> poll() {

        return completionQueue.poll();

    }

 

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {

        return completionQueue.poll(timeout, unit);

    }

 

}
View Code

从源码中可以知道。最终还是线程还是提交到Executor当中去运行,所以构造函数中需要Executor参数来实例化。而每次有线程执行完成后往阻塞队列添加一个Future。

这是上面的RunnableFuture,这是每次往线程池是放入的线程。 

public interface RunnableFuture<V> extends Runnable, Future<V> {

    void run();

}

接下来以两个例子来说明其使用

1、与Future的区别使用:

自定义一个Callable 

class HandleFuture<Integer> implements Callable<Integer> {

    

    private Integer num;

    

    public HandleFuture(Integer num) {

        this.num = num;

    }

 

    @Override

    public Integer call() throws Exception {

        Thread.sleep(3*100);

        System.out.println(Thread.currentThread().getName());

        return num;

    }

    

}

 首先是Futuer 

    public static void FutureTest() throws InterruptedException, ExecutionException {

        System.out.println("main Thread begin:");

        ExecutorService executor = Executors.newCachedThreadPool();

        List<Future<Integer>> result = new ArrayList<Future<Integer>>();

        for (int i = 0;i<10;i++) {

            Future<Integer> submit = executor.submit(new HandleFuture(i));

            result.add(submit);

        }

        executor.shutdown();

        for (int i = 0;i<10;i++) {//一个一个等待返回结果

            System.out.println("返回结果:"+result.get(i).get());

        }

        System.out.println("main Thread end:");

    }

执行结果: 

CompletionService

从输出结果可以看出,我们只能一个一个阻塞的取出。这中间肯定会浪费一定的时间在等待上。如7返回了。但是前面1-6都没有返回。那么7就得等1-6输出才能输出。

接下来换成CompletionService来做: 

    public static void CompleTest() throws InterruptedException, ExecutionException {

        System.out.println("main Thread begin:");

        ExecutorService executor = Executors.newCachedThreadPool();

        // 构建完成服务

        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);

        for (int i = 0;i<10;i++) {

            completionService.submit(new HandleFuture(i));

        }

        for (int i = 0;i<10;i++) {//一个一个等待返回结果

            System.out.println("返回结果:"+completionService.take().get());

        }

        System.out.println("main Thread end:");

    }

输出结果:

 CompletionService

 可以看出,结果的输出和线程的放入顺序无关系。每一个线程执行成功后,立刻就输出。(与线程编号pool-1-thread-x无关,可忽略该输出)

https://blog.csdn.net/evankaka/article/details/51610635

https://blog.csdn.net/u010185262/article/details/56017175

相关推荐