SpringBoot與Eventuate Tram整合,實(shí)現(xiàn)銀行轉(zhuǎn)賬最終一致性系統(tǒng)
作者:Java知識(shí)日歷
Eventuate Tram 是一個(gè)用于構(gòu)建微服務(wù)架構(gòu)的開源框架,提供事件驅(qū)動(dòng)的消息傳遞和最終一致性保證,幫助企業(yè)高效地管理和協(xié)調(diào)分布式系統(tǒng)中的復(fù)雜業(yè)務(wù)邏輯。
Eventuate Tram 是一個(gè)用于構(gòu)建微服務(wù)架構(gòu)的開源框架,提供事件驅(qū)動(dòng)的消息傳遞和最終一致性保證,幫助企業(yè)高效地管理和協(xié)調(diào)分布式系統(tǒng)中的復(fù)雜業(yè)務(wù)邏輯。
我們?yōu)槭裁催x擇Eventuate Tram?
- 解耦和服務(wù)獨(dú)立性:銀行轉(zhuǎn)賬系統(tǒng)通常涉及多個(gè)服務(wù)(如賬戶服務(wù)、轉(zhuǎn)賬服務(wù)等)。Eventuate Tram 提供了一種事件驅(qū)動(dòng)的方式來解耦這些服務(wù),使得每個(gè)服務(wù)可以獨(dú)立開發(fā)、部署和擴(kuò)展。
- 靈活性:隨著業(yè)務(wù)的發(fā)展,新的服務(wù)可能會(huì)被引入或現(xiàn)有服務(wù)需要重構(gòu)。Eventuate Tram 的事件驅(qū)動(dòng)模型允許這種靈活的變化而不需要大規(guī)模的重構(gòu)。
- 分布式事務(wù)管理:傳統(tǒng)的兩階段提交(2PC)在高并發(fā)環(huán)境下性能較差且復(fù)雜度高。Eventuate Tram 通過事件溯源和補(bǔ)償機(jī)制實(shí)現(xiàn)了最終一致性,確保即使在分布式環(huán)境中也能保持?jǐn)?shù)據(jù)的一致性。
- 冪等性和重試機(jī)制:Eventuate Tram 支持冪等處理和自動(dòng)重試,確保消息傳遞的可靠性,防止重復(fù)處理導(dǎo)致的數(shù)據(jù)不一致問題。
- 異步通信:Eventuate Tram 使用事件總線進(jìn)行異步通信,提高了系統(tǒng)的吞吐量和響應(yīng)速度。這對于實(shí)時(shí)性強(qiáng)的應(yīng)用場景尤為重要。
- 事件存儲(chǔ):Eventuate Tram 提供了內(nèi)置的事件存儲(chǔ)機(jī)制,記錄所有發(fā)生的業(yè)務(wù)事件。這不僅有助于審計(jì)和調(diào)試,還能在系統(tǒng)故障后快速恢復(fù)狀態(tài)。
- 多種消息代理支持:Eventuate Tram 支持多種消息代理(如 RabbitMQ、Kafka 等),可以根據(jù)現(xiàn)有的基礎(chǔ)設(shè)施進(jìn)行選擇和集成。
- 代碼生成工具:Eventuate 提供了一些代碼生成工具和模板,幫助開發(fā)者快速搭建項(xiàng)目結(jié)構(gòu),減少了樣板代碼的數(shù)量。
哪些公司使用Eventuate Tram?
- Capital One 是一家美國的金融服務(wù)公司,以其創(chuàng)新的技術(shù)解決方案而聞名。Capital One 使用 Eventuate Tram 來構(gòu)建其微服務(wù)架構(gòu),特別是在需要高一致性和可擴(kuò)展性的金融應(yīng)用中。
- CERN (歐洲核子研究組織) 利用 Eventuate Tram 來處理復(fù)雜的實(shí)驗(yàn)數(shù)據(jù)流和實(shí)時(shí)分析任務(wù),確保數(shù)據(jù)的一致性和系統(tǒng)的可靠性。
- Adidas 是世界著名的運(yùn)動(dòng)用品品牌。Adidas 在數(shù)字化轉(zhuǎn)型過程中采用了 Eventuate Tram 來構(gòu)建其電子商務(wù)平臺(tái)的微服務(wù)架構(gòu),提高系統(tǒng)的靈活性和響應(yīng)速度。
- Accenture 是全球領(lǐng)先的咨詢、技術(shù)服務(wù)和外包公司。Accenture 在為多個(gè)客戶實(shí)施微服務(wù)架構(gòu)時(shí)推薦并使用了 Eventuate Tram,特別是在需要最終一致性和復(fù)雜事件處理的場景中。
- Red Hat 支持并推廣了 Eventuate Tram 作為其微服務(wù)生態(tài)系統(tǒng)的一部分,幫助開發(fā)者構(gòu)建可靠的分布式應(yīng)用。
代碼實(shí)操
創(chuàng)建account-service
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>account-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.eventuate.tram</groupId>
<artifactId>eventuate-tram-spring-jdbc</artifactId>
<version>0.26.0.RELEASE</version><!-- Eventuate Tram JDBC支持 -->
</dependency>
<dependency>
<groupId>io.eventuate.tram</groupId>
<artifactId>eventuate-tram-messaging-rabbitmq</artifactId>
<version>0.26.0.RELEASE</version><!-- Eventuate Tram RabbitMQ消息代理支持 -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId><!-- JPA數(shù)據(jù)訪問支持 -->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=8081 # 應(yīng)用監(jiān)聽端口
# 數(shù)據(jù)庫配置
spring.datasource.url=jdbc:mysql://localhost:3306/accounts?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Hibernate自動(dòng)建表策略
spring.jpa.hibernate.ddl-auto=update
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Account.java
package com.example.accountservice;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collections;
@Service
publicclass AccountService {
@Autowired
private AccountRepository accountRepository; // 賬戶倉庫,用于數(shù)據(jù)庫操作
@Autowired
private DomainEventPublisher domainEventPublisher; // 域事件發(fā)布器,用于發(fā)布事件
/**
* 減少賬戶余額
* @param accountId 賬戶ID
* @param amount 需要減少的金額
*/
@Transactional
public void debit(String accountId, double amount) {
Account account = accountRepository.findById(accountId).orElseThrow(() -> new RuntimeException("Account not found")); // 根據(jù)賬戶ID查找賬戶,如果找不到則拋出異常
if (account.getBalance() < amount) { // 檢查賬戶余額是否足夠
thrownew RuntimeException("Insufficient balance"); // 如果余額不足,則拋出異常
}
account.setBalance(account.getBalance() - amount); // 減少賬戶余額
accountRepository.save(account); // 保存賬戶信息到數(shù)據(jù)庫
domainEventPublisher.publish(Account.class, account.getId(), Collections.singletonList(new AccountDebitedEvent(amount))); // 發(fā)布賬戶被借記的事件
}
/**
* 增加賬戶余額
* @param accountId 賬戶ID
* @param amount 需要增加的金額
*/
@Transactional
public void credit(String accountId, double amount) {
Account account = accountRepository.findById(accountId).orElseThrow(() -> new RuntimeException("Account not found")); // 根據(jù)賬戶ID查找賬戶,如果找不到則拋出異常
account.setBalance(account.getBalance() + amount); // 增加賬戶余額
accountRepository.save(account); // 保存賬戶信息到數(shù)據(jù)庫
domainEventPublisher.publish(Account.class, account.getId(), Collections.singletonList(new AccountCreditedEvent(amount))); // 發(fā)布賬戶被貸記的事件
}
}
AccountRepository.java
package com.example.accountservice;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* 賬戶倉庫接口,繼承自JpaRepository,用于對Account實(shí)體進(jìn)行CRUD操作
*/
public interface AccountRepository extends JpaRepository<Account, String> {}
AccountServiceApplication.java
package com.example.accountservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Spring Boot應(yīng)用啟動(dòng)類
*/
@SpringBootApplication
public class AccountServiceApplication {
public static void main(String[] args) {
SpringApplication.run(AccountServiceApplication.class, args); // 啟動(dòng)Spring Boot應(yīng)用
}
}
AccountController.java
package com.example.accountservice.controller;
import com.example.accountservice.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* 控制器類,處理HTTP請求
*/
@RestController
@RequestMapping("/accounts")
publicclass AccountController {
@Autowired
private AccountService accountService; // 注入AccountService
/**
* 處理借記賬戶的HTTP POST請求
* @param accountId 賬戶ID
* @param amount 借記金額
*/
@PostMapping("/{accountId}/debit/{amount}")
public void debit(@PathVariable String accountId, @PathVariable double amount) {
accountService.debit(accountId, amount); // 調(diào)用AccountService的debit方法
}
/**
* 處理貸記賬戶的HTTP POST請求
* @param accountId 賬戶ID
* @param amount 貸記金額
*/
@PostMapping("/{accountId}/credit/{amount}")
public void credit(@PathVariable String accountId, @PathVariable double amount) {
accountService.credit(accountId, amount); // 調(diào)用AccountService的credit方法
}
}
AccountDebitedEvent.java
package com.example.accountservice.event;
/**
* 賬戶借記事件類
*/
publicclass AccountDebitedEvent {
privatedouble amount; // 借記金額
public AccountDebitedEvent(double amount) {
this.amount = amount; // 構(gòu)造函數(shù)初始化借記金額
}
// 獲取借記金額的方法
public double getAmount() {
return amount;
}
// 設(shè)置借記金額的方法
public void setAmount(double amount) {
this.amount = amount;
}
}
AccountCreditedEvent.java
/**
* 賬戶貸記事件類
*/
publicclass AccountCreditedEvent {
privatedouble amount; // 貸記金額
public AccountCreditedEvent(double amount) {
this.amount = amount; // 構(gòu)造函數(shù)初始化貸記金額
}
// 獲取貸記金額的方法
public double getAmount() {
return amount;
}
// 設(shè)置貸記金額的方法
public void setAmount(double amount) {
this.amount = amount;
}
}
TransferEventHandler.java
package com.example.accountservice.handler;
import com.example.accountservice.event.TransferMadeEvent;
import com.example.accountservice.AccountService;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.EventHandlerMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 事件處理器類,處理轉(zhuǎn)賬完成事件
*/
@Component
publicclass TransferEventHandler {
@Autowired
private AccountService accountService; // 注入AccountService
/**
* 處理轉(zhuǎn)賬完成事件的方法
* @param event 包含轉(zhuǎn)賬完成事件的對象
*/
@EventHandlerMethod
public void handle(DomainEventEnvelope<TransferMadeEvent> event) {
TransferMadeEvent transferMadeEvent = event.getEvent(); // 獲取轉(zhuǎn)賬完成事件對象
accountService.credit(transferMadeEvent.getCreditAccountId(), transferMadeEvent.getAmount()); // 調(diào)用AccountService的credit方法,增加目標(biāo)賬戶的余額
}
}
創(chuàng)建transfer-service
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>transfer-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.eventuate.tram</groupId>
<artifactId>eventuate-tram-spring-jdbc</artifactId>
<version>0.26.0.RELEASE</version><!-- Eventuate Tram JDBC支持 -->
</dependency>
<dependency>
<groupId>io.eventuate.tram</groupId>
<artifactId>eventuate-tram-messaging-rabbitmq</artifactId>
<version>0.26.0.RELEASE</version><!-- Eventuate Tram RabbitMQ消息代理支持 -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=8082
# 數(shù)據(jù)庫配置
spring.datasource.url=jdbc:mysql://localhost:3306/transfers?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Hibernate自動(dòng)建表策略
spring.jpa.hibernate.ddl-auto=update
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Transfer.java
package com.example.transferservice;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collections;
/**
* 轉(zhuǎn)賬服務(wù)類
*/
@Service
publicclass TransferService {
@Autowired
private TransferRepository transferRepository; // 轉(zhuǎn)賬倉庫,用于數(shù)據(jù)庫操作
@Autowired
private DomainEventPublisher domainEventPublisher; // 域事件發(fā)布器,用于發(fā)布事件
/**
* 執(zhí)行轉(zhuǎn)賬操作
* @param cmd 包含轉(zhuǎn)賬命令的對象
*/
@Transactional
public void makeTransfer(MakeTransferCommand cmd) {
Transfer transfer = new Transfer(cmd.getSourceAccountId(), cmd.getTargetAccountId(), cmd.getAmount()); // 創(chuàng)建轉(zhuǎn)賬記錄
transferRepository.save(transfer); // 保存轉(zhuǎn)賬記錄到數(shù)據(jù)庫
domainEventPublisher.publish(Transfer.class, transfer.getId(),
Collections.singletonList(new TransferMadeEvent(cmd.getSourceAccountId(), cmd.getTargetAccountId(), cmd.getAmount()))); // 發(fā)布轉(zhuǎn)賬完成事件
}
}
TransferRepository.java
package com.example.transferservice;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* 轉(zhuǎn)賬倉庫接口,繼承自JpaRepository,用于對Transfer實(shí)體進(jìn)行CRUD操作
*/
public interface TransferRepository extends JpaRepository<Transfer, String> {}
TransferServiceApplication.java
package com.example.transferservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Spring Boot應(yīng)用啟動(dòng)類
*/
@SpringBootApplication
public class TransferServiceApplication {
public static void main(String[] args) {
SpringApplication.run(TransferServiceApplication.class, args); // 啟動(dòng)Spring Boot應(yīng)用
}
}
TransferController.java
package com.example.transferservice.controller;
import com.example.transferservice.MakeTransferCommand;
import com.example.transferservice.TransferService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 控制器類,處理HTTP請求
*/
@RestController
@RequestMapping("/transfers")
publicclass TransferController {
@Autowired
private TransferService transferService; // 注入TransferService
/**
* 處理轉(zhuǎn)賬的HTTP POST請求
* @param cmd 包含轉(zhuǎn)賬命令的對象
*/
@PostMapping
public void makeTransfer(@RequestBody MakeTransferCommand cmd) {
transferService.makeTransfer(cmd); // 調(diào)用TransferService的makeTransfer方法執(zhí)行轉(zhuǎn)賬操作
}
}
MakeTransferCommand.java
package com.example.transferservice.command;
/**
* 轉(zhuǎn)賬命令類,包含轉(zhuǎn)賬所需的信息
*/
publicclass MakeTransferCommand {
private String sourceAccountId; // 源賬戶ID
private String targetAccountId; // 目標(biāo)賬戶ID
privatedouble amount; // 轉(zhuǎn)賬金額
public MakeTransferCommand(String sourceAccountId, String targetAccountId, double amount) {
this.sourceAccountId = sourceAccountId; // 初始化源賬戶ID
this.targetAccountId = targetAccountId; // 初始化目標(biāo)賬戶ID
this.amount = amount; // 初始化轉(zhuǎn)賬金額
}
// 獲取源賬戶ID的方法
public String getSourceAccountId() {
return sourceAccountId;
}
// 設(shè)置源賬戶ID的方法
public void setSourceAccountId(String sourceAccountId) {
this.sourceAccountId = sourceAccountId;
}
// 獲取目標(biāo)賬戶ID的方法
public String getTargetAccountId() {
return targetAccountId;
}
// 設(shè)置目標(biāo)賬戶ID的方法
public void setTargetAccountId(String targetAccountId) {
this.targetAccountId = targetAccountId;
}
// 獲取轉(zhuǎn)賬金額的方法
public double getAmount() {
return amount;
}
// 設(shè)置轉(zhuǎn)賬金額的方法
public void setAmount(double amount) {
this.amount = amount;
}
}
TransferMadeEvent.java
package com.example.transferservice.event;
/**
* 轉(zhuǎn)賬完成事件類
*/
publicclass TransferMadeEvent {
private String debitAccountId; // 借記賬戶ID
private String creditAccountId; // 貸記賬戶ID
privatedouble amount; // 轉(zhuǎn)賬金額
public TransferMadeEvent(String debitAccountId, String creditAccountId, double amount) {
this.debitAccountId = debitAccountId; // 初始化借記賬戶ID
this.creditAccountId = creditAccountId; // 初始化貸記賬戶ID
this.amount = amount; // 初始化轉(zhuǎn)賬金額
}
// 獲取借記賬戶ID的方法
public String getDebitAccountId() {
return debitAccountId;
}
// 設(shè)置借記賬戶ID的方法
public void setDebitAccountId(String debitAccountId) {
this.debitAccountId = debitAccountId;
}
// 獲取貸記賬戶ID的方法
public String getCreditAccountId() {
return creditAccountId;
}
// 設(shè)置貸記賬戶ID的方法
public void setCreditAccountId(String creditAccountId) {
this.creditAccountId = creditAccountId;
}
// 獲取轉(zhuǎn)賬金額的方法
public double getAmount() {
return amount;
}
// 設(shè)置轉(zhuǎn)賬金額的方法
public void setAmount(double amount) {
this.amount = amount;
}
}
AccountEventHandler.java
package com.example.transferservice.handler;
import com.example.transferservice.event.AccountDebitedEvent;
import com.example.transferservice.TransferService;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.EventHandlerMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 事件處理器類,處理賬戶借記事件
*/
@Component
publicclass AccountEventHandler {
@Autowired
private TransferService transferService; // 注入TransferService
/**
* 處理賬戶借記事件的方法
* @param event 包含賬戶借記事件的對象
*/
@EventHandlerMethod
public void handle(DomainEventEnvelope<AccountDebitedEvent> event) {
AccountDebitedEvent accountDebitedEvent = event.getEvent(); // 獲取賬戶借記事件對象
// 這里可以添加額外的邏輯,當(dāng)賬戶被借記時(shí)執(zhí)行的操作
}
}
測試
curl -X POST http://localhost:8082/transfers -H "Content-Type: application/json" -d '{"sourceAccountId": "account1", "targetAccountId": "account2", "amount": 150}'
測試結(jié)果
無返回內(nèi)容,說明操作成功。
責(zé)任編輯:武曉燕
來源:
Java知識(shí)日歷