小編給大家分享一下Java8中CompletableFuture的使用方法,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
十載的元寶網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。網(wǎng)絡(luò)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整元寶建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“元寶網(wǎng)站設(shè)計(jì)”,“元寶網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時(shí)在Java 9 也有對(duì)CompletableFuture有一些改進(jìn),之后再進(jìn)入講解。
Future計(jì)算
Future異步計(jì)算很難操作,通常我們希望將任何計(jì)算邏輯視為一系列步驟。但是在異步計(jì)算的情況下,表示為回調(diào)的方法往往分散在代碼中或者深深地嵌套在彼此內(nèi)部。但是當(dāng)我們需要處理其中一個(gè)步驟中可能發(fā)生的錯(cuò)誤時(shí),情況可能會(huì)變得更復(fù)雜。
Futrue接口是Java 5中作為異步計(jì)算而新增的,但它沒有任何方法去進(jìn)行計(jì)算組合或者處理可能出現(xiàn)的錯(cuò)誤。
在Java 8中,引入了CompletableFuture類。與Future接口一起,它還實(shí)現(xiàn)了CompletionStage接口。此接口定義了可與其他Future組合成異步計(jì)算契約。
CompletableFuture同時(shí)是一個(gè)組合和一個(gè)框架,具有大約50種不同的構(gòu)成,結(jié)合,執(zhí)行異步計(jì)算步驟和處理錯(cuò)誤。
如此龐大的API可能會(huì)令人難以招架,下文將調(diào)一些重要的做重點(diǎn)介紹。
使用CompletableFuture作為Future實(shí)現(xiàn)
首先,CompletableFuture類實(shí)現(xiàn)Future接口,因此你可以將其用作Future實(shí)現(xiàn),但需要額外的完成實(shí)現(xiàn)邏輯。
例如,你可以使用無構(gòu)參構(gòu)造函數(shù)創(chuàng)建此類的實(shí)例,然后使用complete
方法完成。消費(fèi)者可以使用get方法來阻塞當(dāng)前線程,直到get()
結(jié)果。
在下面的示例中,我們有一個(gè)創(chuàng)建CompletableFuture實(shí)例的方法,然后在另一個(gè)線程中計(jì)算并立即返回Future。
計(jì)算完成后,該方法通過將結(jié)果提供給完整方法來完成Future:
public FuturecalculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
為了分離計(jì)算,我們使用了Executor API ,這種創(chuàng)建和完成CompletableFuture的方法可以與任何并發(fā)包(包括原始線程)一起使用。
請(qǐng)注意,該calculateAsync
方法返回一個(gè)Future
實(shí)例。
我們只是調(diào)用方法,接收Future實(shí)例并在我們準(zhǔn)備阻塞結(jié)果時(shí)調(diào)用它的get方法。
另請(qǐng)注意,get方法拋出一些已檢查的異常,即ExecutionException(封裝計(jì)算期間發(fā)生的異常)和InterruptedException(表示執(zhí)行方法的線程被中斷的異常):
FuturecompletableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
如果你已經(jīng)知道計(jì)算的結(jié)果,也可以用變成同步的方式來返回結(jié)果。
FuturecompletableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
作為在某些場景中,你可能希望取消Future任務(wù)的執(zhí)行。
假設(shè)我們沒有找到結(jié)果并決定完全取消異步執(zhí)行任務(wù)。這可以通過Future的取消方法完成。此方法mayInterruptIfRunning
,但在CompletableFuture的情況下,它沒有任何效果,因?yàn)橹袛嗖挥糜诳刂艭ompletableFuture的處理。
這是異步方法的修改版本:
public FuturecalculateAsyncWithCancellation() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.cancel(false); return null; }); return completableFuture; }
當(dāng)我們使用Future.get()方法阻塞結(jié)果時(shí),cancel()
表示取消執(zhí)行,它將拋出CancellationException:
Futurefuture = calculateAsyncWithCancellation(); future.get(); // CancellationException
API介紹
static方法說明
上面的代碼很簡單,下面介紹幾個(gè) static 方法,它們使用任務(wù)來實(shí)例化一個(gè) CompletableFuture 實(shí)例。
CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor); CompletableFuture.supplyAsync(Supplier supplier); CompletableFuture.supplyAsync(Supplier supplier, Executor executor)
runAsync 方法接收的是 Runnable 的實(shí)例,但是它沒有返回值
supplyAsync 方法是JDK8函數(shù)式接口,無參數(shù),會(huì)返回一個(gè)結(jié)果
這兩個(gè)方法是 executor 的升級(jí),表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的。
supplyAsync()使用
靜態(tài)方法runAsync
和supplyAsync
允許我們相應(yīng)地從Runnable和Supplier功能類型中創(chuàng)建CompletableFuture實(shí)例。
該Runnable的接口是在線程使用舊的接口,它不允許返回值。
Supplier接口是一個(gè)不具有參數(shù),并返回參數(shù)化類型的一個(gè)值的單個(gè)方法的通用功能接口。
這允許將Supplier的實(shí)例作為lambda表達(dá)式提供,該表達(dá)式執(zhí)行計(jì)算并返回結(jié)果:
CompletableFuturefuture = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
thenRun()使用
在兩個(gè)任務(wù)任務(wù)A,任務(wù)B中,如果既不需要任務(wù)A的值也不想在任務(wù)B中引用,那么你可以將Runnable lambda 傳遞給thenRun()
方法。在下面的示例中,在調(diào)用future.get()方法之后,我們只需在控制臺(tái)中打印一行:
模板
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
thenRun(Runnable runnable)
,任務(wù) A 執(zhí)行完執(zhí)行 B,并且 B 不需要 A 的結(jié)果。thenRun(Runnable runnable)
,任務(wù) A 執(zhí)行完執(zhí)行 B,會(huì)返回resultA
,但是 B 不需要 A 的結(jié)果。實(shí)戰(zhàn)
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
thenAccept()使用
在兩個(gè)任務(wù)任務(wù)A,任務(wù)B中,如果你不需要在Future中有返回值,則可以用 thenAccept
方法接收將計(jì)算結(jié)果傳遞給它。最后的future.get()調(diào)用返回Void類型的實(shí)例。
模板
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
第一行中,runAsync
不會(huì)有返回值,第二個(gè)方法thenAccept
,接收到的resultA值為null,同時(shí)任務(wù)B也不會(huì)有返回結(jié)果
第二行中,supplyAsync
有返回值,同時(shí)任務(wù)B不會(huì)有返回結(jié)果。
實(shí)戰(zhàn)
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
thenApply()使用
在兩個(gè)任務(wù)任務(wù)A,任務(wù)B中,任務(wù)B想要任務(wù)A計(jì)算的結(jié)果,可以用thenApply方法來接受一個(gè)函數(shù)實(shí)例,用它來處理結(jié)果,并返回一個(gè)Future函數(shù)的返回值:
模板
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB"); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
實(shí)戰(zhàn)
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
當(dāng)然,多個(gè)任務(wù)的情況下,如果任務(wù) B 后面還有任務(wù) C,往下繼續(xù)調(diào)用 .thenXxx() 即可。
thenCompose()使用
接下來會(huì)有一個(gè)很有趣的設(shè)計(jì)模式;
CompletableFuture API 的最佳場景是能夠在一系列計(jì)算步驟中組合CompletableFuture實(shí)例。
這種組合結(jié)果本身就是CompletableFuture,允許進(jìn)一步再續(xù)組合。這種方法在函數(shù)式語言中無處不在,通常被稱為monadic設(shè)計(jì)模式
。
簡單說,Monad就是一種設(shè)計(jì)模式,表示將一個(gè)運(yùn)算過程,通過函數(shù)拆解成互相連接的多個(gè)步驟。你只要提供下一步運(yùn)算所需的函數(shù),整個(gè)運(yùn)算就會(huì)自動(dòng)進(jìn)行下去。
在下面的示例中,我們使用thenCompose方法按順序組合兩個(gè)Futures。
請(qǐng)注意,此方法采用返回CompletableFuture實(shí)例的函數(shù)。該函數(shù)的參數(shù)是先前計(jì)算步驟的結(jié)果。這允許我們?cè)谙乱粋€(gè)CompletableFuture的lambda中使用這個(gè)值:
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
該thenCompose方法連同thenApply一樣實(shí)現(xiàn)了結(jié)果的合并計(jì)算。但是他們的內(nèi)部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設(shè)計(jì)思路在里面的。
兩個(gè)方法都接收一個(gè)CompletableFuture并將其應(yīng)用于計(jì)算結(jié)果,但thenCompose(flatMap)方法接收一個(gè)函數(shù),該函數(shù)返回相同類型的另一個(gè)CompletableFuture對(duì)象。此功能結(jié)構(gòu)允許將這些類的實(shí)例繼續(xù)進(jìn)行組合計(jì)算。
thenCombine()
取兩個(gè)任務(wù)的結(jié)果
如果要執(zhí)行兩個(gè)獨(dú)立的任務(wù),并對(duì)其結(jié)果執(zhí)行某些操作,可以用Future的thenCombine方法:
模板
CompletableFuturecfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
實(shí)戰(zhàn)
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
更簡單的情況是,當(dāng)你想要使用兩個(gè)Future結(jié)果時(shí),但不需要將任何結(jié)果值進(jìn)行返回時(shí),可以用thenAcceptBoth
,它表示后續(xù)的處理不需要返回值,而 thenCombine 表示需要返回值:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
thenApply()和thenCompose()之間的區(qū)別
在前面的部分中,我們展示了關(guān)于thenApply()和thenCompose()的示例。這兩個(gè)API都是使用的CompletableFuture調(diào)用,但這兩個(gè)API的使用是不同的。
thenApply()
此方法用于處理先前調(diào)用的結(jié)果。但是,要記住的一個(gè)關(guān)鍵點(diǎn)是返回類型是轉(zhuǎn)換泛型中的類型,是同一個(gè)CompletableFuture。
因此,當(dāng)我們想要轉(zhuǎn)換CompletableFuture 調(diào)用的結(jié)果時(shí),效果是這樣的 :
CompletableFuturefinalResult = compute().thenApply(s-> s + 1);
thenCompose()
該thenCompose()方法類似于thenApply()在都返回一個(gè)新的計(jì)算結(jié)果。但是,thenCompose()使用前一個(gè)Future作為參數(shù)。它會(huì)直接使結(jié)果變新的Future,而不是我們?cè)趖henApply()中到的嵌套Future,而是用來連接兩個(gè)CompletableFuture,是生成一個(gè)新的CompletableFuture:
CompletableFuturecomputeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);
因此,如果想要繼續(xù)嵌套鏈接CompletableFuture 方法,那么最好使用thenCompose()。
并行運(yùn)行多個(gè)任務(wù)
當(dāng)我們需要并行執(zhí)行多個(gè)任務(wù)時(shí),我們通常希望等待所有它們執(zhí)行,然后處理它們的組合結(jié)果。
該CompletableFuture.allOf
靜態(tài)方法允許等待所有的完成任務(wù):
API
public static CompletableFutureallOf(CompletableFuture>... cfs){...}
實(shí)戰(zhàn)
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
請(qǐng)注意,CompletableFuture.allOf()的返回類型是CompletableFuture
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區(qū)別在于 join() 拋出的是 unchecked Exception。這使得它可以在Stream.map()方法中用作方法引用。
異常處理
說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個(gè)方法:
public CompletableFutureexceptionally(Function fn); public CompletionStage handle(BiFunction super T, Throwable, ? extends U> fn);
看下代碼
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");
上面的代碼中,任務(wù) A、B、C、D 依次執(zhí)行,如果任務(wù) A 拋出異常(當(dāng)然上面的代碼不會(huì)拋出異常),那么后面的任務(wù)都得不到執(zhí)行。如果任務(wù) C 拋出異常,那么任務(wù) D 得不到執(zhí)行。
那么我們?cè)趺刺幚懋惓D??看下面的代碼,我們?cè)谌蝿?wù) A 中拋出異常,并對(duì)其進(jìn)行處理:
CompletableFuturefuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());
上面的代碼中,任務(wù) A 拋出異常,然后通過 .exceptionally()
方法處理了異常,并返回新的結(jié)果,這個(gè)新的結(jié)果將傳遞給任務(wù) B。所以最終的輸出結(jié)果是:
errorResultA resultB resultC resultD
String name = null; // ... CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
當(dāng)然,它們也可以都為 null,因?yàn)槿绻饔玫哪莻€(gè) CompletableFuture 實(shí)例沒有返回值的時(shí)候,s 就是 null。
Async后綴方法
CompletableFuture類中的API的大多數(shù)方法都有兩個(gè)帶有Async后綴的附加修飾。這些方法表示用于異步線程。
沒有Async后綴的方法使用調(diào)用線程運(yùn)行下一個(gè)執(zhí)行線程階段。不帶Async方法使用ForkJoinPool.commonPool()線程池的fork / join實(shí)現(xiàn)運(yùn)算任務(wù)。帶有Async方法使用傳遞式的Executor任務(wù)去運(yùn)行。
下面附帶一個(gè)案例,可以看到有thenApplyAsync方法。在程序內(nèi)部,線程被包裝到ForkJoinTask實(shí)例中。這樣可以進(jìn)一步并行化你的計(jì)算并更有效地使用系統(tǒng)資源。
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
JDK 9 CompletableFuture API
在Java 9中, CompletableFuture API通過以下更改得到了進(jìn)一步增強(qiáng):
引入了新的實(shí)例API:
還有一些靜態(tài)實(shí)用方法:
最后,為了解決超時(shí)問題,Java 9又引入了兩個(gè)新功能:
結(jié)論
在本文中,我們描述了CompletableFuture類的方法和典型用例。
以上是Java8中CompletableFuture的使用方法的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!