自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

詳解 CompletableFuture 實踐

開發(fā)
CompletableFuture繼承了CompletionStage接口和Future接口,在原有Future的基礎(chǔ)上增加了異步回調(diào)、流式處理以及任務(wù)組合,成為JDK8多任務(wù)協(xié)同場景下一個有效利器。

CompletableFuture繼承了CompletionStage接口和Future接口,在原有Future的基礎(chǔ)上增加了異步回調(diào)、流式處理以及任務(wù)組合,成為JDK8多任務(wù)協(xié)同場景下一個有效利器。所以筆者今天就以此文演示一下CompletableFuture基礎(chǔ)實踐案例。

CompletableFuture基本設(shè)計

因為本文著重講解CompletableFuture的使用,所以這里我們就簡單的從類的繼承關(guān)系了解一下CompletableFuture的基本理念,結(jié)合CompletableFuture源碼注釋的說法,它是一個針對異步流程化的工具類,即它支持某個異步Future任務(wù)完成之后按照指定編排的順序觸發(fā)下一個依賴動作:

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion. When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

舉個例子,例如我現(xiàn)在在瀏覽器上某個網(wǎng)站的商家的產(chǎn)品,我希望瀏覽器能夠做到以下幾點:

  • 我針對性點選擇3個商家。
  • 3個門店在我勾選點擊查看信息后,分別開始查詢各自的產(chǎn)品數(shù)據(jù)。
  • 3家數(shù)據(jù)都完成數(shù)據(jù)加載后,網(wǎng)站歸并這些數(shù)據(jù),通過一個網(wǎng)頁渲染給我查看。

通過CompletableFuture,我們就可以完成通過CompletableFuture將上述商家的查詢?nèi)蝿?wù)進(jìn)行異步提交:

對此我們也可以從CompletableFuture的繼承關(guān)系了解其設(shè)計理念:

  • 它繼承Future接口使之具備阻塞獲取異步回調(diào)的能力。
  • 繼承CompletionStage接口,它永遠(yuǎn)thenApply等方法,通過這個繼承關(guān)系,使CompletableFuture具備異步任務(wù)順序編排的能力,即當(dāng)前異步任務(wù)一處理完成就執(zhí)行thenApply給定的異步邏輯,使得我們可以清晰明了的編排異步任務(wù):
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
 //......
}

提交有返回值的異步任務(wù)

通過supplyAsync提交我們的異步任務(wù),然后通過get方法等待異步任務(wù)完成并獲取返回結(jié)果。

public static void main(String[] args) throws Exception {
        //提交一個CompletableFuture任務(wù)
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("work complete! cost:" +(System.currentTimeMillis() - start)  + " ms");
            return 1;
        });


        System.out.println("main thread working");

        //通過get方法阻塞獲取任務(wù)執(zhí)行結(jié)果
        System.out.println("supplyAsync result: " + task.get());

        System.out.println("main thread finish");
    }

輸出結(jié)果如下,可以看出CompletableFuture的get方法會阻塞主線程工作,直到得到返回值為止。

main thread working
work complete! cost:1001 ms
supplyAsync result: 1
main thread finish

對此我們不妨來看看get方法是如何做到阻塞主線程并等待異步線程任務(wù)執(zhí)行完成的。從下面這段源碼我們可以看到get方法的執(zhí)行步驟:

  • 調(diào)用reportGet查看異步任務(wù)是否將結(jié)果賦值給result。
  • 如果不為null直接返回。
  • 若為null則調(diào)用waitingGet等待任務(wù)返回。
public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

查看reportGet方法可以看到邏輯也很簡單,如果r為空則直接拋中斷異常,如果r存在異常則直接將異常拋出,如果有結(jié)果則將結(jié)果返回。

private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        //如果結(jié)果為null直接拋出終端異常
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
         //如果結(jié)果有異常則將異常拋出
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        //如果r正常則直接將結(jié)果返回出去
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

waitingGet源碼相對復(fù)雜一些,整體步驟我們可以拆解為while循環(huán)內(nèi)部和while循環(huán)外部,我們先來看看while循環(huán)內(nèi)部的執(zhí)行流程:

  1. while循環(huán)從任務(wù)中獲取result,如果result為空,則進(jìn)入循環(huán)。
  2. 如果spins小于0,說明剛剛進(jìn)入循環(huán)內(nèi)部,可以自旋等待一下任務(wù)的獲取,設(shè)置好spins(spins的值從SPINS來,如果多核的情況下值為256),進(jìn)入下一次循環(huán)。
  3. 進(jìn)入循環(huán)發(fā)現(xiàn)spins大于0,則隨機(jī)生成一個數(shù),如果這個數(shù)大于等于0則--spins,進(jìn)入下次循環(huán)。
  4. 不斷執(zhí)行步驟3的操作,知道spins等于0。
  5. 此時判斷來到q==null,說明任務(wù)自旋等待了一段時間還是沒有結(jié)果,我們需要將其掛起,首先將線程封裝成一個Signaller,進(jìn)入下一次循環(huán)。
  6. 循環(huán)會判斷if (!queued),將要阻塞的任務(wù)放到棧中,進(jìn)入下一次循環(huán)。
  7. 循環(huán)下一次會來到if (q.thread != null && result == null),說明q線程不為空且沒有結(jié)果,我們需要將其打斷,調(diào)用ForkJoinPool.managedBlock(q)將其打斷,直至有結(jié)果后才結(jié)束循環(huán)。

while循環(huán)外操作就簡單了,來到循環(huán)尾部時,result已經(jīng)有值了,代碼執(zhí)行postComplete完成任務(wù),并將結(jié)果返回。

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        //如果result為空則進(jìn)入循環(huán)
        while ((r = result) == null) {
        //如果spins小于0,說明剛剛進(jìn)入循環(huán)內(nèi)部,可以自旋等待一下任務(wù)的獲取,設(shè)置好spins(spins的值從SPINS來,如果多核的情況下值為256),自此,第一次循環(huán)步驟結(jié)束
            if (spins < 0)
                spins = SPINS;

   //這一步的操作是自旋等待任務(wù)結(jié)果,所以代碼進(jìn)入循環(huán)發(fā)現(xiàn)spins大于0,則隨機(jī)生成一個數(shù),如果這個數(shù)大于等于0則--spins,進(jìn)入下次循環(huán),直到循環(huán)spins變?yōu)?
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            //此時判斷來到q==null,說明任務(wù)自旋等待了一段時間還是沒有結(jié)果,我們需要將其掛起,首先將線程封裝成一個Signaller,結(jié)束本次循環(huán)
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
   //上一步我們將任務(wù)封裝成Signaller,這里就將其存入棧中,然后結(jié)束循環(huán)
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            //循環(huán)來到這說明q線程不為空且沒有結(jié)果,我們需要將其打斷,調(diào)用`ForkJoinPool.managedBlock(q)`將其打斷,直至有結(jié)果后才結(jié)束循環(huán)
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        //結(jié)束循環(huán),調(diào)用postComplete結(jié)束任務(wù)并返回結(jié)果r
        postComplete();
        return r;
    }

提交無返回值的異步任務(wù)

通過runAsync提交一個無返回值的異步任務(wù),這里我們?yōu)榱藢崿F(xiàn)任務(wù)執(zhí)行完成再關(guān)閉主線程用了個get阻塞等待任務(wù)完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "開始工作了,執(zhí)行時間:" + start);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "結(jié)束工作了,總執(zhí)行時間:" + (System.currentTimeMillis() - start));
        });

        System.out.println("主線程開始運行");
        //get阻塞主線程等待任務(wù)結(jié)束
        supplyAsync.get();
        System.out.println("主線程運行結(jié)束");
    }

輸出結(jié)果:

主線程開始運行
ForkJoinPool.commonPool-worker-1開始工作了,執(zhí)行時間:1651251489755
ForkJoinPool.commonPool-worker-1結(jié)束工作了,總執(zhí)行時間:1010
主線程運行結(jié)束

將異步任務(wù)提交給自己的線程池處理

查看supplyAsync方法的源碼我們發(fā)現(xiàn),我們提交的任務(wù)默認(rèn)情況下會交給asyncPool這個線程池處理。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

查看asyncPool 我們可以看到如果服務(wù)器是多核的情況下返回的是一個commonPool,commonPool默認(rèn)線程池數(shù)為CPU核心數(shù)。

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

所以如果某些情況下我們希望將任務(wù)提交到我們自己的線程池中,就建議通過supplyAsync的第二個參數(shù)告知CompletableFuture自己要用自定義線程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        //使用第二個參數(shù)告知CompletableFuture使用的線程池
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread() + "開始工作了,執(zhí)行時間:" + start);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印當(dāng)前執(zhí)行任務(wù)的線程
            System.out.println(Thread.currentThread() + "結(jié)束工作了,總執(zhí)行時間:" + (System.currentTimeMillis() - start));
            return 1;
        }, executorService);

        System.out.println("主線程開始運行");
        System.out.println("輸出結(jié)果 " + supplyAsync.get());
        System.out.println("主線程運行結(jié)束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

從輸出結(jié)果也可以看出這里使用的線程池是我們自定義的線程池:

主線程開始運行
Thread[pool-1-thread-1,5,main]開始工作了,執(zhí)行時間:1651251851358
Thread[pool-1-thread-1,5,main]結(jié)束工作了,總執(zhí)行時間:2005
輸出結(jié)果 1
主線程運行結(jié)束

thenApply和thenApplyAsync

thenApply 適用那些需要順序執(zhí)行的異步任務(wù),例如我們希望將第一個任務(wù)的返回值交給第二個異步任務(wù),就可以使用thenApply將兩個任務(wù)組合起來。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "結(jié)束工作了");
            return 100;
        }, executorService);

        //將兩個任務(wù)組合起來
        CompletableFuture<String> task2 = task1.thenApply((data) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一個線程的結(jié)果為 " + data;
        });



        System.out.println("獲取組合任務(wù)結(jié)果");
        System.out.println("組合任務(wù)處理結(jié)果為: " + task2.get());
        System.out.println("獲取組合任務(wù)結(jié)果結(jié)束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

輸出結(jié)果可以看到,任務(wù)1執(zhí)行完成后任務(wù)2接著執(zhí)行了。

Thread[pool-1-thread-1,5,main]開始工作了
獲取組合任務(wù)結(jié)果
Thread[pool-1-thread-1,5,main]結(jié)束工作了
第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
組合任務(wù)處理結(jié)果為: 第一個線程的結(jié)果為 100
獲取組合任務(wù)結(jié)果結(jié)束

thenApplyAsync與thenApply不同的是,在第一個異步任務(wù)有指定線程池的情況下,第二個異步任務(wù)會被提交到其他線程池中,所以這里我們可以說明一個規(guī)律,帶有Async關(guān)鍵字的方法支持組合任務(wù)時,將任務(wù)提交到不同的線程池中。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread()+"開始工作了");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"結(jié)束工作了");
            return 100;
        },executorService);

        CompletableFuture<String> task2 = task1.thenApplyAsync((data) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一個線程的結(jié)果為 " + data;
        });



        System.out.println("獲取任務(wù)結(jié)果開始");
        System.out.println("任務(wù)的結(jié)果 "+task2.get());
        System.out.println("獲取任務(wù)結(jié)果結(jié)束");

        executorService.shutdown();
        while (executorService.isTerminated()){

        }
    }

輸出結(jié)果:

Thread[pool-1-thread-1,5,main]開始工作了
獲取任務(wù)結(jié)果開始
Thread[pool-1-thread-1,5,main]結(jié)束工作了
第二個線程:Thread[ForkJoinPool.commonPool-worker-9,5,main]開始工作了
任務(wù)的結(jié)果 第一個線程的結(jié)果為 100
獲取任務(wù)結(jié)果結(jié)束

thenAccept和thenRun

thenAccept和thenRun都會在上一個任務(wù)執(zhí)行結(jié)束后才會繼續(xù)執(zhí)行。兩者唯一區(qū)別時:

  • thenAccept在上一個任務(wù)執(zhí)行結(jié)束后,將上一個任務(wù)返回結(jié)果作為入?yún)?,但無返回值。

  • thenRun會在上一個任務(wù)執(zhí)行結(jié)束后才開始處理,既沒有入?yún)⒁矝]有返回值。

以下便是筆者的使用示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);


        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task線程:" + Thread.currentThread().getName() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task線程:" + Thread.currentThread().getName() + "結(jié)束工作了");
            return 200;
        }, executorService);

        CompletableFuture<Integer> task2 = task.thenApply((data) -> {
            System.out.println("task2線程:" + Thread.currentThread().getName() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2線程:" + Thread.currentThread().getName() + "執(zhí)行結(jié)束");
            return data;
        });

        //thenAccept 收上一個任務(wù)的入?yún)?,但無返回值
        CompletableFuture<Void> task3 = task2.thenAccept((data) -> {
            System.out.println("task3線程:" + Thread.currentThread().getName() + ",該任務(wù)接收上一個任務(wù)的結(jié)果,但無返回值,收到上一個任務(wù)的結(jié)果值為 " + data);
        });

        //thenRun在上一個任務(wù)結(jié)束后執(zhí)行,既無入?yún)⒁矡o出參
        CompletableFuture<Void> task4 = task3.thenRun(() -> {
            System.out.println("task4在上一個任務(wù)結(jié)束后繼續(xù)執(zhí)行,無入?yún)?也無返回值");
        });


        System.out.println("嘗試獲取最終執(zhí)行結(jié)果");
        task4.get();
        System.out.println("執(zhí)行任務(wù)直至task4 ");
        System.out.println("任務(wù)全部執(zhí)行結(jié)束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

輸出結(jié)果:

task線程:pool-1-thread-1開始工作了
嘗試獲取最終執(zhí)行結(jié)果
task線程:pool-1-thread-1結(jié)束工作了
task2線程:pool-1-thread-1開始工作了
task2線程:pool-1-thread-1執(zhí)行結(jié)束
task3線程:pool-1-thread-1,該任務(wù)接收上一個任務(wù)的結(jié)果,但無返回值,收到上一個任務(wù)的結(jié)果值為 200
task4在上一個任務(wù)結(jié)束后繼續(xù)執(zhí)行,無入?yún)?也無返回值
執(zhí)行任務(wù)直至task4 
任務(wù)全部執(zhí)行結(jié)束

exceptionally

假如我們的任務(wù)1執(zhí)行過程中可能報錯,我們希望能夠從邏輯的角度處理掉,那么我們就可以在任務(wù)1后面接一個exceptionally方法,然后再接上任務(wù)2。這樣一來,任務(wù)1執(zhí)行報錯就會走到exceptionally,反之就會走到任務(wù)2的代碼段:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1 開始工作了");
            //隨機(jī)生成被除數(shù),為0會拋出算術(shù)異常
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("task1 結(jié)束工作");
            return 200;
        });

        //假如task1報錯,任務(wù)會走到這個任務(wù)上
        CompletableFuture<Integer> exceptionally = task1.exceptionally((e) -> {
            System.out.println("上一個任務(wù)報錯了,錯誤信息" + e.getMessage());
            return -1;
        });

        CompletableFuture task2 = task1.thenAccept((param) -> {
            System.out.println("走到正常的結(jié)束分支了,task1執(zhí)行結(jié)果:" + param);
        });

        System.out.println("主線程開始運行");
//        調(diào)用錯誤捕獲的任務(wù)執(zhí)行結(jié)束也會自動走到正常結(jié)束的分支
        System.out.println("輸出結(jié)果 " + exceptionally.get());
        System.out.println("主線程運行結(jié)束");
    }

執(zhí)行正常的輸出結(jié)果:

task1 開始工作了
主線程開始運行
task1 結(jié)束工作
走到正常的結(jié)束分支了:200
輸出結(jié)果 200
主線程運行結(jié)束

執(zhí)行異常的輸出結(jié)果:

task1 開始工作了
主線程開始運行
上一個任務(wù)報錯了,錯誤信息java.lang.ArithmeticException: / by zero
輸出結(jié)果 -1
主線程運行結(jié)束

whenComplete

對于上面的例子,我們完全可以用whenComplete來簡化,whenComplete會接收兩個入?yún)?

  • 入?yún)?為上一個任務(wù)的返回值。
  • 入?yún)?比較特殊,如果上一個任務(wù)拋出異常,則第2個入?yún)⒉粸榭铡?/li>

所以上一個例子的代碼我們可以簡化成這樣,需要注意的是whenComplete返回結(jié)果是上一個任務(wù)的執(zhí)行結(jié)果,我們無法返回任務(wù)2的執(zhí)行結(jié)果。

public static void main(String[] args) {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務(wù)1開始工作");
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("任務(wù)1執(zhí)行結(jié)束,執(zhí)行結(jié)果:" + result);
            return result;
        });

        CompletableFuture<Integer> task2 = task.whenComplete((result, err) -> {
            System.out.println("任務(wù)2開始工作");

            if (err != null) {
                System.out.println("任務(wù)1執(zhí)行報錯,報錯原因:" + err.getMessage());
                return;
            }

            System.out.println("任務(wù)1正常結(jié)束,執(zhí)行結(jié)果:" + result);

        });


        try {
            System.out.println("task2拿到最終執(zhí)行結(jié)果 " + task2.get());
        } catch (Exception e) {

        }
        System.out.println("全流程結(jié)束");


    }

錯誤的輸出結(jié)果:

任務(wù)1開始工作
任務(wù)2開始工作
任務(wù)1執(zhí)行報錯,報錯原因:java.lang.ArithmeticException: / by zero
全流程結(jié)束

正確執(zhí)行的輸出結(jié)果:

任務(wù)1開始工作
任務(wù)1執(zhí)行結(jié)束,執(zhí)行結(jié)果:10
任務(wù)2開始工作
任務(wù)1正常結(jié)束,執(zhí)行結(jié)果:10
task2拿到最終執(zhí)行結(jié)果 10
全流程結(jié)束

handle

handle使用和whenComplete差不多,唯一的區(qū)別就是whenComplete返回的是上一個任務(wù)的結(jié)果,而handle可以返回自己的結(jié)果。

代碼如下所示:

public static void execute1() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Random random = new java.util.Random();
            int num = random.nextInt(10);
            if (num < 5) {
                throw new RuntimeException("報錯了 num:" + num);
            }
            System.out.println(Thread.currentThread() + "結(jié)束工作了");
            return num;
        });

        CompletableFuture<String> future2 = future.handle((result, err) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (err != null) {
                System.out.println(err.getMessage());
                ;return "fail";
            }
            return "sucdess";
        });


        System.out.println("拿第1個任務(wù)的結(jié)果");
        System.out.println("第1個任務(wù)的結(jié)果 " + future2.get());
        System.out.println("第1個任務(wù)結(jié)果結(jié)束");



        /**
         * 輸出結(jié)果
         * Thread[pool-1-thread-1,5,main]開始工作了
         * 拿第一個任務(wù)的結(jié)果
         * Thread[pool-1-thread-1,5,main]結(jié)束工作了
         * 第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
         * 100
         * 第一個任務(wù)結(jié)果結(jié)束
         * 拿第2個任務(wù)的結(jié)果
         * 第二個任務(wù)的結(jié)果 第一個線程的結(jié)果為 100
         * 第2個任務(wù)結(jié)果結(jié)束
         */

    }

thenCombine / thenAcceptBoth / runAfterBoth

這幾個方法都是將兩個任務(wù)組合起來執(zhí)行的,只有兩個任務(wù)都順利完成了,才會執(zhí)行之后的方法,唯一的區(qū)別是:

(1) thenCombine 接收兩個任務(wù)的返回值,并返回自己的返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結(jié)束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結(jié)束工作");
            return num;
        });

        //通過thenCombine將兩個任務(wù)組合起來
        CompletableFuture<Integer> completableFuture = task1.thenCombine(task2, (result1, result2) -> {
            System.out.println("task1返回結(jié)果:" + result1 + "  task2返回結(jié)果:" + result2);
            return result1 + result2;
        });


        System.out.println(completableFuture.get());


    }

輸出結(jié)果如下:

task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1返回結(jié)果:30  task2返回結(jié)果:1
31

(2) thenAcceptBoth 接收兩個參數(shù)返回值,但沒有返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結(jié)束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結(jié)束工作");
            return num;
        });

        //通過 thenAcceptBoth 將兩個任務(wù)組合起來,獲取前兩個任務(wù)處理結(jié)果,但自己不返回結(jié)果
        CompletableFuture<Void> completableFuture = task1.thenAcceptBoth(task2, (result1, result2) -> {
            System.out.println("task1返回結(jié)果:" + result1 + "  task2返回結(jié)果:" + result2);

        });


        completableFuture.get();


    }

輸出結(jié)果:

task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1返回結(jié)果:66  task2返回結(jié)果:10

(3) runAfterBoth 既不能接收入?yún)?,也無返回值,待前兩個任務(wù)執(zhí)行完成后才能執(zhí)行。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結(jié)束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結(jié)束工作");
            return num;
        });

        //通過 runAfterBoth 將兩個任務(wù)組合起來,待前兩個組合任務(wù)完成后執(zhí)行,無入?yún)ⅰo出參
        CompletableFuture<Void> completableFuture = task1.runAfterBoth(task2,()-> {
            System.out.println("task1、task2處理完成" );

        });


        completableFuture.get();


    }

輸出結(jié)果:

task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1、task2處理完成

applyToEither / acceptEither / runAfterEither

這種組合模式只要有一個異步任務(wù)成功,就會觸發(fā)后續(xù)的方法,比如我們組合任務(wù)1和任務(wù)2,如果任務(wù)1執(zhí)行完成就直接執(zhí)行任務(wù)3,無視任務(wù)2。反之任務(wù)2先完成直接執(zhí)行任務(wù)3,無視任務(wù)1。

和上一個組合模式一樣,依次規(guī)律也是:

(1) 接收入?yún)?,含返回值?/p>

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<String> completableFuture = task.applyToEither(task2, (result) -> {
            if (result == 1) {
                System.out.println("task1先完成任務(wù)");
                return "task1";
            }
            System.out.println("task2先完成任務(wù)");
            return "task2";
        });


        System.out.println("最先完成任務(wù)的是:" + completableFuture.get());


    }

(2) 接收入?yún)?,無返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<Void> completableFuture = task.acceptEither(task2, (result) -> {
            System.out.println("result:" + result);
            if (result == 1) {
                System.out.println("task1先完成任務(wù)");
                return;
            }
            System.out.println("task2先完成任務(wù)");
        });


        completableFuture.get();


    }

(3) 無入?yún)?,無返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1開始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1結(jié)束工作");
            return 1;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync( () -> {
            System.out.println("task2 開始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 結(jié)束工作");
            return 2;
        });

        CompletableFuture<Void> completableFuture = task.runAfterEither(task2, () -> {
            System.out.println("有一個任務(wù)完成了");
        });


        completableFuture.get();


    }

輸出結(jié)果:

task1開始工作
task2 開始工作
task1結(jié)束工作
有一個任務(wù)完成了

thenCompose

thenCompose方法會在某個任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為方法入?yún)⑷缓髨?zhí)行指定的方法,該方法會返回一個新的CompletableFuture實例,例如我們希望任務(wù)1執(zhí)行完成后執(zhí)行任務(wù)2,任務(wù)2執(zhí)行完成后返回執(zhí)行任務(wù)3,最終結(jié)果是從任務(wù)3中獲取。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
            System.out.println("task1開始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1結(jié)束工作,處理結(jié)果:"+num);
            return num;
        });


        CompletableFuture<String> task2= task1.thenCompose((r)->{

            System.out.println("task2 開始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 結(jié)束工作");


            return CompletableFuture.supplyAsync(()->{
                System.out.println("task3 開始工作,收到任務(wù)1的執(zhí)行結(jié)果:"+r);
                return "task3 finished";
            });
        });

        System.out.println("執(zhí)行結(jié)果->"+task2.get());


    }

輸出結(jié)果:

task1開始工作
task1結(jié)束工作,處理結(jié)果:1
task2 開始工作
task2 結(jié)束工作
task3 開始工作,收到任務(wù)1的執(zhí)行結(jié)果:1
執(zhí)行結(jié)果->task3 finished

allOf / anyOf

allOf返回的CompletableFuture是所有任務(wù)都執(zhí)行完成后才會執(zhí)行,只要有一個任務(wù)執(zhí)行異常,則返回的CompletableFuture執(zhí)行g(shù)et方法時會拋出異常。

public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務(wù)1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務(wù)2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            // 所有異步任務(wù)完成后打印它們的結(jié)果
            String result1 = future1.join();
            String result2 = future2.join();
            System.out.println(result1 + " " + result2);
        });

        // 等待所有異步任務(wù)完成
        allFutures.join();
    }

輸出結(jié)果:

Hello World

而anyOf則是只要有一個任務(wù)完成就可以觸發(fā)后續(xù)方法,并且可以返回先完成任務(wù)的返回值,這一點和上述applyToEither 例子差不多。

public class Main {

    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務(wù)1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務(wù)2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        anyFuture.thenAccept(result -> {
            // 任何一個異步任務(wù)完成后打印它的結(jié)果
            System.out.println(result);
        });

        // 等待任何一個異步任務(wù)完成
        anyFuture.join();
    }
}
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2017-12-21 15:48:11

JavaCompletable

2021-06-06 16:56:49

異步編程Completable

2024-08-30 09:53:17

Java 8編程集成

2024-03-06 08:13:33

FutureJDKCallable

2022-05-13 12:34:16

美團(tuán)開發(fā)實踐

2024-01-11 12:14:31

Async線程池任務(wù)

2021-08-30 19:00:46

靜態(tài)CompletableCountDownLa

2012-11-19 10:35:18

阿里云云計算

2024-12-26 12:59:39

2024-05-21 09:55:43

AspectOrientedAOP

2024-11-21 14:42:31

2020-03-17 09:21:20

MariaDBSpider存儲

2024-10-28 13:31:33

性能@Async應(yīng)用

2024-08-06 09:43:54

Java 8工具編程

2021-09-27 13:01:52

線程阻塞排查

2021-03-16 15:12:57

CompletableFuture機(jī)制java

2015-06-16 11:06:42

JavaCompletable

2021-02-21 14:35:29

Java 8異步編程

2020-11-05 10:40:18

ActiveMQ

2022-09-13 08:00:00

協(xié)議緩存區(qū)編程語言系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號