聊聊如何優(yōu)雅地Spring事務編程
在開發(fā)中,有時候我們需要對 Spring 事務的生命周期進行監(jiān)控,比如在事務提交、回滾或掛起時觸發(fā)特定的邏輯處理。那么如何實現(xiàn)這種定制化操作呢?
Spring 作為一個高度靈活和可擴展的框架,早就提供了一個強大的擴展點,即事務同步器 TransactionSynchronization 。通過 TransactionSynchronization ,我們可以輕松地控制事務生命周期中的關鍵階段,實現(xiàn)自定義的業(yè)務邏輯與事務管理的結合。
package org.springframework.transaction.support;
import java.io.Flushable;
public interface TransactionSynchronization extends Flushable {
/** 事務提交狀態(tài) */
int STATUS_COMMITTED = 0;
/** 事務回滾狀態(tài) */
int STATUS_ROLLED_BACK = 1;
/**系統(tǒng)異常狀態(tài) */
int STATUS_UNKNOWN = 2;
//掛起該事務同步器
default void suspend() {
}
//恢復事務同步器
default void resume() {
}
//flush底層的session到數(shù)據(jù)庫
default void flush() {
}
// 事務提交之前
default void beforeCommit(boolean readOnly) {
}
// 操作完成之前(包含commit/rollback)
default void beforeCompletion() {
}
// 事務提交之后
default void afterCommit() {
}
// 操作完成之后(包含commit/rollback)
default void afterCompletion(int status) {
}
}
TransactionSynchronization 是一個接口,它里面定義了一系列與事務各生命周期階段相關的方法。比如,我們可以這樣使用:
public class UserService {
@Transactional(rollbackFor = Exception.class)
public void saveUser(User user) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
System.out.println("saveUser事務已提交...");
}
});
userDao.saveUser(user);
}
}
在 Spring 事務剛開始的時候,我們向 TransactionSynchronizationManager 事務同步管理器注冊了一個事務同步器,事務提交前/后,會遍歷執(zhí)行事務同步器中對應的事務同步方法(一個 Spring 事務可以注冊多個事務同步器)。
需要注意的是注冊事務同步器必須得在一個 Spring 事務中才能注冊,否則會拋出 Transaction synchronization is not active 這個錯誤。
圖片
isSynchronizationActive() 方法用來判斷當前是否存在事務(判斷線程共享變量,是否存在 TransactionSynchronization)
圖片
Spring 在創(chuàng)建事務的時候,會初始化一個空集合放到 synchronizations 屬性中,所以只要當前存在事務,isSynchronizationActive() 就為 true。
TransactionSynchronizationManager 解析
Spring 對于事務的管理都是基于 TransactionSynchronizationManager 這個類,先看下 TransactionSynchronizationManager 的一些屬性:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");
- resources:保存連接資源,因為一個方法里面可能包含多個事務,所以就用 Map 來保存資源, key為 DataSource,value 為connectionHolder。線程可以通過該屬性獲取到同一個 Connection 對象。
- synchronizations:事務同步器,是 Spring 交由程序員進行擴展的代碼,每個線程可以注冊N個事務同步器。
- currentTransactionName:事務的名稱。
- currentTransactionReadOnly:事務是否是只讀。
- currentTransactionIsolationLevel:事務的隔離級別。
- actualTransactionActive:用于保存當前事務是否還是 Active 狀態(tài)(事務是否開啟)。
Spring 創(chuàng)建事務時,DataSourceTransactionManager.doBegin 方法中,將新創(chuàng)建的 connection 包裝成 connectionHolder ,通過 TransactionSynchronizationManager#bindResource 方法存入 resources 中。
然后標注到一個事務當中的其它數(shù)據(jù)庫操作就可以通過 TransactionSynchronizationManager#getResource 方法獲取到這個連接。
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
return null;
} else {
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
從上面我們也能看到,Spring 對于多個數(shù)據(jù)庫操作的事務實現(xiàn)是基于 ThreadLocal 的,所以 Spring 事務操作是無法使用多線程的。
應用場景
TransactionSynchronization 可以用于一些需要在事務結束后執(zhí)行清理操作或其他相關任務的場景。
應用場景舉例:
- 資源釋放:在事務提交或回滾后釋放資源,如關閉數(shù)據(jù)庫連接、釋放文件資源等。
- 日志記錄:在事務結束后記錄相關日志信息,例如記錄事務的執(zhí)行結果或異常情況。
- 緩存更新:在事務完成后更新緩存數(shù)據(jù),保持緩存和數(shù)據(jù)庫數(shù)據(jù)的一致性。
- 消息通知:在事務結束后發(fā)送消息通知相關系統(tǒng)或用戶,如發(fā)送郵件或短信通知。
舉例:假設一個電商系統(tǒng)中存在訂單支付的業(yè)務場景,當用戶支付訂單時,需要在事務提交后發(fā)送訂單支付成功的消息通知給用戶。
由于事務是和數(shù)據(jù)庫連接相綁定的,如果把發(fā)送消息和數(shù)據(jù)庫操作放在一個事務里面。當發(fā)送消息時間過長時會占用數(shù)據(jù)庫連接,所以就要把數(shù)據(jù)庫操作與發(fā)送消息到 MQ 解耦開來。
這時就可以通過 TransactionSynchronization 來實現(xiàn)在事務提交后發(fā)送消息通知的功能。具體示例代碼如下:
@Component
public class OrderPaymentNotification implements TransactionSynchronization {
private String orderNo;
public OrderPaymentNotification(String orderNo) {
this.orderNo = orderNo;
}
@Override
public void beforeCommit(boolean readOnly) {
// 在事務提交前不執(zhí)行任何操作
}
@Override
public void beforeCompletion() {
// 在事務即將完成時不執(zhí)行任何操作
}
@Override
public void afterCommit() {
// 在事務提交后發(fā)送訂單支付成功的消息通知
sendMessage("訂單支付成功", orderNo);
}
@Override
public void afterCompletion(int status) {
// 在事務完成后不執(zhí)行任何操作
}
private void sendMessage(String message, String orderNo) {
// 發(fā)送消息通知的具體實現(xiàn)邏輯
System.out.println(message + ": " + orderNo);
}
}
@Transactional
public void finishOrder(String orderNo) {
// 修改訂單成功
updateOrderSuccess(orderNo);
// 發(fā)送消息到 MQ
TransactionSynchronizationManager.registerSynchronization(new OrderPaymentNotification(orderNo));
}
這樣當事務成功提交之后,就會把消息發(fā)送給 MQ,并且不會占用數(shù)據(jù)庫連接資源。
@TransactionalEventListener
在 Spring Framework 4.2版本后還可以使用 @TransactionalEventListener 注解處理數(shù)據(jù)庫事務提交成功后的執(zhí)行操作。
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
// 表明若沒有事務的時候,對應的event是否需要執(zhí)行,默認值為false表示,沒事務就不執(zhí)行了。
boolean fallbackExecution() default false;
@AliasFor(
annotation = EventListener.class,
attribute = "classes"
)
Class<?>[] value() default {};
@AliasFor(
annotation = EventListener.class,
attribute = "classes"
)
Class<?>[] classes() default {};
String condition() default "";
}
public enum TransactionPhase {
// 在事務commit之前執(zhí)行
BEFORE_COMMIT,
// 在事務commit之后執(zhí)行
AFTER_COMMIT,
// 在事務rollback之后執(zhí)行
AFTER_ROLLBACK,
// 在事務完成后執(zhí)行(包括commit/rollback)
AFTER_COMPLETION;
private TransactionPhase() {
}
}
從命名上可以直接看出,它就是個 EventListener,效果跟 TransactionSynchronization 一樣,但比 TransactionSynchronization 更加優(yōu)雅。它的使用方式如下:
@Data
public class Order {
private Long orderId;
private String orderNumber;
private BigDecimal totalAmount;
}
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private ApplicationEventPublisher eventPublisher;
@Transactional
public void createOrder(Order order) {
// 保存訂單邏輯
System.out.println("Creating order: " + order.getOrderNumber());
orderRepository.save(order);
// 發(fā)布訂單創(chuàng)建事件
OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(order);
eventPublisher.publishEvent(orderCreatedEvent);
}
}
@Getter
@Setter
public class OrderCreatedEvent {
private Order order;
public OrderCreatedEvent(Order order) {
this.order = order;
}
}
@Component
@Slf4j
public class OrderEventListener {
@Autowired
private EmailService emailService;
/*
* @Async加了就是異步監(jiān)聽,沒加就是同步(啟動類要開啟@EnableAsync注解)
* 可以使用@Order定義監(jiān)聽者順序,默認是按代碼書寫順序
* 可以使用SpEL表達式來設置監(jiān)聽器生效的條件
* 監(jiān)聽器可以看做普通方法,如果監(jiān)聽器拋出異常,在publishEvent里處理即可
*/
@Async
@Order(1)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = OrderCreatedEvent.class)
public void onOrderCreatedEvent(OrderCreatedEvent event) {
// 處理訂單創(chuàng)建事件,例如發(fā)送郵件通知
log.info("Received OrderCreatedEvent for order: " + event.getOrder().getOrderNumber());
emailService.sendOrderConfirmationEmail(event.getOrder());
}
}