CompletableFuture
4个静态方法
-
runAsync(无返回值)
-
public static CompletableFuture runAsync(Runnable runnable)
-
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
-
-
supplyAsync(有返回值)
-
public static CompletableFuture supplyAsync(Supplier supplier)
-
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
-
其中,Executor executor 参数,在没有指定时,默认使用的是 ForkJoinPool.commonPool() 作为线程池执行异步代码
代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CompletableFuture.runAsync(() -> {
System.out.println("不指定线程池参数,当前线程:" + Thread.currentThread().getName());
});
CompletableFuture.runAsync(() -> {
System.out.println("指定线程池参数,当前线程:" + Thread.currentThread().getName());
}, executor);
executor.shutdown();
复制代码
输出结果:
指定线程池参数,当前线程:pool-1-thread-1
不指定线程池参数,当前线程:ForkJoinPool.commonPool-worker-3
复制代码
常用方法
1. 获取结果和触发计算
获取结果:
- public T get() // 阻塞等待计算完成获取结果
- public T join() // 阻塞等待计算完成获取结果
- public T get(long timeout, TimeUnit unit) // 等待指定时间,到时间后没获取到结果则抛出异常
- public T getNow(T valueIfAbsent) // 立即获取结果,计算完返回结果,没计算完返回设置的值valueIfAbsent
代码示例:
Integer result = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 123;
}).join();
System.out.println(result);
CompletableFuture<Integer> futureResult = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 123;
});
System.out.println(futureResult.get());
System.out.println(futureResult.getNow(456));
System.out.println(futureResult.get(2, TimeUnit.SECONDS));
复制代码
主动触发计算:
- public boolean complete(T value)
代码示例:
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 11;
});
// 当调用 get() 被阻塞时, complete方法就是结束阻塞并 get() 获取设置的 complete 里面的值
System.out.println(future.complete(666)); // 输出 true
System.out.println(future.get()); // 输出666
}
复制代码
2. 对计算结果进行处理
- thenApply
- handle
当一个线程依赖另一个线程时,用 handle 方法来把这两个线程串行化
public static void main(String[] args) throws InterruptedException {
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(11);
return 11;
}).handle((r, e) -> {
System.out.println(22);
return r + 1;
}).handle((r, e) -> {
System.out.println(33);
return r + 1;
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
TimeUnit.SECONDS.sleep(2);
}
复制代码
3. 对计算结果进行消费
- thenRun
任务A执行完,再执行B,B不需要A的结果
CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
}).thenRun(() -> System.out.println("finish"));
复制代码
- thenAccept
任务A执行完,再执行B,B需要A的结果,但是任务B无返回值
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
}).thenAccept(System.out::println);
复制代码
- thenApply
任务A执行完,再执行B,B需要A的结果,同时任务B有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
}).thenApply(a -> {
// 1
System.out.println(a);
return a + 1;
}).thenApply(b -> {
// 2
System.out.println(b);
return b + 2;
});
// 4
System.out.println(future.get());
复制代码
4. 对计算速度进行选用(谁快用谁)
- applyToEither
示例代码:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程1:" + Thread.currentThread().getName() + " 开始计算");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("线程2:" + Thread.currentThread().getName() + " 开始计算");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
}), f -> {
System.out.println("谁计算快用谁");
return f + 1;
});
System.out.println(future.get());
复制代码
输出结果:
线程1:ForkJoinPool.commonPool-worker-3 开始计算
线程2:ForkJoinPool.commonPool-worker-5 开始计算
谁计算快用谁
11
复制代码
5. 对计算结果进行合并
- thenCombine
使用场景: 请求多个接口的数据,合并后进行返回
示例代码:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程1:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println("线程2:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
}), (a, b) -> {
System.out.println("对计算结果进行合并");
return a + b;
});
System.out.println(future.get());
复制代码
输出结果:
线程1:ForkJoinPool.commonPool-worker-3
线程2:ForkJoinPool.commonPool-worker-7
对计算结果进行合并
30
复制代码
示例
多平台商品比价,两种方式执行耗时比较
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class CompletableFutureNetMallDemo {
static List<NetMall> list = List.of(new NetMall("jd"),
new NetMall("pdd"),
new NetMall("tmall"),
new NetMall("taobao"),
new NetMall("dangdang"),
new NetMall("shop1"),
new NetMall("shop2"),
new NetMall("shop3"),
new NetMall("shop4"),
new NetMall("shop5"));
public static void main(String[] args) {
var start = System.currentTimeMillis();
var firstList = getPriceByStep(list, "Java");
for (String s : firstList) {
System.out.println(s);
}
System.out.println("非同步操作执行耗时:" + (System.currentTimeMillis() - start) + " 毫秒");
var start1 = System.currentTimeMillis();
var secondList = getPriceAsync(list, "Java");
for (String s : secondList) {
System.out.println(s);
}
System.out.println("CompletableFuture异步操作执行耗时:" + (System.currentTimeMillis() - start1) + " 毫秒");
}
/**
* 非同步操作
*
* @param list 平台列表
* @param productName 商品名称
* @return 计算结果
*/
public static List<String> getPriceByStep(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
/**
* CompletableFuture 异步操作
*
* @param list 平台列表
* @param productName 商品名称
* @return 计算结果
*/
public static List<String> getPriceAsync(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
class NetMall {
private final String mallName;
public String getMallName() {
return mallName;
}
public NetMall(String mallName) {
this.mallName = mallName;
}
/**
* 计算价格
*
* @param productName 商品名称
* @return 随机数价格
*/
public double calcPrice(String productName) {
// 返回随机数
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
复制代码
近期评论