哪種分布式事務(wù)處理方案效率最高?必然是...
前面幾篇文章松哥和大家介紹了 Seata 中四種分布式事務(wù)處理方案,相信經(jīng)過前面的幾篇文章的學(xué)習(xí),大家對(duì)于 Seata 中的分布式事務(wù)已經(jīng)非常了解了。還沒看過前面文章的小伙伴,可以先看一下:
- 五分鐘帶你體驗(yàn)一把分布式事務(wù)!so easy!
- 看了那么多博客,還是不懂 TCC,不妨看看這個(gè)案例!
- XA 事務(wù)水很深,小伙子我怕你把握不住!
- 你這 Saga 事務(wù)保“隔離性”嗎?
不過很多小伙伴看完后感覺 Seata 對(duì)于分布式事務(wù)的處理,代碼雖然簡(jiǎn)單,但是內(nèi)部花費(fèi)在網(wǎng)絡(luò)上的時(shí)間消耗太多了,在高并發(fā)場(chǎng)景下,這似乎并不是一種很好的解決方案。
要說哪種分布式事務(wù)處理方案效率高,必然繞不開消息中間件!基于消息中間件的兩階段提交方案,通常用在高并發(fā)場(chǎng)景下。這種方式通過犧牲數(shù)據(jù)的強(qiáng)一致性換取性能的大幅提升,不過實(shí)現(xiàn)這種方式的成本和復(fù)雜度是比較高的,使用時(shí)還要看實(shí)際業(yè)務(wù)情況。
今天松哥想通過一個(gè)簡(jiǎn)單的案例,來和大家聊一聊如何通過消息中間件來處理分布式事務(wù)。
1. 思路分析
先來說說整體思路。
有一個(gè)名詞叫做消息驅(qū)動(dòng)的微服務(wù),相信很多小伙伴都聽說過。怎么理解呢?
在微服務(wù)系統(tǒng)中,服務(wù)之間的互相調(diào)用,我們可以使用 HTTP 的方式,例如 OpenFeign,也可以使用 RPC 的方式,例如 Dubbo,除了這些方案之外,我們也可以使用消息驅(qū)動(dòng),這是一種典型的響應(yīng)式系統(tǒng)設(shè)計(jì)方案。
在消息驅(qū)動(dòng)的微服務(wù)中,服務(wù)之間不再互相直接調(diào)用,當(dāng)服務(wù)之間需要通信時(shí),就把通信內(nèi)容發(fā)送到消息中間件上,另一個(gè)服務(wù)則通過監(jiān)聽消息中間件中的消息隊(duì)列,來完成相應(yīng)的業(yè)務(wù)邏輯調(diào)用,過程就是這么個(gè)過程,并不難,具體怎么玩,我們繼續(xù)往下看。
2. 業(yè)務(wù)分析
折騰了半天,后來松哥在網(wǎng)上找到了一個(gè)別人寫好的例子,我覺得用來演示這個(gè)問題特別合適,所以我就沒有自己寫案例了,直接用別人的代碼,我們來逐個(gè)分析,跟前面講分布式事務(wù) Seata 的方式一致。
首先我們來看如下一張流程圖,這是一個(gè)用戶購(gòu)票的案例:
當(dāng)用戶想要購(gòu)買一張票時(shí):
- 向新訂單隊(duì)列中寫入一條數(shù)據(jù)。
- Order Service 負(fù)責(zé)消費(fèi)這個(gè)隊(duì)列中的消息,完成訂單的創(chuàng)建,然后再向新訂單繳費(fèi)隊(duì)列中寫入一條消息。
- User Service 負(fù)責(zé)消費(fèi)新訂單繳費(fèi)隊(duì)列中的消息,在 User Service 中完成對(duì)用戶賬戶余額的劃扣,然后向新訂單轉(zhuǎn)移票隊(duì)列中寫入一條消息。
- Ticket Service 負(fù)責(zé)消費(fèi)新訂單轉(zhuǎn)移票隊(duì)列,在 Ticket Service 中完成票的轉(zhuǎn)移,然后發(fā)送一條消息給訂單完成隊(duì)列。
- 最后 Order Service 中負(fù)責(zé)監(jiān)聽訂單完成隊(duì)列,處理完成后的訂單。
這就是一個(gè)典型的消息驅(qū)動(dòng)微服務(wù),也是一個(gè)典型的響應(yīng)式系統(tǒng)。在這個(gè)系統(tǒng)中,一共有三個(gè)服務(wù),分別是:
- Order Service
- User Service
- Ticket Service
這三個(gè)服務(wù)之間不會(huì)進(jìn)行任何形式的直接調(diào)用,大家有事都是直接發(fā)送到消息中間件,其他服務(wù)則從消息中間件中獲取自己想要的消息然后進(jìn)行處理。
具體到我們的實(shí)踐中,則多了一個(gè)檢查票是否夠用的流程,如下圖:
創(chuàng)建訂單時(shí),先由 Ticket 服務(wù)檢查票是否夠用,沒問題的話再繼續(xù)發(fā)起訂單的創(chuàng)建。其他過程我就不說了。
另外還需要注意,在售票系統(tǒng)中,由于每張票都不同,例如每張票可能有座位啥的,因此一張票在數(shù)據(jù)庫(kù)中往往是被設(shè)計(jì)成一條記錄。
3. 實(shí)踐
流程我已經(jīng)說明白了,接下來我們就來看看具體的代碼實(shí)踐。
3.1 準(zhǔn)備數(shù)據(jù)庫(kù)
首先我們準(zhǔn)備三個(gè)數(shù)據(jù)庫(kù),分別是:
- javaboy_order:訂單庫(kù),用戶創(chuàng)建訂單等操作,在這個(gè)數(shù)據(jù)庫(kù)中完成。
- javaboy_ticket:票務(wù)庫(kù),這個(gè)庫(kù)中保存著所有的票據(jù)信息,每一張票都是一條記錄,都保存在這個(gè)庫(kù)中。
- javaboy_user:用戶庫(kù),這里保存著用戶的賬戶余額以及付款記錄等信息。
每個(gè)庫(kù)中都有各自對(duì)應(yīng)的表,為了操作方便,這些表不用自己創(chuàng)建,將來等項(xiàng)目啟動(dòng)了,利用 JPA 自動(dòng)創(chuàng)建即可。
3.2 項(xiàng)目概覽
我們先來整體上看下這個(gè)項(xiàng)目,公眾號(hào)后臺(tái)回復(fù) mq_tran 可以下載完整代碼:
一共有五個(gè)服務(wù):
- eureka:注冊(cè)中心
- order:訂單服務(wù)
- service:公共模塊
- ticket:票務(wù)服務(wù)
- user:用戶服務(wù)
下面分別來說。
3.3 注冊(cè)中心
有人說,都消息驅(qū)動(dòng)了,還要注冊(cè)中心干嘛?
消息驅(qū)動(dòng)沒錯(cuò),消息驅(qū)動(dòng)微服務(wù)之后每個(gè)服務(wù)只管把消息往消息中間件上扔,每個(gè)服務(wù)又只管消費(fèi)消息中間件上的消息,這個(gè)時(shí)候?qū)τ诜?wù)注冊(cè)中心似乎不是那么強(qiáng)需要。不過在我們這個(gè)案例中,消息驅(qū)動(dòng)主要用來處理事務(wù)問題,其他常規(guī)需求我們還是用 OpenFeign 來處理,所以這里我們依然需要一個(gè)注冊(cè)中心。
這里的注冊(cè)中心我就選擇常見的 Eureka,省事一些。由于本文主要是和大家聊分布式事務(wù),所以涉及到微服務(wù)的東西我就簡(jiǎn)單介紹下,不會(huì)占用過多篇幅,如果大家還不熟悉 Spring Cloud 的用法,可以在公眾號(hào)后臺(tái)回復(fù) vhr 有一套視頻介紹。
服務(wù)注冊(cè)中心的創(chuàng)建記得加上 Spring Security,將自己的服務(wù)注冊(cè)中心保護(hù)起來。
這塊有一個(gè)小小的細(xì)節(jié)和大家多說兩句。
Eureka 用 Spring Security 保護(hù)起來之后,以后其他服務(wù)注冊(cè)都是通過 Http Basic 來認(rèn)證,所以我們要在代碼中開啟 Http Basic 認(rèn)證,如下(以前舊版本不需要下面這段代碼,但是新版本需要):
- @Configuration
- public class SecurityConfig extends WebSecurityConfigurerAdapter {
- @Override
- protected void configure(HttpSecurity http) throws Exception {
- http.authorizeRequests()
- .anyRequest().authenticated()
- .and()
- .httpBasic()
- .and().formLogin().and().csrf().disable();
- }
- }
3.4 購(gòu)票服務(wù)
接下來我們就來看看購(gòu)票服務(wù)。
購(gòu)票是從下訂單開始,所以我們就先從訂單服務(wù) order 開始整個(gè)流程的分析。
3.4.1 新訂單處理(order)
當(dāng)用戶發(fā)起一個(gè)購(gòu)票請(qǐng)求后,這個(gè)請(qǐng)求發(fā)送到 order 服務(wù)上,order 服務(wù)首先會(huì)向 order:new 隊(duì)列發(fā)送一條消息,開啟一個(gè)訂單的處理流程。代碼如下:
- @Transactional
- @PostMapping("")
- public void create(@RequestBody OrderDTO dto) {
- dto.setUuid(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("order:new", dto);
- }
上面設(shè)置的 UUID 是整個(gè)訂單在處理過程中的一個(gè)唯一標(biāo)志符,也算是一條主線。
order:new 隊(duì)列中的消息將被 ticket 服務(wù)消費(fèi),ticket 服務(wù)消費(fèi) order:new 中的消息,并進(jìn)行鎖票操作(鎖票的目的防止有兩個(gè)消費(fèi)同時(shí)購(gòu)買同一張票),鎖票成功后,ticket 服務(wù)將向 order:locked 隊(duì)列發(fā)送一條消息,表示鎖票成功;否則向 order:fail 隊(duì)列發(fā)送一條消息表示鎖票失敗。
這里的 OrderDTO 對(duì)象將貫穿整個(gè)購(gòu)票過程。
3.4.2 鎖票(ticket)
鎖票操作是在 ticket 服務(wù)中完成的,代碼如下:
- @Transactional
- @RabbitListener(queues = "order:new")
- public void handleTicketLock(OrderDTO msg) {
- LOG.info("Get new order for ticket lock:{}", msg);
- int lockCount = ticketRepository.lockTicket(msg.getCustomerId(), msg.getTicketNum());
- if (lockCount == 0) {
- msg.setStatus("TICKET_LOCK_FAIL");
- rabbitTemplate.convertAndSend("order:fail", msg);
- } else {
- msg.setStatus("TICKET_LOCKED");
- rabbitTemplate.convertAndSend("order:locked", msg);
- }
- }
先調(diào)用 lockTicket 方法去數(shù)據(jù)庫(kù)中鎖票,所謂的鎖票就是將要購(gòu)買的票的 lock_user 字段設(shè)置為 customer_id(購(gòu)買者的 id)。
如果鎖票成功(即數(shù)據(jù)庫(kù)修改成功),設(shè)置 msg 的狀態(tài)為 TICKET_LOCKED,同時(shí)發(fā)送消息到 order:locked 隊(duì)列,表示鎖票成功。
如果鎖票失敗(即數(shù)據(jù)庫(kù)修改失敗),設(shè)置 msg 的狀態(tài)為 TICKET_LOCK_FAIL,同時(shí)發(fā)送消息到 order:fail 隊(duì)列,表示鎖票失敗。
3.4.2 鎖票成功(order)
接下來,由 order 服務(wù)消費(fèi) order:locked 隊(duì)列中的消息,也就是鎖票成功后接下來的操作。
- @Transactional
- @RabbitListener(queues = "order:locked")
- public void handle(OrderDTO msg) {
- LOG.info("Get new order to create:{}", msg);
- if (orderRepository.findOneByUuid(msg.getUuid()) != null) {
- LOG.info("Msg already processed:{}", msg);
- } else {
- Order order = newOrder(msg);
- orderRepository.save(order);
- msg.setId(order.getId());
- }
- msg.setStatus("NEW");
- rabbitTemplate.convertAndSend("order:pay", msg);
- }
鎖票成功后,先根據(jù)訂單的 UUID 去訂單數(shù)據(jù)庫(kù)查詢,是否已經(jīng)有訂單記錄了,如果有,說明這條消息已經(jīng)被處理了,可以防止訂單的重復(fù)處理(這塊主要是解決冪等性問題)。
如果訂單還沒有被處理,則創(chuàng)建一個(gè)新的訂單對(duì)象,并保存到數(shù)據(jù)庫(kù)中,創(chuàng)建新訂單對(duì)象的時(shí)候,需要設(shè)置訂單的 status 為 NEW。
最后設(shè)置 msg 的 status 為 NEW,然后向 order:pay 隊(duì)列發(fā)送一條消息開啟付款流程,付款是由 user 服務(wù)提供的。user 服務(wù)中會(huì)檢查用戶的賬戶余額是否夠用,如果不夠用,就會(huì)發(fā)送消息到 order:ticket_error 隊(duì)列,表示訂票失敗;如果余額夠用,則進(jìn)行正常的付款操作,并在付款成功后發(fā)送消息到 order:ticket_move 隊(duì)列,開啟票的轉(zhuǎn)移。
3.4.3 繳費(fèi)(user)
鎖票成功后,接下來就是付費(fèi)了,付費(fèi)服務(wù)由 user 提供。
- @Transactional
- @RabbitListener(queues = "order:pay")
- public void handle(OrderDTO msg) {
- LOG.info("Get new order to pay:{}", msg);
- // 先檢查payInfo判斷重復(fù)消息。
- PayInfo pay = payInfoRepository.findOneByOrderId(msg.getId());
- if (pay != null) {
- LOG.warn("Order already paid, duplicated message.");
- return;
- }
- Customer customer = customerRepository.getById(msg.getCustomerId());
- if (customer.getDeposit() < msg.getAmount()) {
- LOG.info("No enough deposit, need amount:{}", msg.getAmount());
- msg.setStatus("NOT_ENOUGH_DEPOSIT");
- rabbitTemplate.convertAndSend("order:ticket_error", msg);
- return;
- }
- pay = new PayInfo();
- pay.setOrderId(msg.getId());
- pay.setAmount(msg.getAmount());
- pay.setStatus("PAID");
- payInfoRepository.save(pay);
- customerRepository.charge(msg.getCustomerId(), msg.getAmount());
- msg.setStatus("PAID");
- rabbitTemplate.convertAndSend("order:ticket_move", msg);
- }
這里的執(zhí)行步驟如下:
- 首先根據(jù)訂單 id 去查找付款信息,檢查當(dāng)前訂單是否已經(jīng)完成付款,如果已經(jīng)完成服務(wù),則直接 return,這一步也是為了處理冪等性問題。
- 根據(jù)顧客的 id,查找到顧客的完整信息,包括顧客的賬戶余額。
- 檢查顧客的賬戶余額是否足夠支付票價(jià),如果不夠,則設(shè)置 msg 的 status 為 NOT_ENOUGH_DEPOSIT,同時(shí)向 order:ticket_error 隊(duì)列發(fā)送消息,表示訂票失敗。
- 如果顧客賬戶余額足夠支付票價(jià),則創(chuàng)建一個(gè) PayInfo 對(duì)象,設(shè)置相關(guān)的支付信息,并存入 pay_info 表中。
- 調(diào)用 charge 方法完成顧客賬戶余額的扣款。
- 發(fā)送消息到 order:ticket_move 隊(duì)列中,開啟交票操作。
3.4.4 交票(ticket)
- @Transactional
- @RabbitListener(queues = "order:ticket_move")
- public void handleTicketMove(OrderDTO msg) {
- LOG.info("Get new order for ticket move:{}", msg);
- int moveCount = ticketRepository.moveTicket(msg.getCustomerId(), msg.getTicketNum());
- if (moveCount == 0) {
- LOG.info("Ticket already transferred.");
- }
- msg.setStatus("TICKET_MOVED");
- rabbitTemplate.convertAndSend("order:finish", msg);
- }
調(diào)用 moveTicket 方法完成交票操作,也就是設(shè)置 ticket 表中票的 owner 為 customerId。
交票成功后,發(fā)送消息到 order:finish 隊(duì)列,表示交票完成。
3.4.5 訂單完成(order)
- @Transactional
- @RabbitListener(queues = "order:finish")
- public void handleFinish(OrderDTO msg) {
- LOG.info("Get finished order:{}", msg);
- Order order = orderRepository.getById(msg.getId());
- order.setStatus("FINISH");
- orderRepository.save(order);
- }
這里的處理就比較簡(jiǎn)單,訂單完成后,就設(shè)置訂單的狀態(tài)為 FINISH 即可。
上面介紹的是一條主線,順利的話,消息順著這條線走一遍,一個(gè)訂單就處理完成了。
不順利的話,就有各種幺蛾子,我們分別來看。
3.4.6 鎖票失敗(order)
鎖票是在 ticket 服務(wù)中完成的,如果鎖票失敗,就會(huì)直接向 order:fail 隊(duì)列發(fā)送消息,該隊(duì)列的消息由 order 服務(wù)負(fù)責(zé)消費(fèi)。
3.4.7 扣款失敗(ticket)
扣款操作是在 user 中完成的,扣款失敗就會(huì)向 order:ticket_error 隊(duì)列中發(fā)送消息,該隊(duì)列的消息由 ticket 服務(wù)負(fù)責(zé)消費(fèi)。
- @Transactional
- @RabbitListener(queues = "order:ticket_error")
- public void handleError(OrderDTO msg) {
- LOG.info("Get order error for ticket unlock:{}", msg);
- int count = ticketRepository.unMoveTicket(msg.getCustomerId(), msg.getTicketNum());
- if (count == 0) {
- LOG.info("Ticket already unlocked:", msg);
- }
- count = ticketRepository.unLockTicket(msg.getCustomerId(), msg.getTicketNum());
- if (count == 0) {
- LOG.info("Ticket already unmoved, or not moved:", msg);
- }
- rabbitTemplate.convertAndSend("order:fail", msg);
- }
當(dāng)扣款失敗的時(shí)候,做三件事:
- 撤銷票的轉(zhuǎn)移,也就是把票的 owner 字段重新置為 null。
- 撤銷鎖票,也就是把票的 lock_user 字段重新置為 null。
- 向 order:fail 隊(duì)列發(fā)送訂單失敗的消息。
3.4.8 下單失敗(order)
下單失敗的處理在 order 服務(wù)中,有三種情況會(huì)向 order:fail 隊(duì)列發(fā)送消息:
- 鎖票失敗
- 扣款失敗(客戶賬戶余額不足)
- 訂單超時(shí)
- @Transactional
- @RabbitListener(queues = "order:fail")
- public void handleFailed(OrderDTO msg) {
- LOG.info("Get failed order:{}", msg);
- Order order;
- if (msg.getId() == null) {
- order = newOrder(msg);
- order.setReason("TICKET_LOCK_FAIL");
- } else {
- order = orderRepository.getById(msg.getId());
- if (msg.getStatus().equals("NOT_ENOUGH_DEPOSIT")) {
- order.setReason("NOT_ENOUGH_DEPOSIT");
- }
- }
- order.setStatus("FAIL");
- orderRepository.save(order);
- }
該方法的具體處理邏輯如下:
- 首先查看是否有訂單 id,如果連訂單 id 都沒有,就說明是鎖票失敗,給訂單設(shè)置 reason 屬性的值為TICKET_LOCK_FAIL。
- 如果有訂單 id,則根據(jù) id 查詢訂單信息,并判斷訂單狀態(tài)是否為 NOT_ENOUGH_DEPOSIT,這個(gè)表示扣款失敗,如果訂單狀態(tài)是 NOT_ENOUGH_DEPOSIT,則設(shè)置失敗的 reason 也為此。
- 最后設(shè)置訂單狀態(tài)為 FAIL,然后更新數(shù)據(jù)庫(kù)中的訂單信息即可。
3.4.9 訂單超時(shí)(order)
order 服務(wù)中還有一個(gè)定時(shí)任務(wù),定時(shí)去數(shù)據(jù)庫(kù)中撈取那些處理失敗的訂單,如下:
- @Scheduled(fixedDelay = 10000L)
- public void checkInvalidOrder() {
- ZonedDateTime checkTime = ZonedDateTime.now().minusMinutes(1L);
- List<Order> orders = orderRepository.findAllByStatusAndCreatedDateBefore("NEW", checkTime);
- orders.stream().forEach(order -> {
- LOG.error("Order timeout:{}", order);
- OrderDTO dto = new OrderDTO();
- dto.setId(order.getId());
- dto.setTicketNum(order.getTicketNum());
- dto.setUuid(order.getUuid());
- dto.setAmount(order.getAmount());
- dto.setTitle(order.getTitle());
- dto.setCustomerId(order.getCustomerId());
- dto.setStatus("TIMEOUT");
- rabbitTemplate.convertAndSend("order:ticket_error", dto);
- });
- }
可以看到,這里是去數(shù)據(jù)庫(kù)中撈取那些狀態(tài)為 NEW 并且是 1 分鐘之前的訂單,根據(jù)前面的分析,當(dāng)鎖票成功后,就會(huì)將訂單的狀態(tài)設(shè)置為 NEW 并且存入數(shù)據(jù)庫(kù)中。換言之,當(dāng)鎖票成功一分鐘之后,這張票還沒有賣掉,就設(shè)置訂單超時(shí),同時(shí)向 order:ticket_error 隊(duì)列發(fā)送一條消息,這條消息在 ticket 服務(wù)中被消費(fèi),最終完成撤銷交票、撤銷鎖票等操作。
這就是大致的代碼處理流程。
再來回顧一下前面那張圖:
結(jié)合著代碼來看這張圖是不是就很容易懂了。
3.5 測(cè)試
接下來我們來進(jìn)行一個(gè)簡(jiǎn)單的測(cè)試。
先來一個(gè)訂票失敗的測(cè)試,如下:
由于用戶只有 1000 塊錢,這張票要 10000,所以購(gòu)票必然失敗。請(qǐng)求執(zhí)行成功后,我們查看 order 表,多了如下一條記錄:
可以看到,訂單失敗的理由就是賬戶余額不足。此時(shí)查看 ticket 和 user 表,發(fā)現(xiàn)都完好如初(如果需要,則已經(jīng)反向補(bǔ)償了)。
接下來我們手動(dòng)給 ticket 表中 lock_user 字段設(shè)置一個(gè)值,如下:
這個(gè)表示這張票已經(jīng)被人鎖定了。
然后我們發(fā)起一次購(gòu)票請(qǐng)求(這次可以把金額設(shè)置到合理范圍,其實(shí)不設(shè)置也行,反正這次失敗還沒走到付款這一步):
請(qǐng)求發(fā)送成功后,接下來我們?nèi)ゲ榭?order 表,多了如下一條記錄:
可以看到,這次下單失敗的理由是鎖票失敗。此時(shí)查看 ticket 和 user 表,發(fā)現(xiàn)都完好如初(如果需要,則已經(jīng)反向補(bǔ)償了)。
最后再來一次成功測(cè)試,先把 ticket 表中的 lock_user 字段置空,然后發(fā)送如下請(qǐng)求:
這次購(gòu)票成功,查看 ticket 表,發(fā)票已經(jīng)票有所屬:
查看訂單表:
可以多了一條成功的購(gòu)票記錄。
查看用戶表:
用戶賬戶已扣款。
查看支付記錄表:
可以看到已經(jīng)有了支付記錄。
4. 總結(jié)
整體上來說,上面這個(gè)案例,技術(shù)上并沒有什么難的,復(fù)雜之處在于設(shè)計(jì)。一開始要設(shè)計(jì)好消息的處理流程以及消息處理失敗后如何進(jìn)行補(bǔ)償,這個(gè)是比較考驗(yàn)大家技術(shù)的。
另外上面案例中,消息的發(fā)送和消費(fèi)都用到了 RabbitMQ 中的事務(wù)機(jī)制(確保消息消費(fèi)成功)以及 Spring 中的事務(wù)機(jī)制(確保消息發(fā)送和數(shù)據(jù)保存同時(shí)成功),這些我就不再贅述了。
總之,通過消息中間件處理分布式事務(wù),這種方式通過犧牲數(shù)據(jù)的強(qiáng)一致性換取性能的大幅提升,但是實(shí)現(xiàn)這種方式的成本和復(fù)雜度是比較高的,使用時(shí)還要看實(shí)際業(yè)務(wù)情況。
本文轉(zhuǎn)載自微信公眾號(hào)「江南一點(diǎn)雨」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系江南一點(diǎn)雨公眾號(hào)。