allOf 、anyOf
大约 3 分钟
allOf 、anyOf
初识
这两个方法的入参是一个completableFuture
组、allOf就是所有任务都完成时返回,但是是个Void的返回值。
anyOf是当入参的completableFuture
组中**有一个任务执行完毕就返回**,返回结果是第一个完成的任务的结果。
@Test
public void demo1() throws ExecutionException, InterruptedException {
final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
return "futureOneResult";
});
final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
System.out.println("futureTwo InterruptedException");
}
return "futureTwoResult";
});
CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
System.out.println(future.get());//null
CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
System.out.println(completableFuture.get());//futureOneResult
}
allOf
allOf : 可以探测,所有的异步任务,是否全部执行完了。 举个例子,集齐七颗龙珠可以召唤神龙。 就可以分七个支线,每条支线找一颗龙珠。
不用CompletableFuture版本
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
int count = 7;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
int num = i + 1;
pool.submit(() -> {
new Dragon(num).findPreciousness();
latch.countDown();
});
}
String mainThread = Thread.currentThread().getName();
System.out.println(mainThread + "收集龙珠,等待中……");
latch.await();
long end = System.currentTimeMillis() - start;
System.out.println(mainThread + "线程历经" + end + "毫秒,终于集齐七颗龙珠,开始召唤神龙!!");
pool.shutdown();
}
static class Dragon {
private int num;
Dragon(int num) {
this.num = num;
}
private void findPreciousness() {
int cost = RandomUtil.randomInt(1, num + 1) * 1000;
try {
Thread.sleep(cost);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "线程历经 " + cost + " 毫秒找到第 " + num + " 颗龙珠");
}
}
这个异步版本,用 CountDownLatch
来保证,所有异步任务都执行完。
集齐七颗龙珠的任务,交给线程池异步处理,
重点:当子任务全部完成时,主线程继续执行,否则阻塞。
再看下,用 CompletableFuture
的 allOf
方法实现的版本
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
int count = 7;
CompletableFuture[] temp = new CompletableFuture[count];
for (int i = 0; i < count; i++) {
int num = i + 1;
temp[num - 1] = CompletableFuture.runAsync(() -> new Dragon(num).findPreciousness(), pool);
}
String mainThread = Thread.currentThread().getName();
System.out.println(mainThread + "收集龙珠,等待中……");
CompletableFuture.allOf(temp).join();
long end = System.currentTimeMillis() - start;
System.out.println(mainThread + "线程历经" + end + "毫秒,终于集齐七颗龙珠,开始召唤神龙!!");
pool.shutdown();
}
示例中 allOf
方法,实现了 CountDownLatch
的功能。
当然,如果你对 stream
写法很熟悉,那代码可以很简单。
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
CompletableFuture[] futures = IntStream.rangeClosed(1, 7)
.mapToObj(num -> new Dragon(num))
.map(dragon -> CompletableFuture.runAsync(dragon::findPreciousness, pool))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
pool.shutdown();
}
anyOf
anyOf : 所有的异步任务中,任意一个完成了,就可以被探测到。 举个例子,集齐七颗龙珠可以召唤神龙,这个任务比较难。 找到任意一颗,就摆两桌庆祝下,不管是哪个支线找到的。
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
CompletableFuture[] futures = IntStream.rangeClosed(1, 7)
.mapToObj(num -> new Dragon(num))
.map(dragon -> CompletableFuture.runAsync(dragon::findPreciousness, pool))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.anyOf(futures).join();
pool.shutdown();
只需要修改一行代码,就可以了。把 allOf
改为 anyOf
即可。
同样用 CountDownLatch
也可以实现,也是改一行
CountDownLatch latch = new CountDownLatch(1);
allOf 原理分析
执行流程
- 主线程提交任务到线程池。
- 指主线程调用
allOf
方法,生成 一个CompletableFuture
对象allOfFuture
- 线程池的工作线程,会执行异步任务,所有异步任务执行完,会给刚刚那个
allOfFuture
对象打个标识。 - 是主线程会判断那个标识,标识存在,说明所有异步任务已执行完。若标识不存在,主线程阻塞,等待所有异步任务执行结束时,主线程会被唤醒。