自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

事務提交之后異步執(zhí)行工具類封裝

開發(fā) 架構
我們在項目中大多都是使用聲明式事務(@Transactional注解) ,Spring會基于動態(tài)代理機制對我們的業(yè)務方法進行增強,控制Connection,從而達到事務的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來看下Spring事務的相關核心類(裝配流程不詳細敘述)。

一、背景

許多時候,我們期望在事務提交之后異步執(zhí)行某些邏輯,調用外部系統(tǒng),發(fā)送MQ,推送ES等等;當事務回滾時,異步操作也不執(zhí)行,這些異步操作需要等待事務完成后才執(zhí)行;比如出入庫的事務執(zhí)行完畢后,異步發(fā)送MQ給報表系統(tǒng)、ES等等。

二、猜想

我們在項目中大多都是使用聲明式事務(@Transactional注解) ,spring會基于動態(tài)代理機制對我們的業(yè)務方法進行增強,控制connection,從而達到事務的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來看下spring事務的相關核心類(裝配流程不詳細敘述)。

TransactionInterceptor:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  @Override
  @Nullable
  public Object invoke(MethodInvocation invocation) throws Throwable {
     // Work out the target class: may be {@code null}.
     // The TransactionAttributeSource should be passed the target class
     // as well as the method, which may be from an interface.
     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     // Adapt to TransactionAspectSupport's invokeWithinTransaction...
     return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  }
}

TransactionAspectSupport(重點關注事務提交之后做了哪些事情,有哪些擴展點)。

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
 protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
   // If the transaction attribute is null, the method is non-transactional.
   TransactionAttributeSource tas = getTransactionAttributeSource();
   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
   final TransactionManager tm = determineTransactionManager(txAttr);

   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                  "Unsupported annotated transaction on suspending function detected: " + method +
                  ". Use TransactionalOperator.transactional extensions instead.");
         }
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
         if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                  method.getReturnType());
         }
         return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
   }

   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // 創(chuàng)建事務,此處也會創(chuàng)建connection
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

      Object retVal;
      try {
         // 執(zhí)行目標方法
         retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
         // 目標方法異常時處理
         completeTransactionAfterThrowing(txInfo, ex);
         throw ex;
      }
      finally {
		 // 重置TransactionInfo ThreadLocal
         cleanupTransactionInfo(txInfo);
      }

      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
         // Set rollback-only in case of Vavr failure matching our rollback rules...
         TransactionStatus status = txInfo.getTransactionStatus();
         if (status != null && txAttr != null) {
            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
         }
      }
	  // 業(yè)務方法成功執(zhí)行,提交事務(重點關注此處),最終會調用AbstractPlatformTransactionManager#commit方法
      commitTransactionAfterReturning(txInfo);
      return retVal;
   }

   else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
            try {
               Object retVal = invocation.proceedWithInvocation();
               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                  // Set rollback-only in case of Vavr failure matching our rollback rules...
                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
               }
               return retVal;
            }
            catch (Throwable ex) {
               if (txAttr.rollbackOn(ex)) {
                  // A RuntimeException: will lead to a rollback.
                  if (ex instanceof RuntimeException) {
                     throw (RuntimeException) ex;
                  }
                  else {
                     throw new ThrowableHolderException(ex);
                  }
               }
               else {
                  // A normal return value: will lead to a commit.
                  throwableHolder.throwable = ex;
                  return null;
               }
            }
            finally {
               cleanupTransactionInfo(txInfo);
            }
         });

         // Check result state: It might indicate a Throwable to rethrow.
         if (throwableHolder.throwable != null) {
            throw throwableHolder.throwable;
         }
         return result;
      }
      catch (ThrowableHolderException ex) {
         throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            ex2.initApplicationException(throwableHolder.throwable);
         }
         throw ex2;
      }
      catch (Throwable ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
         }
         throw ex2;
      }
   }
}
}

AbstractPlatformTransactionManager:

public final void commit(TransactionStatus status) throws TransactionException {
   if (status.isCompleted()) {
      throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
   }

   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   if (defStatus.isLocalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Transactional code has requested rollback");
      }
      processRollback(defStatus, false);
      return;
   }

   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
      }
      processRollback(defStatus, true);
      return;
   }
   // 事務提交處理
   processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
   try {
      boolean beforeCompletionInvoked = false;

      try {
         boolean unexpectedRollback = false;
         prepareForCommit(status);
         triggerBeforeCommit(status);
         triggerBeforeCompletion(status);
         beforeCompletionInvoked = true;

         if (status.hasSavepoint()) {
            if (status.isDebug()) {
               logger.debug("Releasing transaction savepoint");
            }
            unexpectedRollback = status.isGlobalRollbackOnly();
            status.releaseHeldSavepoint();
         }
         else if (status.isNewTransaction()) {
            if (status.isDebug()) {
               logger.debug("Initiating transaction commit");
            }
            unexpectedRollback = status.isGlobalRollbackOnly();
            doCommit(status);
         }
         else if (isFailEarlyOnGlobalRollbackOnly()) {
            unexpectedRollback = status.isGlobalRollbackOnly();
         }

         // Throw UnexpectedRollbackException if we have a global rollback-only
         // marker but still didn't get a corresponding exception from commit.
         if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                  "Transaction silently rolled back because it has been marked as rollback-only");
         }
      }
      catch (UnexpectedRollbackException ex) {
         // can only be caused by doCommit
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
         throw ex;
      }
      catch (TransactionException ex) {
         // can only be caused by doCommit
         if (isRollbackOnCommitFailure()) {
            doRollbackOnCommitException(status, ex);
         }
         else {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
         }
         throw ex;
      }
      catch (RuntimeException | Error ex) {
         if (!beforeCompletionInvoked) {
            triggerBeforeCompletion(status);
         }
         doRollbackOnCommitException(status, ex);
         throw ex;
      }

      // Trigger afterCommit callbacks, with an exception thrown there
      // propagated to callers but the transaction still considered as committed.
      try {
		 // 在事務提交后觸發(fā)(追蹤到這里就離真相不遠了)
         triggerAfterCommit(status);
      }
      finally {
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
      }

   }
   finally {
      cleanupAfterCompletion(status);
   }
}

TransactionSynchronizationUtils:

public abstract class TransactionSynchronizationUtils {

  public static void triggerAfterCommit() {
     // TransactionSynchronizationManager: 事務同步器管理
     invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
  }
  public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
     if (synchronizations != null) {
        for (TransactionSynchronization synchronization : synchronizations) {
		   // 調用TransactionSynchronization#afterCommit方法,默認實現為空,留給子類擴展
		   // 那么我們想在事務提交之后做一些異步操作,實現此方法即可
           synchronization.afterCommit();
        }
     }
  }
}

TransactionSynchronization:

public interface TransactionSynchronization extends Flushable {
   default void afterCommit() {}
}

過程中我們發(fā)現TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相關類涉及aop的整個流程,篇幅有限,在此不詳細展開,當然我們的一些擴展也是離不開這些基礎類的。

三、實現

事務提交之后異步執(zhí)行,我們需自定義synchronization.afterCommit,結合線程池一起使用,定義線程池TaskExecutor。

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(******);
    taskExecutor.setMaxPoolSize(******);
    taskExecutor.setKeepAliveSeconds(******);
    taskExecutor.setQueueCapacity(******);
    taskExecutor.setThreadNamePrefix(******);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    taskExecutor.initialize();
    return taskExecutor;
}

定義AfterCommitExecutor接口。

public interface AfterCommitExecutor extends Executor { }

定義AfterCommitExecutorImpl實現類,注意需繼承TransactionSynchronizationAdapter類。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.task.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.List;
import java.util.ArrayList;

@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
    // 保存要運行的任務線程
    private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");
    // 設置線程池
    @Autowired
    private TaskExecutor taskExecutor;

    /**
     * 異步執(zhí)行
     *
     * @param runnable 異步線程
     */
    @Override
    public void execute(Runnable runnable) {
        LOGGER.info("Submitting new runnable {} to run after commit", runnable);
        // 如果事務已經提交,馬上進行異步處理
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
            runnable.run();
            return;
        }
        // 同一個事務的合并到一起處理(注意:沒有初始化則初始化,并注冊)
        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
        if (null == threadRunnableList) {
            threadRunnableList = new ArrayList<>();
            RUNNABLE_THREAD_LOCAL.set(threadRunnableList);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnableList.add(runnable);
    }

    /**
     * 監(jiān)聽到事務提交之后執(zhí)行方法
     */
    @Override
    public void afterCommit() {
        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
        LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());
        for (Runnable runnable : threadRunnableList) {
            try {
                taskExecutor.execute(runnable);
            } catch (RuntimeException e) {
                LOGGER.error("Failed to execute runnable " + runnable, e);
            }
        }
    }

    /**
     * 事務提交/回滾執(zhí)行
     *
     * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)
     */
    @Override
    public void afterCompletion(int status) {
        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
        RUNNABLE_THREAD_LOCAL.remove();
    }
}

使用。

工具類封裝好了,使用上那就很簡便了:注入AfterCommitExecutor,調用AfterCommitExecutor.execute(runnable)方法即可

四、總結

spring如此龐大,找準切入點,許多問題都是可以找到解決思路、或者方案;

你對spring了解多少......

責任編輯:姜華 來源: 今日頭條
相關推薦

2014-07-31 18:23:41

Process

2018-01-30 18:49:16

前端JavascriptCSS

2021-04-18 07:09:50

工具類異步編程

2009-02-11 13:08:29

事務提交事務管理Spring

2021-03-17 00:05:50

分布式事務提交

2024-03-13 14:35:33

Spring事件異步

2011-08-16 15:06:43

IOS開發(fā)異步請求

2024-02-01 08:42:55

2022-07-27 08:52:10

MySQL二階段提交

2023-02-27 14:42:46

MySQLSQL

2009-08-19 09:36:03

ADO封裝類

2012-03-09 10:58:23

2023-07-26 09:24:03

分布式事務分布式系統(tǒng)

2024-05-21 14:12:07

2009-12-07 15:34:18

PHP類的封裝

2023-04-26 01:29:05

OkHttp3工具方式

2009-07-22 10:13:31

異步ActionASP.NET MVC

2020-06-27 09:01:53

Java包裝類編程語言

2024-01-15 07:05:50

開發(fā)大事務數據庫

2011-01-12 17:48:21

ArgusIP網絡事務評審網絡安全工具
點贊
收藏

51CTO技術棧公眾號