简介
CompletableFuture 是 java.io.conrrent 库在 JDK8 中新增的并发工具,同传统的 Future 相比,支持流式计算、函数式编程、聚合计算、完成通知和自定义异常处理等特性。
CompletableFuture 实现了 CompletionStage 和 Future。CompletionStage 是对 Future 的扩展,增强了流式处理、异步回调、组合处理的能力,使得在处理多任务的协同工作时更加顺利。
CompletableFuture 和 FutureTask 属于 Future 接口的实现类,都可以获取线程的执行结果。

CompletableFuture创建
构造函数
最简单的方式是通过构造函数直接 new 一个 CompletableFutrue 实例。
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
Object join = completableFuture.join();
System.out.println(join);
// 无输出
但需要注意的是,新创建的 CompletableFuture 没有计算结果时,当前线程执行 join 方法,会一直阻塞。
我们可以通过 complete 方法给当前线程设置结果。
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
completableFuture.complete("create");
Object join = completableFuture.join();
System.out.println(join);
// 输出create
或者交由另外的线程设置计算结果,这样就实现了线程间协作。
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
System.out.println("开始等待");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束等待");
completableFuture.complete("create");
}).start();
System.out.println("获取值");
Object join = completableFuture.join();
Thread.sleep(1000);
System.out.println(join);
输出结果
获取值
开始等待
结束等待
create
completedFuture创建
CompletableFuture.completedFuture 设置一个计算结果,这样 CompletableFuture 相当于已经执行过了 complete 了。当然,一般情况下这个使用较少。
/**
* Returns a new CompletableFuture that is already completed with
* the given value.
*
* @param value the value
* @param <U> the type of the value
* @return the completed CompletableFuture
*/
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
这里来看一下案例
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("completedFuture");
String join1 = completedFuture.join();
log.info("result=>{}",join1);
completedFuture = CompletableFuture.completedFuture(null);
log.info("result=>{}",completedFuture.join());
输出结果
result=>completedFuture
result=>null
runAsync创建
CompletableFuture.runAsync 要求传入 Runnable 类型的参数,因此没有返回值。所以 runAsync 适合使用在不需要返回值的计算场景中。
CompletableFuture.runAsync 有两个方法签名。其中第二种会允许传入一个线程池,当没有传入自定义线程池时,runAsync 默认会使用内部的 ForkJoinPool 线程池。
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
简单案例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
getThreadName(0);
});
log.info("result=>{}", future.join());
// 输出结果null。
注意这里是会阻塞调用 CompletableFuture.join 的线程来等待结果返回。
supplyAsync创建
CompletableFuture.supplyAsycn 方法与 CompletableFuture.runAsync 类似,区别在于 supplyAsync 接收一个 Supplier 类型的参数,会生成一个返回值,所以 supplyAsync 适用于需要返回值的计算场景中。
简单案例
@Test
public void supplyAsync() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
getThreadName(0);
return "success";
});
log.info("result=>{}", future.join());
}
输出结果
[ForkJoinPool.commonPool-worker-1] INFO top.zsmile.test.basic.threads.CompletableFutureTest - ThreadName:ForkJoinPool.commonPool-worker-1,value:0
[main] INFO top.zsmile.test.basic.threads.CompletableFutureTest - result=>success
从输出结果可以看出,CompletableFuture 生成的异步线程会打印执行线程信息,并将结果返回给主线程打印。
结果获取
CompletableFutrue 有4类获取结果的方法:
public T get() throws InterruptedException, ExecutionException。获取结果。如果出现异常则抛出。public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。获取结果,并允许传入超时字段,如果时间内没有获取到计算结果,则抛出TimeoutException异常,此时结果值为null。注意:就算抛出异常,但任务也不会中断,而是继续执行public T join()。获取结果,且不会抛出异常。public T getNow(T valueIfAbsent)。立即获取结果,如果CompletableFuture的结果未计算完成,则返回传入结果。注意:就算抛出异常,但任务也不会中断,而是继续执行
验证一下。
@Test
public void getResult() throws ExecutionException, InterruptedException {
// get
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
getThreadName(0);
return "success";
});
log.info("result=>{}", future.get());
// get
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
getThreadName(0);
try {
int i = 1;
while (i <= 5) {
Thread.sleep(1000);
log.debug("sleep1 1000");
i++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
});
try {
log.info("result1=>{}", future1.get(2000, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
log.error("timeout1 => {}", e.getMessage());
}
// join
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
getThreadName(0);
return "success";
});
log.info("result2=>{}", future2.join());
// getNow
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
getThreadName(0);
try {
int i = 1;
while (i <= 5) {
Thread.sleep(1000);
log.debug("sleep2 1000");
i++;
}
log.info("over");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
});
log.info("result3=>{}", future3.getNow("6666"));
Thread.sleep(10000);
log.info("result3=>{}", future3.getNow("6666"));
}
输出结果
11:12:26.581 [ForkJoinPool.commonPool-worker-1] INFO top.zsmile.test.basic.threads.CompletableFutureTest - ThreadName:ForkJoinPool.commonPool-worker-1,value:0
11:12:26.585 [main] INFO top.zsmile.test.basic.threads.CompletableFutureTest - result=>success
11:12:26.586 [ForkJoinPool.commonPool-worker-1] INFO top.zsmile.test.basic.threads.CompletableFutureTest - ThreadName:ForkJoinPool.commonPool-worker-1,value:0
11:12:27.601 [ForkJoinPool.commonPool-worker-1] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep1 1000
11:12:28.592 [main] ERROR top.zsmile.test.basic.threads.CompletableFutureTest - timeout1 => null
11:12:28.592 [ForkJoinPool.commonPool-worker-2] INFO top.zsmile.test.basic.threads.CompletableFutureTest - ThreadName:ForkJoinPool.commonPool-worker-2,value:0
11:12:28.592 [main] INFO top.zsmile.test.basic.threads.CompletableFutureTest - result2=>success
11:12:28.592 [main] INFO top.zsmile.test.basic.threads.CompletableFutureTest - result3=>6666
11:12:28.592 [ForkJoinPool.commonPool-worker-2] INFO top.zsmile.test.basic.threads.CompletableFutureTest - ThreadName:ForkJoinPool.commonPool-worker-2,value:0
11:12:28.607 [ForkJoinPool.commonPool-worker-1] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep1 1000
11:12:29.603 [ForkJoinPool.commonPool-worker-2] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep2 1000
11:12:29.618 [ForkJoinPool.commonPool-worker-1] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep1 1000
11:12:30.614 [ForkJoinPool.commonPool-worker-2] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep2 1000
11:12:30.630 [ForkJoinPool.commonPool-worker-1] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep1 1000
11:12:31.615 [ForkJoinPool.commonPool-worker-2] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep2 1000
11:12:31.631 [ForkJoinPool.commonPool-worker-1] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep1 1000
11:12:32.623 [ForkJoinPool.commonPool-worker-2] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep2 1000
11:12:33.630 [ForkJoinPool.commonPool-worker-2] DEBUG top.zsmile.test.basic.threads.CompletableFutureTest - sleep2 1000
11:12:33.630 [ForkJoinPool.commonPool-worker-2] INFO top.zsmile.test.basic.threads.CompletableFutureTest - over
11:12:38.605 [main] INFO top.zsmile.test.basic.threads.CompletableFutureTest - result3=>success
join和get的区别
另外 join 和 get 相同之处都是阻塞获取结果,那么这个异常抛出的差别是什么呢?
@Test
public void getOrJoin() {
try {
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> 1 / 0);
log.info("join result => {}", integerCompletableFuture.join());
} catch (Exception e) {
e.printStackTrace();
}
try {
CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> 1 / 0);
log.info("get result => {}", integerCompletableFuture2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
输出结果
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
...
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
...
这里 join 抛出 CompletionException 异常,而 get 抛出 ExecutionException 异常,既然都抛出了异常,那么他们的差别在于哪里呢?
join抛出的是unchecked异常,即RuntimeException异常,这种异常在检查期间不会抛出,也不会强制要求开发者进行捕获。并且会将异常包装成CancellationException和CompletionException异常。get抛出的是经过检查的异常,InterruptedException和ExecutionException。需要开发捕获处理。
流式计算
CompletableFuture 与 Future 最大的不同在于其对流式计算的支持,多个任务之间,可以关联,形成计算流。并对不同任务的计算结果进行处理。
CompletableFuture 流式方法的大致可以分为两类 带Async 和 不带Async的。带 Async 的会单独提交到线程池中。