并发编程-执行异步任务

异步编程的目标是:提交一个任务给线程池,在任务执行期间,提交者可以执行其他的逻辑,当提交的任务执行完成,通知提交者来获取执行结果

jdk并发包中的异步编程是通过Future 接口实现的:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

泛型V是任务执行结果的类型。

我们通过如下方式提交任务给线程池:

1
2
3
ThreadPool.submit(new Callable<String>()->{
return "zpd";
});

submit接收一个Callable类型的任务,但是我们知道,线程池一般都是通过execut方法来执行任务,且execute只接受Runnable类型的任务,Callable任务又是怎么执行的?

那我们来看一下submit方法的源码:

1
2
3
4
5
6
7
//AbstractExecutorService
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

可以看到Callable又被封装成了FutureTask对象再执行,FutureTask实现了RunnableFuture接口

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

原来FutureTask就是Runnable和Future的合体,意味着Callable被封装成RunnableFuture之后,即可以直接丢给execut方法执行,又能使用Future接口的方法实现异步功能。

1
2
3
4
5
6
7
8
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
// 执行过程
private Callable<V> callable;
// 执行结果或者异常
private Object outcome;
...
}

FutureTask类的两个关键对象,其中一个就是对提交的Callable对象的引用。

这里我整理一下:最终被线程池执行的对象是FutureTask,它本身是一个Runnable,且持有一个Callable对象,因此线程池执行它的时候,一定是执行它的run方法,而run方法内部肯定调用了Callable对象的call方法,因为提交的任务逻辑就是call方法。

submit方法对Runnable类型的任务也做了适配。看源码:

1
2
3
4
5
6
7
//AbstractExecutorService
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

我们在传入Runnable型任务的时候,由于其执行体没有返回值,因此还需要传入另一个参数来代表执行完成的返回结果,这样在将Runnable封装成FutureTask时,可以使用适配器将Runnable任务和 result 转换成一个Callable,再去构建 FutureTask对象

贴出两个newTaskFor方法:

1
2
3
4
5
6
7
//AbstractExecutorService
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

我看到网上一些资料对比callable与runnable的区别:

1、callable 有返回值,runnable不支持返回值

2、callable 可以抛出异常,runnable则不支持

我认为这样对比的意义不大,因为这两个接口本来既不是一个层级的,Runnable是可以直接被线程执行的,而Callable需要通过再封装成Runnable,并在封装层的run方法调用中才能执行,call方法可以有返回值,可以抛异常也只是针对封装层的FutureTask对象而言,返回的结果给了FutureTask,异常也抛给了FutureTask,用户想要获取返回值或者异常,需要主动的写代码获取。

因此Future异步任务的执行过程,并不是真正的异步,最主要的问题是没有实现回调,只能称为同步非阻塞。所以在java8中又新增了一个真正的异步函数,CompletableFuture

Java 8 中新增加了一个类:CompletableFuture,它提供了非常强大的 Future 的扩展功能,最重要的是实现了回调的功能。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CallableFutureTest {
public static void main(String[] args) {
System.out.println("start");
/**
* 异步非阻塞
*/
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("done");
}
}

CompletableFuture.runAsync()方法提供了异步执行无返回值任务的功能。

1
2
3
4
5
6
ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// do something
return "result";
}, executorService);

CompletableFuture.supplyAsync()方法提供了异步执行有返回值任务的功能。

CompletableFuture源码中有四个静态方法用来执行异步任务:

1
2
3
4
5
6
7
8
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}

前面两个可以看到是带返回值的方法,后面两个是不带返回值的方法。同时支持传入自定义的线程池,如果不传入线程池的话默认是使用 ForkJoinPool.commonPool()作为它的线程池执行异步代码。

合并两个异步任务

如果有两个任务需要异步执行,且后面需要对这两个任务的结果进行合并处理,CompletableFuture 也支持这种处理:

1
2
3
4
5
6
7
8
9
10
11
ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
return task1 + task2; // return "Task1Task2" String
});

通过 CompletableFuture.thenCombineAsync()方法获取两个任务的结果然后进行相应的操作。

下一个依赖上一个的结果

如果第二个任务依赖第一个任务的结果:

1
2
3
4
5
6
7
8
9
10
ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
return CompletableFuture.supplyAsync(() -> {
return task1 + "Task2"; // return "Task1Task2" String
});
}, executorService);

CompletableFuture.thenComposeAsync()支持将第一个任务的结果传入第二个任务中。

常用 API 介绍

  1. 拿到上一个任务的结果做后续操作,上一个任务完成后的动作
1
2
3
4
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

上面四个方法表示在当前阶段任务完成之后下一步要做什么。whenComplete 表示在当前线程内继续做下一步,带 Async 后缀的表示使用新线程去执行。

  1. 拿到上一个任务的结果做后续操作,使用 handler 来处理逻辑,可以返回与第一阶段处理的返回类型不一样的返回类型。

    1
    2
    3
    public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

    Handler 与 whenComplete 的区别是 handler 是可以返回一个新的 CompletableFuture 类型的。

    1
    2
    3
    4
    5
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    return "hahaha";
    }).handle((r, e) -> {
    return 1;
    });
  2. 拿到上一个任务的结果做后续操作, thenApply方法

    1
    2
    3
    public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

    注意到 thenApply 方法的参数中是没有 Throwable,这就意味着如有有异常就会立即失败,不能在处理逻辑内处理。且 thenApply 返回的也是新的 CompletableFuture。 这就是它与前面两个的区别。

  3. 拿到上一个任务的结果做后续操作,可以不返回任何值,thenAccept方法

    1
    2
    3
    public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

    看这里的示例:

    1
    2
    3
    4
    5
    6
    7
    CompletableFuture.supplyAsync(() -> {
    return "result";
    }).thenAccept(r -> {
    System.out.println(r);
    }).thenAccept(r -> {
    System.out.println(r);
    });

    执行完毕是不会返回任何值的。

CompletableFuture 的特性提现在执行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

另外当你依赖 CompletableFuture 的计算结果才能进行下一步的时候,无需手动判断当前计算是否完成,可以通过 CompletableFuture 的事件监听自动去完成。