SpringBoot結(jié)合RabbitMQ實現(xiàn)分布式事務(wù)之最大努力通知
環(huán)境:springboot.2.4.12 + RabbitMQ3.7.4
什么是最大努力通知
這是一個充值的案例
圖片
交互流程 :
1、賬戶系統(tǒng)調(diào)用充值系統(tǒng)接口。
2、充值系統(tǒng)完成支付向賬戶系統(tǒng)發(fā)起充值結(jié)果通知 若通知失敗,則充值系統(tǒng)按策略進(jìn)行重復(fù)通知。
3、賬戶系統(tǒng)接收到充值結(jié)果通知修改充值狀態(tài)。
4、賬戶系統(tǒng)未接收到通知會主動調(diào)用充值系統(tǒng)的接口查詢充值結(jié)果。通過上邊的例子我們總結(jié)最大努力通知方案的目標(biāo) :目標(biāo) :發(fā)起通知方通過一定的機(jī)制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。
具體包括 :
1、有一定的消息重復(fù)通知機(jī)制。因為接收通知方可能沒有接收到通知,此時要有一定的機(jī)制對消息重復(fù)通知。
2、消息校對機(jī)制。如果盡最大努力也沒有通知到接收方,或者接收方消費(fèi)消息后要再次消費(fèi),此時可由接收方主動向通知方查詢消息信息來滿足需求。
最大努力通知與可靠消息一致性有什么不同?
1、解決方案思想不同 可靠消息一致性,發(fā)起通知方需要保證將消息發(fā)出去,并且將消息發(fā)到接收通知方,消息的可靠性關(guān)鍵由發(fā)起通知方來保證。最大努力通知,發(fā)起通知方盡最大的努力將業(yè)務(wù)處理結(jié)果通知為接收通知方,但是可能消息接收不到,此時需要接收通知方主動調(diào)用發(fā)起通知方的接口查詢業(yè)務(wù)處理結(jié)果,通知的可靠性關(guān)鍵在接收通知方。
2、兩者的業(yè)務(wù)應(yīng)用場景不同 可靠消息一致性關(guān)注的是交易過程的事務(wù)一致,以異步的方式完成交易。最大努力通知關(guān)注的是交易后的通知事務(wù),即將交易結(jié)果可靠的通知出去。
3、技術(shù)解決方向不同 可靠消息一致性要解決消息從發(fā)出到接收的一致性,即消息發(fā)出并且被接收到。最大努力通知無法保證消息從發(fā)出到接收的一致性,只提供消息接收的可靠性機(jī)制??煽繖C(jī)制是,最大努力地將消息通知給接收方,當(dāng)消息無法被接收方接收時,由接收方主動查詢消費(fèi)。
通過RabbitMQ實現(xiàn)最大努力通知
關(guān)于RabbitMQ相關(guān)文章《SpringBoot RabbitMQ消息可靠發(fā)送與接收 》,《RabbitMQ消息確認(rèn)機(jī)制confirm 》。
- 項目結(jié)構(gòu)
圖片
兩個子模塊users-mananger(賬戶模塊),pay-manager(支付模塊)
- 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
子模塊pay-manager
- 配置文件
server:
port: 8080
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
- 實體類
記錄充值金額及賬戶信息
@Entity
@Table(name = "t_pay_info")
public class PayInfo implements Serializable{
@Id
private Long id;
private BigDecimal money ;
private Long accountId ;
}
- DAO及Service
public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
PayInfo findByOrderId(String orderId) ;
}
@Service
public class PayInfoService {
@Resource
private PayInfoRepository payInfoRepository ;
@Resource
private RabbitTemplate rabbitTemplate ;
// 數(shù)據(jù)保存完后發(fā)送消息(這里發(fā)送消息可以應(yīng)用確認(rèn)模式或事物模式)
@Transactional
public PayInfo savePayInfo(PayInfo payInfo) {
payInfo.setId(System.currentTimeMillis()) ;
PayInfo result = payInfoRepository.save(payInfo) ;
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ;
try {
rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ;
} catch (AmqpException | JsonProcessingException e) {
e.printStackTrace();
}
return result ;
}
public PayInfo queryByOrderId(String orderId) {
return payInfoRepository.findByOrderId(orderId) ;
}
}
支付完成后發(fā)送消息。
- Controller接口
@RestController
@RequestMapping("/payInfos")
public class PayInfoController {
@Resource
private PayInfoService payInfoService ;
// 支付接口
@PostMapping("/pay")
public Object pay(@RequestBody PayInfo payInfo) {
payInfoService.savePayInfo(payInfo) ;
return "支付已提交,等待結(jié)果" ;
}
@GetMapping("/queryPay")
public Object queryPay(String orderId) {
return payInfoService.queryByOrderId(orderId) ;
}
}
子模塊users-manager
- 應(yīng)用配置
server:
port: 8081
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
- 實體類
@Entity
@Table(name = "t_users")
public class Users {
@Id
private Long id;
private String name ;
private BigDecimal money ;
}
賬戶信息表
@Entity
@Table(name = "t_users_log")
public class UsersLog {
@Id
private Long id;
private String orderId ;
// 0:支付中,1:已支付,2:已取消
@Column(columnDefinition = "int default 0")
private Integer status = 0 ;
private BigDecimal money ;
private Date createTime ;
}
賬戶充值記錄表(去重)
- DAO及Service
public interface UsersRepository extends JpaRepository<Users, Long> {
}
public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {
UsersLog findByOrderId(String orderId) ;
}
- Service類
@Service
public class UsersService {
@Resource
private UsersRepository usersRepository ;
@Resource
private UsersLogRepository usersLogRepository ;
@Transactional
public boolean updateMoneyAndLogStatus(Long id, String orderId) {
UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ;
if (usersLog != null && 1 == usersLog.getStatus()) {
throw new RuntimeException("已支付") ;
}
Users users = usersRepository.findById(id).orElse(null) ;
if (users == null) {
throw new RuntimeException("賬戶不存在") ;
}
users.setMoney(users.getMoney().add(usersLog.getMoney())) ;
usersRepository.save(users) ;
usersLog.setStatus(1) ;
usersLogRepository.save(usersLog) ;
return true ;
}
@Transactional
public boolean saveLog(UsersLog usersLog) {
usersLog.setId(System.currentTimeMillis()) ;
usersLogRepository.save(usersLog) ;
return true ;
}
}
- 消息監(jiān)聽
@Component
public class PayMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ;
@Resource
private UsersService usersService ;
@SuppressWarnings("unchecked")
@RabbitListener(queues = {"pay-queue"})
@RabbitHandler
public void receive(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag() ;
byte[] buf = null ;
try {
buf = message.getBody() ;
logger.info("接受到消息:{}", new String(buf, "UTF-8")) ;
Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ;
Long id = ((Integer) result.get("accountId")) + 0L ;
String orderId = (String) result.get("orderId") ;
usersService.updateMoneyAndLogStatus(id, orderId) ;
channel.basicAck(deliveryTag, true) ;
} catch (Exception e) {
logger.error("消息接受出現(xiàn)異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ;
e.printStackTrace() ;
try {
// 應(yīng)該將這類異常的消息放入死信隊列中,以便人工排查。
channel.basicReject(deliveryTag, false);
} catch (IOException e1) {
logger.error("拒絕消息重入隊列異常:{}", e1.getMessage()) ;
e1.printStackTrace();
}
}
}
}
- Controller接口
@RestController
@RequestMapping("/users")
public class UsersController {
@Resource
private RestTemplate restTemplate ;
@Resource
private UsersService usersService ;
@PostMapping("/pay")
public Object pay(Long id, BigDecimal money) throws Exception {
HttpHeaders headers = new HttpHeaders() ;
headers.setContentType(MediaType.APPLICATION_JSON) ;
String orderId = UUID.randomUUID().toString().replaceAll("-", "") ;
Map<String, String> params = new HashMap<>() ;
params.put("accountId", String.valueOf(id)) ;
params.put("orderId", orderId) ;
params.put("money", money.toString()) ;
UsersLog usersLog = new UsersLog() ;
usersLog.setCreateTime(new Date()) ;
usersLog.setOrderId(orderId);
usersLog.setMoney(money) ;
usersLog.setStatus(0) ;
usersService.saveLog(usersLog) ;
HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ;
return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ;
}
}
以上是兩個子模塊的所有代碼了
測試
初始數(shù)據(jù)
圖片
圖片
賬戶子模塊控制臺
圖片
支付子模塊控制臺
圖片
數(shù)據(jù)表數(shù)據(jù)
圖片
完畢!??!