自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot分布式事務(wù)之最大努力通知

開(kāi)發(fā) 前端 分布式
最大努力通知方案的目標(biāo) :發(fā)起通知方通過(guò)一定的機(jī)制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。

[[393657]]

環(huán)境:springboot.2.4.9 + RabbitMQ3.7.4

什么是最大努力通知

這是一個(gè)充值的案例

交互流程 :

1、賬戶系統(tǒng)調(diào)用充值系統(tǒng)接口。

2、充值系統(tǒng)完成支付向賬戶系統(tǒng)發(fā)起充值結(jié)果通知 若通知失敗,則充值系統(tǒng)按策略進(jìn)行重復(fù)通知。

3、賬戶系統(tǒng)接收到充值結(jié)果通知修改充值狀態(tài)。

4、賬戶系統(tǒng)未接收到通知會(huì)主動(dòng)調(diào)用充值系統(tǒng)的接口查詢充值結(jié)果。

通過(guò)上邊的例子我們總結(jié)最大努力通知方案的目標(biāo) : 發(fā)起通知方通過(guò)一定的機(jī)制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。 具體包括 :

1、有一定的消息重復(fù)通知機(jī)制。 因?yàn)榻邮胀ㄖ娇赡軟](méi)有接收到通知,此時(shí)要有一定的機(jī)制對(duì)消息重復(fù)通知。

2、消息校對(duì)機(jī)制。 如果盡最大努力也沒(méi)有通知到接收方,或者接收方消費(fèi)消息后要再次消費(fèi),此時(shí)可由接收方主動(dòng)向通知方查詢消息信息來(lái)滿足需求。

最大努力通知與可靠消息一致性有什么不同?

1、解決方案思想不同 可靠消息一致性,發(fā)起通知方需要保證將消息發(fā)出去,并且將消息發(fā)到接收通知方,消息的可靠性關(guān)鍵由發(fā)起通知方來(lái)保證。 最大努力通知,發(fā)起通知方盡最大的努力將業(yè)務(wù)處理結(jié)果通知為接收通知方,但是可能消息接收不到,此時(shí)需要接收通知方主動(dòng)調(diào)用發(fā)起通知方的接口查詢業(yè)務(wù)處理結(jié)果,通知的可靠性關(guān)鍵在接收通知方。

2、兩者的業(yè)務(wù)應(yīng)用場(chǎng)景不同 可靠消息一致性關(guān)注的是交易過(guò)程的事務(wù)一致,以異步的方式完成交易。 最大努力通知關(guān)注的是交易后的通知事務(wù),即將交易結(jié)果可靠的通知出去。

3、技術(shù)解決方向不同 可靠消息一致性要解決消息從發(fā)出到接收的一致性,即消息發(fā)出并且被接收到。 最大努力通知無(wú)法保證消息從發(fā)出到接收的一致性,只提供消息接收的可靠性機(jī)制。可靠機(jī)制是,最大努力地將消息通知給接收方,當(dāng)消息無(wú)法被接收方接收時(shí),由接收方主動(dòng)查詢消費(fèi)。

通過(guò)RabbitMQ實(shí)現(xiàn)最大努力通知

關(guān)于RabbitMQ相關(guān)文章《SpringBoot RabbitMQ消息可靠發(fā)送與接收 》,《RabbitMQ消息確認(rèn)機(jī)制confirm 》。

 

項(xiàng)目結(jié)構(gòu)

兩個(gè)子模塊users-mananger(賬戶模塊),pay-manager(支付模塊)

依賴

  1. <dependency> 
  2.  <groupId>org.springframework.boot</groupId> 
  3.  <artifactId>spring-boot-starter-data-jpa</artifactId> 
  4. </dependency> 
  5. <dependency> 
  6.  <groupId>org.springframework.boot</groupId> 
  7.  <artifactId>spring-boot-starter-web</artifactId> 
  8. </dependency> 
  9. <dependency> 
  10.  <groupId>org.springframework.boot</groupId> 
  11.  <artifactId>spring-boot-starter-amqp</artifactId> 
  12. </dependency> 
  13. <dependency> 
  14.  <groupId>mysql</groupId> 
  15.  <artifactId>mysql-connector-java</artifactId> 
  16.  <scope>runtime</scope> 
  17. </dependency> 

子模塊pay-manager

配置文件

  1. server: 
  2.   port: 8080 
  3. --- 
  4. spring: 
  5.   rabbitmq: 
  6.     host: localhost 
  7.     port: 5672 
  8.     username: guest 
  9.     password: guest 
  10.     virtual-host: / 
  11.     publisherConfirmType: correlated 
  12.     publisherReturns: true 
  13.     listener: 
  14.       simple: 
  15.         concurrency: 5 
  16.         maxConcurrency: 10 
  17.         prefetch: 5 
  18.         acknowledgeMode: MANUAL 
  19.         retry: 
  20.           enabled: true 
  21.           initialInterval: 3000 
  22.           maxAttempts: 3 
  23.         defaultRequeueRejected: false 

實(shí)體類

記錄充值金額及賬戶信息

  1. @Entity 
  2. @Table(name = "t_pay_info"
  3. public class PayInfo implements Serializable
  4.  @Id 
  5.  private Long id; 
  6.  private BigDecimal money ; 
  7.  private Long accountId ; 
  8. }   

DAO及Service

  1. public interface PayInfoRepository extends JpaRepository<PayInfo, Long> { 
  2.  PayInfo findByOrderId(String orderId) ; 
  1. @Service 
  2. public class PayInfoService { 
  3.      
  4.     @Resource 
  5.     private PayInfoRepository payInfoRepository ; 
  6.     @Resource 
  7.     private RabbitTemplate rabbitTemplate ; 
  8.      
  9.   // 數(shù)據(jù)保存完后發(fā)送消息(這里發(fā)送消息可以應(yīng)用確認(rèn)模式或事物模式) 
  10.     @Transactional 
  11.     public PayInfo savePayInfo(PayInfo payInfo) { 
  12.         payInfo.setId(System.currentTimeMillis()) ; 
  13.         PayInfo result = payInfoRepository.save(payInfo) ; 
  14.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-""")) ; 
  15.         try { 
  16.             rabbitTemplate.convertAndSend("pay-exchange""pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ; 
  17.         } catch (AmqpException | JsonProcessingException e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.         return result ; 
  21.     } 
  22.      
  23.     public PayInfo queryByOrderId(String orderId) { 
  24.         return payInfoRepository.findByOrderId(orderId) ; 
  25.     } 
  26.      

支付完成后發(fā)送消息。

Controller接口

  1. @RestController 
  2. @RequestMapping("/payInfos"
  3. public class PayInfoController { 
  4.  @Resource 
  5.  private PayInfoService payInfoService ; 
  6.      
  7.   // 支付接口 
  8.  @PostMapping("/pay"
  9.  public Object pay(@RequestBody PayInfo payInfo) { 
  10.   payInfoService.savePayInfo(payInfo) ; 
  11.   return "支付已提交,等待結(jié)果" ; 
  12.  } 
  13.      
  14.  @GetMapping("/queryPay"
  15.  public Object queryPay(String orderId) { 
  16.   return payInfoService.queryByOrderId(orderId) ; 
  17.  } 
  18.      

子模塊users-manager

應(yīng)用配置

  1. server: 
  2.   port: 8081 
  3. --- 
  4. spring: 
  5.   rabbitmq: 
  6.     host: localhost 
  7.     port: 5672 
  8.     username: guest 
  9.     password: guest 
  10.     virtual-host: / 
  11.     publisherConfirmType: correlated 
  12.     publisherReturns: true 
  13.     listener: 
  14.       simple: 
  15.         concurrency: 5 
  16.         maxConcurrency: 10 
  17.         prefetch: 5 
  18.         acknowledgeMode: MANUAL 
  19.         retry: 
  20.           enabled: true 
  21.           initialInterval: 3000 
  22.           maxAttempts: 3 
  23.         defaultRequeueRejected: false 

實(shí)體類

  1. @Entity 
  2. @Table(name = "t_users"
  3. public class Users { 
  4.  @Id 
  5.  private Long id; 
  6.  private String name ; 
  7.  private BigDecimal money ; 
  8. }   

賬戶信息表

  1. @Entity 
  2. @Table(name = "t_users_log"
  3. public class UsersLog { 
  4.  @Id 
  5.  private Long id; 
  6.  private String orderId ; 
  7.  // 0: 支付中,1:已支付,2:已取消 
  8.  @Column(columnDefinition = "int default 0"
  9.  private Integer status = 0 ; 
  10.  private BigDecimal money ; 
  11.  private Date createTime ; 

賬戶充值記錄表(去重)

DAO及Service

  1. public interface UsersRepository extends JpaRepository<Users, Long> { 
  2. public interface UsersLogRepository extends JpaRepository<UsersLog, Long> { 
  3.  UsersLog findByOrderId(String orderId) ; 

Service類

  1. @Service 
  2. public class UsersService {  
  3.     @Resource 
  4.     private UsersRepository usersRepository ; 
  5.     @Resource 
  6.     private UsersLogRepository usersLogRepository ; 
  7.      
  8.  @Transactional 
  9.  public boolean updateMoneyAndLogStatus(Long id, String orderId) { 
  10.   UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ; 
  11.   if (usersLog != null && 1 == usersLog.getStatus()) { 
  12.    throw new RuntimeException("已支付") ; 
  13.   } 
  14.   Users users = usersRepository.findById(id).orElse(null) ; 
  15.   if (users == null) { 
  16.    throw new RuntimeException("賬戶不存在") ; 
  17.   } 
  18.   users.setMoney(users.getMoney().add(usersLog.getMoney())) ; 
  19.   usersRepository.save(users) ; 
  20.   usersLog.setStatus(1) ; 
  21.   usersLogRepository.save(usersLog) ; 
  22.   return true ; 
  23.  } 
  24.      
  25.  @Transactional 
  26.  public boolean saveLog(UsersLog usersLog) { 
  27.   usersLog.setId(System.currentTimeMillis()) ; 
  28.   usersLogRepository.save(usersLog) ; 
  29.   return true ; 
  30.  } 

消息監(jiān)聽(tīng)

  1. @Component 
  2. public class PayMessageListener { 
  3.      
  4.  private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ; 
  5.      
  6.  @Resource 
  7.  private  UsersService usersService ; 
  8.      
  9.  @SuppressWarnings("unchecked"
  10.  @RabbitListener(queues = {"pay-queue"}) 
  11.  @RabbitHandler 
  12.  public void receive(Message message, Channel channel) { 
  13.   long deliveryTag = message.getMessageProperties().getDeliveryTag() ; 
  14.   byte[] buf =  null ; 
  15.   try { 
  16.    buf = message.getBody() ; 
  17.    logger.info("接受到消息:{}", new String(buf, "UTF-8")) ; 
  18.    Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ; 
  19.    Long id = ((Integer) result.get("accountId")) + 0L ; 
  20.    String orderId = (String) result.get("orderId") ; 
  21.    usersService.updateMoneyAndLogStatus(id, orderId) ; 
  22.    channel.basicAck(deliveryTag, true) ; 
  23.   } catch (Exception e) { 
  24.    logger.error("消息接受出現(xiàn)異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ; 
  25.    e.printStackTrace() ; 
  26.    try { 
  27.     // 應(yīng)該將這類異常的消息放入死信隊(duì)列中,以便人工排查。 
  28.     channel.basicReject(deliveryTag, false); 
  29.    } catch (IOException e1) { 
  30.     logger.error("拒絕消息重入隊(duì)列異常:{}", e1.getMessage()) ; 
  31.     e1.printStackTrace(); 
  32.    } 
  33.   } 
  34.  } 

Controller接口

  1. @RestController 
  2. @RequestMapping("/users"
  3. public class UsersController { 
  4.      
  5.     @Resource 
  6.     private RestTemplate restTemplate ; 
  7.     @Resource 
  8.     private UsersService usersService ; 
  9.      
  10.     @PostMapping("/pay"
  11.     public Object pay(Long id, BigDecimal money) throws Exception { 
  12.         HttpHeaders headers = new HttpHeaders() ; 
  13.         headers.setContentType(MediaType.APPLICATION_JSON) ; 
  14.         String orderId = UUID.randomUUID().toString().replaceAll("-""") ; 
  15.         Map<String, String> params = new HashMap<>() ; 
  16.         params.put("accountId", String.valueOf(id)) ; 
  17.         params.put("orderId", orderId) ; 
  18.         params.put("money", money.toString()) ; 
  19.          
  20.         UsersLog usersLog = new UsersLog() ; 
  21.         usersLog.setCreateTime(new Date()) ; 
  22.         usersLog.setOrderId(orderId); 
  23.         usersLog.setMoney(money) ; 
  24.         usersLog.setStatus(0) ; 
  25.         usersService.saveLog(usersLog) ; 
  26.         HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ; 
  27.         return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ; 
  28.     } 
  29.      

以上是兩個(gè)子模塊的所有代碼了

測(cè)試

初始數(shù)據(jù)

賬戶子模塊控制臺(tái)

支付子模塊控制臺(tái)

數(shù)據(jù)表數(shù)據(jù)

完畢?。?!

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2023-08-30 08:33:07

RabbitMQSpringBoot消息信息

2024-06-11 13:50:43

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2009-06-19 15:28:31

JDBC分布式事務(wù)

2021-09-29 09:07:37

分布式架構(gòu)系統(tǒng)

2009-09-18 15:10:13

分布式事務(wù)LINQ TO SQL

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2019-06-26 09:41:44

分布式事務(wù)微服務(wù)

2022-03-24 07:51:27

seata分布式事務(wù)Java

2021-08-06 08:33:27

Springboot分布式Seata

2022-12-19 19:12:17

分布式事務(wù)

2018-10-28 17:54:00

分布式事務(wù)數(shù)據(jù)

2020-03-31 08:05:23

分布式開(kāi)發(fā)技術(shù)

2023-12-26 08:59:52

分布式場(chǎng)景事務(wù)機(jī)制

2023-09-11 15:40:43

鍵值存儲(chǔ)云服務(wù)

2021-02-01 09:35:53

關(guān)系型數(shù)據(jù)庫(kù)模型

2010-07-26 13:25:11

SQL Server分
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)