异步编程CompletableFuture中两个任务并行执行

这是我参与8月更文挑战的第6天,活动详情查看: 8月更文挑战

当我们多个异步任务执行完成后,有时候需要让多个任务之间串联执行,有时候需要将多个任务的执行结果进行合并处理,这就需要等多个任务执行完成后在进行后续的操作,为了方便我们对于这样情况的处理,在CompletableFuture中也提供了一些方法给我们实现,实现多个任务的合并执行。

thenApply()使用

提供了三个方法来供我们使用,这方法的参数和前面两篇文章的参数含义都差不多,在这里就不拿出来单独分析,放在一起来看下如何使用他们以及他们的实现原理及区别,主要用途是待上个任务执行完成后,执行后续的任务。老规矩,先贴源码。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}
复制代码

从上面三个方法可以看出,内部执行都调用了uniApplyStage这个方法,只是对于线程池的参数有了区别,这线程池的区别跟异步任务创建的线程池原理一样,在这里就不详细分析了,在下面我们主要分析如何使用他们,以及他们的区别。在方法中都会传入Function,用于声明后续要执行的业务代码逻辑,在Function中有个T参数,这个代表上个任务执行成功后返回的结果,fn代表当前任务的结果数据类型,最终都会映射到CompletableFuture中的结果数据类型。我们接下来看如何使用这个方法。

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 123;
        }, executor);
        CompletableFuture<Integer> fun = future.thenApplyAsync(integer -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("上个任务执行结果:" + integer);
            return 3333;
        },executor);
        System.out.println("第二个任务:" + fun.get());
复制代码

执行结果:

pool-1-thread-1
pool-1-thread-2
上个任务执行结果:123
第二个任务:3333
复制代码

执行的结果可以看出,第二个任务是新开的一个线程来执行自己的任务。这个肯定要问thenApply和thenApplyAsync的区别,从函数名可以看出第二个是异步执行的,至于是不是这样的结果,我们从执行的结果来看

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 123;
        }, executor);
        CompletableFuture<Integer> fun = future.thenApply(integer -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("上个任务执行结果:" + integer);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3333;
        });
        System.out.println("第二个任务:" + fun.get());
复制代码

执行结果:

pool-1-thread-1
main
上个任务执行结果:123
第二个任务:3333
复制代码

执行结果可看出采用thenApply执行的任务,采用的是主线程来执行任务。

handle()使用

由于我们在运行代码时,难免会出现些异常信息需要抛出,从thenApply的三个方法中,都没有异常相关的处理逻辑,那如果我们要对异常进行处理,这个时候又该怎么办?别急,这些点CompletableFuture作者都替我们想到,在类中提供了handle()来供我们使用,handle()的使用效果与thenApply()的效用类似,只是在返回的方法中多个上步任务抛出的异常信息,在handle() 我们既可以处理正常的结果,也可以捕获异常后,对异常信息处理做一些异常处理操作,handle也提供了三个方法,在这里我就贴一下具体的哪三个方法,就不在举例怎么使用,使用方法和thenApply一样

thenAccept()使用

上面的两组方法,thenApply()和handle()都需要第二个任务执行完成后,放回具体的执行结果,但是我们在执行后,不需要返回的最终结果,研究CompletableFuture的原理一路走来,作者肯定给想到了这点,给我们提供方法来调用,不然在说好用呢,最起码一点就是考虑的场景比较丰富,能够满足我们各种复杂的业务场景,从现在我们看到的只是其中的一部分,好了,废话不多说,我们继续往下看。
先上源码,看一眼在说后面

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}
复制代码

看到这三个方法的源码,应该一眼就知道怎么使用,我在这里就不举例怎么使用,与上面的差别就是无返回值,很多眼尖的童鞋一眼就看出这个方法无法处理上个任务发生异常的情况,这肯定作者会想到的,大家不用急,马上就贴对异常处理的方法

thenRun()使用

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}
复制代码

看了这三个方法的源码,应该都懂了咋用,那就接着往下分析其走。

上面的方法都是串行执行,等上个任务执行完成后,在执行后面的任务,那有没有提供两人合并执行的方法呢?那答案肯定是必须的!!!

两任务合并执行

两任务合并执行,在执行的过程中,有些任务需要串行执行,有些任务需要并行执行,任务的串行执行上面的函数已经能够满足我们的需求,剩下的就需要解决我们的并行执行问题,这并行执行又会分为两种场景:

  • 一种:两个并行执行完成后或者到两个任务的执行结果,进行合并处理,最后有返回值
  • 另一种:两个并行任务执行万抽,谁先执行完,就以谁的结果为准,然后完成后续的业务处理,最后有返回值

两任务全部执行完成

thenCombine()使用

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("1:" + Thread.currentThread().getName());
            return 123;
        }, executor);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("2:" + Thread.currentThread().getName());
            return 123;
        }, executor);
        CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (f1, f2) -> {
            System.out.println("3:" + Thread.currentThread().getName());
            System.out.println("第1个任务结果:" + f1);
            System.out.println("第2个任务结果:" + f2);
            return 3;
        },executor);
        System.out.println("最后执行结果:" + future3.get());
复制代码

**
执行结果:

1:pool-1-thread-1
2:pool-1-thread-2
3:pool-1-thread-31个任务结果:1232个任务结果:123
最后执行结果:3
复制代码

从上面打印的结果可以看出,我们刚刚执行的步骤,相当于构建了三个异步异步,只不过第三个异步任务在等第一、二异步任务执行完成后在执行最后的任务。
好了,如果你认真仔细的看了我上面的文章,肯定就会猜到我想下面要写那些方法了,那肯定就是无返回值和接受任务异常的方法了。

thenAcceptBoth()使用

thenAcceptBoth()使用与thenCombine()使用方法类似,当两个任务执行完,获取两个任务的结果进行后续处理,只不过thenAcceptBoth()没有返回值

runAfterBoth()使用

上面两个方法是需要获取第一二个任务执行完成后返回的结果进行后续的操作处理,那肯定就会存在当两个任务执行完成后,不要前面两个任务的结果,并且不会有值返回

好了,上面两个任务并行处理的函数都已经贴完了,但是没有找到当前面两个任务执行异常后,怎么获取到异常信息,反正我是没有找到,估计作者等到后面的提供吧!!!接下来我们就看两个任务任意一个完成后触发后续操作。

两任务任意执行完成

看了两任务全部执行完成的API,想必都能够猜到任意任务执行完成后的方法怎么使用了,下面我就贴贴具体的方法,实在是太困了。

applyToEither()使用

两个任务执行完成,谁先执行完,就用谁的执行结果,并且对后续操作完成后有结果返回

acceptEither()使用

acceptEither()与applyToEither差不多,只是它没有返回值

runAfterEither()使用

两个任务执行完成,谁先执行完,就执行后面的操作,不需要前面两个任务的执行结果

好了,到这里两个任务讲解都差不多了,这没有过多的去分析里面的源码怎么实现逻辑,更多的是找寻针对不同的场景用怎样的方法,主要是因为小弟才疏学浅,对里面的源码分析不清楚,怕误人子弟,就不进行分析源码了,还有就是知识不是所有都要去精通实现原理,更多情况下是能够用就行,待需要深入研究的时候再去研究具体的实现。
在这里又到了这篇博文的结尾了,肯定有人疑惑,这两个任务怎么够我使用,万一有多个任务我该怎么办呢,莫慌,下一遍文章就来讲解多任务的执行相关操作。哈哈哈,老规矩哦,请听下回分解。