Kafka Exactly Once 語義實現(xiàn)原理:冪等性與事務(wù)消息
1、前言
在現(xiàn)代分布式系統(tǒng)中,確保數(shù)據(jù)處理的準確性和一致性是至關(guān)重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業(yè)務(wù)需求的增長,Kafka 的事務(wù)消息功能應(yīng)運而生,它允許應(yīng)用程序以一種原子的方式處理消息,即要么所有消息都被正確處理,要么都不處理。本文將深入剖析 Kafka 的 Exactly-Once 語義實現(xiàn)原理,包括冪等性與事務(wù)消息的關(guān)鍵概念,以及它們是如何在 Kafka 中實現(xiàn)的。我們將探討 Kafka 事務(wù)的流程,事務(wù)提供的 ACID 保證,以及在實際應(yīng)用中可能遇到的一些限制。無論您是 Kafka 的新手還是經(jīng)驗豐富的開發(fā)者,本文都將為您提供有價值的見解和指導(dǎo)。
2、消息隊列的事務(wù)場景
Kafka 目前用于流處理的場景:相當于一個有向無環(huán)圖(DAG,Directed acyclic graph)每個節(jié)點是一個 Kafka Topic,每條邊是一個流處理操作。在這樣的場景下,有兩種操作:
? 消費上游消息并提交位點
? 處理消息并發(fā)送到下游 Topic
對于由這兩種操作構(gòu)成的一組處理流程需要具備事務(wù)語義,這樣我們就可以不重復(fù)(Exactly Once)的處理上游消息并將結(jié)果可靠地存儲在下游 Topic 中。
圖片
上圖是一個典型的 Kafka 事務(wù)的流程,我們可以看到:MySQL 的 binlog 作為上游數(shù)據(jù)源將數(shù)據(jù)寫入到 Kafka 中,Spark Streaming 從 Kafka 中讀取數(shù)據(jù)并進行處理,最后將處理結(jié)果寫入到另外兩個 Topic 中(圖中三個 Topic 位于同一集群中)。其中消費 Topic A 與寫入 Topic B 和 Topic C 的操作具備事務(wù)語義。
3、Kafka 的 Exactly Once 語義
從上述的場景中我們可以發(fā)現(xiàn),事務(wù)消息最主要的動機是在流處理中實現(xiàn) Exactly Once 的語義,這可以分為:
? 僅發(fā)送一次: 單分區(qū)僅發(fā)送一次由生產(chǎn)者冪等保證,多分區(qū)僅發(fā)送一次由事務(wù)機制保證
? 僅消費一次: Kafka 通過消費位點的提交來控制消費進度,而消費位點的提交被抽象成向系統(tǒng) topic 發(fā)送消息。這就使得發(fā)送和消費行為統(tǒng)一起來,只要解決了多分區(qū)發(fā)送消息的一致性就能實現(xiàn) Exactly Once 語義
4、生產(chǎn)者冪等性
在創(chuàng)建 Kafka 生產(chǎn)者時設(shè)置了 enable.idempotence 參數(shù),用于開啟生產(chǎn)者冪等性。
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val producer = new KafkaProducer(props)
Kafka 的發(fā)送冪等是通過序列號來實現(xiàn)的,每個消息都會被分配一個序列號,序列號是遞增的,這樣就可以保證消息的順序性。當生產(chǎn)者發(fā)送消息時,會將消息的序列號和消息內(nèi)容一起寫入到日志文件中,下次收到非預(yù)期序列號的消息就會返回 OutOfOrderSequenceException 異常。
設(shè)置 enable.idempotence 參數(shù)后,生產(chǎn)者會檢查以下三個參數(shù)的值是否合法(ProducerConfig#postProcessAndValidateIdempotenceConfigs)
? max.in.flight.requests.per.connection 必須小于 5
? retries 必須大于 0
? acks 必須設(shè)置為 all
Kafka 將消息的序列號信息保存在分區(qū)維度的 .snapshot 文件中,格式如下(ProducerStateManager#ProducerSnapshotEntrySchema):
圖片
我們可以發(fā)現(xiàn),該文件中保存了 ProducerId、ProducerEpoch 和 LastSequence。所以冪等的約束為:相同分區(qū)、相同 Producer(id 和 epoch) 發(fā)送的消息序列號需遞增。即 Kafka 的生產(chǎn)者冪等性只在單連接、單分區(qū)生效,Producer 重啟或消息發(fā)送到其他分區(qū)就失去了冪等性的約束。
.snapshot 文件在 log segment 滾動時更新,發(fā)生重啟后通過讀取 .snapshot 文件和最新的日志文件即可恢復(fù) Producer 的狀態(tài)。Broker 的重啟或分區(qū)遷移并不會影響冪等性。
5、事務(wù)消息流程
我們首先從 Demo 開始,來看一下如何使用 Kafka 客戶端完成一個事務(wù):
// 事務(wù)初始化
val props = new Properties()
...
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val producer = new KafkaProducer(props)
producer.initTransactions()
producer.beginTransaction()
// 消息發(fā)送
producer.send(RecordUtils.create(topic1, partition1, "message1"))
producer.send(RecordUtils.create(topic2, partition2, "message2"))
// 事務(wù)提交或回滾
producer.commitTransaction()
5.1 事務(wù)初始化
Kafka Producer 啟動后我們使用兩個 API 來初始化事務(wù):initTransactions 和 beginTransaction。
回顧一下我們的 Demo,在發(fā)送消息時是發(fā)送到兩個不同分區(qū)中,這兩個分區(qū)可能在不同的 Broker 上,所以我們需要一個全局的協(xié)調(diào)者 TransactionCoordinator 來記錄事務(wù)的狀態(tài)。
所以,在 initTransactions 中,Producer 首先發(fā)送 ApiKeys.FIND_COORDINATOR 請求獲取 TransactionCoordinator。
之后即可向其發(fā)送 ApiKeys.INIT_PRODUCER_ID 請求獲取 ProducerId 及 ProducerEpoch(也是上文中用于冪等的字段)。此步驟生成的 id 和 epoch 會寫入內(nèi)部 Topic __transaction_state 中,并且將事務(wù)的狀態(tài)置為 Empty。
__transaction_state 是 compaction Topic,其中消息的 key 為客戶端設(shè)置的transactional.id(詳見 TransactionStateManager#appendTransactionToLog)。
區(qū)別于 ProducerId 是服務(wù)端生成的內(nèi)部屬性;TransactionId 由用戶設(shè)置,用于標識業(yè)務(wù)視角認為的“同一個應(yīng)用”,啟動具有相同 TransactionId 的新 Producer 會使得未完成的事務(wù)被回滾并且來自舊 Producer(具有較小 epoch)的請求被拒絕掉。
后續(xù) beginTransaction 用于開始一個事務(wù),該方法會創(chuàng)建一個 Producer 內(nèi)部事務(wù)狀態(tài),標識這一個事務(wù)的開始,并不會有 RPC 產(chǎn)生。
5.2 消息發(fā)送
上一節(jié)說到 beginTransaction 只是更改 Producer 內(nèi)部狀態(tài),那么在第一條消息發(fā)送時才隱式開啟了事務(wù):
首先,Producer 會發(fā)送 ApiKeys.ADD_PARTITIONS_TO_TXN 請求到 TransactionCoordinator。TransactionCoordinator 會將這個分區(qū)加入到事務(wù)中,并更改事務(wù)的狀態(tài)為 Ongoing,這些信息被持久化到 __transaction_state 中。
然后 Producer 使用 ApiKeys.PRODUCE 請求正常發(fā)送消息到對應(yīng)的分區(qū)中。這條消息的可見性控制在下文消息消費一節(jié)中會詳細討論。
5.3 事務(wù)提交與回滾
當所有消息發(fā)送完成后,Producer 可以選擇提交或回滾事務(wù),此時:
? TransactionCoordinator:具有當前事務(wù)所有相關(guān)分區(qū)的信息
? 其他 Broker:已經(jīng)將消息持久化到日志文件中
接下來 Producer 調(diào)用 commitTransaction 會發(fā)送 ApiKeys.END_TXN 請求將事務(wù)狀態(tài)更改為 PrepareCommit(回滾事務(wù)對應(yīng)狀態(tài) PrepareAbort)并持久化到 __transaction_state 中,此時從 Producer 的視角來看整個事務(wù)已經(jīng)結(jié)束了。
TransactionCoordinator 會異步向各個 Broker 發(fā)送 ApiKeys.WRITE_TXN_MARKERS 請求,當所有參加事務(wù)的 Broker 都返回成功后,TransactionCoordinator 會將事務(wù)狀態(tài)更改為 CompleteCommit(回滾事務(wù)對應(yīng)狀態(tài) CompleteAbort)并持久化到 __transaction_state 中。
5.4 消息的消費
某個分區(qū)的消息可能是事務(wù)消息與非事務(wù)消息混雜的,如下圖所示:
圖片
在 Broker 處理 ApiKeys.PRODUCE 請求時,完成消息持久化會更新 LSO 到第一條未提交的事務(wù)消息的 offset。這樣在消費者消費消息時,可以通過 LSO 來判斷消息是否可見:如果設(shè)置了 isolation.level 為 read_committed 則只會消費 LSO 之前的消息。
LSO(log stable offset): 它表示的是已經(jīng)被成功復(fù)制到所有副本(replicas)并且可以被消費者安全消費的消息的最大偏移量。
但是我們可以發(fā)現(xiàn) LSO 之前存在已回滾的消息(圖中紅色矩形)這些消息應(yīng)該被過濾掉:在 Broker 處理 ApiKeys.WRITE_TXN_MARKERS 請求時,會將已回滾的消息索引寫入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。
后續(xù) Consumer 消費消息時還會收到對應(yīng)區(qū)間的已取消事務(wù)消息列表,上圖區(qū)間中的該列表為:
圖片
代表 offset 在 [2,5] 之間且由 id 為 11 的 Producer 發(fā)送的消息都已回滾。
上文我們討論了 __transaction_state 的實現(xiàn)確保同一時間,同一 TransactionId 有且只有一個事務(wù)在進行中。所以可以使用 ProducerId 和 offset 區(qū)間定位回滾的消息不會發(fā)生沖突。
6、Kafka 事務(wù)提供的 ACID 保證
? 原子性(Atomicity)
Kafka 通過對 __transaction_state Topic 的寫入實現(xiàn)了事務(wù)狀態(tài)的轉(zhuǎn)移,保證了事務(wù)要么同時提交,要么同時回滾。
? 一致性(Consistency)
在事務(wù)進入 PrepareCommit 或 PrepareAbort 階段時, TransactionCoordinator 異步向所有參與事務(wù)的 Broker 提交或回滾事務(wù)。這使得 Kafka 的事務(wù)做不到強一致性,只能通過不斷重試保證最終一致性。
? 隔離性(Isolation)
Kafka 通過 LSO 機制和 .txnindex 文件來避免臟讀,實現(xiàn)讀已提交(Read Committed)的隔離級別。
? 持久性(Durability)
Kafka 通過將事務(wù)狀態(tài)寫入到 __transaction_state Topic 和消息寫入到日志文件中來保證持久性。
7、Kafka 事務(wù)的限制
從功能上看,Kafka 事務(wù)并不能支持業(yè)務(wù)方事務(wù),強限制上游的消費和下游寫入都需要是同一個 Kafka 集群,否則就不具備原子性保障。
從性能上看,Kafka 事務(wù)的性能開銷主要體現(xiàn)在生產(chǎn)側(cè):
開啟事務(wù)時需要額外的 RPC 請求定位 TransactionCoordinator 并初始化數(shù)據(jù)
消息發(fā)送需要在發(fā)送消息前向 TransactionCoordinator 同步請求添加分區(qū),并將事務(wù)狀態(tài)的變化寫入到 __transaction_state Topic
事務(wù)提交或回滾時需要向所有參與事務(wù)的 Broker 發(fā)送請求
對于涉及分區(qū)較少且消息數(shù)量較多的事務(wù),事務(wù)的開銷可以被均攤;反之,較多的同步 RPC 帶來的開銷會極大影響性能。并且每個生產(chǎn)者只能有一個事務(wù)在進行中,這就意味著事務(wù)的吞吐量會受到限制。
消費側(cè)也有一定的影響:消費者只能看到 LSO 以下的消息,并且需要額外的索引文件來過濾已回滾的消息,這無疑會增加端到端的延遲。
8、總結(jié)
通過本文的深入分析,我們了解到 Kafka 的事務(wù)消息功能是如何在流處理場景中提供 Exactly-Once 語義的。Kafka 通過其事務(wù) API 和內(nèi)部機制,實現(xiàn)了消息發(fā)送的原子性、最終一致性、隔離性和持久性,盡管在實際應(yīng)用中可能存在一些性能和功能上的限制。開發(fā)者和架構(gòu)師應(yīng)當充分理解這些概念,并在設(shè)計系統(tǒng)時考慮如何有效地利用 Kafka 的事務(wù)功能,以構(gòu)建更加健壯和可靠的數(shù)據(jù)處理流程。
AutoMQ 是構(gòu)建于對象存儲之上的云原生 Kafka fork,在解決了 Kafka 已有的成本和彈性問題基礎(chǔ)上對 Kafka 100%兼容,因此在 AutoMQ 上也可以使用 Kafka 事務(wù)消息。AutoMQ 作為國內(nèi) Kafka 生態(tài)的忠實擁護者,我們將持續(xù)為 Kafka 技術(shù)愛好者帶來優(yōu)質(zhì)的 Kafka 技術(shù)內(nèi)容分享,歡迎關(guān)注我們。