新聞中心
本文將深入講解 ExecutorCompletionService 的使用以及源碼解析。

成都創(chuàng)新互聯(lián)主要業(yè)務(wù)有網(wǎng)站營(yíng)銷策劃、網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)、微信公眾號(hào)開發(fā)、微信小程序開發(fā)、H5開發(fā)、程序開發(fā)等業(yè)務(wù)。一次合作終身朋友,是我們奉行的宗旨;我們不僅僅把客戶當(dāng)客戶,還把客戶視為我們的合作伙伴,在開展業(yè)務(wù)的過程中,公司還積累了豐富的行業(yè)經(jīng)驗(yàn)、成都全網(wǎng)營(yíng)銷推廣資源和合作伙伴關(guān)系資源,并逐漸建立起規(guī)范的客戶服務(wù)和保障體系。
ExecutorCompletionService適用場(chǎng)景
ExecutorCompletionService在以下場(chǎng)景中特別有用:
- 并行任務(wù)處理:當(dāng)需要同時(shí)執(zhí)行多個(gè)任務(wù),并按照完成的順序獲取它們的結(jié)果時(shí),可以使用ExecutorCompletionService來簡(jiǎn)化任務(wù)提交和結(jié)果獲取的流程。
- 高性能計(jì)算:在需要進(jìn)行大規(guī)模計(jì)算或復(fù)雜計(jì)算的場(chǎng)景中,可以將任務(wù)拆分成多個(gè)子任務(wù),并使用ExecutorCompletionService來管理和獲取子任務(wù)的結(jié)果。
假設(shè)現(xiàn)在有一批需要進(jìn)行計(jì)算的任務(wù),為了提高整批任務(wù)的執(zhí)行效率,我們可以使用線程池來異步計(jì)算這些任務(wù)。通過向線程池中不斷提交任務(wù)并保留與每個(gè)任務(wù)關(guān)聯(lián)的Future對(duì)象。最后,我們可以遍歷這些Future對(duì)象,并通過調(diào)用 get() 方法獲取每個(gè)任務(wù)的計(jì)算結(jié)果。
Future的不足
Future 沒有辦法回調(diào),只能手動(dòng)去調(diào)用,當(dāng)通過 get() 方法獲取線程的返回值時(shí),會(huì)導(dǎo)致阻塞,也就是和當(dāng)前這個(gè) Future 關(guān)聯(lián)的計(jì)算任務(wù)執(zhí)行完成的時(shí)候才返回結(jié)果,新任務(wù)必須等待已完成任務(wù)的結(jié)果才能繼續(xù)進(jìn)行處理。
這樣會(huì)浪費(fèi)很多時(shí)間,因?yàn)槲覀儾恢滥膫€(gè)線程先執(zhí)行完了,只能挨個(gè)去獲取結(jié)果,這樣已經(jīng)完成的線程會(huì)因?yàn)榍懊嫖赐瓿傻木€程的耗時(shí)而無(wú)法提前進(jìn)行匯總,最好是誰(shuí)先執(zhí)行完成,誰(shuí)先返回。
而 ExecutorCompletionService 可以實(shí)現(xiàn)這樣的效果,節(jié)省獲取完成結(jié)果的時(shí)間,它的內(nèi)部有一個(gè)先進(jìn)先出的阻塞隊(duì)列,用于保存已經(jīng)執(zhí)行完成的 Future,通過調(diào)用它的 take() 方法或 poll() 方法可以獲取到一個(gè)已經(jīng)執(zhí)行完成的 Future,進(jìn)而通過調(diào)用 Future 接口實(shí)現(xiàn)類的 get() 方法獲取最終的結(jié)果。
CompletionService的目標(biāo)是任務(wù)誰(shuí)先完成誰(shuí)先獲取,即結(jié)果按照完成先后順序排序
ExecutorCompletionService使用
ExecutorCompletionService 提供了一種方便的方式來處理一組異步任務(wù),并按照完成的順序獲取它們的結(jié)果。它內(nèi)部使用了Executor框架來執(zhí)行任務(wù),并且內(nèi)部管理著一個(gè)已完成任務(wù)的阻塞隊(duì)列,在結(jié)果獲取上提供了更加靈活和高效的機(jī)制。
下面是一個(gè)簡(jiǎn)單的例子來演示ExecutorCompletionService的基本使用:
public class ExecutorCompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);
// 提交任務(wù)
for (int i = 0; i < 10; i++) {
final int taskId = i;
completionService.submit(() -> {
double sleepTime = Math.random() * 1000;
Thread.sleep((long) sleepTime); // 模擬耗時(shí)操作
return "Task " + taskId + " completed,cost time: " + sleepTime;
});
}
// 獲取結(jié)果
for (int i = 0; i < 10; i++) {
Future future = completionService.take();
String result = future.get();
System.out.println(result);
}
executor.shutdown();
}
} 輸出:
Task 2 completed,cost time: 170.01927312611775
Task 3 completed,cost time: 460.9622858036789
Task 1 completed,cost time: 563.24738180643
Task 0 completed,cost time: 595.938819219159
Task 5 completed,cost time: 480.4473056068137
Task 4 completed,cost time: 748.2343208613524
Task 6 completed,cost time: 370.4679098376097
Task 7 completed,cost time: 270.45945981324905
Task 9 completed,cost time: 336.5536570760892
Task 8 completed,cost time: 577.5774464801026在上述代碼中,我們創(chuàng)建了一個(gè)固定大小的線程池,并使用 ExecutorCompletionService 來提交和獲取任務(wù)的結(jié)果。通過調(diào)用completionService.submit()方法來提交任務(wù),并隨機(jī)指定睡眠時(shí)間,來模擬任務(wù)執(zhí)行的耗時(shí),然后通過completionService.take()方法來獲取已完成的任務(wù)結(jié)果。
可以看到是按照任務(wù)的執(zhí)行耗時(shí)順序去獲取結(jié)果的。
ExecutorCompletionService原理解析
ExecutorCompletionService 提供了兩個(gè)構(gòu)造函數(shù),一個(gè)可以指定阻塞隊(duì)列,另一個(gè)使用內(nèi)部默認(rèn)的阻塞隊(duì)列,兩個(gè)構(gòu)造函數(shù)都需要傳進(jìn)線程池參數(shù)。
圖片
提供了三個(gè)獲取方法,可以看到都是從隊(duì)列中獲取。
- take()/poll() 方法的工作都委托給內(nèi)部的已完成任務(wù)隊(duì)列 completionQueue。
- 如果隊(duì)列中有已完成的任務(wù), take() 方法就返回任務(wù)的結(jié)果,否則阻塞等待任務(wù)完成。
- poll() 與 take() 方法不同,poll() 有兩個(gè)版本:
無(wú)參的 poll() 方法:如果完成隊(duì)列中有數(shù)據(jù)就返回,否則返回null。
有參數(shù)的 poll() 方法:如果完成隊(duì)列中有數(shù)據(jù)就直接返回,否則等待指定的時(shí)間,到時(shí)間后如果還是沒有數(shù)據(jù)就返回null。
圖片
兩個(gè)提交任務(wù)方法,可以看到 submit() 方法最終會(huì)委托給內(nèi)部的 executor 去執(zhí)行任務(wù),提交任務(wù)的時(shí)候會(huì)將任務(wù)封裝成 QueueingFuture 對(duì)象。
圖片
ExecutorCompletionService內(nèi)部維護(hù)了 QueueingFuture 類,QueueingFuture 繼承了 FutureTask,并重寫了 done() 方法,
可以看到 done() 方法在任務(wù)完成的時(shí)候會(huì)將結(jié)果存進(jìn) 已完成任務(wù)隊(duì)列 completionQueue 中。
圖片
Futuretask 的 done() 方法是用來標(biāo)記一個(gè)任務(wù)已經(jīng)完成的方法。當(dāng)一個(gè) Futuretask 中的任務(wù)完成后,就會(huì)調(diào)用 done() 方法通知。
圖片
默認(rèn)是空方法,不會(huì)執(zhí)行任何動(dòng)作。
圖片
執(zhí)行流程
當(dāng)我們使用ExecutorCompletionService類時(shí),它能夠按照任務(wù)完成的順序獲取它們的結(jié)果,這是因?yàn)镋xecutorCompletionService類內(nèi)部結(jié)合了QueueingFuture類和done()方法的機(jī)制。以下是源碼流程步驟解釋:
- 提交任務(wù):
我們通過submit方法將任務(wù)提交給ExecutorCompletionService。在提交任務(wù)時(shí),ExecutorCompletionService會(huì)使用自定義的QueueingFuture類來包裝任務(wù),并將其交給底層線程池執(zhí)行。
- QueueingFuture類:
QueueingFuture類是ExecutorCompletionService的內(nèi)部類,繼承自FutureTask。它的構(gòu)造方法接收一個(gè)Callable對(duì)象作為參數(shù)。
在QueueingFuture類中,它重寫了done()方法。done()方法會(huì)在任務(wù)執(zhí)行完成后被調(diào)用。
任務(wù)執(zhí)行完成時(shí)的處理:
當(dāng)任務(wù)執(zhí)行完成后,在底層線程池的Worker線程中,會(huì)調(diào)用QueueingFuture的done()方法。
在done()方法中,QueueingFuture會(huì)首先調(diào)用父類FutureTask的done()方法,以觸發(fā)對(duì)計(jì)算結(jié)果的獲取。然后,它會(huì)將任務(wù)的結(jié)果存儲(chǔ)到一個(gè)內(nèi)部的BlockingQueue隊(duì)列中(即completionQueue)。
獲取任務(wù)結(jié)果:
當(dāng)我們調(diào)用take方法獲取任務(wù)結(jié)果時(shí),它會(huì)從completionQueue隊(duì)列中取出已完成的任務(wù)結(jié)果,并返回該結(jié)果。如果隊(duì)列為空,則會(huì)阻塞等待,直到有任務(wù)完成并返回結(jié)果。
take方法內(nèi)部會(huì)調(diào)用QueueingFuture的get()方法,從而觸發(fā)對(duì)應(yīng)任務(wù)的計(jì)算結(jié)果的獲取。
保證按順序獲取結(jié)果:
由于completionQueue是一個(gè)阻塞隊(duì)列,并且在done()方法中將任務(wù)結(jié)果按照完成的順序放入隊(duì)列中,因此我們可以通過按順序獲取隊(duì)列中的任務(wù)結(jié)果,來保證按照任務(wù)完成的順序獲取它們的結(jié)果。
通過以上源碼流程步驟,ExecutorCompletionService類能夠按照任務(wù)完成的順序獲取結(jié)果。它利用QueueingFuture類包裝任務(wù)并存儲(chǔ)結(jié)果到阻塞隊(duì)列中,在任務(wù)執(zhí)行完成后,按照完成的順序?qū)⒔Y(jié)果放入隊(duì)列,從而實(shí)現(xiàn)了按順序獲取結(jié)果的功能。
注意事項(xiàng)
在使用ExecutorCompletionService時(shí),需要注意以下事項(xiàng):
- 合理選擇線程池大?。焊鶕?jù)任務(wù)的數(shù)量和復(fù)雜性,合理選擇線程池的大小,以充分利用系統(tǒng)資源并避免資源浪費(fèi)。
- 及時(shí)處理異常:在任務(wù)執(zhí)行過程中,如果發(fā)生異常,需要及時(shí)處理和記錄異常信息,以保證程序的穩(wěn)定性和可靠性。
- 使用Future對(duì)象進(jìn)行任務(wù)取消和超時(shí)控制:通過使用Future對(duì)象的cancel方法,可以取消正在執(zhí)行的任務(wù)。同時(shí),可以通過調(diào)整 poll 方法的參數(shù)來設(shè)置超時(shí)時(shí)間,避免長(zhǎng)時(shí)間等待任務(wù)結(jié)果而導(dǎo)致阻塞。
總結(jié)
ExecutorCompletionService是一個(gè)強(qiáng)大且靈活的工具類,能夠簡(jiǎn)化異步任務(wù)的處理和結(jié)果獲取過程。通過使用ExecutorCompletionService,我們可以更加高效地處理一組異步任務(wù),并按照完成的順序獲取它們的結(jié)果。
本文介紹了ExecutorCompletionService的基本使用方法,并對(duì)其源碼進(jìn)行了解析。希望通過這篇博客能夠幫助讀者更好地理解和應(yīng)用ExecutorCompletionService。
新聞名稱:ExecutorCompletionService詳解,你學(xué)會(huì)了嗎?
本文鏈接:http://fisionsoft.com.cn/article/dpjisij.html


咨詢
建站咨詢
