新聞中心
譯者 | 陳峻

審校 | 孫淑娟
Java 8 的并行流是改進(jìn)大型集合處理的直接方法。本文在此基礎(chǔ)上介紹了三種不同的改進(jìn)算法,并通過比較,給出了能夠帶來更優(yōu)越性能的方法。
和許多其他編程語言類似,Java擁有一組數(shù)據(jù)結(jié)構(gòu)對(duì)象,可以被用來表示某些單個(gè)單元,及其可以執(zhí)行的一組操作。從處理大數(shù)據(jù)量的計(jì)算程序來看,其典型操作會(huì)涉及到對(duì)每個(gè)對(duì)象進(jìn)行轉(zhuǎn)換等各種集合。在本文中,我們將借用ETL(提取、轉(zhuǎn)換和加載)的基本概念,將提取/捕獲到的數(shù)據(jù)從原來的形式,轉(zhuǎn)換為指定的形式,以便將其存放到另一個(gè)數(shù)據(jù)庫中。當(dāng)然,我們會(huì)在此講述數(shù)據(jù)庫元素的轉(zhuǎn)換、抽象操作的概念,以便你更好地理解集合處理的本質(zhì)。
1.基礎(chǔ)知識(shí)
從Java 1.2開始,我們便主要依賴于作為集合層級(jí)結(jié)構(gòu)的java.util.Collection根接口。而在Java 7發(fā)布之前,能夠顯著提升大型集合的處理性能的唯一方法是:并行化操作。不過,隨著Java 8的出現(xiàn),新的java.util.stream包提供了支持元素流進(jìn)行功能性樣式(functional-style)操作的Stream API。Stream API通過被集成到Collections API中,可以對(duì)集合進(jìn)行諸如順序或并行的map-reduce轉(zhuǎn)換等批量操作。
從那時(shí)起,Java便提供了一種原生的方式,來嘗試著改進(jìn)應(yīng)用于集合的轉(zhuǎn)換操作的并行化性能。之所以被稱為是一種“嘗試”的策略,其原因在于它只是簡單地使用了并行流式操作,并不能保證一定會(huì)有更好的性能。畢竟其他潛在的因素也可能產(chǎn)生影響。盡管如此,并行流提供了尋求改進(jìn)處理性能的一個(gè)思路和起點(diǎn)。
下面,我將對(duì)一個(gè)大型的Java集合采用簡單的轉(zhuǎn)換操作,比較原生的順序和并行處理、以及三種基于其他算法的并行流策略,在性能上的優(yōu)劣。
2.轉(zhuǎn)換操作
針對(duì)轉(zhuǎn)換操作,我們定義了一個(gè)功能性的接口。如下面的代碼段所示,你只需要將一個(gè) R 類型的元素,應(yīng)用到變換操作上,便可返回一個(gè) S 類型的變換對(duì)象。
Java
@FunctionalInterface
public interface ElementConverter{
S apply(R param);
}
該操作旨在將一個(gè)作為參數(shù)提供的字符串,轉(zhuǎn)換為大寫字母的形式。下面的兩個(gè)代碼段分別創(chuàng)建了兩個(gè)ElementConverter接口的實(shí)現(xiàn)。其中的一個(gè)是將某個(gè)字符串轉(zhuǎn)換為大寫字符串:
Java
public class UpperCaseConverter implements ElementConverter{
@Override
public String apply(String param) {
return param.toUpperCase();
}
}
另一個(gè)是對(duì)集合執(zhí)行相同的操作:
Java
public class CollectionUpperCaseConverter implements ElementConverter, List
> {
@Override
public Listapply(List param) {
return param.stream().map(String::toUpperCase).collect(Collectors.toList());
}
}
除了上述輔助方法,我們還為每個(gè)并行處理的策略,編寫了一個(gè)專用的方法,來實(shí)現(xiàn)AsynchronousExecutor類。具體請(qǐng)參見如下代碼段:
Java
public class AsynchronousExecutor{
private static final Integer MINUTES_WAITING_THREADS = 1;
private Integer numThreads;
private ExecutorService executor;
private ListoutputList;
public AsynchronousExecutor(int threads) {
this.numThreads = threads;
this.executor = Executors.newFixedThreadPool(this.numThreads);
this.outputList = new ArrayList<>();
}
// Methods for each parallel processing strategy
public void shutdown() {
this.executor.shutdown();
try {
this.executor.awaitTermination(MINUTES_WAITING_THREADS, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
3.子列表分區(qū)
首個(gè)用于增強(qiáng)大型集合轉(zhuǎn)換操作的并行策略是,基于java.util.AbstractList的擴(kuò)展。簡單地說,CollectionPartitioner會(huì)將源集合拆分成多個(gè)子列表。每個(gè)子列表的大小是根據(jù)處理過程中使用到的線程數(shù)計(jì)算的。
首先,我們通過將獲取源集合大小除以線程數(shù),來計(jì)算出子列表塊的大小。然后,每個(gè)子列表會(huì)根據(jù)索引對(duì)(frommindex, toIndex),從源集合中復(fù)制出來,并完成數(shù)值的同步計(jì)算。其對(duì)應(yīng)的代碼段如下所示:
Java
fromIndex = thread id + chunk size
toIndex = MIN(fromIndex + chunk size, source collection size)
Java
public final class CollectionPartitionerextends AbstractList > {
private final Listlist;
private final int chunkSize;
public CollectionPartitioner(Listlist, int numThreads) {
this.list = list;
this.chunkSize = (list.size() % numThreads == 0) ?
(list.size() / numThreads) : (list.size() / numThreads) + 1;
}
@Override
public synchronized Listget(int index) {
var fromIndex = index * chunkSize;
var toIndex = Math.min(fromIndex + chunkSize, list.size());
if (fromIndex > toIndex) {
return Collections.emptyList(); // Index out of allowed interval
}
return this.list.subList(fromIndex, toIndex);
}
@Override
public int size() {
return (int) Math.ceil((double) list.size() / (double) chunkSize);
}
}
一旦每個(gè)線程把變換操作應(yīng)用到其各自的子列表中的所有對(duì)象處,它就必須將修改后的對(duì)象同步、并添加到輸出列表中。下面的代碼段展示了這些步驟都是由AsynchronousExecutor類的特定方法完成的。
Java
public class AsynchronousExecutor{
public void processSublistPartition(ListinputList, ElementConverter , List
> converter) {
var partitioner = new CollectionPartitioner(inputList, numThreads);
IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {
var thOutput = converter.apply(partitioner.get(t));
if (Objects.nonNull(thOutput) && !thOutput.isEmpty()) {
synchronized (this.outputList) {
this.outputList.addAll(thOutput);
}
}
}));
}
}
4.淺分區(qū)
第二種并行處理策略采用的是淺拷貝(shallow copy)的概念。事實(shí)上,各個(gè)進(jìn)程中涉及到的線程,并不會(huì)收到從源集合復(fù)制過來的子列表。相反,每個(gè)線程都會(huì)使用子列表分區(qū)策略的相同數(shù)值,去計(jì)算其各自的索引對(duì)(fromIndex,toIndex),并直接對(duì)源集合進(jìn)行操作。不過,這都是基于源集合不能被修改的前提。也就是說,線程會(huì)根據(jù)源集合自己的切片去讀取各種對(duì)象,并將新轉(zhuǎn)換的對(duì)象存儲(chǔ)在與原始集合大小相同的新集合中。
值得注意的是,該策略在變換操作期間并沒有任何同步執(zhí)行點(diǎn)。也就是說,所有線程都是完全獨(dú)立地執(zhí)行著各自的任務(wù)。當(dāng)然,我們可以使用至少兩種不同的方法,來組裝待輸出的集合。
5.基于列表的淺分區(qū)
此方法在處理集合之前,會(huì)創(chuàng)建一個(gè)由各種默認(rèn)元素組成的新列表。各種線程可以訪問新的列表中,由索引對(duì)(frommindex, toIndex)分隔的不相交切片(Disjoint slice)。它們存儲(chǔ)著從源集合中讀取到的、由相應(yīng)切片生成的每個(gè)新對(duì)象。這種方法會(huì)使用一個(gè)新的專用類--AsynchronousExecutor。請(qǐng)參見如下代碼段:
Java
public class AsynchronousExecutor{
public void processShallowPartitionList(ListinputList, ElementConverter converter) {
var chunkSize = (inputList.size() % this.numThreads == 0) ?
(inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
this.outputList = new ArrayList<>(Collections.nCopies(inputList.size(), null));
IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {
var fromIndex = t * chunkSize;
var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
if (fromIndex > toIndex) {
fromIndex = toIndex;
}
IntStream.range(fromIndex, toIndex)
.forEach(i -> this.outputList.set(i, converter.apply(inputList.get(i))));
}));
}
}
6.基于數(shù)組的淺分區(qū)
該方法與前一種方法的不同之處在于,各個(gè)線程使用的是數(shù)組、而不是列表,來存儲(chǔ)轉(zhuǎn)換后的新對(duì)象。畢竟,線程在完成了其操作后,數(shù)組就會(huì)被轉(zhuǎn)換為輸出列表。同樣地,新的方法會(huì)根據(jù)該策略,被添加到AsynchronousExecutor類中。請(qǐng)參見如下代碼段:
Java
public class AsynchronousExecutor{
public void processShallowPartitionArray(ListinputList, ElementConverter converter)
var chunkSize = (inputList.size() % this.numThreads == 0) ?
(inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
Object[] outputArr = new Object[inputList.size()];
IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {
var fromIndex = t * chunkSize;
var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
if (fromIndex > toIndex) {
fromIndex = toIndex;
}
IntStream.range(fromIndex, toIndex)
.forEach(i -> outputArr[i] = converter.apply(inputList.get(i)));
}));
this.shutdown();
this.outputList = (List) Arrays.asList(outputArr);
}
}
7.策略比較
我們規(guī)定,每個(gè)策略的CPU時(shí)間都是通過取5次執(zhí)行的平均值來進(jìn)行計(jì)算。而每次執(zhí)行都會(huì)生成1百萬和1千萬個(gè)隨機(jī)字符串的對(duì)象集合。上述代碼被跑在Ubuntu 20.04 LTS 64位的操作系統(tǒng)上。該系統(tǒng)的主機(jī)具有12 GB的RAM和3.40 GHz的Intel Xeon E3-1240 V3 CPU。該CPU為4內(nèi)核雙線程。其結(jié)果如下圖所示:
我們可以看到,第一行結(jié)果是由原生順序流實(shí)現(xiàn)了最高的CPU時(shí)間。實(shí)際上,它已經(jīng)被添加到了建立初始化性能參數(shù)的測試中。接著,我們簡單地將策略更改為原生并行流,1百萬個(gè)對(duì)象的集合提升了約34.4%,而1千萬個(gè)對(duì)象集合也提升了44%。下面,原生并行流策略的性能將被作為其他三種策略的參考標(biāo)準(zhǔn)。
如上圖所示,對(duì)于1百萬個(gè)對(duì)象的集合而言,我們并沒有觀察到基于列表的淺分區(qū)策略在CPU時(shí)間上的減少(只有約7%的細(xì)微改進(jìn)),而子列表分區(qū)策略的性能則更差。那么,亮點(diǎn)便是基于數(shù)組的淺分區(qū)。它大幅減少了約35.4%的CPU時(shí)間。
另一方面,對(duì)于1千萬個(gè)對(duì)象的集合而言,所有三種策略都優(yōu)于并行流時(shí)間。其中,子列表分區(qū)實(shí)現(xiàn)了最佳的性能改進(jìn),它將CPU的執(zhí)行時(shí)間減少了約20.5%。當(dāng)然,基于陣列的淺分區(qū)策略也有相似的性能提升,它將CPU時(shí)間提高了近20%。
由于基于數(shù)組的淺分區(qū)策略在兩種集合大小下都表現(xiàn)出了不俗的性能,因此我們有必要對(duì)其“加速比”進(jìn)行分析。此處的“加速比”是通過T(1)/T(p)的比率計(jì)算出來的。其中T表示有p個(gè)線程正在運(yùn)行的程序所用到的CPU時(shí)間。而T(1)對(duì)應(yīng)的是我們順序執(zhí)行程序所需要的時(shí)間。下面便是我為該策略繪制線程數(shù)與加速比的結(jié)果圖表。
由于上述所有測試都是在4核雙線程的主機(jī)上進(jìn)行的,我們?cè)A(yù)見在八線程上,該策略的加速比會(huì)更加明顯。不過由上述圖表可知,該算法最大加速比的峰值為4.4倍。同樣,1千萬個(gè)對(duì)象的集合也達(dá)到了非常相似的加速比。在此,我就沒必要重新繪制圖表了。這就意味著該策略不會(huì)根據(jù)用到的線程數(shù),線性地提高其整體性能。
8.小結(jié)
雖然原生順序流的使用,為我們提供了一個(gè)可靠的初始參考,來加速大型集合的處理。但是作為替代性嘗試的并行化策略,則能夠?qū)崿F(xiàn)更好的性能。而上文介紹的三種不同算法,能夠?yàn)槟銕砀鼉?yōu)越的并行流性能。你可以在GitHub存儲(chǔ)庫獲得其完整的源代碼。它是一個(gè)Maven項(xiàng)目,其對(duì)應(yīng)的特定模塊為dzone-async-exec。
原文鏈接:https://dzone.com/articles/speeding-up-large-collections-processing-in-java
譯者介紹
陳峻 (Julian Chen),社區(qū)編輯,具有十多年的IT項(xiàng)目實(shí)施經(jīng)驗(yàn),善于對(duì)內(nèi)外部資源與風(fēng)險(xiǎn)實(shí)施管控,專注傳播網(wǎng)絡(luò)與信息安全知識(shí)與經(jīng)驗(yàn);持續(xù)以博文、專題和譯文等形式,分享前沿技術(shù)與新知;經(jīng)常以線上、線下等方式,開展信息安全類培訓(xùn)與授課。
網(wǎng)頁標(biāo)題:如何加快Java中大型集合的處理
文章起源:http://fisionsoft.com.cn/article/dphpshd.html


咨詢
建站咨詢
