跳至主要內容

allOf 、anyOf

Jin大约 3 分钟

allOf 、anyOf

初识

官网APIopen in new window

这两个方法的入参是一个completableFuture组、allOf就是所有任务都完成时返回,但是是个Void的返回值。

anyOf是当入参的completableFuture组中**有一个任务执行完毕就返回**,返回结果是第一个完成的任务的结果。

    @Test
    public void demo1() throws ExecutionException, InterruptedException {
        final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("futureOne InterruptedException");
            }
            return "futureOneResult";
        });
        final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                System.out.println("futureTwo InterruptedException");
            }
            return "futureTwoResult";
        });
        CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
        System.out.println(future.get());//null
        CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
        System.out.println(completableFuture.get());//futureOneResult
    }

allOf

allOf : 可以探测,所有的异步任务,是否全部执行完了。 举个例子,集齐七颗龙珠可以召唤神龙。 就可以分七个支线,每条支线找一颗龙珠。

不用CompletableFuture版本

 public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        int count = 7;
        CountDownLatch latch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            int num = i + 1;
            pool.submit(() -> {
                new Dragon(num).findPreciousness();
                latch.countDown();
            });
        }
        String mainThread = Thread.currentThread().getName();
        System.out.println(mainThread + "收集龙珠,等待中……");
        latch.await();
        long end = System.currentTimeMillis() - start;
        System.out.println(mainThread + "线程历经" + end + "毫秒,终于集齐七颗龙珠,开始召唤神龙!!");
        pool.shutdown();
    }

    static class Dragon {

        private int num;

        Dragon(int num) {
            this.num = num;
        }

        private void findPreciousness() {
            int cost = RandomUtil.randomInt(1, num + 1) * 1000;
            try {
                Thread.sleep(cost);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "线程历经 " + cost + " 毫秒找到第 " + num + " 颗龙珠");
        }
    }

这个异步版本,用 CountDownLatch 来保证,所有异步任务都执行完。

集齐七颗龙珠的任务,交给线程池异步处理,

重点:当子任务全部完成时,主线程继续执行,否则阻塞。

再看下,用 CompletableFutureallOf 方法实现的版本

public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        int count = 7;
        CompletableFuture[] temp = new CompletableFuture[count];
        for (int i = 0; i < count; i++) {
            int num = i + 1;
            temp[num - 1] = CompletableFuture.runAsync(() -> new Dragon(num).findPreciousness(), pool);
        }
        String mainThread = Thread.currentThread().getName();
        System.out.println(mainThread + "收集龙珠,等待中……");
        CompletableFuture.allOf(temp).join();
        long end = System.currentTimeMillis() - start;
        System.out.println(mainThread + "线程历经" + end + "毫秒,终于集齐七颗龙珠,开始召唤神龙!!");
        pool.shutdown();
    }

示例中 allOf 方法,实现了 CountDownLatch 的功能。

当然,如果你对 stream 写法很熟悉,那代码可以很简单。

 public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        CompletableFuture[] futures = IntStream.rangeClosed(1, 7)
                .mapToObj(num -> new Dragon(num))
                .map(dragon -> CompletableFuture.runAsync(dragon::findPreciousness, pool))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
        pool.shutdown();
    }

anyOf

anyOf : 所有的异步任务中,任意一个完成了,就可以被探测到。 举个例子,集齐七颗龙珠可以召唤神龙,这个任务比较难。 找到任意一颗,就摆两桌庆祝下,不管是哪个支线找到的。

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        CompletableFuture[] futures = IntStream.rangeClosed(1, 7)
                .mapToObj(num -> new Dragon(num))
                .map(dragon -> CompletableFuture.runAsync(dragon::findPreciousness, pool))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.anyOf(futures).join();
        pool.shutdown();

只需要修改一行代码,就可以了。把 allOf 改为 anyOf 即可。

同样用 CountDownLatch 也可以实现,也是改一行

CountDownLatch latch = new CountDownLatch(1);

allOf 原理分析

执行流程

  1. 主线程提交任务到线程池。
  2. 指主线程调用 allOf 方法,生成 一个 CompletableFuture 对象 allOfFuture
  3. 线程池的工作线程,会执行异步任务,所有异步任务执行完,会给刚刚那个 allOfFuture 对象打个标识。
  4. 是主线程会判断那个标识,标识存在,说明所有异步任务已执行完。若标识不存在,主线程阻塞,等待所有异步任务执行结束时,主线程会被唤醒。

参考

贡献者: Jin