Spring聲明式事務(wù)源碼詳解
維護公司之前的后臺管理系統(tǒng),在開發(fā)自測時發(fā)現(xiàn)mock接口失敗,數(shù)據(jù)庫仍插入成功。經(jīng)排查發(fā)現(xiàn)代碼中沒有指定具體事務(wù),在添加注解@Transactional后事務(wù)按預(yù)期生效回滾。為此,本文通過分析源碼來介紹下事務(wù)失效的根源。
1.事務(wù)的幾個小小例子
1.1 未添加事務(wù),異常未回滾
代碼未執(zhí)行前數(shù)據(jù)庫信息:
圖片
@Component
public class TransactionalTest {
@Resource
BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;
public void onAddTransactionToException() {
BasicPriceUploadRecord base = new BasicPriceUploadRecord();
base.setErrorMsg("未添加注解事務(wù)拋異常");
base.setId(1824040965380245002L);
basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
throw new RuntimeException();
}
}
代碼執(zhí)行后數(shù)據(jù)庫信息:
圖片
雖然拋出異常,但數(shù)據(jù)庫仍修改成功了。
1.2 添加注解事務(wù),異?;貪L
代碼未執(zhí)行前數(shù)據(jù)庫信息:
圖片
@Component
public class TransactionalTest {
@Resource
BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;
/**
* 添加聲明式注解,出現(xiàn)異常情況
*/
@Transactional(transactionManager = "valuationTransactionManager", rollbackFor = Exception.class)
public void addTransactionToException() {
BasicPriceUploadRecord base = new BasicPriceUploadRecord();
base.setErrorMsg("添加注解事務(wù)拋異常");
base.setId(1824040965380245002L);
basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
throw new RuntimeException();
}
}
代碼執(zhí)行后數(shù)據(jù)庫信息:
圖片
沒有修改成功,證明添加注解事務(wù)生效,進行了事務(wù)回滾操作。
通過上述簡單例子,我們了解下聲明式事務(wù)@Transactional的一些相關(guān)信息。
2. Transactional
2.1 注解定義
是Spring框架中用于聲明式事務(wù)管理的一個注解,主要通過 AOP(面向切面編程)來實現(xiàn)事務(wù)管理??梢员粦?yīng)用于接口定義、接口方法、類定義或類的public方法上。
2.2 常用屬性
value|transactionManager:指定事務(wù)管理器名稱
propagation :事務(wù)的傳播行為
isolation: 事務(wù)的隔離級別
timeout: 事務(wù)的超時時間
readOnly: 事務(wù)是否為只讀
rollbackFor:一個異常類數(shù)組,遇到這些異常時事務(wù)回滾
noRollbackFor:一個異常類數(shù)組,遇到這些異常時事務(wù)不回滾
事務(wù)的傳播行為:
required:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則創(chuàng)建一個新的事務(wù)
supports:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則以非事務(wù)方式執(zhí)行
mandatory:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則拋出異常
requires_new:創(chuàng)建一個新的事務(wù),并且掛起當前事務(wù)
not_supported:以非事務(wù)方式執(zhí)行,并且掛起當前事務(wù)
never:以非事務(wù)方式執(zhí)行,如果存在事務(wù),則拋出異常
nested:當前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行;如果當前沒有事務(wù),則其效果與required相同
事務(wù)的隔離級別:在spring中,用于控制事務(wù)之間的并發(fā)訪問,以減少或避免事務(wù)之間的數(shù)據(jù)不一致問題。開發(fā)中基本都是 default 級別
default:使用底層數(shù)據(jù)庫默認的隔離級別
read_uncommitted:最低級別的隔離,事務(wù)可以看到其他未提交事務(wù)的數(shù)據(jù)
read_committed:事務(wù)只能看到其他已提交事務(wù)的數(shù)據(jù),可以避免臟讀
repeatable_read:事務(wù)在整個過程中多次讀取同一數(shù)據(jù)時,結(jié)果是一致的,可以避免不可重復(fù)讀
serializable:最高級別的隔離,事務(wù)之間完全隔離,可以避免幻讀
3. 源碼分析
以下源碼均基于Spring4.3.12版本。主要從 創(chuàng)建事務(wù)、開啟事務(wù)、提交事務(wù)、事務(wù)回滾 的維度來詳細分析聲明式事務(wù)。
3.1 事務(wù)簡易流程圖
圖片
3.2 代理類生成
在Spring框架中,當你配置了事務(wù)管理器并聲明了@Transactional注解時,Spring會在實例化bean時生成事務(wù)增強的代理類。創(chuàng)建代理類參考源碼路徑如下:
AbstractAutowireCapableBeanFactory.createBean=>
doCreateBean()=>
initializeBean()=>
applyBeanPostProcessorsAfterInitialization()=>
postProcessAfterInitialization()(BeanPostProcessor內(nèi)接口)=>
AbstractAutoProxyCreator.postProcessAfterInitialization()=>
wrapIfNecessary()=>
createProxy() 中 proxyFactory.setProxyTargetClass(true); //是否對類進行代理的設(shè)置,true為cglib代理
3.3 代理類中方法執(zhí)行入口
從TransactionInterceptor.invoke()方法開始分析 (獲取代理類,調(diào)用父類TransactionAspectSupport.invokeWithinTransaction()方法,該方法會將代理類的方法納入事務(wù)中)。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 返回代理類的目標類
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
//事務(wù)中執(zhí)行被代理的方法
return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
}
3.4 主要核心邏輯
TransactionAspectSupport.invokeWithinTransaction()方法負責(zé)獲取事務(wù)屬性和事務(wù)管理器,然后針對聲明式事務(wù)和編程式事務(wù)區(qū)分處理流程(此處源碼忽略編程式事務(wù))。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
// 獲取事務(wù)屬性 TransactionDefinition對象(回顧規(guī)則,隔離級別,只讀等)
final TransactionAttribute txAttr = this.getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 根據(jù)事務(wù)屬性和方法,獲取對應(yīng)的事務(wù)管理器,(后續(xù)用于做事務(wù)的提交,回滾等操作),數(shù)據(jù)庫的一些信息,
final PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
// 獲取事務(wù)方法全路徑,
final String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
//響應(yīng)式編程事務(wù),大多數(shù)情況下都會執(zhí)行到 else中的語句;
// CallbackPreferringPlatformTransactionManager 可以通過回掉函數(shù)來處理事務(wù)的提交和回滾操作, 此處不考慮,此處源碼可以忽略
if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
// 此處省略,此處為編程式事務(wù) 處理邏輯
} else {
//創(chuàng)建事務(wù),事務(wù)屬性等信息會被保存進 TransactionInfo,便于后續(xù)流程中的提交和回滾操作,詳情見下文
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 執(zhí)行目標的方法 (執(zhí)行具體的業(yè)務(wù)邏輯)
retVal = invocation.proceedWithInvocation();
} catch (Throwable var15) {
//異常處理
this.completeTransactionAfterThrowing(txInfo, var15);
throw var15;
} finally {
//清除當前節(jié)點的事務(wù)消息,將舊事務(wù)節(jié)點消息通過ThreadLoacl更新到當前線程(事務(wù)的掛起操作就是在這執(zhí)行)
this.cleanupTransactionInfo(txInfo);
}
//提交事務(wù)
this.commitTransactionAfterReturning(txInfo);
return retVal;
}
}
3.4.1 開啟事務(wù)
TransactionAspectSupport.createTransactionIfNecessary() 方法作用是檢查當前是否存在事務(wù),如果存在,則根據(jù)一定的規(guī)則創(chuàng)建一個新的事務(wù)。
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
//如果事務(wù)名稱不為空,則使用方法唯一標識。并使用 DelegatingTransactionAttribute 封裝 txAttr
if (txAttr != null && ((TransactionAttributerollbackOn)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 獲取事務(wù)狀態(tài)。內(nèi)部判斷是否開啟事務(wù)綁定線程與數(shù)據(jù)庫連接。詳情見下文
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
//構(gòu)建事務(wù)消息,根據(jù)指定的屬性與狀態(tài)status 構(gòu)建一個 TransactionInfo。將已經(jīng)建立連接的事務(wù)所有信息,都記錄在ThreadLocal下的TransactionInfo 實例中,包括目標方法的所有狀態(tài)信息,如果事務(wù)執(zhí)行失敗,spring會根據(jù)TransactionInfo中的信息來進行回滾等后續(xù)操作
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
1)AbstractPlatformTransactionManager.getTransaction() 獲取當前事務(wù)對象。通過這個方法,可以獲取到關(guān)于事務(wù)的詳細信息,如事務(wù)的狀態(tài)、相關(guān)屬性等。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//definition 中存儲的事務(wù)的注解信息,超時時間和隔離級別等
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 獲取當前事務(wù)
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判斷當前線程是否存在事務(wù)
if (isExistingTransaction(transaction)) {
// 處理已經(jīng)存在的事務(wù)
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 事務(wù)超時設(shè)置驗證,超時時間小于-1 拋異常
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果當前線程不存在事務(wù)且 事務(wù)傳播行為是 MANDATORY(用當前事務(wù),如果當前沒有事務(wù),則拋出異常) 拋異常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//以下三種事務(wù)傳播行為 需要開啟新事務(wù)
else if (def.getPropagationBehavior() == TransactionDefinition.propagation_required ||
def.getPropagationBehavior() == TransactionDefinition.propagation_requires_new ||
def.getPropagationBehavior() == TransactionDefinition.propagation_nested) {
//掛起原事務(wù),因為這里不存在原事務(wù) 故設(shè)置為null。
//當一個事務(wù)方法內(nèi)部調(diào)用了另一個事務(wù)方法時,如果第二個事務(wù)方法需要獨立于第一個事務(wù)方法,那么可以使用 suspend 方法來掛起當前事務(wù),然后再開始一個新的事務(wù)
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
try {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//開啟事務(wù)
this.doBegin(transaction, (TransactionDefinition)definition);
//同步事務(wù)狀態(tài)及書屋屬性
this.prepareSynchronization(status, (TransactionDefinition)definition);
return status;
} catch (RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);//0
//創(chuàng)建默認狀態(tài) 詳情見 下文
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
1.1AbstractPlatformTransactionManager.doGetTransaction() 方法可能用于執(zhí)行獲取事務(wù)的具體操作。它可能會根據(jù)一些條件或規(guī)則,去查找和獲取當前的事務(wù)對象,并進行相應(yīng)的處理。
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
//是否允許在一個事務(wù)內(nèi)部開啟另一個事務(wù)。
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
// this.dataSource數(shù)據(jù)源 配置
//判斷當前線程如果已經(jīng)記錄數(shù)據(jù)庫連接則使用原連接
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
//false 表示不是新創(chuàng)建連接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
1.1.1this.dataSource() 是我們配置DataSourceTransactionManager時傳入的。
<bean id="valuationTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="valuationDataSource"/>
</bean>
1.1.2TransactionSynchronizationManager.getResource() 方法的作用主要是獲取與當前事務(wù)相關(guān)聯(lián)的資源。TransactionSynchronizationManager 持有一個ThreadLocal的實例,存在一個key為dataSource ,value為ConnectionHolder 的Map信息。
//ThreadLocal 存放 ConnectionHolder 信息,ConnectionHolder 可以理解為Connection(數(shù)據(jù)庫連接)的包裝類,其中最主要屬性為 Connection
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
// 獲取ConnectionHolder
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
//獲取連接信息
Object value = doGetResource(actualKey);
return value;
}
//具體執(zhí)行獲取連接信息操作
private static Object doGetResource(Object actualKey) {
//從 ThreadLoacl中獲取
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
return null;
} else {
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
1.2AbstractPlatformTransactionManager.isExistingTransaction() 方法用于判斷是否存在正在進行的事務(wù)。它可以幫助我們確定當前的執(zhí)行環(huán)境是否處于事務(wù)中,以便進行相應(yīng)的處理。
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
return txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive();
}
1.3AbstractPlatformTransactionManager.suspend() 掛起事務(wù),對有無同步的事務(wù)采取不同方案,doSuspend()執(zhí)行掛起具體操作。
protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
//如果有同步的事務(wù),則優(yōu)先掛起同步事務(wù)
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List suspendedSynchronizations = this.doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
//執(zhí)行掛起操作
suspendedResources = this.doSuspend(transaction);
}
//重置事務(wù)名稱
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName((String)null);
//重置只讀狀態(tài)
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
//重置隔離級別
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);
//重置事務(wù)激活狀態(tài)
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
//返回掛起的事務(wù)
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException var8) {
this.doResumeSynchronization(suspendedSynchronizations);
throw var8;
}
} else if (transaction != null) {
Object suspendedResources = this.doSuspend(transaction);
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);
} else {
return null;
}
}
1.3.1 AbstractPlatformTransactionManager.doSuspend()執(zhí)行掛起操作只是將當前ConnectionHolder設(shè)置為null,返回原有事務(wù)消息,方便后續(xù)恢復(fù)原有事務(wù)消息,并將當前正在進行的事務(wù)信息進行重置。
protected Object doSuspend(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
txObject.setConnectionHolder((ConnectionHolder)null);
//接觸綁定
return TransactionSynchronizationManager.unbindResource(this.dataSource);
}
1.3.1.1TransactionSynchronizationManager.unbindResource()解除綁定操作,將現(xiàn)有的事務(wù)消息remove并返回上一級
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
//解綁操作,移除資源
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
} else {
return value;
}
}
1.4 AbstractPlatformTransactionManager.doBegin()數(shù)據(jù)庫連接獲取,當新事務(wù)時,則獲取新的數(shù)據(jù)庫連接,并為其設(shè)置隔離級別,是否只讀等屬性。
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
//新事務(wù)開啟時將 ConnectionHolder 設(shè)置為null
if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//獲取新的數(shù)據(jù)庫連接
Connection newCon = this.dataSource.getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//設(shè)置事務(wù)隔離級別 和readOnly屬性
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
// 交給Spring控制事務(wù)提交
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
//設(shè)置當前線程的事務(wù)激活狀態(tài)
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
// 設(shè)置超時時間
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
1.5AbstractPlatformTransactionManager.prepareTransactionStatus()創(chuàng)建默認Status,如果不需要開始事務(wù) (比如SUPPORTS),則返回一個默認的狀態(tài)。
protected final DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
this.prepareSynchronization(status, definition);
return status;
}
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
//創(chuàng)建 DefaultTransactionStatus 對象
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
1.6AbstractPlatformTransactionManager.handleExistingTransaction()針對不同的傳播行為做不同的處理方法,比如掛起原事務(wù)開啟新事務(wù)等等。
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
//當傳播行為是 NEVER 時拋出異常
if (definition.getPropagationBehavior() == 5) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
} else {
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
boolean newSynchronization;
//當傳播方式為NOT_SUPPORTED 時掛起當前事務(wù),然后在無事務(wù)的狀態(tài)下運行
if (definition.getPropagationBehavior() == 4) {
//掛起事務(wù)
suspendedResources = this.suspend(transaction);
newSynchronization = this.getTransactionSynchronization() == 0;
//返回默認status
return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
//當傳播方式為REQUIRES_NEW時,掛起當前事務(wù),然后啟動新事務(wù)
} else if (definition.getPropagationBehavior() == 3) {
//掛起原事務(wù)
suspendedResources = this.suspend(transaction);
try {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//啟動新的事務(wù)
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
} catch (Error|RuntimeException var7) {
this.resumeAfterBeginException(transaction, suspendedResources, var7);
throw var7;
}
} else {
boolean newSynchronization;
//當傳播方式為NESTED時,設(shè)置事務(wù)的保存點
//存在事務(wù),將該事務(wù)標注保存點,形成嵌套事務(wù)
//嵌套事務(wù)中的子事務(wù)出現(xiàn)異常不會影響到父事務(wù)保存點之前的操作
if (definition.getPropagationBehavior() == 6) {
if (!this.isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
} else {
if (this.useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
//創(chuàng)建保存點,回滾時,只回滾到該保存點
status.createAndHoldSavepoint();
return status;
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
//如果不支持保存點,就啟動新的事務(wù)
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
}
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
}
}
}
}
3.4.2 回滾事務(wù)
TransactionAspectSupport.completeTransactionAfterThrowing() 判斷事務(wù)是否存在,如不存在就不需要回滾,如果存在則在判斷是否滿足回滾條件。
protected void completeTransactionAfterThrowing(TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
//判斷是否存在事務(wù)
if (txInfo != null && txInfo.hasTransaction()) {
// 判斷是否滿足回滾條件。拋出的異常類型,和定義的回滾規(guī)則進行匹配
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滾處理
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
//省略代碼
} else {
try {
//不滿足回滾條件 出現(xiàn)異常
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
//省略代碼
}
}
}
1 AbstractPlatformTransactionManager.rollback()當在事務(wù)執(zhí)行過程中出現(xiàn)異常或其他需要回滾的情況時,就會調(diào)用這個方法,將事務(wù)進行回滾操作,撤銷之前所做的數(shù)據(jù)庫操作,以保證數(shù)據(jù)的一致性。
public final void rollback(TransactionStatus status) throws TransactionException {
//判斷事務(wù)是否已經(jīng)完成,回滾時拋出異常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
// 執(zhí)行回滾操作。
this.processRollback(defStatus);
}
}
1.1AbstractPlatformTransactionManager.processRollback()方法主要用于處理事務(wù)的回滾操作。通過這個方法,可以確保事務(wù)在需要回滾時能夠正確地執(zhí)行回滾操作,保持數(shù)據(jù)的完整性。
private void processRollback(DefaultTransactionStatus status) {
try {
try {
//解綁線程和會話綁定關(guān)系
this.triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
//如果有保存點(當前事務(wù)為單獨的線程則會退到保存點)
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
//如果是新事務(wù)直接回滾。調(diào)用數(shù)據(jù)庫連接并調(diào)用rollback方法進行回滾。使用底層數(shù)據(jù)庫連接提供的API
this.doRollback(status);
} else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || !this.isGlobalRollbackOnParticipationFailure()) {
//如果當前事務(wù)不是獨立的事務(wù),則只能等待事務(wù)鏈執(zhí)行完成后再做回滾操作
this.doSetRollbackOnly(status);
}
}
}
//catch 等代碼
// 關(guān)閉會話,重置屬性
this.triggerAfterCompletion(status, 1);
} finally {
//清理并恢復(fù)掛起的事務(wù)
this.cleanupAfterCompletion(status);
}
}
3.4.3 提交事務(wù)
TransactionAspectSupport.commitTransactionAfterReturning() 基本上和回滾一樣,都是先判斷是否有事務(wù),在操作提交。
protected void commitTransactionAfterReturning(TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
//提交事務(wù)
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
1) AbstractPlatformTransactionManager.commit() 創(chuàng)建默認Status prepareTransactionStatu,發(fā)現(xiàn)是否有回滾標記,然后進行回滾。如果判斷無需回滾就可以直接提交。
public final void commit(TransactionStatus status) throws TransactionException {
// 事務(wù)狀態(tài)已完成則拋異常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
//發(fā)現(xiàn)回滾標記
if (defStatus.isLocalRollbackOnly()) {
//回滾
this.processRollback(defStatus);
} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
//回滾
this.processRollback(defStatus);
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} else {
// 提交操作
this.processCommit(defStatus);
}
}
}
1.1 AbstractPlatformTransactionManager.processCommit()處理事務(wù)的提交操作
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
this.prepareForCommit(status);
this.triggerBeforeCommit(status);
this.triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
//釋放保存點信息
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 是一個新的事務(wù) 則提交。 獲取數(shù)據(jù)庫連接后使用數(shù)據(jù)庫API進行提交事務(wù)
this.doCommit(status);
}
if (globalRollbackOnly) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (TransactionException var20) {
if (this.isRollbackOnCommitFailure()) {
//提交異常回滾
this.doRollbackOnCommitException(status, var20);
} else {
this.triggerAfterCompletion(status, 2);
}
throw var20;
}
//省略其它異常攔截
try {
this.triggerAfterCommit(status);
} finally {
this.triggerAfterCompletion(status, 0);
}
} finally {
// 清理事務(wù)消息
this.cleanupAfterCompletion(status);
}
}
3.4.4 清除事務(wù)信息
AbstractPlatformTransactionManager.cleanupAfterCompletion() 這個方法主要用于在事務(wù)完成后進行清理工作。它可能會負責(zé)釋放資源、清理臨時數(shù)據(jù)等,以確保系統(tǒng)處于良好的狀態(tài)。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
//將當前事務(wù)設(shè)置為完成狀態(tài)
status.setCompleted();
if (status.isNewSynchronization()) {
// 清空當前事務(wù)消息
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
//如果是新事務(wù) 則在事務(wù)完成之后做清理操作
this.doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
// 將原事務(wù)從掛起狀態(tài)恢復(fù)
this.resume(status.getTransaction(), (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());
}
}
1AbstractPlatformTransactionManager.doCleanupAfterCompletion()在新事務(wù)完成后會調(diào)用resetConnectionAfterTransaction方法重置數(shù)據(jù)庫連接信息,并判單如果是新的數(shù)據(jù)庫連接則將其放回連接池。
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
if (txObject.isNewConnectionHolder()) {
// 將數(shù)據(jù)庫連接從當前線程中解除綁定
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
// 恢復(fù)數(shù)據(jù)庫連接的autoCommit狀態(tài)
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 負責(zé)重置數(shù)據(jù)庫連接信息,包括隔離級別、readOnly屬性等
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
// 如果是新的數(shù)據(jù)庫連接則將數(shù)據(jù)庫連接放回連接池
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
2) AbstractPlatformTransactionManager.resume() 如果事務(wù)執(zhí)行前有事務(wù)掛起,那么當前事務(wù)執(zhí)行結(jié)束后需要將掛起的事務(wù)恢復(fù),掛起事務(wù)時保存了原事務(wù)信息,重置了當前事務(wù)信息,所以恢復(fù)操作就是將當前的事務(wù)信息設(shè)置為之前保存的原事務(wù)信息。
protected final void resume(Object transaction, AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 執(zhí)行 恢復(fù)掛起事務(wù) ,綁定資源bindResource
this.doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
this.doResumeSynchronization(suspendedSynchronizations);
}
}
}
3) TransactionAspectSupport.cleanupTransactionInfo()清除當前節(jié)點的事務(wù)消息,將舊事務(wù)節(jié)點信息通過thradLoacl更新到當前線程。
protected void cleanupTransactionInfo(TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null) {
//從當前線程的 ThreadLocal 獲取上層的事務(wù)信息,將當前事務(wù)出棧,繼續(xù)執(zhí)行上層事務(wù)
txInfo.restoreThreadLocalStatus();
}
}
private void restoreThreadLocalStatus() {
//當前事務(wù)處理完之后,恢復(fù)上層事務(wù)上下文
TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}
總結(jié)
如果方法正常執(zhí)行完成且沒有異常,調(diào)用commitTransactionAfterReturning()方法。如果執(zhí)行中出現(xiàn)異常,調(diào)用completeTransactionAfterThrowing()方法。
兩個方法內(nèi)部都會判斷是否存在事務(wù)以及是否滿足回滾條件來決定最終執(zhí)行提交操作還是回滾操作。
上述例子1.1中,未添加事務(wù),不受事務(wù)控制,因此修改操作生效。
上述例子1.2中,因加入事務(wù)注解,指定回滾異常類型,在completeTransactionAfterThrowing()中邏輯判斷需要回滾,調(diào)用txInfo.getTransactionManager().rollback()方法執(zhí)行回滾操作,因此修改操作不生效。
常見事務(wù)失效的原因及解決方案
場景一:同類中非事務(wù)方法調(diào)用事務(wù)方法
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
// 事務(wù)邏輯
}
public void nonTransactionalCall() {
transactionalMethod(); // 非事務(wù)方法中調(diào)用事務(wù)方法
}
}
失效原因:事務(wù)是基于動態(tài)代理實現(xiàn)的,但本類中調(diào)用另一個方法默認是this調(diào)用關(guān)系,并非動態(tài)代理,故失效
解決方案:要么將操作移動到事務(wù)中,要么調(diào)用另一個Service中的事務(wù)方法
@Service
public class MyService {
@Resource
MyService2 myService2;
public void nonTransactionalCall() {
myService2.transactionalMethod(); // 非事務(wù)方法中調(diào)用事務(wù)方法
}
}
@Service
public class MyService2 {
@Transactional
public void transactionalMethod() {
// 事務(wù)邏輯
}
}
場景二:事務(wù)方法中拋出檢查型異常且未被正確處理
//失效場景
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
throw new Exception(); // 拋出檢查型異常
}
}
失效原因:@Transactional 注解默認處理RuntimeException,即只有拋出運行時異常,才會觸發(fā)事務(wù)回滾
解決方案:@Transactional 設(shè)置為 @Transactional(rollbackFor =Exception.class) 或者直接拋出運行時異常
解決方案1
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
throw new RuntimeException(); // 拋出非檢查型異常
}
}
解決方案2
@Service
public class MyService {
@Transactional(rollbackFor =Exception.class)
public void transactionalMethod() {
throw new Exception();
}
}
場景三:多線程問題
public class MyService {
@Resource
private ValuationMapper valuationMapper;
@Transactional
public void transactionalMethod() {
new Thread(() ->{
Valuation v = new Valuation();
v.setUserName("張三");
valuationMapper.insert(v);
}).start();
}
}
原因:Spring的事務(wù)是通過數(shù)據(jù)庫連接來實現(xiàn)不同線程使用不同的數(shù)據(jù)庫連接,且放在ThreadLocal中,基于同一個數(shù)據(jù)庫連接的事務(wù)才能同時提交或回滾,多線程場景下,拿到的數(shù)據(jù)庫連接不是同一個
解決方案:
1.采用分布式事務(wù)保證
2.自己實現(xiàn)事務(wù)回滾
場景四:錯誤使用事務(wù)傳播特性
public class MyService {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void methodA() {
// 一些事務(wù)操作
methodB();
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void methodB() {
// 這里不會執(zhí)行事務(wù)操作,因為 methodA 中傳播特性配置錯誤
}
public static void main(String[] args) {
MyService example = new MyService();
example.methodA();
}
}
原因:methodA 配置為 REQUIRES_NEW,而 methodB 配置為 NOT_SUPPORTED 不支持事務(wù)傳播特性,導(dǎo)致事務(wù)傳播特性失效
解決方案:修改 methodB 傳播特性
關(guān)于作者
張云剛 采貨俠JAVA開發(fā)工程師
大道至簡。再復(fù)雜的技術(shù),歸根結(jié)底也是在基礎(chǔ)技術(shù)之上實現(xiàn)的。