事務提交之后異步執(zhí)行工具類封裝
一、背景
許多時候,我們期望在事務提交之后異步執(zhí)行某些邏輯,調用外部系統(tǒng),發(fā)送MQ,推送ES等等;當事務回滾時,異步操作也不執(zhí)行,這些異步操作需要等待事務完成后才執(zhí)行;比如出入庫的事務執(zhí)行完畢后,異步發(fā)送MQ給報表系統(tǒng)、ES等等。
二、猜想
我們在項目中大多都是使用聲明式事務(@Transactional注解) ,spring會基于動態(tài)代理機制對我們的業(yè)務方法進行增強,控制connection,從而達到事務的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來看下spring事務的相關核心類(裝配流程不詳細敘述)。
TransactionInterceptor:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
TransactionAspectSupport(重點關注事務提交之后做了哪些事情,有哪些擴展點)。
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 創(chuàng)建事務,此處也會創(chuàng)建connection
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 執(zhí)行目標方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 目標方法異常時處理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 重置TransactionInfo ThreadLocal
cleanupTransactionInfo(txInfo);
}
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 業(yè)務方法成功執(zhí)行,提交事務(重點關注此處),最終會調用AbstractPlatformTransactionManager#commit方法
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
}
AbstractPlatformTransactionManager:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 事務提交處理
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 在事務提交后觸發(fā)(追蹤到這里就離真相不遠了)
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
TransactionSynchronizationUtils:
public abstract class TransactionSynchronizationUtils {
public static void triggerAfterCommit() {
// TransactionSynchronizationManager: 事務同步器管理
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
// 調用TransactionSynchronization#afterCommit方法,默認實現為空,留給子類擴展
// 那么我們想在事務提交之后做一些異步操作,實現此方法即可
synchronization.afterCommit();
}
}
}
}
TransactionSynchronization:
public interface TransactionSynchronization extends Flushable {
default void afterCommit() {}
}
過程中我們發(fā)現TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相關類涉及aop的整個流程,篇幅有限,在此不詳細展開,當然我們的一些擴展也是離不開這些基礎類的。
三、實現
事務提交之后異步執(zhí)行,我們需自定義synchronization.afterCommit,結合線程池一起使用,定義線程池TaskExecutor。
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(******);
taskExecutor.setMaxPoolSize(******);
taskExecutor.setKeepAliveSeconds(******);
taskExecutor.setQueueCapacity(******);
taskExecutor.setThreadNamePrefix(******);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
taskExecutor.initialize();
return taskExecutor;
}
定義AfterCommitExecutor接口。
public interface AfterCommitExecutor extends Executor { }
定義AfterCommitExecutorImpl實現類,注意需繼承TransactionSynchronizationAdapter類。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.task.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.List;
import java.util.ArrayList;
@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
// 保存要運行的任務線程
private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");
// 設置線程池
@Autowired
private TaskExecutor taskExecutor;
/**
* 異步執(zhí)行
*
* @param runnable 異步線程
*/
@Override
public void execute(Runnable runnable) {
LOGGER.info("Submitting new runnable {} to run after commit", runnable);
// 如果事務已經提交,馬上進行異步處理
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
runnable.run();
return;
}
// 同一個事務的合并到一起處理(注意:沒有初始化則初始化,并注冊)
List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
if (null == threadRunnableList) {
threadRunnableList = new ArrayList<>();
RUNNABLE_THREAD_LOCAL.set(threadRunnableList);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnableList.add(runnable);
}
/**
* 監(jiān)聽到事務提交之后執(zhí)行方法
*/
@Override
public void afterCommit() {
List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());
for (Runnable runnable : threadRunnableList) {
try {
taskExecutor.execute(runnable);
} catch (RuntimeException e) {
LOGGER.error("Failed to execute runnable " + runnable, e);
}
}
}
/**
* 事務提交/回滾執(zhí)行
*
* @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)
*/
@Override
public void afterCompletion(int status) {
LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
RUNNABLE_THREAD_LOCAL.remove();
}
}
使用。
工具類封裝好了,使用上那就很簡便了:注入AfterCommitExecutor,調用AfterCommitExecutor.execute(runnable)方法即可
四、總結
spring如此龐大,找準切入點,許多問題都是可以找到解決思路、或者方案;
你對spring了解多少......