新聞中心
審校 | 梁策 孫淑娟

我們注重客戶提出的每個(gè)要求,我們充分考慮每一個(gè)細(xì)節(jié),我們積極的做好網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)服務(wù),我們努力開拓更好的視野,通過(guò)不懈的努力,創(chuàng)新互聯(lián)公司贏得了業(yè)內(nèi)的良好聲譽(yù),這一切,也不斷的激勵(lì)著我們更好的服務(wù)客戶。 主要業(yè)務(wù):網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)站設(shè)計(jì),小程序開發(fā),網(wǎng)站開發(fā),技術(shù)開發(fā)實(shí)力,DIV+CSS,PHP及ASP,ASP.Net,SQL數(shù)據(jù)庫(kù)的技術(shù)開發(fā)工程師。
作為開發(fā)人員,我們習(xí)慣于通過(guò)在public方法上添加@Transactional 注解來(lái)實(shí)現(xiàn)事務(wù)管理。大多數(shù)情況下,把事務(wù)的啟動(dòng)、提交或者回滾全部交給Spring框架操作非常便捷,但如果認(rèn)為這就是事務(wù)管理的全部,那就有失偏頗了。
Spring的確可負(fù)責(zé)事務(wù)管理的所有底層實(shí)現(xiàn)細(xì)節(jié),而且不管你用的是什么持久層框架,如Hibernate、MyBatis,即便是JDBC也都提供了統(tǒng)一的事務(wù)模型,確保數(shù)據(jù)訪問(wèn)方式的變更不會(huì)影響到代碼實(shí)現(xiàn)層面。事務(wù)管理的良好封裝,一方面提升了開發(fā)效率,但同時(shí)也要注意到其降低了開發(fā)者了解底層原理的動(dòng)機(jī)和意愿。捫心自問(wèn),我們真正了解在多線程環(huán)境中事務(wù)運(yùn)行的機(jī)制嗎?例如在一個(gè)事務(wù)里面是否可以支持多個(gè)線程同時(shí)進(jìn)行數(shù)據(jù)寫入?針對(duì)這個(gè)問(wèn)題,網(wǎng)上很多論壇給出了確定的答案,但也不乏反饋@Transaction失效的聲音。
究其背后的根源是Spring實(shí)現(xiàn)事務(wù)通過(guò)ThreadLocal把事務(wù)和當(dāng)前線程進(jìn)行了綁定。ThreadLocal作為本地線程變量載體,保存了當(dāng)前線程的變量,并確保所有變量是線程安全的。這些封閉隔離的變量中就包含了數(shù)據(jù)庫(kù)連接,Session管理的對(duì)象以及當(dāng)前事務(wù)運(yùn)行的其他必要信息,而開啟的新線程是獲取不到這些變量和對(duì)象的。不了解這些,事務(wù)內(nèi)部冒然啟用多線程,受限于業(yè)務(wù)場(chǎng)景,大多數(shù)情況下是不會(huì)有問(wèn)題的,但是作為嚴(yán)謹(jǐn)?shù)拈_發(fā)萬(wàn)不能忽視其潛在的風(fēng)險(xiǎn)。問(wèn)題主要集中在兩個(gè)方面:一方面導(dǎo)致事務(wù)失效,看似是提高了處理效率,但是一旦有異常相關(guān)數(shù)據(jù)將不會(huì)回滾,就會(huì)破壞業(yè)務(wù)的完整性。另一方面還會(huì)增加死鎖的概率,無(wú)計(jì)劃的并發(fā)處理,增加資源爭(zhēng)搶的概率,其后果就是死鎖,產(chǎn)生的異常進(jìn)一步破壞業(yè)務(wù)的完整性,得不償失。
難道就沒有提升事務(wù)內(nèi)處理性能的方法了?非也!雖然不能通過(guò)事務(wù)內(nèi),發(fā)起多線程處理。我們可以通過(guò)合理分塊后,再啟用多線程處理,通過(guò)類似分布式事務(wù)方式達(dá)到異曲同工的效果。
假設(shè)我們要并行處理一個(gè)大的對(duì)象列表,然后將它們存儲(chǔ)到數(shù)據(jù)庫(kù)中。我們先將這些對(duì)象分組,將每個(gè)塊傳遞給不同線程分別去調(diào)用加了事務(wù)的處理方法,最后將每個(gè)線程中處理的結(jié)果收集匯總。這樣通過(guò)事務(wù)的傳播機(jī)制既確保了業(yè)務(wù)的完整性,也通過(guò)并行處理提升了處理效率。下面通過(guò)具體的示例,逐步演示如何實(shí)現(xiàn)。
第一步:定義一個(gè)負(fù)責(zé)對(duì)象處理邏輯的服務(wù)接口。
/**
* Service interface defining the contract for object identifiers processing
*/
public interface ProcessingService {
/**
* Processes the list of objects identified by id and returns a an identifiers
* list of the successfully processed objects
*
* @param objectIds List of object identifiers
*
* @return identifiers list of the successfully processed objects
*/
List processObjects(List objectIds);
}
第二步:針對(duì)上述對(duì)象處理的接口的一個(gè)簡(jiǎn)單實(shí)現(xiàn)。
/**
* Service implementation for database related ids processing
*/
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Transactional
@Override
public List processObjects(List objectIds) {
// Process and save to DB
logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());
return objectIds.stream().collect(Collectors.toList());
}
}
第三步:也是最核心的一步,通過(guò)分塊然后進(jìn)行并行處理。當(dāng)然為了保持代碼的整潔性和隔離性,我們將在后續(xù)具體實(shí)現(xiàn)中使用Decorator修飾模式。
/**
* Service implementation for parallel chunk processing
*/
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {
private ProcessingService delegate;
public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
this.delegate = delegate;
}
/**
* In a real scenario it should be an external configuration
*/
private int batchSize = 10;
@Override
public ListprocessObjects(List objectIds) {
List> chuncks = getBatches(objectIds, batchSize);
List> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());
return flatList(processedObjectIds);
}
private List> getBatches(List collection, int batchSize) {
return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
.collect(Collectors.toList());
}
private List flatList(List> listOfLists) {
return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
}
最后,我們通過(guò)一個(gè)簡(jiǎn)單的單元測(cè)試驗(yàn)證一下執(zhí)行的結(jié)果。
private List> getBatches(List collection, int batchSize) {
return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
.collect(Collectors.toList());
}
private List flatList(List> listOfLists) {
return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
}
}通過(guò)輸出日志,我們看到如下的執(zhí)行結(jié)果:
ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]
執(zhí)行結(jié)果也是符合預(yù)期目標(biāo)的。List對(duì)象分組后,除了主線程又通過(guò)ForkJoin啟動(dòng)另外線程進(jìn)行并行處理。ProcessingServiceParallelRunDecorator 的parallelStream().map的并行處理提升了處理性能,而ProcessingDBService中processObjects這個(gè)public方法上@Transactional的注解保證了業(yè)務(wù)完整性,問(wèn)題得以完美解決。
譯者介紹
胥磊,社區(qū)編輯,某頭部電商技術(shù)副總監(jiān),關(guān)注Java后端開發(fā),技術(shù)管理,架構(gòu)優(yōu)化,分布式開發(fā)等領(lǐng)域。
原文標(biāo)題:??Multi-Threading and Spring Transactions??,作者:Daniela Kolarova
名稱欄目:詳解多線程與Spring事務(wù)
文章位置:http://fisionsoft.com.cn/article/cogiopp.html


咨詢
建站咨詢
