「Kafka技術(shù)」Apache Kafka中的事務(wù)
在之前的一篇博客文章中,我們介紹了Apache Kafka?的一次語義。這篇文章介紹了各種消息傳遞語義,介紹了冪等生成器、事務(wù)和Kafka流的一次處理語義?,F(xiàn)在,我們將繼續(xù)上一節(jié)的內(nèi)容,深入探討Apache Kafka中的事務(wù)。該文檔的目標(biāo)是讓讀者熟悉有效使用Apache Kafka中的事務(wù)API所需的主要概念。
我們將討論設(shè)計(jì)事務(wù)API的主要用例、Kafka的事務(wù)語義、用于Java客戶端的事務(wù)API的細(xì)節(jié)、實(shí)現(xiàn)的有趣方面,以及在使用API時(shí)的重要注意事項(xiàng)。
這篇博客文章并不是關(guān)于使用事務(wù)細(xì)節(jié)的教程,我們也不會(huì)深入討論設(shè)計(jì)細(xì)節(jié)。相反,我們將在適當(dāng)?shù)牡胤芥溄拥絁avaDocs或設(shè)計(jì)文檔,以供希望深入研究的讀者使用。
我們希望讀者熟悉基本的Kafka概念,比如主題、分區(qū)、日志偏移量,以及代理和客戶在基于Kafka的應(yīng)用程序中的角色。熟悉Java的Kafka客戶機(jī)也會(huì)有所幫助。
為什么交易?
我們?cè)贙afka中設(shè)計(jì)的事務(wù)主要用于那些顯示“讀-進(jìn)程-寫”模式的應(yīng)用程序,其中的讀和寫來自于異步數(shù)據(jù)流,比如Kafka主題。這種應(yīng)用程序通常稱為流處理應(yīng)用程序。
第一代流處理應(yīng)用程序可以容忍不準(zhǔn)確的處理。例如,使用web頁面印象流并生成每個(gè)web頁面的視圖聚合計(jì)數(shù)的應(yīng)用程序可以容忍計(jì)數(shù)中的一些錯(cuò)誤。
然而,隨著這些應(yīng)用程序的流行,對(duì)具有更強(qiáng)語義的流處理應(yīng)用程序的需求也在增長。例如,一些金融機(jī)構(gòu)使用流處理應(yīng)用程序來處理用戶帳戶上的借方和貸方。在這些情況下,不能容忍處理過程中的錯(cuò)誤:我們需要準(zhǔn)確地一次處理所有消息,沒有例外。
更正式地說,如果流處理應(yīng)用程序使用消息a并生成消息B,使得B = F(a),那么僅一次處理就意味著如果且僅當(dāng)成功生成B時(shí)才使用a,反之亦然。
使用配置為至少一次傳遞語義的普通Kafka生產(chǎn)者和消費(fèi)者,流處理應(yīng)用程序可能會(huì)在以下方面失去一次處理語義:
- 由于內(nèi)部重試,生產(chǎn)者.send()可能導(dǎo)致消息B的重復(fù)寫入。這是由冪等生產(chǎn)者解決的,并不是本文其余部分的重點(diǎn)。
- 我們可能會(huì)重新處理輸入消息A,導(dǎo)致將重復(fù)的B消息寫入輸出,這違反了一次處理語義。如果流處理應(yīng)用程序在寫入B之后但在將A標(biāo)記為已使用之前崩潰,則可能發(fā)生重新處理。因此,當(dāng)它恢復(fù)時(shí),它將再次消耗A并再次寫入B,從而導(dǎo)致重復(fù)。
- 最后,在分布式環(huán)境中,應(yīng)用程序會(huì)崩潰,甚至更糟!-暫時(shí)失去與系統(tǒng)其余部分的連接。通常,會(huì)自動(dòng)啟動(dòng)新實(shí)例來替換那些被認(rèn)為丟失的實(shí)例。通過這個(gè)過程,我們可能會(huì)有多個(gè)實(shí)例處理相同的輸入主題,并寫入相同的輸出主題,從而導(dǎo)致輸出重復(fù),并違反一次處理語義。我們稱之為“僵尸實(shí)例”問題。
我們?cè)贙afka中設(shè)計(jì)了事務(wù)api來解決第二個(gè)和第三個(gè)問題。事務(wù)通過使這些周期成為原子性的,并通過促進(jìn)僵死的隔離,從而在讀寫周期中實(shí)現(xiàn)精確的一次處理。
事務(wù)性語義
原子多分區(qū)寫道
事務(wù)允許對(duì)多個(gè)Kafka主題和分區(qū)進(jìn)行原子寫入。事務(wù)中包含的所有消息都將被成功寫入,或者一個(gè)也不寫入。例如,處理過程中的錯(cuò)誤可能導(dǎo)致事務(wù)中止,在這種情況下,來自事務(wù)的任何消息都不會(huì)被使用者讀取?,F(xiàn)在我們來看看它是如何實(shí)現(xiàn)原子讀寫周期的。
首先,讓我們考慮原子讀寫周期的含義。簡而言之,這意味著如果一個(gè)應(yīng)用程序使用一個(gè)消息的抵消X topic-partition tp0,和寫消息B topic-partition tp1在消息上做一些處理,B = F (a),然后read-process-write周期是a和B原子只有在消息被認(rèn)為成功地消耗和發(fā)表在一起,要么一無所有。
現(xiàn)在,只有當(dāng)消息A的偏移量X標(biāo)記為已使用時(shí),才會(huì)認(rèn)為它是從主題分區(qū)tp0使用的。將偏移量標(biāo)記為已使用的偏移量稱為提交偏移量。在Kafka中,我們通過寫入內(nèi)部Kafka主題offsets主題來記錄偏移量提交。僅當(dāng)消息的偏移量提交到偏移量主題時(shí),才認(rèn)為該消息已被消耗。
因此從一個(gè)偏移量提交只是另一個(gè)寫一個(gè)卡夫卡的話題,因?yàn)橄⒈徽J(rèn)為是只有當(dāng)其抵消消費(fèi)承諾,原子還寫跨多個(gè)主題和分區(qū)使原子read-process-write周期:提交的抵消X的補(bǔ)償主題寫的消息B tp1將單個(gè)事務(wù)的一部分,因此原子。
僵尸(Zombie fencing)
我們通過要求為每個(gè)事務(wù)生產(chǎn)者分配一個(gè)稱為transaction .id的惟一標(biāo)識(shí)符來解決zombie實(shí)例的問題。這用于跨流程重新啟動(dòng)標(biāo)識(shí)相同的生產(chǎn)者實(shí)例。
API要求事務(wù)生產(chǎn)者的第一個(gè)操作應(yīng)該是顯式注冊(cè)其事務(wù)。使用Kafka集群的id。當(dāng)它這樣做時(shí),Kafka代理使用給定的事務(wù)檢查打開的事務(wù)。id并完成它們。它還增加與transaction .id關(guān)聯(lián)的epoch。epoch是存儲(chǔ)在每個(gè)transaction .id中的內(nèi)部元數(shù)據(jù)。
一旦epoch被碰撞,任何具有相同事務(wù)的生產(chǎn)者。身份證和舊時(shí)代被認(rèn)為是僵尸,被隔離。來自這些生產(chǎn)者的未來事務(wù)寫將被拒絕。
讀事務(wù)消息
現(xiàn)在,讓我們將注意力轉(zhuǎn)向在讀取作為事務(wù)的一部分寫入的消息時(shí)提供的保證。
Kafka使用者只會(huì)在事務(wù)被提交時(shí)才會(huì)向應(yīng)用程序提交事務(wù)消息。換句話說,使用者不會(huì)交付作為開放事務(wù)一部分的事務(wù)性消息,也不會(huì)交付作為中止事務(wù)一部分的消息。
值得注意的是,上面的保證沒有達(dá)到原子讀取。特別是,當(dāng)使用Kafka使用者來消費(fèi)來自主題的消息時(shí),應(yīng)用程序?qū)⒉恢肋@些消息是否作為事務(wù)的一部分寫入,因此它們不知道事務(wù)何時(shí)開始或結(jié)束。進(jìn)一步說,一個(gè)給定的消費(fèi)者不保證訂閱所有分區(qū)事務(wù)的一部分,它沒有發(fā)現(xiàn)這個(gè)方法,這就很難保證所有的信息是一個(gè)事務(wù)的一部分最終會(huì)被一個(gè)消費(fèi)者。
簡而言之:Kafka保證使用者最終只交付非事務(wù)性消息或提交的事務(wù)性消息。它將從打開的事務(wù)中保留消息,并從中止的事務(wù)中過濾出消息。
Java中的事務(wù)API
事務(wù)特性主要是一個(gè)服務(wù)器端和協(xié)議級(jí)特性,任何支持它的客戶端庫都可以使用它。用Java編寫的“讀-處理-寫”應(yīng)用程序,使用Kafka的事務(wù)API,看起來應(yīng)該是這樣的:
第1-5行通過指定事務(wù)設(shè)置生產(chǎn)者。配置id并將其注冊(cè)到initTransactions API。inittransactions()返回后,由具有相同事務(wù)的生產(chǎn)者的另一個(gè)實(shí)例啟動(dòng)的任何事務(wù)。id會(huì)被關(guān)閉和隔離。
第7-10行指定KafkaConsumer應(yīng)該只讀取非事務(wù)性消息,或者從它的輸入主題中提交事務(wù)性消息。流處理應(yīng)用程序通常在多個(gè)讀寫階段處理其數(shù)據(jù),每個(gè)階段使用前一階段的輸出作為其輸入。通過指定read_committed模式,我們可以在所有階段只執(zhí)行一次處理。
第14-21行演示了讀寫循環(huán)的核心:我們使用一些記錄,啟動(dòng)一個(gè)事務(wù),處理使用的記錄,將處理過的記錄寫入輸出主題,將使用的偏移量發(fā)送到偏移量主題,最后提交事務(wù)。根據(jù)上面提到的保證,我們知道偏移量和輸出記錄將作為一個(gè)原子單元提交。
事務(wù)是如何工作的
在本節(jié)中,我們將簡要概述上述事務(wù)api引入的新組件和新數(shù)據(jù)流。為了更詳盡地討論這個(gè)主題,您可以閱讀原始設(shè)計(jì)文檔,或者觀看介紹事務(wù)的Kafka峰會(huì)演講。
下面內(nèi)容的目標(biāo)是在調(diào)試使用事務(wù)的應(yīng)用程序時(shí),或者在嘗試調(diào)優(yōu)事務(wù)以獲得更好的性能時(shí),提供一個(gè)心智模型。
事務(wù)協(xié)調(diào)器和事務(wù)日志
Kafka 0.11.0中的transactions API引入的組件是事務(wù)協(xié)調(diào)器和上圖右側(cè)的事務(wù)日志。
事務(wù)協(xié)調(diào)器是在每個(gè)Kafka代理中運(yùn)行的模塊。事務(wù)日志是一個(gè)內(nèi)部kafka主題。每個(gè)協(xié)調(diào)器在事務(wù)日志中擁有一些分區(qū)子集。其代理為其領(lǐng)導(dǎo)的分區(qū)。
每一個(gè)事務(wù)。id通過一個(gè)簡單的哈希函數(shù)映射到事務(wù)日志的特定分區(qū)。這意味著只有一個(gè)協(xié)調(diào)器擁有給定的transaction .id。
通過這種方式,我們利用Kafka的rock solid復(fù)制協(xié)議和leader選擇過程來確保事務(wù)協(xié)調(diào)器總是可用的,并且所有事務(wù)狀態(tài)都被持久地存儲(chǔ)。
值得注意的是,事務(wù)日志只存儲(chǔ)事務(wù)的最新狀態(tài),而不是事務(wù)中的實(shí)際消息。消息僅存儲(chǔ)在實(shí)際的主題分區(qū)中。事務(wù)可以處于“進(jìn)行中”、“準(zhǔn)備提交”和“完成”等不同狀態(tài)。存儲(chǔ)在事務(wù)日志中的就是這種狀態(tài)和相關(guān)的元數(shù)據(jù)。
數(shù)據(jù)流
在較高的層次上,數(shù)據(jù)流可以分為四種不同的類型。
A:生產(chǎn)者和事務(wù)協(xié)調(diào)者的交互
執(zhí)行事務(wù)時(shí),生產(chǎn)者向事務(wù)協(xié)調(diào)器發(fā)出以下請(qǐng)求:
initTransactions API注冊(cè)一個(gè)事務(wù)。id與協(xié)調(diào)器。此時(shí),協(xié)調(diào)器將使用該事務(wù)關(guān)閉任何掛起的事務(wù)。id和碰撞的時(shí)代,以柵欄出僵尸。每個(gè)生產(chǎn)者會(huì)話只發(fā)生一次。
當(dāng)生產(chǎn)者在事務(wù)中第一次將數(shù)據(jù)發(fā)送到一個(gè)分區(qū)時(shí),該分區(qū)首先向協(xié)調(diào)器注冊(cè)。
當(dāng)應(yīng)用程序調(diào)用commitTransaction或abortTransaction時(shí),將向協(xié)調(diào)器發(fā)送一個(gè)請(qǐng)求,以開始兩階段提交協(xié)議。
B:協(xié)調(diào)器和事務(wù)日志的交互
隨著事務(wù)的進(jìn)展,生產(chǎn)者發(fā)送上述請(qǐng)求來更新協(xié)調(diào)器上事務(wù)的狀態(tài)。事務(wù)協(xié)調(diào)器將其擁有的每個(gè)事務(wù)的狀態(tài)保存在內(nèi)存中,并將該狀態(tài)寫入事務(wù)日志(以三種方式復(fù)制,因此是持久的)。
事務(wù)協(xié)調(diào)器是從事務(wù)日志中讀寫的惟一組件。如果給定的代理失敗,則將選出一個(gè)新的協(xié)調(diào)器作為死代理擁有的事務(wù)日志分區(qū)的leader,它將從傳入分區(qū)讀取消息,以便為這些分區(qū)中的事務(wù)重建其內(nèi)存狀態(tài)。
C:生產(chǎn)者寫數(shù)據(jù)到目標(biāo)主題分區(qū)
在向協(xié)調(diào)器注冊(cè)了事務(wù)中的新分區(qū)之后,生產(chǎn)者將數(shù)據(jù)正常地發(fā)送到實(shí)際的分區(qū)。這是同一個(gè)生產(chǎn)者。發(fā)送流,但是要進(jìn)行一些額外的驗(yàn)證以確保生產(chǎn)者不受保護(hù)。
D:主題分區(qū)交互的協(xié)調(diào)器
在生產(chǎn)者發(fā)起提交(或中止)之后,協(xié)調(diào)器開始兩階段提交協(xié)議。
在第一階段,協(xié)調(diào)器將其內(nèi)部狀態(tài)更新為“prepare_commit”,并在事務(wù)日志中更新此狀態(tài)。一旦完成了這一步,就可以保證在任何情況下提交事務(wù)。
然后協(xié)調(diào)器開始第2階段,將事務(wù)提交標(biāo)記寫入作為事務(wù)一部分的主題分區(qū)。
這些事務(wù)標(biāo)記不公開給應(yīng)用程序,而是由處于read_committed模式的使用者使用,以過濾掉中止的事務(wù)中的消息,并且不返回作為打開事務(wù)一部分的消息(即,在日志中但沒有與之關(guān)聯(lián)的事務(wù)標(biāo)記的。
一旦寫入了標(biāo)記,事務(wù)協(xié)調(diào)器將事務(wù)標(biāo)記為“完成”,并且生產(chǎn)者可以啟動(dòng)下一個(gè)事務(wù)。
實(shí)踐中處理交易
既然我們已經(jīng)理解了事務(wù)的語義以及它們是如何工作的,那么我們就將注意力轉(zhuǎn)向編寫利用事務(wù)的應(yīng)用程序的實(shí)踐方面。
如何選擇一個(gè)transaction .id
事務(wù)。id在保護(hù)僵尸方面起著重要作用。但是保持一個(gè)標(biāo)識(shí)符在不同的生產(chǎn)者會(huì)話之間是一致的,并且適當(dāng)?shù)馗綦x僵尸是有點(diǎn)棘手的。
正確隔離“僵尸”的關(guān)鍵是確保對(duì)于給定的transaction .id,讀寫周期中的輸入主題和分區(qū)總是相同的。如果這不是真的,那么一些消息可能會(huì)通過事務(wù)提供的圍欄泄漏。
例如,在一個(gè)分布式流處理應(yīng)用程序中,假設(shè)主題分區(qū)tp0最初是由transactional處理的。T0 id。如果在以后的某個(gè)時(shí)候,它可以映射到另一個(gè)具有transactional的生產(chǎn)者。id T1,在T0和T1之間沒有柵欄。因此,可以對(duì)來自tp0的消息進(jìn)行重新處理,這違反了一次處理的保證。
實(shí)際上,必須存儲(chǔ)輸入分區(qū)和事務(wù)之間的映射。外部存儲(chǔ)中的id,或者對(duì)其進(jìn)行一些靜態(tài)編碼。Kafka Streams選擇后一種方法來解決這個(gè)問題。
事務(wù)如何執(zhí)行,以及如何調(diào)優(yōu)它們
事務(wù)生產(chǎn)者的性能
讓我們將注意力轉(zhuǎn)向事務(wù)如何執(zhí)行。
首先,事務(wù)只導(dǎo)致適度的寫放大。增加的寫是由于:
- 對(duì)于每個(gè)事務(wù),我們都有額外的rpc向協(xié)調(diào)器注冊(cè)分區(qū)。這些是成批的,因此我們的rpc比事務(wù)中的分區(qū)要少。
- 在完成事務(wù)時(shí),必須將一個(gè)事務(wù)標(biāo)記寫入?yún)⑴c事務(wù)的每個(gè)分區(qū)。同樣,事務(wù)協(xié)調(diào)器在單個(gè)RPC中批量處理為同一代理綁定的所有標(biāo)記,因此我們?cè)谀抢锉4鍾PC開銷。但是我們不能避免對(duì)事務(wù)中的每個(gè)分區(qū)進(jìn)行一次額外的寫操作。
- 最后,我們將狀態(tài)更改寫入事務(wù)日志。這包括對(duì)添加到事務(wù)中的每批分區(qū)的寫操作、“prepare_commit”狀態(tài)和“complete_commit”狀態(tài)。
我們可以看到,開銷與作為事務(wù)一部分寫入的消息的數(shù)量無關(guān)。因此,提高吞吐量的關(guān)鍵是在每個(gè)事務(wù)中包含更多的消息。
實(shí)際上,對(duì)于在最大吞吐量下生成1KB記錄的生產(chǎn)者,每100ms提交一條消息只會(huì)導(dǎo)致吞吐量降低3%。較小的消息或較短的事務(wù)提交間隔將導(dǎo)致更嚴(yán)重的降級(jí)。
增加事務(wù)持續(xù)時(shí)間的主要代價(jià)是增加了端到端延遲。請(qǐng)記住,讀取事務(wù)性消息的使用者不會(huì)交付作為開放事務(wù)一部分的消息。因此,提交間隔的時(shí)間越長,應(yīng)用程序的等待時(shí)間就越長,從而增加了端到端延遲。
事務(wù)消費(fèi)者的性能
事務(wù)性消費(fèi)者比生產(chǎn)者簡單得多,因?yàn)樗枰龅木褪?
- 篩選屬于中止的事務(wù)的消息。
- 不返回作為開放事務(wù)一部分的事務(wù)消息。
因此,當(dāng)以read_committed模式讀取事務(wù)消息時(shí),事務(wù)使用者的吞吐量沒有下降。這樣做的主要原因是,我們?cè)谧x取事務(wù)性消息時(shí)保持零副本讀取。
而且,使用者不需要任何緩沖來等待事務(wù)完成。相反,代理不允許它提前進(jìn)行補(bǔ)償,其中包括打開的事務(wù)。
因此,消費(fèi)者是極其輕量級(jí)和高效的。有興趣的讀者可以在本文檔中了解消費(fèi)者設(shè)計(jì)的細(xì)節(jié)。
進(jìn)一步的閱讀
我們剛剛觸及了Apache Kafka中事務(wù)的皮毛。幸運(yùn)的是,幾乎所有的設(shè)計(jì)細(xì)節(jié)都記錄在網(wǎng)上。有關(guān)文件如下:
最初的Kafka KIP:它提供了關(guān)于數(shù)據(jù)流的詳細(xì)信息和公共接口的概述,特別是隨事務(wù)而來的配置選項(xiàng)。
原始設(shè)計(jì)文檔:不適合膽小的人,這是權(quán)威的地方——源代碼外!-了解如何處理每個(gè)事務(wù)RPC,如何維護(hù)事務(wù)日志,如何清除事務(wù)數(shù)據(jù),等等。
KafkaProducer javadocs:這是一個(gè)學(xué)習(xí)如何使用新api的好地方。頁面開頭的示例以及send方法的文檔都是很好的起點(diǎn)。
結(jié)論
在這篇文章中,我們了解了Apache Kafka中事務(wù)API的關(guān)鍵設(shè)計(jì)目標(biāo),理解了事務(wù)API的語義,并對(duì)API的實(shí)際工作方式有了更深入的了解。
如果我們考慮一個(gè)讀-進(jìn)程-寫循環(huán),這篇文章主要討論了讀和寫路徑,處理本身就是一個(gè)黑盒。事實(shí)上,在處理階段可以做很多事情,這使得僅使用事務(wù)api無法保證一次處理。例如,如果處理對(duì)其他存儲(chǔ)系統(tǒng)有副作用,這里介紹的api不足以保證只進(jìn)行一次處理。
Kafka Streams框架使用這里描述的事務(wù)api向上移動(dòng)價(jià)值鏈,并為各種流處理應(yīng)用程序提供一次處理,甚至包括那些在處理期間更新某些額外狀態(tài)存儲(chǔ)的應(yīng)用程序。
將來的一篇博客文章將討論Kafka流如何提供一次處理語義,以及如何編寫利用它的應(yīng)用程序。
最后,對(duì)于那些渴望了解上述api實(shí)現(xiàn)細(xì)節(jié)的人,我們將在另一篇后續(xù)博客文章中介紹一些更有趣的解決方案。