JAVA8之CompletableFuture(组合式异步编程)与Future
分类:
IT文章
•
2022-08-02 00:17:15
Future
Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。要使用Future,通只需要将耗时操作封装在一个Callable对象中,再将它提交给ExecutorService。
ExecutorService(线程池)体系结构:
一、线程池: 提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。
二、线程池的体系结构:
java.util.concurrent.Executor 负责线程的使用和调度的根接口
|--ExecutorService 子接口: 线程池的主要接口
|--ThreadPoolExecutor 线程池的实现类
|--ScheduledExceutorService 子接口: 负责线程的调度
|--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService
三、工具类 : Executors
ExecutorService newFixedThreadPool() : 创建固定大小的线程池
ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程
ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务
示例代码:
package completableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> f = es.submit(() -> {
Thread.sleep(10000);
// 结果
return 100;
});
// do something
Integer result = 0;
try {
result = f.get(1l, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(result);
}
}
View Code
Future接口的局限性:Future很难直接表述多个Future 结果之间的依赖性
CompletableFuture
创建CompletableFuture
a.实例化生成 CompletableFuture
此时这个future和Callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。通过complete方法触发完成
示例代码如下:
package completableFuture;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
CompletableFuture<Double> future = new CompletableFuture<Double>();
new Thread(() -> {
try {
Thread.sleep(1000l);
double price = 10d;
future.complete(price);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
Double result = future.join();
Long end = System.currentTimeMillis();
System.out.println((end - start) + " ms:" + result);
}
}
View Code
b.使用工厂方法创建 CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
supplyAsync
与runAsync
不同在与前者异步返回一个结果,后者是void. Executor表示是用自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
作为它的线程池.线程数是Runtime.getRuntime().availableProcessors(). 其中Supplier
是一个函数式接口(函数描述符为 () -> T).
结果执行完成时的处理
public CompletableFuture<T> whenComplete( <? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
BiConsumer的函数描述符为(T,U) -> void ,无返回值.方法以Async结尾,意味着Action使用相同的线程执行(如果使用相同的线程池,也可能会被同一个线程选中执行).
多个异步任务进行流水线操作
a.结果转换(thenApply):输入是上一个阶段计算后的结果,返回值是经过转化后结果 .Function函数描述符: T -> R
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
b.消费结果(thenAccept):只是针对结果进行消费,入参是Consumer,没有返回值. Consumer函数描述符: T -> void
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
c.结果合并(thenCombine):需要上一阶段的返回值,并且other代表的CompletionStage也要返回值之后,把这两个返回值,进行转换后返回指定类型的值
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
代码示例如下:
package completableFuture;
import java.util.concurrent.CompletableFuture;
public class Test {
public static void main(String[] args) {
testThenApply();
testThenCompose();
testThenCombine();
testThenAcceptBoth();
}
private static void testThenApply() {
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> {
System.out.println(resultA + " resultB");
return resultA + " resultB";
});
}
private static void testThenCompose() {
//
CompletableFuture.supplyAsync(() -> "resultA").thenCompose(resultA -> {
return CompletableFuture.supplyAsync(() -> resultA + "resultB");
});
}
private static void testThenCombine() {
// thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2);
System.out.println(result.join());
}
private static void testThenAcceptBoth() {
// 接受任务的处理结果,并消费处理,有返回结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
future1.thenAcceptBoth(future2, (f1, f2) -> {
System.out.println("accept:" + f1 + f2);
});
}
}
View Code