淺談 RocketMQ、Kafka、Pulsar 的事務(wù)消息
導(dǎo)語
事務(wù)是一個程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。RocketMQ、Kafka 和 Pulsar 都是當(dāng)今業(yè)界應(yīng)用十分廣泛的開源消息隊列(MQ)組件,筆者在工作中遇到關(guān)于 MQ 選型相關(guān)的內(nèi)容,了解到關(guān)于“事務(wù)消息”這個概念在不同的 MQ 組件里有不同內(nèi)涵。故借此文,試著淺析一番這三種消息隊列(MQ)的事務(wù)消息有何異同,目的是形成關(guān)于消息隊列事務(wù)消息的全景視圖,給有類似業(yè)務(wù)需求的同學(xué)提供一些參考和借鑒。
一、消息隊列演化
消息隊列(Message Queue,簡稱 MQ),是指在消息的傳輸中保存消息的容器或服務(wù),是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu),是分布式系統(tǒng)實現(xiàn)高性能、高可用、可伸縮等高級特效的重要組件。常見的主流消息隊列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Pulsar 等。而在公司內(nèi)有 TubeMQ、Ckafka、TDMQ、CMQ、CDMQ、Hippo 等。
消息隊列演化
Kafka:Apache Kafka 是由 Apache 軟件基金會開發(fā)的一個開源消息系統(tǒng)項目,由 Scala 寫成。Kafka 最初是由 LinkedIn 開發(fā),并于 2011 年初開源。2012 年 10 月從 Apache Incubator 畢業(yè)。該項目的目標(biāo)是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的平臺。Kafka 是一個分布式的、分區(qū)的、多復(fù)本的日志提交服務(wù)。它通過一種獨一無二的設(shè)計提供了一個消息系統(tǒng)的功能,其整體架構(gòu)圖如下所示。
kafka整體架構(gòu)圖
RocketMQ:Apache RocketMQ 是一個分布式消息和流媒體平臺,具有低延遲、強一致、高性能和可靠性、萬億級容量和靈活的可擴展性。它有借鑒 Kafka 的設(shè)計思想,但不是 kafka 的拷貝,其整體架構(gòu)圖如下所示。
RocketMQ架構(gòu)圖
Pulsar:Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計算為一體,采用計算與存儲分離架構(gòu)設(shè)計,支持多租戶、持久化存儲、多機房跨區(qū)域數(shù)據(jù)復(fù)制,具有強一致性、高吞吐、低延時及高可擴展性等流數(shù)據(jù)存儲特性,被看作是云原生時代實時消息流傳輸、存儲和計算最佳解決方案,其整體架構(gòu)圖如下所示。
Pulsar架構(gòu)圖
二、背景知識
2.1 什么是事務(wù)?
2.1.1 事務(wù)(Trasaction)
事務(wù)是一個程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。
一個事務(wù)有四個基本特性,也就是我們常說的(ACID)。
Atomicity(原子性) :事務(wù)是一個不可分割的整體,事務(wù)內(nèi)所有操作要么全做成功,要么全失敗。
Consistency(一致性) :事務(wù)執(zhí)行前后,數(shù)據(jù)從一個狀態(tài)到另一個狀態(tài)必須是一致的(A 向 B 轉(zhuǎn)賬,不能出現(xiàn) A 扣了錢,B 卻沒收到)。
Isolation(隔離性):多個并發(fā)事務(wù)之間相互隔離,不能互相干擾。
Durablity(持久性) :事務(wù)完成后,對數(shù)據(jù)的更改是永久保存的,不能回滾。
2.1.2 分布式事務(wù)
分布式事務(wù)是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點之上。分布式事務(wù)通常用于在分布式系統(tǒng)中保證不同節(jié)點之間的數(shù)據(jù)一致性。
分布式事務(wù)的解決方案一般有以下幾種:
XA(2PC/3PC)
最具有代表性的是由 Oracle Tuxedo 系統(tǒng)提出的 XA 分布式事務(wù)協(xié)議。XA 中大致分為兩部分:事務(wù)管理器和本地資源管理器。其中本地資源管理器往往由數(shù)據(jù)庫實現(xiàn),比如 Oracle、DB2 這些商業(yè)數(shù)據(jù)庫都實現(xiàn)了 XA 接口,而事務(wù)管理器作為全局的調(diào)度者,負責(zé)各個本地資源的提交和回滾。XA 協(xié)議通常包含**兩階段提交(2PC)和三階段提交(3PC)**兩種實現(xiàn)。兩階段提交顧名思義就是要進行兩個階段的提交:第一階段,準(zhǔn)備階段(投票階段) ;第二階段,提交階段(執(zhí)行階段)。實現(xiàn)過程如下所示:
2PC
二階段提交看似能夠提供原子性的操作,但它存在著一些缺陷,三段提交(3PC)是對兩段提交(2PC)的一種升級優(yōu)化,有興趣的可以深入了解一下,這里不再贅述。
TCC
TCC(Try-Confirm-Cancel)是 Try、Commit、Cancel 三種指令的縮寫,又被稱補償事務(wù),其邏輯模式類似于 XA 兩階段提交,事務(wù)處理流程也很相似,但 2PC 是應(yīng)用于在 DB 層面,TCC 則可以理解為在應(yīng)用層面的 2PC,是需要我們編寫業(yè)務(wù)邏輯來實現(xiàn)。
TCC 它的核心思想是:"針對每個操作都要注冊一個與其對應(yīng)的確認(rèn)(Try)和補償(Cancel)"。
消息事務(wù)
所謂的消息事務(wù)就是基于消息隊列的兩階段提交,本質(zhì)上是對消息隊列的一種特殊利用,它是將本地事務(wù)和發(fā)消息放在了一個分布式事務(wù)里,保證要么本地操作成功成功并且對外發(fā)消息成功,要么兩者都失敗。
基于消息隊列的兩階段提交往往用在高并發(fā)場景下,將一個分布式事務(wù)拆成一個消息事務(wù)(A 系統(tǒng)的本地操作+發(fā)消息)+B 系統(tǒng)的本地操作,其中 B 系統(tǒng)的操作由消息驅(qū)動,只要消息事務(wù)成功,那么 A 操作一定成功,消息也一定發(fā)出來了,這時候 B 會收到消息去執(zhí)行本地操作,如果本地操作失敗,消息會重投,直到 B 操作成功,這樣就變相地實現(xiàn)了 A 與 B 的分布式事務(wù)。原理如下:
消息事務(wù)示意圖
雖然上面的方案能夠完成 A 和 B 的操作,但是 A 和 B 并不是強一致的,而是最終一致(Eventually consistent)的。而這也是滿足 BASE 理論的要求的。這里引申一下,BASE 是 Basically Available(基本可用)、Soft state(軟狀態(tài))和 Eventually consistent (最終一致性)三個短語的縮寫。BASE 理論是對 CAP 中 AP (CAP 已經(jīng)被證實一個分布式系統(tǒng)最多只能同時滿足 CAP 三項中的兩項)的一個擴展,通過犧牲強一致性來獲得可用性,當(dāng)出現(xiàn)故障允許部分不可用但要保證核心功能可用,允許數(shù)據(jù)在一段時間內(nèi)是不一致的,但最終達到一致狀態(tài)。滿足 BASE 理論的事務(wù),我們稱之為“柔性事務(wù)”。
2.2 什么是 Exactly-once (精確一次)語義?
在分布式系統(tǒng)中,任何節(jié)點都有可能出現(xiàn)異常甚至宕機。在 消息隊列中也一樣,當(dāng) Producer 在生產(chǎn)消息時,可能會發(fā)生 Broker 宕機不可用,或者網(wǎng)絡(luò)突然中斷等異常情況。根據(jù)在發(fā)生異常時 Producer 處理消息的方式,系統(tǒng)可以具備以下三種消息語義。
2.2.1 At-least-once (至少一次)語義
Producer 通過接收 Broker 的 ACK (消息確認(rèn))通知來確保消息成功寫入 Topic。然而,當(dāng) Producer 接收 ACK 通知超時,或者收到 Broker 出錯信息時,會嘗試重新發(fā)送消息。如果 Broker 正好在成功把消息寫入到 Topic,但還沒有給 Producer 發(fā)送 ACK 時宕機,Producer 重新發(fā)送的消息會被再次寫入到 Topic,最終導(dǎo)致消息被重復(fù)分發(fā)至 Consumer。即:消息不會丟失,但有可能被重復(fù)發(fā)送。
2.2.2 At-most-once (最多一次)語義
當(dāng) Producer 在接收 ACK 超時,或者收到 Broker 出錯信息時不重發(fā)消息,那就有可能導(dǎo)致這條消息丟失,沒有寫入到 Topic 中,也不會被 Consumer 消費到。在某些場景下,為了避免發(fā)生重復(fù)消費,我們可以容許消息丟失的發(fā)生。即:消息可能會丟失,但絕不會被重復(fù)發(fā)送。
2.2.3 Exactly-once (精確一次)語義
**Exactly-once 語義保證了即使 Producer 多次發(fā)送同一條消息到服務(wù)端,服務(wù)端也僅僅會記錄一次。**Exactly-once 語義是最可靠的,同時也是最難理解的。Exactly-once 語義需要消息隊列服務(wù)端,消息生產(chǎn)端和消費端應(yīng)用三者的協(xié)同才能實現(xiàn)。比如,當(dāng)消費端應(yīng)用成功消費并且 ACK 了一條消息之后,又把消費位點回滾到之前的一個消息 ID,那么從那個消息 ID 往后的所有消息都會被消費端應(yīng)用重新消費到。即:消息不會丟失,也不會被重復(fù)發(fā)送。
三、RocketMQ、Kafka、Pulsar 事務(wù)消息
3.1 RocketMQ 的事務(wù)消息
RocketMQ 在 4.3.0 版中已經(jīng)支持分布式事務(wù)消息,這里 RocketMQ 采用了 2PC 的思想來實現(xiàn)了提交事務(wù)消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,流程如下圖所示:
RocketMQ事務(wù)消息
其具體工作流程分為正常事務(wù)消息的發(fā)送及提交和不正常情況下事務(wù)消息的補償流程:
1.在消息隊列上開啟一個事務(wù)主題。2.事務(wù)中第一個執(zhí)行的服務(wù)發(fā)送一條“半消息”(半消息和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對于消費者來說,這個消息是不可見的)給消息隊列。3.半消息發(fā)送成功后,發(fā)送半消息的服務(wù)就會開始執(zhí)行本地事務(wù),根據(jù)本地事務(wù)執(zhí)行結(jié)果來決定事務(wù)消息提交或者回滾。
補償流程:RocketMQ 提供事務(wù)反查來解決異常情況,如果 RocketMQ 沒有收到提交或者回滾的請求,Broker 會定時到生產(chǎn)者上去反查本地事務(wù)的狀態(tài),然后根據(jù)生產(chǎn)者本地事務(wù)的狀態(tài)來處理這個“半消息”是提交還是回滾。值得注意的是我們需要根據(jù)自己的業(yè)務(wù)邏輯來實現(xiàn)反查邏輯接口,然后根據(jù)返回值 Broker 決定是提交還是回滾。而且這個反查接口需要是無狀態(tài)的,請求到任意一個生產(chǎn)者節(jié)點都會返回正確的數(shù)據(jù)。4.本地事務(wù)成功后會讓這個“半消息”變成正常消息,供分布式事務(wù)后面的步驟執(zhí)行自己的本地事務(wù)。(這里的事務(wù)消息,Producer 不會因為 Consumer 消費失敗而做回滾,采用事務(wù)消息的應(yīng)用,其所追求的是高可用和最終一致性,消息消費失敗的話,RocketMQ 自己會負責(zé)重推消息,直到消費成功。)
其中,補償流程用于解決消息 Commit 或者 Rollback 發(fā)生超時或者失敗的情況。在 RocketMQ 事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ 事務(wù)消息的做法是:如果消息是“半消息”,將備份原消息的主題與消息消費隊列,然后改變主題為 RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費“半消息”的消息,然后 RocketMQ 會開啟一個定時任務(wù),從 Topic 為 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息進行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
講到這里大家就明白了,這里說的就是 2.1.2 節(jié)里提到分布式事務(wù)中的消息事務(wù),目的是在分布式事務(wù)中實現(xiàn)系統(tǒng)的最終一致性。
3.2 Kafka 的事務(wù)消息
與 RocketMQ 的事務(wù)消息用途不同,Kafka 的事務(wù)基本上是配合其冪等機制來實現(xiàn) Exactly-once (見 2.2.3 節(jié))語義的。
開發(fā)此功能的原因可以總結(jié)如下。
流處理的需求隨著流處理的興起,對具有更強處理保證的流處理應(yīng)用的需求也在增長。例如,在金融行業(yè),金融機構(gòu)使用流處理引擎為用戶處理借款和信貸。這種類型的用例要求每條消息都只處理一次,無一例外。
換句話說,如果流處理應(yīng)用程序消費消息 A 并將結(jié)果作為消息 B (B = f(A)),那么恰好一次處理保證意味著當(dāng)且僅當(dāng) B 被成功生產(chǎn)后 A 才能被標(biāo)記為消費,反之亦然。
Pulsar事務(wù)
事務(wù) API 使流處理應(yīng)用程序能夠在一個原子操作中使用、處理和生成消息。這意味著,事務(wù)中的一批消息可以從許多主題分區(qū)接收、生成和確認(rèn)。一個事務(wù)涉及的所有操作都作為整體成功或失敗。
目前,Kafka 默認(rèn)提供的交付可靠性保障是 At-least-once。如果消息成功“提交”,但 Broker 的應(yīng)答沒有成功發(fā)送回 Producer 端(比如網(wǎng)絡(luò)出現(xiàn)瞬時抖動),那么 Producer 就無法確定消息是否真的提交成功了。因此,它只能選擇重試,這就是 Kafka 默認(rèn)提供 At-least-once 保障的原因,不過這會導(dǎo)致消息重復(fù)發(fā)送。大部分用戶還是希望消息只會被交付一次,這樣的話,消息既不會丟失,也不會被重復(fù)處理。或者說,即使 Producer 端重復(fù)發(fā)送了相同的消息,Broker 端也能做到自動去重。在下游 Consumer 看來,消息依然只有一條。那么問題來了,Kafka 是怎么做到精確一次的呢?簡單來說,這是通過兩種機制:冪等性(Idempotence)和事務(wù)(Transaction)。
3.2.1 冪等性 Producer
“冪等”這個詞原是數(shù)學(xué)領(lǐng)域中的概念,指的是某些操作或函數(shù)能夠被執(zhí)行多次,但每次得到的結(jié)果都是不變的。冪等性有很多好處,其最大的優(yōu)勢在于我們可以安全地重試任何冪等性操作,反正它們也不會破壞我們的系統(tǒng)狀態(tài)。如果是非冪等性操作,我們還需要擔(dān)心某些操作執(zhí)行多次對狀態(tài)的影響,但對于冪等性操作而言,我們根本無需擔(dān)心此事。
在 Kafka 中,Producer 默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性 Producer。它其實是 0.11.0.0 版本引入的新功能。enable.idempotence 被設(shè)置成 true 后,Producer 自動升級成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動幫你做消息的重復(fù)去重。Kafka 為了實現(xiàn)冪等性,它在底層設(shè)計架構(gòu)中引入了ProducerID和SequenceNumber。ProducerID:在每個新的 Producer 初始化時,會被分配一個唯一的 ProducerID,用來標(biāo)識本次會話。
SequenceNumber:對于每個 ProducerID,Producer 發(fā)送數(shù)據(jù)的每個 Topic 和 Partition 都對應(yīng)一個從 0 開始單調(diào)遞增的 SequenceNumber 值。Broker 在內(nèi)存維護(pid,seq)映射,收到消息后檢查 seq。Producer 在收到明確的的消息丟失 ack,或者超時后未收到 ack,要進行重試。
new_seq = old_seq+1: 正常消息;
new_seq <= old_seq : 重復(fù)消息;
new_seq > old_seq+1: 消息丟失;
另外我們需要了解冪等性 Producer 的作用范圍。首先,它只能保證單分區(qū)上的冪等性,即一個冪等性 Producer 能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息,它無法實現(xiàn)多個分區(qū)的冪等性。其次,它只能實現(xiàn)單會話上的冪等性,不能實現(xiàn)跨會話的冪等性。這里的會話,你可以理解為 Producer 進程的一次運行。當(dāng)你重啟了 Producer 進程之后,這種冪等性保證就喪失了。如果想實現(xiàn)多分區(qū)以及多會話上的消息無重復(fù),應(yīng)該怎么做呢?答案就是事務(wù)(transaction)或者依賴事務(wù)型 Producer。這也是冪等性 Producer 和事務(wù)型 Producer 的最大區(qū)別。
3.2.2 事務(wù)型 Producer
事務(wù)型 Producer 能夠保證將消息原子性地寫入到多個分區(qū)中。這批消息要么全部寫入成功,要么全部失敗。另外,事務(wù)型 Producer 也不受進程的重啟影響。Producer 重啟后,Kafka 依然保證它們發(fā)送消息的 Exactly-once 處理。和普通 Producer 代碼相比,事務(wù)型 Producer 的顯著特點是調(diào)用了一些事務(wù) API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它們分別對應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
Kafka 事務(wù)消息是由 Producer、事務(wù)協(xié)調(diào)器、Broker、組協(xié)調(diào)器、Consumer 等共同參與實現(xiàn)的。
1)Producer
為 Producer 指定固定的 TransactionalId(事務(wù) id),可以穿越 Producer 的多次會話(Producer 重啟/斷線重連)中,持續(xù)標(biāo)識 Producer 的身份。
每個生產(chǎn)者增加一個 epoch。用于標(biāo)識同一個 TransactionalId 在一次事務(wù)中的 epoch,每次初始化事務(wù)時會遞增,從而讓服務(wù)端可以知道生產(chǎn)者請求是否舊的請求。使用 epoch 標(biāo)識 Producer 的每一次"重生",可以防止同一 Producer 存在多個會話。
Producer 遵從冪等消息的行為,并在發(fā)送的 BatchRecord 中增加事務(wù) id 和 epoch。
2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)
引入事務(wù)協(xié)調(diào)器,類似于消費組負載均衡的協(xié)調(diào)者,每一個實現(xiàn)事務(wù)的生產(chǎn)端都被分配到一個事務(wù)協(xié)調(diào)者。以兩階段提交的方式,實現(xiàn)消息的事務(wù)提交。
事務(wù)協(xié)調(diào)器使用一個特殊的 Topic:即事務(wù) Topic,事務(wù) Topic 本身也是持久化的,日志信息記錄事務(wù)狀態(tài)信息,由事務(wù)協(xié)調(diào)者寫入。
事務(wù)協(xié)調(diào)器通過 RPC 調(diào)用,協(xié)調(diào) Broker 和 Consumer 實現(xiàn)事務(wù)的兩階段提交。
每一個 Broker 都會啟動一個事務(wù)協(xié)調(diào)器,使用 hash(TransactionalId)確定 Producer 對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個集群的負載均衡。
3)Broker
引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓 Broker 告知消費者之前拉取的消息是否被原子性提交。
Broker 處理事務(wù)協(xié)調(diào)器的 commit/abort 控制消息,把控制消息向正常消息一樣寫入 Topic(圖中標(biāo) c 的消息,和正常消息交織在一起,用來確認(rèn)事務(wù)提交的日志偏移),并向前推進消息提交偏移 hw。
kafka事務(wù)
4)組協(xié)調(diào)器
如果在事務(wù)過程中,提交了消費偏移,組協(xié)調(diào)器在 offset log 中寫入事務(wù)消費偏移。當(dāng)事務(wù)提交時,在 offset log 中寫入事務(wù) offset 確認(rèn)消息。
5)Consumer
Consumer 過濾未提交消息和事務(wù)控制消息,使這些消息對用戶不可見。
有兩種實現(xiàn)方式,
- Consumer 緩存方式
設(shè)置 isolation.level=read_uncommitted,此時 topic 的所有消息對 Consumer 都可見。Consumer 緩存這些消息,直到收到事務(wù)控制消息。若事務(wù) commit,則對外發(fā)布這些消息;若事務(wù) abort,則丟棄這些消息。
- Broker 過濾方式
設(shè)置 isolation.level=read_committed,此時 topic 中未提交的消息對 Consumer 不可見,只有在事務(wù)結(jié)束后,消息才對 Consumer 可見。Broker 給 Consumer 的 BatchRecord 消息中,會包含以列表,指明哪些是"abort"事務(wù),Consumer 丟棄 abort 事務(wù)的消息即可。
因為事務(wù)機制會影響消費者所能看到的消息的范圍,它不只是簡單依賴高水位來判斷。它依靠一個名為 LSO(Log Stable Offset)的位移值來判斷事務(wù)型消費者的可見性。
3.3 Pulsar 的事務(wù)消息
Apache Pulsar 在 2.8.0 正式支持了事務(wù)相關(guān)的功能,Pulsar 這里提供的事務(wù)區(qū)別于 RocketMQ 中 2PC 那種事務(wù)的實現(xiàn)方式,沒有本地事務(wù)回查的機制,更類似于 Kafka 的事務(wù)實現(xiàn)機制。Apache Pulsar 中的事務(wù)主要用來保證類似 Pulsar Functions 這種流計算場景中 Exactly-once 語義的實現(xiàn),這也符合 Apache Pulsar 本身 Event Streaming 的定位,即保證端到端(End-to-End)的事務(wù)實現(xiàn)的語義。
在 Pulsar 中,對于事務(wù)語義是這樣定義的:允許事件流應(yīng)用將消費、處理、生產(chǎn)消息整個過程定義為一個原子操作,即生產(chǎn)者或消費者能夠處理跨多個主題和分區(qū)的消息,并確保這些消息作為一個單元被處理。
Pulsar 事務(wù)具有以下語義:
- 事務(wù)中的所有操作都作為一個單元提交。要么提交所有消息,要么都不提交。
- 每條消息只寫入或處理一次,不會丟失數(shù)據(jù)或重復(fù)(即使發(fā)生故障)。
- 如果事務(wù)中止,則此事務(wù)中的所有寫入和確認(rèn)都將回滾。
事務(wù)中的批量消息可以被以多分區(qū)接收、生產(chǎn)和確認(rèn)。
- 消費者只能讀取已提交(確認(rèn))的消息。換句話說,Broker 不傳遞屬于打開事務(wù)的事務(wù)消息或?qū)儆谥兄故聞?wù)的消息。
- 跨多個分區(qū)的消息寫入是原子性的。
- 跨多個訂閱的消息確認(rèn)是原子性的。訂閱下的消費者在確認(rèn)帶有事務(wù) ID 的消息時,只會成功確認(rèn)一次消息。
Pulsar 事務(wù)消息由以下幾個關(guān)鍵點構(gòu)成:
1)事務(wù) ID
事務(wù) ID(TxnID)標(biāo)識 Pulsar 中的唯一事務(wù)。事務(wù) ID 長度是 128-bit。最高 16 位保留給事務(wù)協(xié)調(diào)器的 ID,其余位用于每個事務(wù)協(xié)調(diào)器中單調(diào)遞增的數(shù)字。
2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)
事務(wù)協(xié)調(diào)器(TC)是運行在 Pulsar Broker 中的一個模塊。
- 它維護事務(wù)的整個生命周期,并防止事務(wù)進入錯誤狀態(tài)。
- 它處理事務(wù)超時,并確保事務(wù)在事務(wù)超時后中止。
3)事務(wù)日志
所有事務(wù)元數(shù)據(jù)都保存在事務(wù)日志中。事務(wù)日志由 Pulsar 主題記錄。如果事務(wù)協(xié)調(diào)器崩潰,它可以從事務(wù)日志恢復(fù)事務(wù)元數(shù)據(jù)。
事務(wù)日志存儲事務(wù)狀態(tài),而不是事務(wù)中的實際消息(實際消息存儲在實際的主題分區(qū)中)。
4)事務(wù)緩存
向事務(wù)內(nèi)的主題分區(qū)生成的消息存儲在該主題分區(qū)的事務(wù)緩沖區(qū)(TB)中。在提交事務(wù)之前,事務(wù)緩沖區(qū)中的消息對消費者不可見。當(dāng)事務(wù)中止時,事務(wù)緩沖區(qū)中的消息將被丟棄。
事務(wù)緩沖區(qū)將所有正在進行和中止的事務(wù)存儲在內(nèi)存中。所有消息都發(fā)送到實際的分區(qū) Pulsar 主題。提交事務(wù)后,事務(wù)緩沖區(qū)中的消息對消費者具體化(可見)。事務(wù)中止時,事務(wù)緩沖區(qū)中的消息將被丟棄。
5)待確認(rèn)狀態(tài)
掛起確認(rèn)狀態(tài)在事務(wù)完成之前維護事務(wù)中的消息確認(rèn)。如果消息處于掛起確認(rèn)狀態(tài),則在該消息從掛起確認(rèn)狀態(tài)中移除之前,其他事務(wù)無法確認(rèn)該消息。
掛起的確認(rèn)狀態(tài)被保留到掛起的確認(rèn)日志中(cursor ledger)。新啟動的 broker 可以從掛起的確認(rèn)日志中恢復(fù)狀態(tài),以確保狀態(tài)確認(rèn)不會丟失。
處理流程一般分為以下幾個步驟:
- 開啟事務(wù)。
- 使用事務(wù)發(fā)布消息。
- 使用事務(wù)確認(rèn)消息。
- 結(jié)束事務(wù)。
Pulsar 的事務(wù)處理流程與 Kafka 的事務(wù)處理思路大致上保持一致,大家都有一個 TC 以及對應(yīng)的一個用于持久化 TC 所有操作的 Topic 來記錄所有事務(wù)狀態(tài)變更的請求。同樣的在事務(wù)開始階段也都有一個專門的 Topic 來去 查詢 TC 對應(yīng)的 Owner Broker 的位置在哪里。不同的是,第一:Kafka 中對于未確認(rèn)的消息是維護在 Broker 端的,但是 Pulsar 的是維護在 Client 端的,通過 Transaction Timeout 來決定這個事務(wù)是否執(zhí)行成功,所以有了 Transaction Timeout 的存在之后,就可以確保 Client 和 Broker 側(cè)事務(wù)處理的一致性。第二:由于 Kafka 本身沒有單條消息的 Ack,所以 Kafka 的事務(wù)處理只能是順序執(zhí)行的,當(dāng)一個事務(wù)請求被阻塞之后,會阻塞后續(xù)所有的事務(wù)請求,但是 Pulsar 是可以對消息進行單條 Ack 的,所以在這里每一個事務(wù)的 Ack 動作是獨立的,不會出現(xiàn)事務(wù)阻塞的情況。
四、結(jié)論
RocketMQ 和 Kafka/Pulsar 的事務(wù)消息實用的場景是不一樣的。
RocketMQ 中的事務(wù),它解決的問題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個操作,要么都成功,要么都失敗。并且 RocketMQ 增加了一個事務(wù)反查的機制,來盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。
Kafka 中的事務(wù),它解決的問題是,確保在一個事務(wù)中發(fā)送的多條消息,要么都成功,要么都失敗。(這里面的多條消息不一定要在同一個主題和分區(qū)中,可以是發(fā)往多個主題和分區(qū)的消息)當(dāng)然也可以在 kafka 事務(wù)執(zhí)行過程中開啟本地事務(wù)來實現(xiàn)類似 RocketMQ 事務(wù)消息的效果,但是 Kafka 是沒有事務(wù)消息反查機制的,它是直接拋出異常的,用戶可以根據(jù)異常來實現(xiàn)自己的重試等方法保證事務(wù)正常運行。
它們的共同點就是:都是通過兩階段提交來實現(xiàn)事務(wù)的,事務(wù)消息都保存在單獨的主題上。不同的地方就是 RocketMQ 是通過“半消息”來實現(xiàn)的,kafka 是直接將消息發(fā)送給對應(yīng)的 topic,通過客戶端來過濾實現(xiàn)的。而且它們兩個使用的場景區(qū)別是非常之大的,RockteMQ 主要解決的是基于本地事務(wù)和消息的數(shù)據(jù)一致性,而 Kafka 的事務(wù)則是用于實現(xiàn)它的 Exactly-once 機制,應(yīng)用于實時流計算的場景中。
Pulsar 的事務(wù)消息和 Kafka 應(yīng)用場景和語義類似,只是由于底層實現(xiàn)機制有差別,在一些細節(jié)上有區(qū)別。
相信看到這里就非常清楚了,對于事務(wù)消息如何選型和應(yīng)用,首先要明白你的業(yè)務(wù)需求是什么。是要實現(xiàn)分布式事務(wù)的最終一致性,還是要實現(xiàn) Exactly-once (精確一次)語義?明白之后需求,選擇什么組件就十分明確了。
參考文章
pulsar 官方 doc
消息隊列漫談:如何使用消息隊列實現(xiàn)分布式事務(wù)?