Future和CompletableFuture解析与使用
一、Future 模式
自 Java 1.5 起,Java 提供了 Callable 和 Future 接口,允许我们在任务执行完毕后获取执行结果。
Future 接口是构建异步应用的基础,也是多线程开发中常见的设计模式。
当我们调用一个函数方法时,如果该函数执行缓慢,调用者通常需要进行等待。但在某些场景下,我们并不急于立即获取结果。此时,我们可以让被调用者立即返回,在后台慢慢处理请求;对于调用者而言,则可以先处理其他任务,待真正需要数据时再去尝试获取结果。

1. Callable 与 Runnable
java.lang.Runnable 是一个接口,其中只声明了一个 run() 方法。该方法的返回值为 void,因此任务执行完毕后无法返回任何结果。
public interface Runnable {
public abstract void run();
}Callable 位于 java.util.concurrent 包下,它也是一个接口,其中声明了一个 call() 方法。这是一个泛型接口,call() 函数返回的类型即为泛型参数 V 的类型。
public interface Callable<V> {
V call() throws Exception;
}2. Future 与 Callable 的使用
Future 用于对具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成以及获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}如何使用 Future 和 Callable 呢?一般情况下是配合 ExecutorService 来使用的。在 ExecutorService 接口中声明了若干个 submit 方法的重载版本:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);以下是 Future + Callable 的使用示例。这里演示了直接使用 FutureTask 配合 Thread 的方式,同时也注释展示了配合 ExecutorService 的方式:
import java.util.Random;
import java.util.concurrent.*;
/**
* @program: callable
* @description: testfuture
* @author: Mr.Wang
* @create: 2018-08-12 12:11
**/
public class Testfuture {
public static void main(String[] args) {
// 第一种方式:直接使用 FutureTask 配合 Thread
FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt();
}
});
new Thread(task).start();
// 第二种方式:配合 ExecutorService (示例代码已注释)
// ExecutorService executor = Executors.newSingleThreadExecutor();
// FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
// @Override
// public Integer call() throws Exception {
// return new Random().nextInt();
// }
// });
// executor.submit(task);
try {
System.out.println("result: " + task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}运行结果示例:
result: 297483790或者:
result: -3584908093. Future 接口的局限性
了解了 Future 的使用后,我们需要谈谈它的局限性。Future 很难直接表述多个 Future 结果之间的依赖性。在开发中,我们经常需要达成以下目的:
- 将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)。
- 等待
Future集合中的所有任务都完成。 - 仅等待
Future集合中最快结束的任务完成,并返回它的结果。
二、CompletableFuture
CompletableFuture 类实现了 CompletionStage 和 Future 接口,因此你可以像使用 Future 那样使用它,同时提供了更强大的功能。
下面通过例子来逐步解释 CompletableFuture 的使用。
创建 CompletableFuture 对象
说明:以Async结尾的方法都是可以异步执行的。如果指定了线程池,会在指定的线程池中执行;如果没有指定,默认会在ForkJoinPool.commonPool()中执行。下文很多方法类似,不再做特别说明。
提供了四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象。方法的参数类型都是函数式接口,所以可以使用 Lambda 表达式实现异步任务。
runAsync方法:以Runnable函数式接口类型为参数,因此CompletableFuture的计算结果为空。supplyAsync方法:以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)1. 变换结果
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);这些方法的输入是上一个阶段计算后的结果,返回值是经过转化后的结果。
示例:
import java.util.concurrent.CompletableFuture;
/**
* @program: callable
* @description: test
* @author: Mr.Wang
* @create: 2018-08-12 12:36
**/
public class TestCompleteFuture {
public static void main(String[] args) {
String result = CompletableFuture.supplyAsync(() -> {
return "Hello ";
}).thenApplyAsync(v -> v + "world").join();
System.out.println(result);
}
}结果:
Hello world2. 消费结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);这些方法只是针对结果进行消费,入参是 Consumer,没有返回值。
示例:
import java.util.concurrent.CompletableFuture;
/**
* @program: callable
* @description: test
* @author: Mr.Wang
* @create: 2018-08-12 12:36
**/
public class TestCompleteFuture {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return "Hello ";
}).thenAccept(v -> {
System.out.println("consumer: " + v);
});
}
}结果:
consumer: Hello 3. 合并两个 CompletionStage 的结果
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);需要上一阶段的返回值,并且 other 代表的 CompletionStage 也要返回值之后,把这两个返回值进行转换后返回指定类型的值。
说明:同样,也存在对两个CompletionStage结果进行消耗的一组方法,例如thenAcceptBoth,这里不再进行示例。
示例:
import java.util.concurrent.CompletableFuture;
/**
* @program: callable
* @description: test
* @author: Mr.Wang
* @create: 2018-08-12 12:36
**/
public class TestCompleteFuture {
public static void main(String[] args) {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> {
return s1 + " " + s2;
}).join();
System.out.println(result);
}
}结果:
Hello world4. 竞争执行:谁快用谁
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);两种渠道完成同一个事情,就可以调用这个方法,找一个最快的结果进行处理,最终有返回值。
示例:
import java.util.concurrent.CompletableFuture;
/**
* @program: callable
* @description: test
* @author: Mr.Wang
* @create: 2018-08-12 12:36
**/
public class TestCompleteFuture {
public static void main(String[] args) {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hi Boy";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hi Girl";
}), (s) -> {
return s;
}).join();
System.out.println(result);
}
}结果:
Hi Boy5. 异常处理
运行时出现了异常,可以通过 exceptionally 进行补偿。
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);示例:
import java.util.concurrent.CompletableFuture;
/**
* @program: callable
* @description: test
* @author: Mr.Wang
* @create: 2018-08-12 12:36
**/
public class TestCompleteFuture {
public static void main(String[] args) {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (true) {
throw new RuntimeException("exception test!");
}
return "Hi Boy";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "Hello world!";
}).join();
System.out.println(result);
}
}结果:
java.lang.RuntimeException: exception test!
Hello world!三、总结
通过以上示例,基本上就对 CompletableFuture 的使用比较清楚了。后续可以进一步探讨 CompletableFuture 的实现原理。
说明:本文基于 Java 8+ 版本特性编写。Future接口自 Java 1.5 引入,CompletableFuture自 Java 8 引入。示例代码中的时间戳为原文创作时间(2018 年),不影响代码逻辑的适用性。
版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/future-he-completablefuture-jie-xi-yu-shi-yong.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。