自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性

開發(fā) 架構(gòu)
微服務(wù)中,不同模塊之間使用的數(shù)據(jù)庫是不同的,不同模塊之間部署的服務(wù)去也有可能是不用的,那么分區(qū)容錯是無法避免的,因為服務(wù)之間的調(diào)用不能保證百分百的沒問題,所以系統(tǒng)設(shè)計必須考慮這種情況。

 事務(wù)一致性

首先,我們來回顧一下ACID原則:

  • Atomicity:原子性,改變數(shù)據(jù)狀態(tài)要么是一起完成,要么一起失敗
  • Consistency:一致性,數(shù)據(jù)的狀態(tài)是完整一致的
  • Isolation:隔離線,即使有并發(fā)事務(wù),互相之間也不影響
  • Durability:持久性, 一旦事務(wù)提交,不可撤銷

在單體應(yīng)用中,我們可以利用關(guān)系型數(shù)據(jù)庫的特性去完成事務(wù)一致性,但是一旦應(yīng)用往微服務(wù)發(fā)展,根據(jù)業(yè)務(wù)拆分成不用的模塊,而且每個模塊的數(shù)據(jù)庫已經(jīng)分離開了,這時候,我們要面對的就是分布式事務(wù)了,需要自己在代碼里頭完成ACID了。比較流行的解決方案有:兩階段提交、補(bǔ)償機(jī)制、本地消息表(利用本地事務(wù)和MQ)、MQ的事務(wù)消息(RocketMQ)。

[[278930]]

CAP定理

1998年,加州大學(xué)的計算機(jī)科學(xué)家 Eric Brewer 提出,分布式系統(tǒng)有三個指標(biāo)。

  • Consistency:一致性
  • Availability:可用性
  • Partition tolerance:分區(qū)容錯

Eric Brewer 說,這三個指標(biāo)不可能同時做到。這個結(jié)論就叫做 CAP 定理。

微服務(wù)中,不同模塊之間使用的數(shù)據(jù)庫是不同的,不同模塊之間部署的服務(wù)去也有可能是不用的,那么分區(qū)容錯是無法避免的,因為服務(wù)之間的調(diào)用不能保證百分百的沒問題,所以系統(tǒng)設(shè)計必須考慮這種情況。因此,我們可以認(rèn)為CAP的P總是成立的,剩下的C和A無法同時做到。

實際上根據(jù)分布式系統(tǒng)中CAP原則,當(dāng)P(分區(qū)容忍)發(fā)生的時候,強(qiáng)行追求C(一致性),會導(dǎo)致(A)可用性、吞吐量下降,此時我們一般用最終一致性來保證我們系統(tǒng)的AP能力。當(dāng)然不是放棄C,而是放棄強(qiáng)一致性,而且在一般情況下CAP都能保證,只是在發(fā)生分區(qū)容錯的情況下,我們可以通過最終一致性來保證數(shù)據(jù)一致。

事件驅(qū)動實現(xiàn)最終一致性

事件驅(qū)動架構(gòu)在領(lǐng)域?qū)ο笾g通過異步的消息來同步狀態(tài),有些消息也可以同時發(fā)布給多個服務(wù),在消息引起了一個服務(wù)的同步后可能會引起另外消息,事件會擴(kuò)散開。嚴(yán)格意義上的事件驅(qū)動是沒有同步調(diào)用的。

例子:

在電商里面,用戶下單必須根據(jù)庫存來確定訂單是否成交。

項目架構(gòu):SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因為小例子,不做成Spring Cloud架構(gòu)】

首先,我們來看看正常的服務(wù)之間調(diào)用:

 

微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性

 

代碼:

  1. @Override 
  2. @Transactional(rollbackFor = Exception.class) 
  3. public Result placeOrder(OrderQuery query) { 
  4.  Result result = new Result(); 
  5.  // 先遠(yuǎn)程調(diào)用Stock-Service去減少庫存 
  6.  RestTemplate restTemplate = new RestTemplate(); 
  7.  //請求頭 
  8.  HttpHeaders headers = new HttpHeaders(); 
  9.  headers.setContentType(MediaType.APPLICATION_JSON); 
  10.  //封裝成一個請求對象 
  11.  HttpEntity entity = new HttpEntity(query, headers); 
  12.  // 同步調(diào)用庫存服務(wù)的接口 
  13.  Result stockResult = restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,Result.class); 
  14.  if (stockResult.getCode() == Result.ResultConstants.SUCCESS){ 
  15.  Order order = new Order(); 
  16.  BeanUtils.copyProperties(query,order); 
  17.  order.setOrderStatus(1); 
  18.  Integer insertCount = orderMapper.insertSelective(order); 
  19.  if (insertCount == 1){ 
  20.  result.setMsg("下單成功"); 
  21.  }else { 
  22.  result.setMsg("下單失敗"); 
  23.  } 
  24.  }else { 
  25.  result.setCode(Result.ResultConstants.FAIL); 
  26.  result.setMsg("下單失敗:"+stockResult.getMsg()); 
  27.  } 
  28.  return result; 

我們可以看到,這樣的服務(wù)調(diào)用的弊端多多:

1、訂單服務(wù)需同步等待庫存服務(wù)的返回結(jié)果,接口結(jié)果返回延誤。2、訂單服務(wù)直接依賴于庫存服務(wù),只要庫存服務(wù)崩了,訂單服務(wù)不能再正常運行。3、訂單服務(wù)需考慮并發(fā)問題,庫存最后可能為負(fù)。

下面開始利用事件驅(qū)動實現(xiàn)最終一致性

1、在訂單服務(wù)新增訂單后,訂單的狀態(tài)是“已開啟”,然后發(fā)布一個Order Created事件到消息隊列上

 

微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性

 

代碼:

  1. @Transactional(rollbackFor = Exception.class) 
  2. public Result placeOrderByMQ(OrderQuery query) { 
  3.  Result result = new Result(); 
  4.  // 先創(chuàng)建訂單,狀態(tài)為下單0 
  5.  Order order = new Order(); 
  6.  BeanUtils.copyProperties(query,order); 
  7.  order.setOrderStatus(0); 
  8.  Integer insertCount = orderMapper.insertSelective(order); 
  9.  if (insertCount == 1){ 
  10.  // 發(fā)送 訂單消息 
  11.  MqOrderMsg mqOrderMsg = new MqOrderMsg(); 
  12.  mqOrderMsg.setId(order.getId()); 
  13.  mqOrderMsg.setGoodCount(query.getGoodCount()); 
  14.  mqOrderMsg.setGoodName(query.getGoodName()); 
  15.  mqOrderMsg.setStockId(query.getStockId()); 
  16.  jmsProducer.sendOrderCreatedMsg(mqOrderMsg); 
  17.  // 此時的訂單只是開啟狀態(tài) 
  18.  result.setMsg("下單成功"); 
  19.  } 
  20.  return result; 

2、庫存服務(wù)在監(jiān)聽到消息隊列OrderCreated中的消息,將庫存表中商品的庫存減去下單數(shù)量,然后再發(fā)送一個Stock Locked事件給消息隊列。

 

微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性

 

代碼:

  1. /** 
  2.  * 接收下單消息 
  3.  * @param message 接收到的消息 
  4.  * @param session 上下文 
  5.  */ 
  6. @JmsListener(destination = ORDER_CREATE,containerFactory = "myListenerContainerFactory"
  7. @Transactional(rollbackFor = Exception.class) 
  8. public void receiveOrderCreatedMsg(Message message, Session session){ 
  9.  try { 
  10.  if (message instanceof ActiveMQObjectMessage){ 
  11.  MqStockMsg result = new MqStockMsg(); 
  12.  ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message; 
  13.  MqOrderMsg msg = (MqOrderMsg)objectMessage.getObject(); 
  14.  Integer updateCount = stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount()); 
  15.  if (updateCount >= 1){ 
  16.  result.setSuccess(true); 
  17.  result.setOrderId(msg.getId()); 
  18.  }else { 
  19.  result.setSuccess(false); 
  20.  } 
  21.  // 手動ack,使消息出隊列,不然會不斷消費 
  22.  message.acknowledge(); 
  23.  // 發(fā)送庫存鎖定消息到MQ 
  24.  jmsProducer.sendStockLockedMsg(result); 
  25.  } 
  26.  } catch (JMSException e) { 
  27.  log.error("接收訂單創(chuàng)建消息報錯:"+e.getMessage()); 
  28.  } 

仔細(xì)的朋友可能會看到:message.acknowledge(),即手動確認(rèn)消息。因為在保證庫存服務(wù)的邏輯能正常執(zhí)行后再確認(rèn)消息已消費,可以保證消息的投遞可靠性,萬一在庫存服務(wù)執(zhí)行時報出異常,我們可以做到重新消費該下單消息。

3、訂單服務(wù)接收到Stock Locked事件,將訂單的狀態(tài)改為“已確認(rèn)”

 

微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性

 

代碼:

  1. /** 
  2.  * 判斷是否還有庫存,有庫存更新訂單狀態(tài)為1,無庫存更新訂單狀態(tài)為2,并且通知用戶(WebSocket) 
  3.  * @param message 
  4.  */ 
  5. @JmsListener(destination = STOCK_LOCKED,containerFactory = "myListenerContainerFactory"
  6. @Transactional(rollbackFor = Exception.class) 
  7. public void receiveStockLockedMsg(Message message, Session session){ 
  8.  try { 
  9.  if (message instanceof ActiveMQObjectMessage){ 
  10.  ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message; 
  11.  MqStockMsg msg = (MqStockMsg)objectMessage.getObject(); 
  12.  if (msg.isSuccess()){ 
  13.  Order updateOrder = new Order(); 
  14.  updateOrder.setId(msg.getOrderId()); 
  15.  updateOrder.setOrderStatus(1); 
  16.  orderMapper.updateByPrimaryKeySelective(updateOrder); 
  17.  log.info("訂單【"+msg.getOrderId()+"】下單成功"); 
  18.  }else { 
  19.  Order updateOrder = new Order(); 
  20.  updateOrder.setId(msg.getOrderId()); 
  21.  updateOrder.setOrderStatus(2); 
  22.  orderMapper.updateByPrimaryKeySelective(updateOrder); 
  23.  // 通知用戶庫存不足,訂單被取消 
  24.  log.error("訂單【"+msg.getOrderId()+"】因庫存不足被取消"); 
  25.  } 
  26.  // 手動ack,使消息出隊列,不然會不斷消費 
  27.  message.acknowledge(); 
  28.  } 
  29.  } catch (JMSException e) { 
  30.  log.error("接收庫存鎖定消息報錯:"+e.getMessage()); 
  31.  } 

同樣,這里我們也是會利用手動確認(rèn)消息來保證消息的投遞可靠性。

至此,已經(jīng)全部搞定了。我們看一下和正常的服務(wù)調(diào)用對比如何:

1、訂單服務(wù)不再直接依賴于庫存服務(wù),而是將下單事件發(fā)送到MQ中,讓庫存監(jiān)聽。

2、訂單服務(wù)能真正的作為一個模塊獨立運行。

3、解決了并發(fā)問題,而且MQ的隊列處理效率非常的高。

但是也存在下面的問題:

1、用戶體驗改變了:因為使用事件機(jī)制,訂單是立即生成的,可是很有可能過一會,系統(tǒng)會提醒你沒貨了。。這就像是排隊搶購一樣,排著排著就被通知沒貨了,不用再排隊了。

2、數(shù)據(jù)庫可能會存在很對沒有完成下單的訂單。

最后,如果真的要考慮用戶體驗,并且不想數(shù)據(jù)庫存在很多不必要的數(shù)據(jù),該怎么辦?

那就把訂單服務(wù)和庫存服務(wù)聚合在一起吧。解決當(dāng)前的問題應(yīng)當(dāng)是首先要考慮的,我們設(shè)計微服務(wù)的目的是本想是解決業(yè)務(wù)并發(fā)量。而現(xiàn)在面臨的卻是用戶體驗的問題,所以架構(gòu)設(shè)計也是需要妥協(xié)的。

最主要是,我們是經(jīng)過思考和分析的,每個方案能做到哪種程度,能應(yīng)用到哪種場景。正所謂,技術(shù)要和實際場景結(jié)合,我們不能為了追求新技術(shù)而生搬硬套。

責(zé)任編輯:武曉燕 來源: 今日頭條
相關(guān)推薦

2020-02-25 23:39:11

架構(gòu)運維技術(shù)

2019-12-17 08:40:33

微服務(wù)架構(gòu)數(shù)據(jù)

2023-11-22 12:55:59

微服務(wù)架構(gòu)數(shù)據(jù)庫

2019-01-15 17:58:03

微服務(wù)架構(gòu)數(shù)據(jù)

2021-07-26 06:33:42

CRDT數(shù)據(jù)CAP

2019-09-05 08:43:34

微服務(wù)分布式一致性數(shù)據(jù)共享

2024-06-04 09:51:48

2023-12-27 14:23:10

微服務(wù)數(shù)據(jù)存儲

2022-07-21 06:54:28

微服務(wù)系統(tǒng)RocketMQ

2017-07-25 14:38:56

數(shù)據(jù)庫一致性非鎖定讀一致性鎖定讀

2020-11-24 09:03:41

一致性MySQLMVCC

2022-12-14 08:23:30

2017-07-02 16:28:06

MySQL數(shù)據(jù)庫集群

2017-05-19 15:00:05

session架構(gòu)web-server

2023-06-07 08:10:29

2021-11-01 21:15:54

微服務(wù)系統(tǒng)數(shù)據(jù)

2016-12-19 18:41:09

哈希算法Java數(shù)據(jù)

2022-11-10 07:49:09

hash算法代碼

2021-06-16 08:33:02

分布式事務(wù)ACID

2025-02-10 03:00:00

點贊
收藏

51CTO技術(shù)棧公眾號