一、Future 模式

自 Java 1.5 起,Java 提供了 CallableFuture 接口,允许我们在任务执行完毕后获取执行结果。

Future 接口是构建异步应用的基础,也是多线程开发中常见的设计模式。

当我们调用一个函数方法时,如果该函数执行缓慢,调用者通常需要进行等待。但在某些场景下,我们并不急于立即获取结果。此时,我们可以让被调用者立即返回,在后台慢慢处理请求;对于调用者而言,则可以先处理其他任务,待真正需要数据时再去尝试获取结果。

1. Callable 与 Runnable

java.lang.Runnable 是一个接口,其中只声明了一个 run() 方法。该方法的返回值为 void,因此任务执行完毕后无法返回任何结果。

public interface Runnable {
    public abstract void run();
}

Callable 位于 java.util.concurrent 包下,它也是一个接口,其中声明了一个 call() 方法。这是一个泛型接口,call() 函数返回的类型即为泛型参数 V 的类型。

public interface Callable<V> {
    V call() throws Exception;
}

2. Future 与 Callable 的使用

Future 用于对具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成以及获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

如何使用 FutureCallable 呢?一般情况下是配合 ExecutorService 来使用的。在 ExecutorService 接口中声明了若干个 submit 方法的重载版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

以下是 Future + Callable 的使用示例。这里演示了直接使用 FutureTask 配合 Thread 的方式,同时也注释展示了配合 ExecutorService 的方式:

import java.util.Random;
import java.util.concurrent.*;

/**
 * @program: callable
 * @description: testfuture
 * @author: Mr.Wang
 * @create: 2018-08-12 12:11
 **/
public class Testfuture {
    public static void main(String[] args) {
        // 第一种方式:直接使用 FutureTask 配合 Thread
        FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt();
            }
        });
        new Thread(task).start();

        // 第二种方式:配合 ExecutorService (示例代码已注释)
        // ExecutorService executor = Executors.newSingleThreadExecutor();
        // FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
        //     @Override
        //     public Integer call() throws Exception {
        //         return new Random().nextInt();
        //     }
        // });
        // executor.submit(task);

        try {
            System.out.println("result: " + task.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果示例:

result: 297483790

或者:

result: -358490809

3. Future 接口的局限性

了解了 Future 的使用后,我们需要谈谈它的局限性。Future 很难直接表述多个 Future 结果之间的依赖性。在开发中,我们经常需要达成以下目的:

  • 将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)。
  • 等待 Future 集合中的所有任务都完成。
  • 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。

二、CompletableFuture

CompletableFuture 类实现了 CompletionStageFuture 接口,因此你可以像使用 Future 那样使用它,同时提供了更强大的功能。

下面通过例子来逐步解释 CompletableFuture 的使用。

创建 CompletableFuture 对象

说明:以 Async 结尾的方法都是可以异步执行的。如果指定了线程池,会在指定的线程池中执行;如果没有指定,默认会在 ForkJoinPool.commonPool() 中执行。下文很多方法类似,不再做特别说明。

提供了四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象。方法的参数类型都是函数式接口,所以可以使用 Lambda 表达式实现异步任务。

  • runAsync 方法:以 Runnable 函数式接口类型为参数,因此 CompletableFuture 的计算结果为空。
  • supplyAsync 方法:以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

1. 变换结果

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

这些方法的输入是上一个阶段计算后的结果,返回值是经过转化后的结果。

示例:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args) {
        String result = CompletableFuture.supplyAsync(() -> {
            return "Hello ";
        }).thenApplyAsync(v -> v + "world").join();
        System.out.println(result);
    }
}

结果:

Hello world

2. 消费结果

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

这些方法只是针对结果进行消费,入参是 Consumer,没有返回值。

示例:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return "Hello ";
        }).thenAccept(v -> {
            System.out.println("consumer: " + v);
        });
    }
}

结果:

consumer: Hello 

3. 合并两个 CompletionStage 的结果

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

需要上一阶段的返回值,并且 other 代表的 CompletionStage 也要返回值之后,把这两个返回值进行转换后返回指定类型的值。

说明:同样,也存在对两个 CompletionStage 结果进行消耗的一组方法,例如 thenAcceptBoth,这里不再进行示例。

示例:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args) {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> {
            return s1 + " " + s2;
        }).join();
        System.out.println(result);
    }
}

结果:

Hello world

4. 竞争执行:谁快用谁

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);

两种渠道完成同一个事情,就可以调用这个方法,找一个最快的结果进行处理,最终有返回值。

示例:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args) {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hi Boy";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hi Girl";
        }), (s) -> {
            return s;
        }).join();
        System.out.println(result);
    }
}

结果:

Hi Boy

5. 异常处理

运行时出现了异常,可以通过 exceptionally 进行补偿。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

示例:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args) {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (true) {
                throw new RuntimeException("exception test!");
            }
            return "Hi Boy";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "Hello world!";
        }).join();
        System.out.println(result);
    }
}

结果:

java.lang.RuntimeException: exception test!
Hello world!

三、总结

通过以上示例,基本上就对 CompletableFuture 的使用比较清楚了。后续可以进一步探讨 CompletableFuture 的实现原理。

说明:本文基于 Java 8+ 版本特性编写。Future 接口自 Java 1.5 引入,CompletableFuture 自 Java 8 引入。示例代码中的时间戳为原文创作时间(2018 年),不影响代码逻辑的适用性。