自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spring聲明式事務(wù)源碼詳解

數(shù)據(jù)庫 其他數(shù)據(jù)庫
維護公司之前的后臺管理系統(tǒng),在開發(fā)自測時發(fā)現(xiàn)mock接口失敗,數(shù)據(jù)庫仍插入成功。經(jīng)排查發(fā)現(xiàn)代碼中沒有指定具體事務(wù),在添加注解@Transactional后事務(wù)按預(yù)期生效回滾。為此,本文通過分析源碼來介紹下事務(wù)失效的根源。

維護公司之前的后臺管理系統(tǒng),在開發(fā)自測時發(fā)現(xiàn)mock接口失敗,數(shù)據(jù)庫仍插入成功。經(jīng)排查發(fā)現(xiàn)代碼中沒有指定具體事務(wù),在添加注解@Transactional后事務(wù)按預(yù)期生效回滾。為此,本文通過分析源碼來介紹下事務(wù)失效的根源。

1.事務(wù)的幾個小小例子

1.1 未添加事務(wù),異常未回滾

代碼未執(zhí)行前數(shù)據(jù)庫信息:

圖片圖片

@Component
public class TransactionalTest {
    @Resource
    BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;
    public void onAddTransactionToException() {
        BasicPriceUploadRecord base = new BasicPriceUploadRecord();
        base.setErrorMsg("未添加注解事務(wù)拋異常");
        base.setId(1824040965380245002L);
        basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
        throw new RuntimeException();
    }
}

代碼執(zhí)行后數(shù)據(jù)庫信息:

圖片圖片

雖然拋出異常,但數(shù)據(jù)庫仍修改成功了。

1.2 添加注解事務(wù),異?;貪L

代碼未執(zhí)行前數(shù)據(jù)庫信息:

圖片圖片

@Component
public class TransactionalTest {
    @Resource
    BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;

    /**
     * 添加聲明式注解,出現(xiàn)異常情況
     */
    @Transactional(transactionManager = "valuationTransactionManager", rollbackFor = Exception.class)
    public void addTransactionToException() {
        BasicPriceUploadRecord base = new BasicPriceUploadRecord();
        base.setErrorMsg("添加注解事務(wù)拋異常");
        base.setId(1824040965380245002L);
        basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
        throw new RuntimeException();
    }
}

代碼執(zhí)行后數(shù)據(jù)庫信息:

圖片圖片

沒有修改成功,證明添加注解事務(wù)生效,進行了事務(wù)回滾操作。

通過上述簡單例子,我們了解下聲明式事務(wù)@Transactional的一些相關(guān)信息。

2. Transactional

2.1 注解定義

是Spring框架中用于聲明式事務(wù)管理的一個注解,主要通過 AOP(面向切面編程)來實現(xiàn)事務(wù)管理??梢员粦?yīng)用于接口定義、接口方法、類定義或類的public方法上。

2.2 常用屬性

value|transactionManager:指定事務(wù)管理器名稱
    propagation :事務(wù)的傳播行為
    isolation: 事務(wù)的隔離級別
    timeout: 事務(wù)的超時時間
    readOnly: 事務(wù)是否為只讀
    rollbackFor:一個異常類數(shù)組,遇到這些異常時事務(wù)回滾
    noRollbackFor:一個異常類數(shù)組,遇到這些異常時事務(wù)不回滾

事務(wù)的傳播行為:
    required:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則創(chuàng)建一個新的事務(wù)
    supports:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則以非事務(wù)方式執(zhí)行
    mandatory:當前存在事務(wù),則加入該事務(wù);如果當前沒有事務(wù),則拋出異常
    requires_new:創(chuàng)建一個新的事務(wù),并且掛起當前事務(wù)
    not_supported:以非事務(wù)方式執(zhí)行,并且掛起當前事務(wù)
    never:以非事務(wù)方式執(zhí)行,如果存在事務(wù),則拋出異常
    nested:當前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行;如果當前沒有事務(wù),則其效果與required相同
      
事務(wù)的隔離級別:在spring中,用于控制事務(wù)之間的并發(fā)訪問,以減少或避免事務(wù)之間的數(shù)據(jù)不一致問題。開發(fā)中基本都是 default 級別
    default:使用底層數(shù)據(jù)庫默認的隔離級別
    read_uncommitted:最低級別的隔離,事務(wù)可以看到其他未提交事務(wù)的數(shù)據(jù)
    read_committed:事務(wù)只能看到其他已提交事務(wù)的數(shù)據(jù),可以避免臟讀
    repeatable_read:事務(wù)在整個過程中多次讀取同一數(shù)據(jù)時,結(jié)果是一致的,可以避免不可重復(fù)讀
    serializable:最高級別的隔離,事務(wù)之間完全隔離,可以避免幻讀

3. 源碼分析

以下源碼均基于Spring4.3.12版本。主要從 創(chuàng)建事務(wù)、開啟事務(wù)、提交事務(wù)、事務(wù)回滾 的維度來詳細分析聲明式事務(wù)。

3.1 事務(wù)簡易流程圖

圖片圖片

3.2 代理類生成

在Spring框架中,當你配置了事務(wù)管理器并聲明了@Transactional注解時,Spring會在實例化bean時生成事務(wù)增強的代理類。創(chuàng)建代理類參考源碼路徑如下:

AbstractAutowireCapableBeanFactory.createBean=>
            doCreateBean()=>
            initializeBean()=>
            applyBeanPostProcessorsAfterInitialization()=>
            postProcessAfterInitialization()(BeanPostProcessor內(nèi)接口)=> 
      AbstractAutoProxyCreator.postProcessAfterInitialization()=>
            wrapIfNecessary()=>
            createProxy() 中  proxyFactory.setProxyTargetClass(true); //是否對類進行代理的設(shè)置,true為cglib代理

3.3 代理類中方法執(zhí)行入口

從TransactionInterceptor.invoke()方法開始分析 (獲取代理類,調(diào)用父類TransactionAspectSupport.invokeWithinTransaction()方法,該方法會將代理類的方法納入事務(wù)中)。

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        // 返回代理類的目標類
        Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        //事務(wù)中執(zhí)行被代理的方法
        return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }
}

3.4 主要核心邏輯

TransactionAspectSupport.invokeWithinTransaction()方法負責(zé)獲取事務(wù)屬性和事務(wù)管理器,然后針對聲明式事務(wù)和編程式事務(wù)區(qū)分處理流程(此處源碼忽略編程式事務(wù))。

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
        // 獲取事務(wù)屬性 TransactionDefinition對象(回顧規(guī)則,隔離級別,只讀等)
        final TransactionAttribute txAttr = this.getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        // 根據(jù)事務(wù)屬性和方法,獲取對應(yīng)的事務(wù)管理器,(后續(xù)用于做事務(wù)的提交,回滾等操作),數(shù)據(jù)庫的一些信息,
        final PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
        // 獲取事務(wù)方法全路徑,
        final String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
        //響應(yīng)式編程事務(wù),大多數(shù)情況下都會執(zhí)行到 else中的語句;
        // CallbackPreferringPlatformTransactionManager 可以通過回掉函數(shù)來處理事務(wù)的提交和回滾操作, 此處不考慮,此處源碼可以忽略
        if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
        // 此處省略,此處為編程式事務(wù) 處理邏輯
        } else {
            //創(chuàng)建事務(wù),事務(wù)屬性等信息會被保存進 TransactionInfo,便于后續(xù)流程中的提交和回滾操作,詳情見下文
            TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // 執(zhí)行目標的方法 (執(zhí)行具體的業(yè)務(wù)邏輯)
                retVal = invocation.proceedWithInvocation();
            } catch (Throwable var15) {
                //異常處理
                this.completeTransactionAfterThrowing(txInfo, var15);
                throw var15;
            } finally {
                //清除當前節(jié)點的事務(wù)消息,將舊事務(wù)節(jié)點消息通過ThreadLoacl更新到當前線程(事務(wù)的掛起操作就是在這執(zhí)行)
                this.cleanupTransactionInfo(txInfo);
            }
            //提交事務(wù)
            this.commitTransactionAfterReturning(txInfo);
            return retVal;
        }
    }

3.4.1 開啟事務(wù)

TransactionAspectSupport.createTransactionIfNecessary() 方法作用是檢查當前是否存在事務(wù),如果存在,則根據(jù)一定的規(guī)則創(chuàng)建一個新的事務(wù)。

protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        //如果事務(wù)名稱不為空,則使用方法唯一標識。并使用 DelegatingTransactionAttribute 封裝 txAttr 
        if (txAttr != null && ((TransactionAttributerollbackOn)txAttr).getName() == null) {
            txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                // 獲取事務(wù)狀態(tài)。內(nèi)部判斷是否開啟事務(wù)綁定線程與數(shù)據(jù)庫連接。詳情見下文
                status = tm.getTransaction((TransactionDefinition)txAttr);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
            }
        }
        //構(gòu)建事務(wù)消息,根據(jù)指定的屬性與狀態(tài)status 構(gòu)建一個 TransactionInfo。將已經(jīng)建立連接的事務(wù)所有信息,都記錄在ThreadLocal下的TransactionInfo 實例中,包括目標方法的所有狀態(tài)信息,如果事務(wù)執(zhí)行失敗,spring會根據(jù)TransactionInfo中的信息來進行回滾等后續(xù)操作
        return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
    }

1)AbstractPlatformTransactionManager.getTransaction() 獲取當前事務(wù)對象。通過這個方法,可以獲取到關(guān)于事務(wù)的詳細信息,如事務(wù)的狀態(tài)、相關(guān)屬性等。

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
   throws TransactionException {
            
  //definition 中存儲的事務(wù)的注解信息,超時時間和隔離級別等
  TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  // 獲取當前事務(wù)
  Object transaction = doGetTransaction();
  boolean debugEnabled = logger.isDebugEnabled();
  // 判斷當前線程是否存在事務(wù) 
  if (isExistingTransaction(transaction)) {
   // 處理已經(jīng)存在的事務(wù)
   return handleExistingTransaction(def, transaction, debugEnabled);
  }

  // 事務(wù)超時設(shè)置驗證,超時時間小于-1 拋異常
  if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
   throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
  }

  // 如果當前線程不存在事務(wù)且 事務(wù)傳播行為是 MANDATORY(用當前事務(wù),如果當前沒有事務(wù),則拋出異常) 拋異常
  if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
   throw new IllegalTransactionStateException(
     "No existing transaction found for transaction marked with propagation 'mandatory'");
  }
  //以下三種事務(wù)傳播行為 需要開啟新事務(wù)
  else if (def.getPropagationBehavior() == TransactionDefinition.propagation_required ||
    def.getPropagationBehavior() == TransactionDefinition.propagation_requires_new ||
    def.getPropagationBehavior() == TransactionDefinition.propagation_nested) {
    //掛起原事務(wù),因為這里不存在原事務(wù) 故設(shè)置為null。
    //當一個事務(wù)方法內(nèi)部調(diào)用了另一個事務(wù)方法時,如果第二個事務(wù)方法需要獨立于第一個事務(wù)方法,那么可以使用 suspend 方法來掛起當前事務(wù),然后再開始一個新的事務(wù)
    AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
            try {
                boolean newSynchronization = this.getTransactionSynchronization() != 2;
                DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //開啟事務(wù)
                this.doBegin(transaction, (TransactionDefinition)definition);
                //同步事務(wù)狀態(tài)及書屋屬性
                this.prepareSynchronization(status, (TransactionDefinition)definition);
                return status;
            } catch (RuntimeException var7) {
                this.resume((Object)null, suspendedResources);
                throw var7;
            }
  }
  else {
   boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);//0
    //創(chuàng)建默認狀態(tài) 詳情見 下文
   return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  }
 }

1.1AbstractPlatformTransactionManager.doGetTransaction() 方法可能用于執(zhí)行獲取事務(wù)的具體操作。它可能會根據(jù)一些條件或規(guī)則,去查找和獲取當前的事務(wù)對象,并進行相應(yīng)的處理。

protected Object doGetTransaction() {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
        //是否允許在一個事務(wù)內(nèi)部開啟另一個事務(wù)。
        txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
        // this.dataSource數(shù)據(jù)源 配置
        //判斷當前線程如果已經(jīng)記錄數(shù)據(jù)庫連接則使用原連接
        ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
        //false 表示不是新創(chuàng)建連接
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

1.1.1this.dataSource() 是我們配置DataSourceTransactionManager時傳入的。

<bean id="valuationTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="valuationDataSource"/>
    </bean>

1.1.2TransactionSynchronizationManager.getResource() 方法的作用主要是獲取與當前事務(wù)相關(guān)聯(lián)的資源。TransactionSynchronizationManager 持有一個ThreadLocal的實例,存在一個key為dataSource ,value為ConnectionHolder 的Map信息。

//ThreadLocal 存放 ConnectionHolder 信息,ConnectionHolder 可以理解為Connection(數(shù)據(jù)庫連接)的包裝類,其中最主要屬性為  Connection
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");

 // 獲取ConnectionHolder
 public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        //獲取連接信息
        Object value = doGetResource(actualKey);
        return value;
    }
 //具體執(zhí)行獲取連接信息操作
 private static Object doGetResource(Object actualKey) {
        //從 ThreadLoacl中獲取
        Map<Object, Object> map = (Map)resources.get();
        if (map == null) {
            return null;
        } else {
            Object value = map.get(actualKey);
            if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
                map.remove(actualKey);
                if (map.isEmpty()) {
                    resources.remove();
                }
                value = null;
            }
            return value;
        }
    }

1.2AbstractPlatformTransactionManager.isExistingTransaction() 方法用于判斷是否存在正在進行的事務(wù)。它可以幫助我們確定當前的執(zhí)行環(huán)境是否處于事務(wù)中,以便進行相應(yīng)的處理。

protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        return txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive();
    }

1.3AbstractPlatformTransactionManager.suspend()  掛起事務(wù),對有無同步的事務(wù)采取不同方案,doSuspend()執(zhí)行掛起具體操作。

protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
        //如果有同步的事務(wù),則優(yōu)先掛起同步事務(wù)
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            List suspendedSynchronizations = this.doSuspendSynchronization();
            try {
                Object suspendedResources = null;
                if (transaction != null) {
                    //執(zhí)行掛起操作
                    suspendedResources = this.doSuspend(transaction);
                }
                //重置事務(wù)名稱
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName((String)null);
                //重置只讀狀態(tài)
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                //重置隔離級別
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);
                //重置事務(wù)激活狀態(tài)
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                //返回掛起的事務(wù)
                return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            } catch (RuntimeException var8) {
                this.doResumeSynchronization(suspendedSynchronizations);
                throw var8;
            } 
        } else if (transaction != null) {
            Object suspendedResources = this.doSuspend(transaction);
            return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);
        } else {
            return null;
        }
    }

1.3.1 AbstractPlatformTransactionManager.doSuspend()執(zhí)行掛起操作只是將當前ConnectionHolder設(shè)置為null,返回原有事務(wù)消息,方便后續(xù)恢復(fù)原有事務(wù)消息,并將當前正在進行的事務(wù)信息進行重置。

protected Object doSuspend(Object transaction) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        txObject.setConnectionHolder((ConnectionHolder)null);
        //接觸綁定 
        return TransactionSynchronizationManager.unbindResource(this.dataSource);
    }

1.3.1.1TransactionSynchronizationManager.unbindResource()解除綁定操作,將現(xiàn)有的事務(wù)消息remove并返回上一級

public static Object unbindResource(Object key) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        //解綁操作,移除資源
        Object value = doUnbindResource(actualKey);
        if (value == null) {
            throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        } else {
            return value;
        }
    }

1.4 AbstractPlatformTransactionManager.doBegin()數(shù)據(jù)庫連接獲取,當新事務(wù)時,則獲取新的數(shù)據(jù)庫連接,并為其設(shè)置隔離級別,是否只讀等屬性。

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        Connection con = null;

        try {
            //新事務(wù)開啟時將 ConnectionHolder 設(shè)置為null
            if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                //獲取新的數(shù)據(jù)庫連接
                Connection newCon = this.dataSource.getConnection();
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //設(shè)置事務(wù)隔離級別 和readOnly屬性
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                // 交給Spring控制事務(wù)提交
                con.setAutoCommit(false);
            }
            this.prepareTransactionalConnection(con, definition);
            //設(shè)置當前線程的事務(wù)激活狀態(tài)
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if (timeout != -1) {
                // 設(shè)置超時時間
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
            }

        } catch (Throwable var7) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder((ConnectionHolder)null, false);
            }

            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
        }
    }

1.5AbstractPlatformTransactionManager.prepareTransactionStatus()創(chuàng)建默認Status,如果不需要開始事務(wù) (比如SUPPORTS),則返回一個默認的狀態(tài)。

protected final DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
        DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
        this.prepareSynchronization(status, definition);
        return status;
    }
    
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
        boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
        //創(chuàng)建 DefaultTransactionStatus 對象
        return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
    }

1.6AbstractPlatformTransactionManager.handleExistingTransaction()針對不同的傳播行為做不同的處理方法,比如掛起原事務(wù)開啟新事務(wù)等等。

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
        //當傳播行為是 NEVER 時拋出異常
        if (definition.getPropagationBehavior() == 5) {
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
            boolean newSynchronization;
            //當傳播方式為NOT_SUPPORTED 時掛起當前事務(wù),然后在無事務(wù)的狀態(tài)下運行
            if (definition.getPropagationBehavior() == 4) {
                //掛起事務(wù)
                suspendedResources = this.suspend(transaction);
                newSynchronization = this.getTransactionSynchronization() == 0;
                //返回默認status
                return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
                //當傳播方式為REQUIRES_NEW時,掛起當前事務(wù),然后啟動新事務(wù)
            } else if (definition.getPropagationBehavior() == 3) {
                //掛起原事務(wù)
                suspendedResources = this.suspend(transaction);
                try {
                    newSynchronization = this.getTransactionSynchronization() != 2;
                    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    //啟動新的事務(wù)
                    this.doBegin(transaction, definition);
                    this.prepareSynchronization(status, definition);
                    return status;
                } catch (Error|RuntimeException var7) {
                    this.resumeAfterBeginException(transaction, suspendedResources, var7);
                    throw var7;
                } 
            } else {
                boolean newSynchronization;
                //當傳播方式為NESTED時,設(shè)置事務(wù)的保存點
                //存在事務(wù),將該事務(wù)標注保存點,形成嵌套事務(wù)
                //嵌套事務(wù)中的子事務(wù)出現(xiàn)異常不會影響到父事務(wù)保存點之前的操作
                if (definition.getPropagationBehavior() == 6) {
                    if (!this.isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
                    } else {
                        if (this.useSavepointForNestedTransaction()) {
                            DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
                            //創(chuàng)建保存點,回滾時,只回滾到該保存點
                            status.createAndHoldSavepoint();
                            return status;
                        } else {
                            newSynchronization = this.getTransactionSynchronization() != 2;
                            DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
                            //如果不支持保存點,就啟動新的事務(wù)
                            this.doBegin(transaction, definition);
                            this.prepareSynchronization(status, definition);
                            return status;
                        }
                    }
                } else {
                    newSynchronization = this.getTransactionSynchronization() != 2;
                    return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
                }
            }
        }
    }

3.4.2 回滾事務(wù)

TransactionAspectSupport.completeTransactionAfterThrowing() 判斷事務(wù)是否存在,如不存在就不需要回滾,如果存在則在判斷是否滿足回滾條件。

protected void completeTransactionAfterThrowing(TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
        //判斷是否存在事務(wù)
        if (txInfo != null && txInfo.hasTransaction()) {
            // 判斷是否滿足回滾條件。拋出的異常類型,和定義的回滾規(guī)則進行匹配
            if (txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    // 回滾處理
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                } 
            //省略代碼
            } else {
                try {
                    //不滿足回滾條件 出現(xiàn)異常 
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                } 
            //省略代碼
            }
        }

    }

1 AbstractPlatformTransactionManager.rollback()當在事務(wù)執(zhí)行過程中出現(xiàn)異常或其他需要回滾的情況時,就會調(diào)用這個方法,將事務(wù)進行回滾操作,撤銷之前所做的數(shù)據(jù)庫操作,以保證數(shù)據(jù)的一致性。

public final void rollback(TransactionStatus status) throws TransactionException {
        //判斷事務(wù)是否已經(jīng)完成,回滾時拋出異常
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
        } else {
            DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
            // 執(zhí)行回滾操作。
            this.processRollback(defStatus);
        }
    }

1.1AbstractPlatformTransactionManager.processRollback()方法主要用于處理事務(wù)的回滾操作。通過這個方法,可以確保事務(wù)在需要回滾時能夠正確地執(zhí)行回滾操作,保持數(shù)據(jù)的完整性。

private void processRollback(DefaultTransactionStatus status) {
        try {
            try {
                //解綁線程和會話綁定關(guān)系
                this.triggerBeforeCompletion(status);
                if (status.hasSavepoint()) {
                    //如果有保存點(當前事務(wù)為單獨的線程則會退到保存點)
                    status.rollbackToHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    //如果是新事務(wù)直接回滾。調(diào)用數(shù)據(jù)庫連接并調(diào)用rollback方法進行回滾。使用底層數(shù)據(jù)庫連接提供的API
                    this.doRollback(status);
                } else if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || !this.isGlobalRollbackOnParticipationFailure()) {
                        //如果當前事務(wù)不是獨立的事務(wù),則只能等待事務(wù)鏈執(zhí)行完成后再做回滾操作
                        this.doSetRollbackOnly(status);
                    } 
                } 
            } 
            //catch 等代碼
            // 關(guān)閉會話,重置屬性
            this.triggerAfterCompletion(status, 1);
        } finally {
            //清理并恢復(fù)掛起的事務(wù)
            this.cleanupAfterCompletion(status);
        }

    }

3.4.3 提交事務(wù)

TransactionAspectSupport.commitTransactionAfterReturning() 基本上和回滾一樣,都是先判斷是否有事務(wù),在操作提交。

protected void commitTransactionAfterReturning(TransactionAspectSupport.TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            //提交事務(wù)
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

1) AbstractPlatformTransactionManager.commit() 創(chuàng)建默認Status prepareTransactionStatu,發(fā)現(xiàn)是否有回滾標記,然后進行回滾。如果判斷無需回滾就可以直接提交。

public final void commit(TransactionStatus status) throws TransactionException {
        // 事務(wù)狀態(tài)已完成則拋異常
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
        } else {
            DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
            //發(fā)現(xiàn)回滾標記
            if (defStatus.isLocalRollbackOnly()) {
                //回滾
                this.processRollback(defStatus);
            } else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
                //回滾
                this.processRollback(defStatus);
                if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
                    throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
                }
            } else {
                // 提交操作
                this.processCommit(defStatus);
            }
        }
    }

1.1 AbstractPlatformTransactionManager.processCommit()處理事務(wù)的提交操作

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                this.prepareForCommit(status);
                this.triggerBeforeCommit(status);
                this.triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    //釋放保存點信息
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    // 是一個新的事務(wù) 則提交。 獲取數(shù)據(jù)庫連接后使用數(shù)據(jù)庫API進行提交事務(wù)
                    this.doCommit(status);
                }

                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
                }
            } catch (TransactionException var20) {
                if (this.isRollbackOnCommitFailure()) {
                    //提交異常回滾
                    this.doRollbackOnCommitException(status, var20);
                } else {
                    this.triggerAfterCompletion(status, 2);
                }

                throw var20;
            } 
            //省略其它異常攔截
            try {
                this.triggerAfterCommit(status);
            } finally {
                this.triggerAfterCompletion(status, 0);
            }
        } finally {
            // 清理事務(wù)消息
            this.cleanupAfterCompletion(status);
        }

    }

3.4.4 清除事務(wù)信息

AbstractPlatformTransactionManager.cleanupAfterCompletion() 這個方法主要用于在事務(wù)完成后進行清理工作。它可能會負責(zé)釋放資源、清理臨時數(shù)據(jù)等,以確保系統(tǒng)處于良好的狀態(tài)。

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        //將當前事務(wù)設(shè)置為完成狀態(tài)
        status.setCompleted();
        if (status.isNewSynchronization()) {
            // 清空當前事務(wù)消息
            TransactionSynchronizationManager.clear();
        }

        if (status.isNewTransaction()) {
            //如果是新事務(wù) 則在事務(wù)完成之后做清理操作
            this.doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
            // 將原事務(wù)從掛起狀態(tài)恢復(fù)
            this.resume(status.getTransaction(), (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());
        }
    }

1AbstractPlatformTransactionManager.doCleanupAfterCompletion()在新事務(wù)完成后會調(diào)用resetConnectionAfterTransaction方法重置數(shù)據(jù)庫連接信息,并判單如果是新的數(shù)據(jù)庫連接則將其放回連接池。

protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    if (txObject.isNewConnectionHolder()) {
        // 將數(shù)據(jù)庫連接從當前線程中解除綁定
        TransactionSynchronizationManager.unbindResource(this.dataSource);
    }
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        // 恢復(fù)數(shù)據(jù)庫連接的autoCommit狀態(tài)
        if (txObject.isMustRestoreAutoCommit()) {
            con.setAutoCommit(true); 
        }
        // 負責(zé)重置數(shù)據(jù)庫連接信息,包括隔離級別、readOnly屬性等
        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    }
    catch (Throwable ex) {
        logger.debug("Could not reset JDBC Connection after transaction", ex);
    }
    if (txObject.isNewConnectionHolder()) {
        // 如果是新的數(shù)據(jù)庫連接則將數(shù)據(jù)庫連接放回連接池
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }
    txObject.getConnectionHolder().clear();
}

2) AbstractPlatformTransactionManager.resume() 如果事務(wù)執(zhí)行前有事務(wù)掛起,那么當前事務(wù)執(zhí)行結(jié)束后需要將掛起的事務(wù)恢復(fù),掛起事務(wù)時保存了原事務(wù)信息,重置了當前事務(wù)信息,所以恢復(fù)操作就是將當前的事務(wù)信息設(shè)置為之前保存的原事務(wù)信息。

protected final void resume(Object transaction, AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {
        if (resourcesHolder != null) {
            Object suspendedResources = resourcesHolder.suspendedResources;
            if (suspendedResources != null) {
                // 執(zhí)行 恢復(fù)掛起事務(wù) ,綁定資源bindResource
                this.doResume(transaction, suspendedResources);
            }
            List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
            if (suspendedSynchronizations != null) {
                TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                this.doResumeSynchronization(suspendedSynchronizations);
            }
        }
    }

3) TransactionAspectSupport.cleanupTransactionInfo()清除當前節(jié)點的事務(wù)消息,將舊事務(wù)節(jié)點信息通過thradLoacl更新到當前線程。

protected void cleanupTransactionInfo(TransactionAspectSupport.TransactionInfo txInfo) {
        if (txInfo != null) {
            //從當前線程的 ThreadLocal 獲取上層的事務(wù)信息,將當前事務(wù)出棧,繼續(xù)執(zhí)行上層事務(wù)
            txInfo.restoreThreadLocalStatus();
        }
    }

 private void restoreThreadLocalStatus() {
            //當前事務(wù)處理完之后,恢復(fù)上層事務(wù)上下文 
            TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
        }

總結(jié)

如果方法正常執(zhí)行完成且沒有異常,調(diào)用commitTransactionAfterReturning()方法。如果執(zhí)行中出現(xiàn)異常,調(diào)用completeTransactionAfterThrowing()方法。

兩個方法內(nèi)部都會判斷是否存在事務(wù)以及是否滿足回滾條件來決定最終執(zhí)行提交操作還是回滾操作。

上述例子1.1中,未添加事務(wù),不受事務(wù)控制,因此修改操作生效。

上述例子1.2中,因加入事務(wù)注解,指定回滾異常類型,在completeTransactionAfterThrowing()中邏輯判斷需要回滾,調(diào)用txInfo.getTransactionManager().rollback()方法執(zhí)行回滾操作,因此修改操作不生效。

常見事務(wù)失效的原因及解決方案

場景一:同類中非事務(wù)方法調(diào)用事務(wù)方法

@Service
public class MyService {
    @Transactional
    public void transactionalMethod() {
        // 事務(wù)邏輯
    }

    public void nonTransactionalCall() {
        transactionalMethod(); // 非事務(wù)方法中調(diào)用事務(wù)方法
    }
}

失效原因:事務(wù)是基于動態(tài)代理實現(xiàn)的,但本類中調(diào)用另一個方法默認是this調(diào)用關(guān)系,并非動態(tài)代理,故失效
解決方案:要么將操作移動到事務(wù)中,要么調(diào)用另一個Service中的事務(wù)方法
@Service
public class MyService {
    
    @Resource
    MyService2 myService2;
    
    public void nonTransactionalCall() {
        myService2.transactionalMethod(); // 非事務(wù)方法中調(diào)用事務(wù)方法
    }
}

@Service
public class MyService2 {
    @Transactional
    public void transactionalMethod() {
        // 事務(wù)邏輯
    }
}

場景二:事務(wù)方法中拋出檢查型異常且未被正確處理

//失效場景
@Service
public class MyService {
  @Transactional
  public void transactionalMethod() {
      throw new Exception(); // 拋出檢查型異常
  }
}

失效原因:@Transactional 注解默認處理RuntimeException,即只有拋出運行時異常,才會觸發(fā)事務(wù)回滾
解決方案:@Transactional 設(shè)置為 @Transactional(rollbackFor =Exception.class) 或者直接拋出運行時異常
解決方案1
@Service
public class MyService {
  @Transactional
  public void transactionalMethod() {
      throw new RuntimeException(); // 拋出非檢查型異常
  }
}
解決方案2
@Service
public class MyService {
   @Transactional(rollbackFor =Exception.class)
  public void transactionalMethod() {
      throw new Exception(); 
  }
}

場景三:多線程問題

public class MyService {
    @Resource
    private ValuationMapper valuationMapper;

    @Transactional
    public void transactionalMethod() {
        new Thread(() ->{
             Valuation v  = new Valuation();
            v.setUserName("張三");
            valuationMapper.insert(v);
        }).start();
    }
  
}
原因:Spring的事務(wù)是通過數(shù)據(jù)庫連接來實現(xiàn)不同線程使用不同的數(shù)據(jù)庫連接,且放在ThreadLocal中,基于同一個數(shù)據(jù)庫連接的事務(wù)才能同時提交或回滾,多線程場景下,拿到的數(shù)據(jù)庫連接不是同一個
解決方案:
1.采用分布式事務(wù)保證 
2.自己實現(xiàn)事務(wù)回滾

場景四:錯誤使用事務(wù)傳播特性

public class MyService {

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void methodA() {
        // 一些事務(wù)操作
        methodB(); 
    }

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void methodB() {
        // 這里不會執(zhí)行事務(wù)操作,因為 methodA 中傳播特性配置錯誤
    }

    public static void main(String[] args) {
        MyService example = new MyService();
        example.methodA();
    }
}

原因:methodA 配置為 REQUIRES_NEW,而 methodB 配置為 NOT_SUPPORTED 不支持事務(wù)傳播特性,導(dǎo)致事務(wù)傳播特性失效
解決方案:修改 methodB 傳播特性

關(guān)于作者

張云剛  采貨俠JAVA開發(fā)工程師

大道至簡。再復(fù)雜的技術(shù),歸根結(jié)底也是在基礎(chǔ)技術(shù)之上實現(xiàn)的。

責(zé)任編輯:武曉燕 來源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2009-06-22 09:01:57

Spring聲明式事務(wù)

2009-02-11 11:14:31

事務(wù)管理事務(wù)開始Spring

2009-02-11 13:08:29

事務(wù)提交事務(wù)管理Spring

2021-09-06 13:42:14

Spring聲明式事務(wù)

2023-04-28 08:21:36

SpringBoot聲明式事務(wù)編程式事務(wù)

2021-04-15 08:01:27

Spring聲明式事務(wù)

2009-06-22 11:01:12

2025-01-16 08:45:48

2011-11-23 16:28:07

JavaSpring框架

2022-05-20 10:20:17

Spring事務(wù)MyBatis

2023-05-26 07:19:49

Spring聲明式事務(wù)

2023-03-27 10:40:09

2009-06-08 17:56:00

SpringJDBC事務(wù)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2022-07-10 20:24:48

Seata分布式事務(wù)

2023-11-06 13:15:32

分布式事務(wù)Seata

2025-01-15 08:34:00

分布式事務(wù)服務(wù)

2020-12-09 09:14:57

SpringCloudSeata 分布式

2020-12-17 07:59:46

聲明式代碼命令式代碼代碼

2009-06-17 14:57:11

Spring事務(wù)管理
點贊
收藏

51CTO技術(shù)棧公眾號