Java8通过CompletableFuture实现异步回调
前言:
java5为我们提供了Callable
和Future
,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我们提供了CompletableFuture
,使我们可以轻松完成异步任务的回调。
1 什么是CompletableFuture?
CompletableFuture
是Java 8 中新增的一个类,它是对Future接口的扩展。从下方的类继承关系图中我们看到其不仅实现了Future接口,还有CompletionStage接口,当Future需要显示地完成时,可以使用CompletionStage接口去支持完成时触发的函数和操作,当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。
CompletableFuture
主要作用就是简化我们异步编程的复杂性,支持函数式编程,可以通过回调的方式处理计算结果。
2 为什么会有CompletableFuture ?
在java5中,JDK为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我们提供了CompletableFuture,使我们无需阻塞等待,而是通过回调的方式去处理结果,并且还支持流式处理、组合异步任务等操作。
如果不熟悉Callable
和Future
的,可以看小编之前更新的这篇文章Java从源码看异步任务计算FutureTask
3 CompletableFuture 简单使用
下面我们就CompletableFuture 的使用进行简单分类:
创建任务:
- supplyAsync/runAsync
异步回调:
- thenApply/thenAccept/thenRun
- thenApplyAsync/thenAcceptAsync/thenRunAsync
- exceptionally
- handle/whenComplete
组合处理:
- thenCombine / thenAcceptBoth / runAfterBoth
- applyToEither / acceptEither / runAfterEither
- thenCompose
- allOf / anyOf
具体内容请参照以下案例:
public static void main(String[] args) throws Exception { // 1.带返回值的异步任务(不指定线程池,默认ForkJoinPool.commonPool(),单核ThreadPerTaskExecutor) CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> { return 1 + 1; }); System.out.println("cf1 result: " + cf1.get()); // 2.无返回值的异步任务(不指定线程池,默认ForkJoinPool.commonPool(),单核ThreadPerTaskExecutor) CompletableFuture cf2 = CompletableFuture.runAsync(() -> { int a = 1 + 1; }); System.out.println("cf2 result: " + cf2.get()); // 3.指定线程池的带返回值的异步任务,runAsync同理 CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> { return 1 + 1; }, Executors.newCachedThreadPool()); System.out.println("cf3 result: " + cf3.get()); // 4.回调,任务执行完成后执行的动作 CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> { System.out.println("cf4回调拿到cf1的结果 result : " + result); return result + 1; }); System.out.println("cf4 result: " + cf4.get()); // 5.异步回调(将回调任务提交到线程池),任务执行完成后执行的动作后异步执行 CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> { System.out.println("cf5回调拿到cf1的结果 result : " + result); return result + 1; }); System.out.println("cf5 result: " + cf5.get()); // 6.回调(同thenApply但无返回结果),任务执行完成后执行的动作 CompletableFuture cf6 = cf1.thenAccept((result) -> { System.out.println("cf6回调拿到cf1的结果 result : " + result); }); System.out.println("cf6 result: " + cf6.get()); // 7.回调(同thenAccept但无入参),任务执行完成后执行的动作 CompletableFuture cf7 = cf1.thenRun(() -> { }); System.out.println("cf7 result: " + cf7.get()); // 8.异常回调,任务执行出现异常后执行的动作 CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("出现异常"); }); CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> { return -1; }); System.out.println("cf8 result: " + cf8.get()); // 9.当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法 // 如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致; // 如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。 CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> { if (b != null) { b.printStackTrace(); } return a; }); System.out.println("cf9 result: " + cf9.get()); // 10 与handle类似,无返回值 try { CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> { if (b != null) { b.printStackTrace(); } }); System.out.println("cf10 result: " + cf10.get()); } catch (Exception e) { System.out.println("cf10 出现异常!!!"); } // 11 组合处理(两个都完成,然后执行)有入参,有返回值 CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> { return r1 + r2; }); System.out.println("cf11 result: " + cf11.get()); // 12 组合处理(两个都完成,然后执行)有入参,无返回值 CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> { }); System.out.println("cf12 result: " + cf12.get()); // 13 组合处理(两个都完成,然后执行)无入参,无返回值 CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> { }); System.out.println("cf13 result: " + cf13.get()); // 14 组合处理(有一个完成,然后执行)有入参,有返回值 CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> { return r; }); System.out.println("cf14 result: " + cf14.get()); // 15 组合处理(有一个完成,然后执行)有入参,无返回值 CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> { }); System.out.println("cf15 result: " + cf15.get()); // 16 组合处理(有一个完成,然后执行)无入参,无返回值 CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> { }); System.out.println("cf16 result: " + cf16.get()); // 17 方法执行后返回一个新的CompletableFuture CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> { return CompletableFuture.supplyAsync(() -> { return 1 + 1; }); }); System.out.println("cf17 result: " + cf17.get()); // 18 多个任务都执行成功才会继续执行 CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> { System.out.println(r); }); // 18 多个任务任意一个执行成功就会继续执行 CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> { System.out.println(r); }); }
4 CompletableFuture 源码分析
首先我们可以从注释中看到,它对CompletionStage
、Future
接口扩展的一些描述,这些也是它的一些重点。
除了直接操作状态和结果的相关方法外,CompletableFuture
还实现了CompletionStage
接口的如下策略:
- (1)为非异步方法的依赖完成提供的操作,可以由完成当前
CompletableFuture
的线程执行,也可以由完成方法的任何其他调用方执行。 - (2)所有没有显式Executor参数的异步方法都使用
ForkJoinPool.commonPool()
执行(除非它不支持至少两个并行级别,在这种情况下,将创建一个新线程来运行每个任务)。为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture的实例,异步完成任务。
不了解ForkJoinPool
的可以阅读小编之前更新的这篇文章一文带你了解Java中的ForkJoin。
- (3)所有
CompletionStage
方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。
CompletableFuture实现了Future接口的如下策略:
- 因为(与FutureTask不同)这个类对导致它完成的计算没有直接控制权,所以取消被视为另一种形式的异常完成,所以cancel操作被视为是另一种异常完成形式(new CancellationException()具有相同的效果。)。方法
isCompletedExceptionally()
可以用来确定一个CompletableFuture是否以任何异常的方式完成。 - 如果异常完成时出现
CompletionException
,方法get()和get(long,TimeUnit)会抛出一个ExecutionException
,其原因与相应CompletionException中的原因相同。为了简化在大多数上下文中的使用,该类还定义了join()和getNow()方法,在这些情况下直接抛出CompletionException。
4.1 创建异步任务
我们先看一下CompletableFuture
是如何创建异步任务的,我们可以看到起创建异步任务的核心实现是两个入参,一个入参是Executor,另一个入参是Supplier(函数式编程接口)。其中也提供了一个入参的重载,一个入参的重载方法会获取默认的Executor,当系统是单核的会使用ThreadPerTaskExecutor,多核时使用ForkJoinPool.commonPool()
。
注意:这里默认ForkJoinPool.commonPool()线程池,如果所有异步任务都使用该线程池话,出现问题不容易定位,如果长时间占用该线程池可能影响其他业务的正常操作,stream的并行流也是使用的该线程池。
其中还封装了静态内部类AsyncSupply
,该类代表这个异步任务,实现了Runnable
,重写了run方法。
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; } /** * 静态内部类,继承了ForkJoinTask<Void>、实现了Runnable、AsynchronousCompletionTask */ static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} public final boolean exec() { run(); return true; } public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } } }
Supplier类是一个函数式的接口,@FunctionalInterface
注解就是函数式编程的标记。
package java.util.function; @FunctionalInterface public interface Supplier<T> { T get(); }
4.2 异步任务回调
异步任务回调,我们以thenApply/thenApplyAsync
为例来看一下其实现原理,方法名含有Async的会传入asyncPool。uniApplyStage
方法通过判断e是否有值,来区分是从哪个方法进来的。thenApply
不会传入 Executor,它优先让当前线程来执行后续 stage 的任务。
- 当发现前一个 stage 已经执行完毕时,直接让当前线程来执行后续 stage 的 task。
- 当发现前一个 stage 还没执行完毕时,则把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中。执行前一个 stage 的线程,执行完毕后,接着执行后续 stage 的 task。
thenApplyAsync
会传入一个 Executor,它总是让 Executor 线程池里面的线程来执行后续 stage 的任务。
- 把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中,直接让 Executor 来执行。
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); // Async直接进入,不是Async执行uniApply尝试获取结果 if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; // 判断当前CompletableFuture是否已完成,如果没完成则返回false;如果完成了则执行下面的逻辑。 if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { // 判断任务结果是否是AltResult类型 if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { // 判断当前任务是否可以执行 if (c != null && !c.claim()) return false; // 获取任务结果 @SuppressWarnings("unchecked") S s = (S) r; // 执行 completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } final void push(UniCompletion<?,?> c) { if (c != null) { while (result == null && !tryPushStack(c)) lazySetNext(c, null); // clear on failure } } final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); }
4.3 异步任务组合
我们再thenCombine
方法为例看一下CompletableFuture
是如何处理组合任务的,我们可以看到thenCombine的源码与thenApply的源码基本上是一直的,只不过组合的时候不仅仅是判断一个,需要集合具体场景,判断多个CompletableFuture
。
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.biApply(this, b, f, null)) { BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; } final <R,S> boolean biApply(CompletableFuture<R> a, CompletableFuture<S> b, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c) { Object r, s; Throwable x; // 此处不止要判断a还得判断b if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } // 这里不止判断a的结果r还要判断b的结果s if (s instanceof AltResult) { if ((x = ((AltResult)s).ex) != null) { completeThrowable(x, s); break tryComplete; } s = null; } // 最后将rr, ss传入 try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") R rr = (R) r; @SuppressWarnings("unchecked") S ss = (S) s; completeValue(f.apply(rr, ss)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn) { super(executor, dep, src, snd); this.fn = fn; } // tryFire方法也同样的多可个b final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } }
上一篇:java 方法与数组基础使用详解
栏 目:JAVA代码
下一篇:Springboot 集成 lombok.jar过程解析
本文标题:Java8通过CompletableFuture实现异步回调
本文地址:http://www.codeinn.net/misctech/222807.html