JDK8中CompletableFuture异步操作基本用法

CompletableFuture

4个静态方法

  • runAsync(无返回值)

    1. public static CompletableFuture runAsync(Runnable runnable)

    2. public static CompletableFuture runAsync(Runnable runnable, Executor executor)

  • supplyAsync(有返回值)

    1. public static CompletableFuture supplyAsync(Supplier supplier)

    2. public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

其中,Executor executor 参数,在没有指定时,默认使用的是 ForkJoinPool.commonPool() 作为线程池执行异步代码

代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

CompletableFuture.runAsync(() -> {
    System.out.println("不指定线程池参数,当前线程:" + Thread.currentThread().getName());
});

CompletableFuture.runAsync(() -> {
    System.out.println("指定线程池参数,当前线程:" + Thread.currentThread().getName());
}, executor);

executor.shutdown();
复制代码
输出结果:
指定线程池参数,当前线程:pool-1-thread-1
不指定线程池参数,当前线程:ForkJoinPool.commonPool-worker-3
复制代码

常用方法

1. 获取结果和触发计算

获取结果:

  • public T get() // 阻塞等待计算完成获取结果
  • public T join() // 阻塞等待计算完成获取结果
  • public T get(long timeout, TimeUnit unit) // 等待指定时间,到时间后没获取到结果则抛出异常
  • public T getNow(T valueIfAbsent) // 立即获取结果,计算完返回结果,没计算完返回设置的值valueIfAbsent

代码示例:

Integer result = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 123;
}).join();
System.out.println(result);

CompletableFuture<Integer> futureResult = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 123;
});
System.out.println(futureResult.get());
System.out.println(futureResult.getNow(456));
System.out.println(futureResult.get(2, TimeUnit.SECONDS));
复制代码

主动触发计算:

  • public boolean complete(T value)

代码示例:

public static void main(String[] args) throws InterruptedException, ExecutionException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 11;
    });

    // 当调用 get() 被阻塞时, complete方法就是结束阻塞并 get() 获取设置的 complete 里面的值
    System.out.println(future.complete(666));  // 输出 true
    System.out.println(future.get());  // 输出666
}
复制代码

2. 对计算结果进行处理

  • thenApply
  • handle

当一个线程依赖另一个线程时,用 handle 方法来把这两个线程串行化

public static void main(String[] args) throws InterruptedException {

    CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(11);
        return 11;
    }).handle((r, e) -> {
        System.out.println(22);
        return r + 1;
    }).handle((r, e) -> {
        System.out.println(33);
        return r + 1;
    }).exceptionally(e -> {
        e.printStackTrace();
        return null;
    });

    TimeUnit.SECONDS.sleep(2);
}
复制代码

3. 对计算结果进行消费

  • thenRun

任务A执行完,再执行B,B不需要A的结果

CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName());
}).thenRun(() -> System.out.println("finish"));
复制代码
  • thenAccept

任务A执行完,再执行B,B需要A的结果,但是任务B无返回值

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    return 1;
}).thenAccept(System.out::println);
复制代码
  • thenApply

任务A执行完,再执行B,B需要A的结果,同时任务B有返回值

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    return 1;
}).thenApply(a -> {
    // 1
    System.out.println(a);
    return a + 1;
}).thenApply(b -> {
    // 2
    System.out.println(b);
    return b + 2;
});
// 4
System.out.println(future.get());
复制代码

4. 对计算速度进行选用(谁快用谁)

  • applyToEither

示例代码:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程1:" + Thread.currentThread().getName() + " 开始计算");
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
    System.out.println("线程2:" + Thread.currentThread().getName() + " 开始计算");
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 20;
}), f -> {
    System.out.println("谁计算快用谁");
    return f + 1;
});

System.out.println(future.get());
复制代码

输出结果:

线程1:ForkJoinPool.commonPool-worker-3 开始计算
线程2:ForkJoinPool.commonPool-worker-5 开始计算
谁计算快用谁
11
复制代码

5. 对计算结果进行合并

  • thenCombine

使用场景: 请求多个接口的数据,合并后进行返回

示例代码:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程1:" + Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
    System.out.println("线程2:" + Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 20;
}), (a, b) -> {
    System.out.println("对计算结果进行合并");
    return a + b;
});

System.out.println(future.get());
复制代码

输出结果:

线程1:ForkJoinPool.commonPool-worker-3
线程2:ForkJoinPool.commonPool-worker-7
对计算结果进行合并
30
复制代码

示例

多平台商品比价,两种方式执行耗时比较

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class CompletableFutureNetMallDemo {

    static List<NetMall> list = List.of(new NetMall("jd"),
            new NetMall("pdd"),
            new NetMall("tmall"),
            new NetMall("taobao"),
            new NetMall("dangdang"),
            new NetMall("shop1"),
            new NetMall("shop2"),
            new NetMall("shop3"),
            new NetMall("shop4"),
            new NetMall("shop5"));

    public static void main(String[] args) {
        var start = System.currentTimeMillis();
        var firstList = getPriceByStep(list, "Java");
        for (String s : firstList) {
            System.out.println(s);
        }
        System.out.println("非同步操作执行耗时:" + (System.currentTimeMillis() - start) + " 毫秒");

        var start1 = System.currentTimeMillis();
        var secondList = getPriceAsync(list, "Java");
        for (String s : secondList) {
            System.out.println(s);
        }
        System.out.println("CompletableFuture异步操作执行耗时:" + (System.currentTimeMillis() - start1) + " 毫秒");

    }

    /**
     * 非同步操作
     *
     * @param list        平台列表
     * @param productName 商品名称
     * @return 计算结果
     */
    public static List<String> getPriceByStep(List<NetMall> list, String productName) {
        return list.stream()
                .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    /**
     * CompletableFuture 异步操作
     *
     * @param list        平台列表
     * @param productName 商品名称
     * @return 计算结果
     */
    public static List<String> getPriceAsync(List<NetMall> list, String productName) {
        return list.stream()
                .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

}

class NetMall {
    private final String mallName;

    public String getMallName() {
        return mallName;
    }

    public NetMall(String mallName) {
        this.mallName = mallName;
    }

    /**
     * 计算价格
     *
     * @param productName 商品名称
     * @return 随机数价格
     */
    public double calcPrice(String productName) {
        // 返回随机数
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}
复制代码