Java异步编程优雅之美-CompletableFuture

in 笔记 with 0 comment

背景

前段时间测试的同事反馈他们那边的自动化脚本最近老是跑出一个慢接口,接口响应时间平均在2s左右。这是一个详情页接口,看代码该接口要查询的东西确实太多了,前前后后七七八八有十几二十个查询业务吧。看pinpoint监控调用链查询业务主要是一个别的组提供的dubbo接口和自己项目的一个慢查询,其余十几个简单查询其实并不慢,对能优化的地方都改进后打算还是再对这个详情接口做异步查询。一个线程查dubbo接口,一个线程查慢查询,一个线程查剩下的杂七杂八数据。

线程池Executor

用线程池后代码看起来又觉得对一个接口这样子搞显得不太美观,像这样:

ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> future = es.submit(() ->{
	// 长时间的异步计算
	// ……
	// 然后返回结果
	return 100;
});
future.get();

就这样等三个Future都获取到值后响应给客户端,这样对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源。

代码强迫症犯了,于是开始寻找其他更优雅的异步方式。

直到发现了它,CompletableFuture JDK亲儿子,1.8版本后加入的正统Java类库。

CompletableFuture API

在Java 8中, 新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。

创建CompletableFuture对象

completedFuture

CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture(应该没卵用)

runAsync

创建异步执行的代码,类比Executor中的execute(() -> xxx),CompletableFuture的计算结果返回类型为Void

public static CompletableFuture<Void> runAsync(Runnable runnable)

Executor参数,可以自定义线程池,如果不传该参数默认使用的ForkJoinPool.commonPool()作为它的线程池执行异步代码

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync

同上,类比Executor中的submit(() -> return xxx);
supplyAsync方法以Supplier<U>函数式接口类型为参数CompletableFuture的计算结果返回类型为U,例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //长时间的计算任务
    return "·00";
});

处理单个CompletableFuture

针对一些有返回值的逻辑处理代码,对其异步操作后只想对其返回

whenComplete

可以就简单理解成加强版的<T> Future<T> submit(Callable<T> task),增加了主动处理异常。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

例子

public class Main {
    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();
    static int getMoreData() {
        System.out.println("begin to start compute");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("end to start compute. passed " + (System.currentTimeMillis() - t)/1000 + " seconds");
        return rand.nextInt(1000);
    }
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
        System.in.read();
    }
}

get()方法会阻塞代码,等待回调。

exceptionally

简单理解就是专门捕获CompletableFuture异常的,一般放在最后。

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

Function就是专门接受异常的。

handle

兼有whenComplete转换的两个功能。

public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor);

相比whenComplete将BiConsumer接口换成了BiFunction接口(有返回值,这个返回值就是转换的新类型)。相当于将CompletableFuture<T>转换成CompletableFuture<U>

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<String> f =  future.handle((v,e) -> {
    if (e == null) {
	return v * 10;
    }
});
System.out.println(f.get()); //"1000"

thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

thenApplay方法只是用来处理正常值,因此一旦有异常就会抛出。相比handle,就是把BiFuntion换成了Funtion,没有了异常的单独处理,handle有对异常的处理。

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<String> f =  future.thenApply(i -> i * 10);
System.out.println(f.get()); //"1000"

需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。

thenAccept

纯消费的接口

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

看它的参数类型也就明白了,它们是函数式接口Consumer,相比thenApply这个接口只有输入,没有返回值。

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<Void> f =  future.thenAccept(i -> System.out.println(i));
System.out.println(f.get());

thenRun

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished"));
System.out.println(f.get());

同上,相比thenAccept这个接口连输入值也没了。

至此,你可以根据方法的参数的类型来加速你的记忆。Runnable类型的参数会忽略计算的结果,Consumer是纯消费计算结果,BiConsumer会组合另外一个CompletionStage纯消费,Function会对计算结果做转换,BiFunction会组合另外一个CompletionStage的计算结果做转换。

处理两个CompletableFuture

runAfterBoth

相比thenRun可以处理两个CompletionStage的结果

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

例子

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("A"));
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("B"));
CompletableFuture<Void> f = future.runAfterBoth(future2, () -> System.out.println("success"));
System.out.println(f.get()); // null

thenAcceptBoth

相比thenAccept可以处理两个CompletionStage的结果

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);

当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。该接口的返回的CompletableFuture的类型是Void

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> {
    return 66.66;
});
CompletableFuture<Void> f = future.thenAcceptBoth(future2, (x, y) -> System.out.println(x * y)); // 6666.0
System.out.println(f.get()); // null

thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor);

这个可以和thenAcceptBoth对比记忆,thenAcceptBoth是将两个CompletionStage并行计算后拿到两者的返回结果进行消费;而thenCompose是先拿到第一个CompletionStage的返回值然后计算返回第二个CompletionStage的返回值。

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<String> f =  future.thenCompose( i -> {
    return CompletableFuture.supplyAsync(() -> {
        return (i * 10) + "";
    });
});
System.out.println(f.get()); //1000

thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

thenCombine和上面的thenCompose相比,是将两个CompletionStage并行执行的,这个在用异步解决“速度”问题时用处更大。和thenAcceptBoth相比是有返回自定义类型,thenCombineBiFunction,而thenAcceptBothBiConsumer。搞清楚了这些BI接口就一目了然了。

例子

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "abc";
});
CompletableFuture<String> f =  future.thenCombine(future2, (x,y) -> y + "-" + x);
System.out.println(f.get()); //abc-100

acceptEither

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);

acceptEither方法是当任意一个CompletionStage完成的时候,然后对首先完成的执行Consumer。这个方法返回CompletableFuture<Void>

applyToEither

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor);

applyToEither方法是当任意一个CompletionStage完成的时候,然后对首先完成的执行Function。它的返回值会当作新的CompletableFuture<U>的计算结果。

例子

Random rand = new Random();
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(10000 + rand.nextInt(1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(10000 + rand.nextInt(1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 200;
});
CompletableFuture<String> f =  future.applyToEither(future2,i -> i.toString());

这个例子有时会输出100,有时候会输出200,哪个Future先完成就会根据它的结果计算。

处理多个CompletableFuture

allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

allOf方法是当所有的CompletableFuture都执行完后执行计算。返回CompletableFuture<Void>,没有任何FunctionCustomer,执行get(),join()方法时等待所有任务全部执行。

anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

相比allOfanyOf是有返回值的,返回CompletableFuture<Object>,获取第一个执行完成的返回对象。

例子

Random rand = new Random();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(10000 + rand.nextInt(1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(10000 + rand.nextInt(1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
//CompletableFuture<Void> f =  CompletableFuture.allOf(future1,future2);
CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2);
System.out.println(f.get());

参考资料