微服務(wù)架構(gòu):利用事件驅(qū)動實現(xiàn)最終一致性
事務(wù)一致性
首先,我們來回顧一下ACID原則:
- Atomicity:原子性,改變數(shù)據(jù)狀態(tài)要么是一起完成,要么一起失敗
- Consistency:一致性,數(shù)據(jù)的狀態(tài)是完整一致的
- Isolation:隔離線,即使有并發(fā)事務(wù),互相之間也不影響
- Durability:持久性, 一旦事務(wù)提交,不可撤銷
在單體應(yīng)用中,我們可以利用關(guān)系型數(shù)據(jù)庫的特性去完成事務(wù)一致性,但是一旦應(yīng)用往微服務(wù)發(fā)展,根據(jù)業(yè)務(wù)拆分成不用的模塊,而且每個模塊的數(shù)據(jù)庫已經(jīng)分離開了,這時候,我們要面對的就是分布式事務(wù)了,需要自己在代碼里頭完成ACID了。比較流行的解決方案有:兩階段提交、補(bǔ)償機(jī)制、本地消息表(利用本地事務(wù)和MQ)、MQ的事務(wù)消息(RocketMQ)。
CAP定理
1998年,加州大學(xué)的計算機(jī)科學(xué)家 Eric Brewer 提出,分布式系統(tǒng)有三個指標(biāo)。
- Consistency:一致性
- Availability:可用性
- Partition tolerance:分區(qū)容錯
Eric Brewer 說,這三個指標(biāo)不可能同時做到。這個結(jié)論就叫做 CAP 定理。
微服務(wù)中,不同模塊之間使用的數(shù)據(jù)庫是不同的,不同模塊之間部署的服務(wù)去也有可能是不用的,那么分區(qū)容錯是無法避免的,因為服務(wù)之間的調(diào)用不能保證百分百的沒問題,所以系統(tǒng)設(shè)計必須考慮這種情況。因此,我們可以認(rèn)為CAP的P總是成立的,剩下的C和A無法同時做到。
實際上根據(jù)分布式系統(tǒng)中CAP原則,當(dāng)P(分區(qū)容忍)發(fā)生的時候,強(qiáng)行追求C(一致性),會導(dǎo)致(A)可用性、吞吐量下降,此時我們一般用最終一致性來保證我們系統(tǒng)的AP能力。當(dāng)然不是放棄C,而是放棄強(qiáng)一致性,而且在一般情況下CAP都能保證,只是在發(fā)生分區(qū)容錯的情況下,我們可以通過最終一致性來保證數(shù)據(jù)一致。
事件驅(qū)動實現(xiàn)最終一致性
事件驅(qū)動架構(gòu)在領(lǐng)域?qū)ο笾g通過異步的消息來同步狀態(tài),有些消息也可以同時發(fā)布給多個服務(wù),在消息引起了一個服務(wù)的同步后可能會引起另外消息,事件會擴(kuò)散開。嚴(yán)格意義上的事件驅(qū)動是沒有同步調(diào)用的。
例子:
在電商里面,用戶下單必須根據(jù)庫存來確定訂單是否成交。
項目架構(gòu):SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因為小例子,不做成Spring Cloud架構(gòu)】
首先,我們來看看正常的服務(wù)之間調(diào)用:

代碼:
- @Override
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrder(OrderQuery query) {
- Result result = new Result();
- // 先遠(yuǎn)程調(diào)用Stock-Service去減少庫存
- RestTemplate restTemplate = new RestTemplate();
- //請求頭
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- //封裝成一個請求對象
- HttpEntity entity = new HttpEntity(query, headers);
- // 同步調(diào)用庫存服務(wù)的接口
- Result stockResult = restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,Result.class);
- if (stockResult.getCode() == Result.ResultConstants.SUCCESS){
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(1);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- result.setMsg("下單成功");
- }else {
- result.setMsg("下單失敗");
- }
- }else {
- result.setCode(Result.ResultConstants.FAIL);
- result.setMsg("下單失敗:"+stockResult.getMsg());
- }
- return result;
- }
我們可以看到,這樣的服務(wù)調(diào)用的弊端多多:
1、訂單服務(wù)需同步等待庫存服務(wù)的返回結(jié)果,接口結(jié)果返回延誤。2、訂單服務(wù)直接依賴于庫存服務(wù),只要庫存服務(wù)崩了,訂單服務(wù)不能再正常運行。3、訂單服務(wù)需考慮并發(fā)問題,庫存最后可能為負(fù)。
下面開始利用事件驅(qū)動實現(xiàn)最終一致性
1、在訂單服務(wù)新增訂單后,訂單的狀態(tài)是“已開啟”,然后發(fā)布一個Order Created事件到消息隊列上

代碼:
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrderByMQ(OrderQuery query) {
- Result result = new Result();
- // 先創(chuàng)建訂單,狀態(tài)為下單0
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(0);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- // 發(fā)送 訂單消息
- MqOrderMsg mqOrderMsg = new MqOrderMsg();
- mqOrderMsg.setId(order.getId());
- mqOrderMsg.setGoodCount(query.getGoodCount());
- mqOrderMsg.setGoodName(query.getGoodName());
- mqOrderMsg.setStockId(query.getStockId());
- jmsProducer.sendOrderCreatedMsg(mqOrderMsg);
- // 此時的訂單只是開啟狀態(tài)
- result.setMsg("下單成功");
- }
- return result;
- }
2、庫存服務(wù)在監(jiān)聽到消息隊列OrderCreated中的消息,將庫存表中商品的庫存減去下單數(shù)量,然后再發(fā)送一個Stock Locked事件給消息隊列。

代碼:
- /**
- * 接收下單消息
- * @param message 接收到的消息
- * @param session 上下文
- */
- @JmsListener(destination = ORDER_CREATE,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveOrderCreatedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- MqStockMsg result = new MqStockMsg();
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqOrderMsg msg = (MqOrderMsg)objectMessage.getObject();
- Integer updateCount = stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount());
- if (updateCount >= 1){
- result.setSuccess(true);
- result.setOrderId(msg.getId());
- }else {
- result.setSuccess(false);
- }
- // 手動ack,使消息出隊列,不然會不斷消費
- message.acknowledge();
- // 發(fā)送庫存鎖定消息到MQ
- jmsProducer.sendStockLockedMsg(result);
- }
- } catch (JMSException e) {
- log.error("接收訂單創(chuàng)建消息報錯:"+e.getMessage());
- }
- }
仔細(xì)的朋友可能會看到:message.acknowledge(),即手動確認(rèn)消息。因為在保證庫存服務(wù)的邏輯能正常執(zhí)行后再確認(rèn)消息已消費,可以保證消息的投遞可靠性,萬一在庫存服務(wù)執(zhí)行時報出異常,我們可以做到重新消費該下單消息。
3、訂單服務(wù)接收到Stock Locked事件,將訂單的狀態(tài)改為“已確認(rèn)”

代碼:
- /**
- * 判斷是否還有庫存,有庫存更新訂單狀態(tài)為1,無庫存更新訂單狀態(tài)為2,并且通知用戶(WebSocket)
- * @param message
- */
- @JmsListener(destination = STOCK_LOCKED,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveStockLockedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqStockMsg msg = (MqStockMsg)objectMessage.getObject();
- if (msg.isSuccess()){
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(1);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- log.info("訂單【"+msg.getOrderId()+"】下單成功");
- }else {
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(2);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- // 通知用戶庫存不足,訂單被取消
- log.error("訂單【"+msg.getOrderId()+"】因庫存不足被取消");
- }
- // 手動ack,使消息出隊列,不然會不斷消費
- message.acknowledge();
- }
- } catch (JMSException e) {
- log.error("接收庫存鎖定消息報錯:"+e.getMessage());
- }
- }
同樣,這里我們也是會利用手動確認(rèn)消息來保證消息的投遞可靠性。
至此,已經(jīng)全部搞定了。我們看一下和正常的服務(wù)調(diào)用對比如何:
1、訂單服務(wù)不再直接依賴于庫存服務(wù),而是將下單事件發(fā)送到MQ中,讓庫存監(jiān)聽。
2、訂單服務(wù)能真正的作為一個模塊獨立運行。
3、解決了并發(fā)問題,而且MQ的隊列處理效率非常的高。
但是也存在下面的問題:
1、用戶體驗改變了:因為使用事件機(jī)制,訂單是立即生成的,可是很有可能過一會,系統(tǒng)會提醒你沒貨了。。這就像是排隊搶購一樣,排著排著就被通知沒貨了,不用再排隊了。
2、數(shù)據(jù)庫可能會存在很對沒有完成下單的訂單。
最后,如果真的要考慮用戶體驗,并且不想數(shù)據(jù)庫存在很多不必要的數(shù)據(jù),該怎么辦?
那就把訂單服務(wù)和庫存服務(wù)聚合在一起吧。解決當(dāng)前的問題應(yīng)當(dāng)是首先要考慮的,我們設(shè)計微服務(wù)的目的是本想是解決業(yè)務(wù)并發(fā)量。而現(xiàn)在面臨的卻是用戶體驗的問題,所以架構(gòu)設(shè)計也是需要妥協(xié)的。
最主要是,我們是經(jīng)過思考和分析的,每個方案能做到哪種程度,能應(yīng)用到哪種場景。正所謂,技術(shù)要和實際場景結(jié)合,我們不能為了追求新技術(shù)而生搬硬套。