背景
前段时间测试的同事反馈他们那边的自动化脚本最近老是跑出一个慢接口,接口响应时间平均在2s左右。这是一个详情页接口,看代码该接口要查询的东西确实太多了,前前后后七七八八有十几二十个查询业务吧。看pinpoint监控调用链查询业务主要是一个别的组提供的dubbo接口和自己项目的一个慢查询,其余十几个简单查询其实并不慢,对能优化的地方都改进后打算还是再对这个详情接口做异步查询。一个线程查dubbo接口,一个线程查慢查询,一个线程查剩下的杂七杂八数据。
线程池Executor
用线程池后代码看起来又觉得对一个接口这样子搞显得不太美观,像这样:
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> future = es.submit(() ->{
// 长时间的异步计算
// ……
// 然后返回结果
return 100;
});
future.get();
就这样等三个Future都获取到值后响应给客户端,这样对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源。
代码强迫症犯了,于是开始寻找其他更优雅的异步方式。
- @Async倒是优雅,但无奈我们Spring项目版本过低。
- RxJava太古老,代码还不如Future方便。
- Google的guava包扩展了Future:
ListenableFuture
、SettableFuture
以及辅助类Futures
等,但无奈我们项目没有引入guava。
直到发现了它,CompletableFuture
JDK亲儿子,1.8版本后加入的正统Java类库。
CompletableFuture API
在Java 8中, 新增加了一个包含50个方法左右的类:CompletableFuture
,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
创建CompletableFuture对象
completedFuture
CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture(应该没卵用)
runAsync
创建异步执行的代码,类比Executor中的execute(() -> xxx),CompletableFuture的计算结果返回类型为Void
public static CompletableFuture<Void> runAsync(Runnable runnable)
带Executor
参数,可以自定义线程池,如果不传该参数默认使用的ForkJoinPool.commonPool()
作为它的线程池执行异步代码
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
supplyAsync
同上,类比Executor中的submit(() -> return xxx);
supplyAsync
方法以Supplier<U>
函数式接口类型为参数CompletableFuture的计算结果返回类型为U
,例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//长时间的计算任务
return "·00";
});
处理单个CompletableFuture
针对一些有返回值的逻辑处理代码,对其异步操作后只想对其返回
whenComplete
可以就简单理解成加强版的<T> Future<T> submit(Callable<T> task)
,增加了主动处理异常。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
- 当
CompletableFuture
的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action
。可以看到Action
的类型是BiConsumer<? super T,? super Throwable>
,它可以处理正常的计算结果,或者异常情况。 - 方法不以
Async
结尾,意味着Action
使用相同的线程执行,而Async
可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
例子
public class Main {
private static Random rand = new Random();
private static long t = System.currentTimeMillis();
static int getMoreData() {
System.out.println("begin to start compute");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("end to start compute. passed " + (System.currentTimeMillis() - t)/1000 + " seconds");
return rand.nextInt(1000);
}
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(f.get());
System.in.read();
}
}
get()方法会阻塞代码,等待回调。
exceptionally
简单理解就是专门捕获CompletableFuture
异常的,一般放在最后。
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
Function就是专门接受异常的。
handle
兼有whenComplete
和转换
的两个功能。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor);
相比whenComplete
将BiConsumer接口换成了BiFunction接口(有返回值,这个返回值就是转换的新类型)。相当于将CompletableFuture<T>
转换成CompletableFuture<U>
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> f = future.handle((v,e) -> {
if (e == null) {
return v * 10;
}
});
System.out.println(f.get()); //"1000"
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
thenApplay
方法只是用来处理正常值,因此一旦有异常就会抛出。相比handle
,就是把BiFuntion
换成了Funtion
,没有了异常的单独处理,handle
有对异常的处理。
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> f = future.thenApply(i -> i * 10);
System.out.println(f.get()); //"1000"
需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。
thenAccept
纯消费
的接口
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
看它的参数类型也就明白了,它们是函数式接口Consumer,相比thenApply
这个接口只有输入,没有返回值。
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<Void> f = future.thenAccept(i -> System.out.println(i));
System.out.println(f.get());
thenRun
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<Void> f = future.thenRun(() -> System.out.println("finished"));
System.out.println(f.get());
同上,相比thenAccept
这个接口连输入值也没了。
至此,你可以根据方法的参数的类型来加速你的记忆。
Runnable
类型的参数会忽略计算的结果,Consumer
是纯消费计算结果,BiConsumer
会组合另外一个CompletionStage
纯消费,Function
会对计算结果做转换,BiFunction
会组合另外一个CompletionStage
的计算结果做转换。
处理两个CompletableFuture
runAfterBoth
相比thenRun
可以处理两个CompletionStage
的结果
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
例子
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("A"));
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("B"));
CompletableFuture<Void> f = future.runAfterBoth(future2, () -> System.out.println("success"));
System.out.println(f.get()); // null
thenAcceptBoth
相比thenAccept
可以处理两个CompletionStage
的结果
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);
当两个CompletionStage
都正常完成计算的时候,就会执行提供的action
,它用来组合另外一个异步的结果。该接口的返回的CompletableFuture
的类型是Void
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> {
return 66.66;
});
CompletableFuture<Void> f = future.thenAcceptBoth(future2, (x, y) -> System.out.println(x * y)); // 6666.0
System.out.println(f.get()); // null
thenCompose
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor);
这个可以和thenAcceptBoth
对比记忆,thenAcceptBoth
是将两个CompletionStage
并行计算后拿到两者的返回结果进行消费;而thenCompose
是先拿到第一个CompletionStage
的返回值然后计算返回第二个CompletionStage
的返回值。
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> f = future.thenCompose( i -> {
return CompletableFuture.supplyAsync(() -> {
return (i * 10) + "";
});
});
System.out.println(f.get()); //1000
thenCombine
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
thenCombine
和上面的thenCompose
相比,是将两个CompletionStage
并行执行的,这个在用异步解决“速度”问题时用处更大。和thenAcceptBoth
相比是有返回自定义类型,thenCombine
是BiFunction
,而thenAcceptBoth
是BiConsumer
。搞清楚了这些BI
接口就一目了然了。
例子
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "abc";
});
CompletableFuture<String> f = future.thenCombine(future2, (x,y) -> y + "-" + x);
System.out.println(f.get()); //abc-100
acceptEither
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
acceptEither
方法是当任意一个CompletionStage
完成的时候,然后对首先完成的执行Consumer
。这个方法返回CompletableFuture<Void>
。
applyToEither
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor);
applyToEither
方法是当任意一个CompletionStage
完成的时候,然后对首先完成的执行Function
。它的返回值会当作新的CompletableFuture<U>
的计算结果。
例子
Random rand = new Random();
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
CompletableFuture<String> f = future.applyToEither(future2,i -> i.toString());
这个例子有时会输出100,有时候会输出200,哪个Future先完成就会根据它的结果计算。
处理多个CompletableFuture
allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
allOf
方法是当所有的CompletableFuture
都执行完后执行计算。返回CompletableFuture<Void>
,没有任何Function
或Customer
,执行get()
,join()
方法时等待所有任务全部执行。
anyOf
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
相比allOf
,anyOf
是有返回值的,返回CompletableFuture<Object>
,获取第一个执行完成的返回对象。
例子
Random rand = new Random();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//CompletableFuture<Void> f = CompletableFuture.allOf(future1,future2);
CompletableFuture<Object> f = CompletableFuture.anyOf(future1,future2);
System.out.println(f.get());
参考资料