新聞中心
本文轉載自微信公眾號「KK架構師」,作者wangkai 。轉載本文請聯(lián)系KK架構師公眾號。

創(chuàng)新互聯(lián)主營金堂縣網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,app軟件開發(fā)公司,金堂縣h5重慶小程序開發(fā)搭建,金堂縣網(wǎng)站營銷推廣歡迎金堂縣等地區(qū)企業(yè)咨詢
本文大綱速看
一、異步編程
通常來說,程序都是順序執(zhí)行,同一時刻只會發(fā)生一件事情。如果一個函數(shù)依賴于另一個函數(shù)的結果,它只能等待那個函數(shù)結束才能繼續(xù)執(zhí)行,從用戶角度來說,整個程序才算執(zhí)行完畢。但現(xiàn)在的計算機普遍擁有多核 CPU,在那里干等著毫無意義,完全可以在另一個處理器內核上干其他工作,耗時長的任務結束之后會主動通知你。這就是異步編程的出發(fā)點:充分使用多核 CPU 的優(yōu)勢,最大程度提高程序性能。一句話來說:所謂異步編程,就是實現(xiàn)一個無需等待被調用函數(shù)的返回值而讓操作繼續(xù)運行的方法。
二、拋出一個問題:如何實現(xiàn)燒水泡茶的程序
最后我們會使用傳統(tǒng)方式和 Java8 異步編程方式分別實現(xiàn),來對比一下實現(xiàn)復雜度。
三、Java5 的 Future 實現(xiàn)的異步編程
Future 是 Java 5 添加的類,用來描述一個異步計算的結果。你可以使用 isDone() 方法檢查計算是否完成,或者使用 get() 方法阻塞住調用線程,直到計算完成返回結果,也可以使用 cancel() 方法停止任務的執(zhí)行。
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ExecutorService es = Executors.newFixedThreadPool(5);
- Future
f = es.submit(() -> 100); - System.out.println(f.get());
- es.shutdown();
- }
雖然 Future 提供了異步執(zhí)行任務的能力,但是對于結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時的獲取結果。
當然,很多其他的語言采用回調的方式來實現(xiàn)異步編程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也擴展了 Future 接口,提供了很多回調的機制,封裝了工具類,輔助異步編程開發(fā)。
Java 作為老牌編程語言,自然也不會落伍。在 Java 8 中,新增了一個包含 50 多個方法的類:CompletableFuture,提供了非常強大的 Future 擴展功能,可以幫助我們簡化異步編程的復雜性,提供函數(shù)式編程的能力。
四、CompletableFuture 類功能概覽
如下圖是 CompletableFuture 實現(xiàn)的接口:
它實現(xiàn)了 Future 接口,擁有 Future 所有的特性,比如可以使用 get() 方法獲取返回值等;還實現(xiàn)了 CompletionStage 接口,這個接口有超過 40 個方法,功能太豐富了,它主要是為了編排任務的工作流。
我們可以把工作流和工作流之間的關系分類為三種:串行關系,并行關系,匯聚關系。
串行關系
提供了如下的 api 來實現(xiàn)(先大致瀏覽一遍):
- CompletionStage
thenApply(fn); - CompletionStage
thenApplyAsync(fn); - CompletionStage
thenAccept(consumer); - CompletionStage
thenAcceptAsync(consumer); - CompletionStage
thenRun(action); - CompletionStage
thenRunAsync(action); - CompletionStage
thenCompose(fn); - CompletionStage
thenComposeAsync(fn);
并行關系
多線程異步執(zhí)行就是并行關系
匯聚關系
匯聚關系,又分為 AND 匯聚關系和 OR 匯聚關系:
AND 匯聚關系,就是所有依賴的任務都完成之后再執(zhí)行;OR 匯聚關系,就是依賴的任務中有一個執(zhí)行完成,就開始執(zhí)行。
AND 匯聚關系由這些接口表達:
- CompletionStage
thenCombine(other, fn); - CompletionStage
thenCombineAsync(other, fn); - CompletionStage
thenAcceptBoth(other, consumer); - CompletionStage
thenAcceptBothAsync(other, consumer); - CompletionStage
runAfterBoth(other, action); - CompletionStage
runAfterBothAsync(other, action);
OR 匯聚關系由這些接口來表達:
- CompletionStage applyToEither(other, fn);
- CompletionStage applyToEitherAsync(other, fn);
- CompletionStage acceptEither(other, consumer);
- CompletionStage acceptEitherAsync(other, consumer);
- CompletionStage runAfterEither(other, action);
- CompletionStage runAfterEitherAsync(other, action);
五、CompletableFuture 接口精講
1、提交執(zhí)行的靜態(tài)方法
方法名描述
| 方法名 | 描述 |
|---|---|
| runAsync(Runnable runnable) | 執(zhí)行異步代碼,使用 ForkJoinPool.commonPool() 作為它的線程池 |
| runAsync(Runnable runnable, Executor executor) | 執(zhí)行異步代碼,使用指定的線程池 |
| supplyAsync(Supplier supplier) | 異步執(zhí)行代碼,有返回值,使用 ForkJoinPool.commonPool() 作為它的線程池 |
| supplyAsync(Supplier supplier, Executor executor) | 異步執(zhí)行代碼,有返回值,使用指定的線程池執(zhí)行 |
上述四個方法,都是提交任務的,runAsync 方法需要傳入一個實現(xiàn)了 Runnable 接口的方法,supplyAsync 需要傳入一個實現(xiàn)了 Supplier 接口的方法,實現(xiàn) get 方法,返回一個值。
(1)run 和 supply 的區(qū)別
run 就是執(zhí)行一個方法,沒有返回值,supply 執(zhí)行一個方法,有返回值。
(2)一個參數(shù)和兩個參數(shù)的區(qū)別
第二個參數(shù)是線程池,如果沒有傳,則使用自帶的 ForkJoinPool.commonPool() 作為線程池,這個線程池默認創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數(shù))
2、串行關系 api
這些 api 之間主要是能否獲得前一個任務的返回值與自己是否有返回值的區(qū)別。
| api | 是否可獲得前一個任務的返回值 | 是否有返回值 |
|---|---|---|
| thenApply | 能 | 有 |
| thenAccept | 能 | 無 |
| thenRun | 不能 | 無 |
| thenCompose | 能 | 有 |
(1) thenApply 和 thenApplyAsync 使用
thenApply 和 thenApplyAsync 把兩個并行的任務串行化,另一個任務在獲得上一個任務的返回值之后,做一些加工和轉換。它也是有返回值的。
- public class BasicFuture4 {
- @Data
- @AllArgsConstructor
- @ToString
- static class Student {
- private String name;
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
future = CompletableFuture.supplyAsync(() -> "Jack") - .thenApply(s -> s + " Smith")
- .thenApply(String::toUpperCase)
- .thenApplyAsync(Student::new);
- System.out.println(future.get());
- }
- }
結果可以看到,輸入是一個字符串,拼接了一個字符串,轉換成大寫,new 了一個 Student 對象返回。
- BasicFuture4.Student(name=JACK SMITH)
和 thenApply 一起的還有 thenAccept 和 thenRun,thenAccept 能獲得到前一個任務的返回值,但是自身沒有返回值;thenRun 不能獲得前一個任務的返回值,自身也沒有返回值。
(2)thenApply 和 thenApplyAsync 的區(qū)別
這兩個方法的區(qū)別,在于誰去執(zhí)行任務。如果使用 thenApplyAsync,那么執(zhí)行的線程是從 ForkJoinPool.commonPool() 或者自己定義的線程池中取線程去執(zhí)行。如果使用 thenApply,又分兩種情況,如果 supplyAsync 方法執(zhí)行速度特別快,那么 thenApply 任務就使用主線程執(zhí)行,如果 supplyAsync 執(zhí)行速度特別慢,就是和 supplyAsync 執(zhí)行線程一樣。
可以使用下面的例子演示一下:
- package com.dsj361.future;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- /**
- * @Author wangkai
- */
- public class BasicFuture8 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("----------supplyAsync 執(zhí)行很快");
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName());
- return "1";
- }).thenApply(s -> {
- System.out.println(Thread.currentThread().getName());
- return "2";
- });
- System.out.println(future1.get());
- System.out.println("----------supplyAsync 執(zhí)行很慢");
- CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> { - try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- System.out.println(Thread.currentThread().getName());
- return "1";
- }).thenApply(s -> {
- System.out.println(Thread.currentThread().getName());
- return "2";
- });
- System.out.println(future2.get());
- }
- }
執(zhí)行結果:
- ----------supplyAsync 執(zhí)行很快
- ForkJoinPool.commonPool-worker-1
- main
- 2
- ----------supplyAsync 執(zhí)行很慢
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-1
- 2
(3)thenCompose 的使用
假設有兩個異步任務,第二個任務想要獲取第一個任務的返回值,并且做運算,我們可以用 thenCompose。此時使用 thenApply 也可以實現(xiàn),看一段代碼發(fā)現(xiàn)他們的區(qū)別:
- public class BasicFuture9 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
future = getLastOne().thenCompose(BasicFuture9::getLastTwo); - System.out.println(future.get());
- CompletableFuture
> future2 = getLastOne().thenApply(s -> getLastTwo(s)); - System.out.println(future2.get().get());
- }
- public static CompletableFuture
getLastOne(){ - return CompletableFuture.supplyAsync(()-> "topOne");
- }
- public static CompletableFuture
getLastTwo(String s){ - return CompletableFuture.supplyAsync(()-> s + " topTwo");
- }
- }
可以看到使用 thenApply 的時候,需要使用兩個 get() 方法才能獲取到最終的返回值,使用 thenCompose 只要一個即可。
3、And 匯聚關系 Api
(1)thenCombine 的使用
加入我們要計算兩個異步方法返回值的和,就必須要等到兩個異步任務都計算完才能求和,此時可以用 thenCombine 來完成。
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
thenComposeOne = CompletableFuture.supplyAsync(() -> 192); - CompletableFuture
thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); - CompletableFuture
thenComposeCount = thenComposeOne - .thenCombine(thenComposeTwo, (s, y) -> s + y);
- thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth"));
- thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth"));
- System.out.println(thenComposeCount.get());
- }
可以看到 thenCombine 第二個參數(shù)是一個 Function 函數(shù),前面兩個異步任務都完成之后,使用這個函數(shù)來完成一些運算。
(2)thenAcceptBoth
接收前面兩個異步任務的結果,執(zhí)行一個回調函數(shù),但是這個回調函數(shù)沒有返回值。
(3)runAfterBoth
接收前面兩個異步任務的結果,但是回調函數(shù),不接收參數(shù),也不返回值。
4、Or 匯聚關系 Api
- public class BasicFuture11 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
thenComposeOne = CompletableFuture.supplyAsync(() -> 192); - CompletableFuture
thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); - CompletableFuture
thenComposeCount = thenComposeOne - .applyToEither(thenComposeTwo, s -> s + 1);
- thenComposeOne.acceptEither(thenComposeTwo,s -> {});
- thenComposeOne.runAfterEither(thenComposeTwo,()->{});
- System.out.println(thenComposeCount.get());
- }
- }
(1)applyToEither
任何一個執(zhí)行完就執(zhí)行回調方法,回調方法接收一個參數(shù),有返回值
(2)acceptEither
任何一個執(zhí)行完就執(zhí)行回調方法,回調方法接收一個參數(shù),無返回值
(3)runAfterEither
任何一個執(zhí)行完就執(zhí)行回調方法,回調方法不接收參數(shù),也無返回值
5、處理異常
上面我們講了如何把幾個異步任務編排起來,執(zhí)行一些串行或者匯聚操作。還有一個重要的地方,就是異常的處理。
先看下面的例子:
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture.supplyAsync(() -> {
- System.out.println("execute one ");
- return 100;
- })
- .thenApply(s -> 10 / 0)
- .thenRun(() -> System.out.println("thenRun"))
- .thenAccept(s -> System.out.println("thenAccept"));
- CompletableFuture.runAsync(() -> System.out.println("other"));
- }
結果:
- execute one
- other
可以發(fā)現(xiàn),只要鏈條上有一個任務發(fā)生了異常,這個鏈條下面的任務都不再執(zhí)行了。
但是 main 方法上的接下來的代碼還是會執(zhí)行的。
所以這個時候,需要合理的去處理異常來完成一些收尾的工作。
- public class BasicFuture12 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture.supplyAsync(() -> {
- System.out.println("execute one ");
- return 100;
- })
- .thenApply(s -> 10 / 0)
- .thenRun(() -> System.out.println("thenRun"))
- .thenAccept(s -> System.out.println("thenAccept"))
- .exceptionally(s -> {
- System.out.println("異常處理");
- return null;
- });
- CompletableFuture.runAsync(() -> System.out.println("other"));
- }
- }
可以使用 exceptionally 來處理異常。
使用 handle() 方法也可以處理異常。但是 handle() 方法的不同之處在于,即使沒有發(fā)生異常,也會執(zhí)行。
六、燒水泡茶程序的實現(xiàn)
1、使用 Thread 多線程和 CountDownLatch 來實現(xiàn)
- public class MakeTee {
- private static CountDownLatch countDownLatch = new CountDownLatch(2);
- static class HeatUpWater implements Runnable {
- private CountDownLatch countDownLatch;
- public HeatUpWater(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("洗水壺");
- Thread.sleep(1000);
- System.out.println("燒開水");
- Thread.sleep(5000);
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- }
- }
- }
- static class PrepareTee implements Runnable {
- private CountDownLatch countDownLatch;
- public PrepareTee(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("洗茶壺");
- Thread.sleep(1000);
- System.out.println("洗茶杯");
- Thread.sleep(1000);
- System.out.println("拿茶葉");
- Thread.sleep(1000);
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- new Thread(new HeatUpWater(countDownLatch) ).start();
- new Thread(new PrepareTee(countDownLatch)).start();
- countDownLatch.await();
- System.out.println("準備就緒,開始泡茶");
- }
- }
這里我們使用兩個線程,分別執(zhí)行燒水和泡茶的程序,使用 CountDownLatch 來協(xié)調兩個線程的進度,等到他們都執(zhí)行完成之后,再執(zhí)行泡茶的動作。
可以看到這種方法,多了很多不必要的代碼,new Thread,人工維護 CountDownLatch 的進度。
2、使用 CompletableFuture 來實現(xiàn)
- public class MakeTeeFuture {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
future1 = CompletableFuture.runAsync(() -> { - try {
- System.out.println("洗水壺");
- Thread.sleep(1000);
- System.out.println("燒開水");
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- CompletableFuture
future2 = CompletableFuture.runAsync(() -> { - try {
- System.out.println("洗茶壺");
- Thread.sleep(1000);
- System.out.println("洗茶杯");
- Thread.sleep(1000);
- System.out.println("拿茶葉");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- CompletableFuture
finish = future1.runAfterBoth(future2, () -> { - System.out.println("準備完畢,開始泡茶");
- });
- System.out.println(finish.get());
- }
- }
這個程序極度簡單,無需手工維護線程,給任務分配線程的工作也不需要關注。
同時語義也更加清晰,future1.runAfterBoth(future2,......) 能夠清晰的表述“任務 3 要等到任務 1 和任務 2 都完成之后才能繼續(xù)開始”
然后代碼更加簡練并且專注于業(yè)務邏輯,幾乎所有的代碼都是業(yè)務邏輯相關的。
七、總結
本文介紹了異步編程的概念,以及 Java8 的 CompletableFuture 是如何優(yōu)雅的處理多個異步任務之間的協(xié)調工作的。CompletableFuture 能夠極大簡化我們對于異步任務編排的工作,F(xiàn)link 在提交任務時,也是使用這種異步任務的方式,去編排提交時和提交后對于任務狀態(tài)處理的一些工作的。
分享題目:Java8異步編程CompletableFuture全解析
網(wǎng)站鏈接:http://fisionsoft.com.cn/article/djeepei.html


咨詢
建站咨詢
