自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

詳解多線程與Spring事務

譯文 精選
開發(fā) 架構(gòu)
Spring的確可負責事務管理的所有底層實現(xiàn)細節(jié),而且不管你用的是什么持久層框架,如Hibernate、MyBatis,即便是JDBC也都提供了統(tǒng)一的事務模型,確保數(shù)據(jù)訪問方式的變更不會影響到代碼實現(xiàn)層面。

譯者 | 胥磊

審校 | 梁策 孫淑娟

作為開發(fā)人員,我們習慣于通過在public方法上添加@Transactional 注解來實現(xiàn)事務管理。大多數(shù)情況下,把事務的啟動、提交或者回滾全部交給Spring框架操作非常便捷,但如果認為這就是事務管理的全部,那就有失偏頗了。

Spring的確可負責事務管理的所有底層實現(xiàn)細節(jié),而且不管你用的是什么持久層框架,如Hibernate、MyBatis,即便是JDBC也都提供了統(tǒng)一的事務模型,確保數(shù)據(jù)訪問方式的變更不會影響到代碼實現(xiàn)層面。事務管理的良好封裝,一方面提升了開發(fā)效率,但同時也要注意到其降低了開發(fā)者了解底層原理的動機和意愿。捫心自問,我們真正了解在多線程環(huán)境中事務運行的機制嗎?例如在一個事務里面是否可以支持多個線程同時進行數(shù)據(jù)寫入?針對這個問題,網(wǎng)上很多論壇給出了確定的答案,但也不乏反饋@Transaction失效的聲音。

究其背后的根源是Spring實現(xiàn)事務通過ThreadLocal把事務和當前線程進行了綁定。ThreadLocal作為本地線程變量載體,保存了當前線程的變量,并確保所有變量是線程安全的。這些封閉隔離的變量中就包含了數(shù)據(jù)庫連接,Session管理的對象以及當前事務運行的其他必要信息,而開啟的新線程是獲取不到這些變量和對象的。不了解這些,事務內(nèi)部冒然啟用多線程,受限于業(yè)務場景,大多數(shù)情況下是不會有問題的,但是作為嚴謹?shù)拈_發(fā)萬不能忽視其潛在的風險。問題主要集中在兩個方面:一方面導致事務失效,看似是提高了處理效率,但是一旦有異常相關(guān)數(shù)據(jù)將不會回滾,就會破壞業(yè)務的完整性。另一方面還會增加死鎖的概率,無計劃的并發(fā)處理,增加資源爭搶的概率,其后果就是死鎖,產(chǎn)生的異常進一步破壞業(yè)務的完整性,得不償失。

難道就沒有提升事務內(nèi)處理性能的方法了?非也!雖然不能通過事務內(nèi),發(fā)起多線程處理。我們可以通過合理分塊后,再啟用多線程處理,通過類似分布式事務方式達到異曲同工的效果。

假設(shè)我們要并行處理一個大的對象列表,然后將它們存儲到數(shù)據(jù)庫中。我們先將這些對象分組,將每個塊傳遞給不同線程分別去調(diào)用加了事務的處理方法,最后將每個線程中處理的結(jié)果收集匯總。這樣通過事務的傳播機制既確保了業(yè)務的完整性,也通過并行處理提升了處理效率。下面通過具體的示例,逐步演示如何實現(xiàn)。

第一步:定義一個負責對象處理邏輯的服務接口。

/**

* Service interface defining the contract for object identifiers processing

*/

public interface ProcessingService {

/**

* Processes the list of objects identified by id and returns a an identifiers

* list of the successfully processed objects

*

* @param objectIds List of object identifiers

*

* @return identifiers list of the successfully processed objects

*/

List processObjects(List objectIds);

}

第二步:針對上述對象處理的接口的一個簡單實現(xiàn)。

/**
* Service implementation for database related ids processing
*/
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Transactional
@Override
public List processObjects(List objectIds) {
// Process and save to DB

logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());

return objectIds.stream().collect(Collectors.toList());
}
}

第三步:也是最核心的一步,通過分塊然后進行并行處理。當然為了保持代碼的整潔性和隔離性,我們將在后續(xù)具體實現(xiàn)中使用Decorator修飾模式。

/**
* Service implementation for parallel chunk processing
*/
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {

private ProcessingService delegate;

public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
this.delegate = delegate;
}

/**
* In a real scenario it should be an external configuration
*/
private int batchSize = 10;

@Override
public List<Integer> processObjects(List objectIds) {
List<List<Integer>> chuncks = getBatches(objectIds, batchSize);
List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());

return flatList(processedObjectIds);
}
private List> getBatches(List collection, int batchSize) {

return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)

.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))

.collect(Collectors.toList());

}

private List flatList(List> listOfLists) {

return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);

}

最后,我們通過一個簡單的單元測試驗證一下執(zhí)行的結(jié)果。

private List> getBatches(List collection, int batchSize) {

return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)

.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))

.collect(Collectors.toList());

}

private List flatList(List> listOfLists) {

return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);

}
}

通過輸出日志,我們看到如下的執(zhí)行結(jié)果:

ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]

執(zhí)行結(jié)果也是符合預期目標的。List對象分組后,除了主線程又通過ForkJoin啟動另外線程進行并行處理。ProcessingServiceParallelRunDecorator 的parallelStream().map的并行處理提升了處理性能,而ProcessingDBService中processObjects這個public方法上@Transactional的注解保證了業(yè)務完整性,問題得以完美解決。

譯者介紹

胥磊,51CTO社區(qū)編輯,某頭部電商技術(shù)副總監(jiān),關(guān)注Java后端開發(fā),技術(shù)管理,架構(gòu)優(yōu)化,分布式開發(fā)等領(lǐng)域。

原文標題:??Multi-Threading and Spring Transactions??,作者:Daniela Kolarova

責任編輯:武曉燕 來源: 51CTO
相關(guān)推薦

2023-10-08 08:29:31

2024-06-12 12:50:06

2024-11-13 19:03:14

2021-12-28 09:10:55

Java線程狀態(tài)

2023-10-18 15:19:56

2024-10-24 17:13:55

WinformUI多線程

2023-03-27 10:40:09

2009-06-08 17:56:00

SpringJDBC事務

2010-04-14 09:20:26

.NET多線程

2011-08-31 16:30:19

Lua多線程

2024-12-27 08:11:44

Python編程模式IO

2022-05-26 08:31:41

線程Java線程與進程

2022-09-29 09:19:04

線程池并發(fā)線程

2010-03-18 16:02:09

python 多線程

2015-07-08 09:56:25

Notificatio多線程

2018-11-16 15:35:10

Spring事務Java

2009-03-24 08:56:23

數(shù)據(jù)同步多線程Java

2017-05-27 20:59:30

Java多線程synchronize

2022-08-29 10:52:37

線程函數(shù)操作系統(tǒng)

2020-04-07 11:10:30

Python數(shù)據(jù)線程
點贊
收藏

51CTO技術(shù)棧公眾號