Kafka 核心全面總結(jié),高可靠高性能核心原理探究
你好,我是碼哥,可以叫我靚仔
作者:mo
引言
在探究 Kafka 核心知識之前,我們先思考一個問題:什么場景會促使我們使用 Kafka? 說到這里,我們頭腦中或多或少會蹦出異步解耦和削峰填谷等字樣,是的,這就是 Kafka 最重要的落地場景。
異步解耦:同步調(diào)用轉(zhuǎn)換成異步消息通知,實現(xiàn)生產(chǎn)者和消費者的解耦。想象一個場景,在商品交易時,在訂單創(chuàng)建完成之后,需要觸發(fā)一系列其他的操作,比如進行用戶訂單數(shù)據(jù)的統(tǒng)計、給用戶發(fā)送短信、給用戶發(fā)送郵件等等。如果所有操作都采用同步方式實現(xiàn),將嚴重影響系統(tǒng)性能。針對此場景,我們可以利用消息中間件解耦訂單創(chuàng)建操作和其他后續(xù)行為。
削峰填谷:利用 broker 緩沖上游生產(chǎn)者瞬時突發(fā)的流量,使消費者消費流量整體平滑。對于發(fā)送能力很強的上游系統(tǒng),如果沒有消息中間件的保護,下游系統(tǒng)可能會直接被壓垮導致全鏈路服務雪崩。想象秒殺業(yè)務場景,上游業(yè)務發(fā)起下單請求,下游業(yè)務執(zhí)行秒殺業(yè)務(庫存檢查,庫存凍結(jié),余額凍結(jié),生成訂單等等),下游業(yè)務處理的邏輯是相當復雜的,并發(fā)能力有限,如果上游服務不做限流策略,瞬時可能把下游服務壓垮。針對此場景,我們可以利用 MQ 來做削峰填谷,讓高峰流量填充低谷空閑資源,達到系統(tǒng)資源的合理利用。
通過上述例子可以發(fā)現(xiàn)交易、支付等場景常需要異步解耦和削峰填谷功能解決問題,而交易、支付等場景對性能、可靠性要求特別高。那么,我們本文的主角 Kafka 能否滿足相應要求呢?下面我們來探討下。
Kafka 宏觀認知
在探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來看下 Kafka 的系統(tǒng)架構(gòu):
如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負責集群管理的 ZooKeeper 組成,各部分功能如下:
- Producer:生產(chǎn)者,負責消息的創(chuàng)建并通過一定的路由策略發(fā)送消息到合適的 Broker;
- Broker:服務實例,負責消息的持久化、中轉(zhuǎn)等功能;
- Consumer :消費者,負責從 Broker 中拉?。≒ull)訂閱的消息并進行消費,通常多個消費者構(gòu)成一個分組,消息只能被同組中的一個消費者消費;
- ZooKeeper:負責 broker、consumer 集群元數(shù)據(jù)的管理等;(注意:Producer 端直接連接 broker,不在 zk 上存任何數(shù)據(jù),只是通過 ZK 監(jiān)聽 broker 和 topic 等信息)
上圖消息流轉(zhuǎn)過程中,還有幾個特別重要的概念—主題(Topic)、分區(qū)(Partition)、分段(segment)、位移(offset)。
- topic:消息主題。Kafka 按 topic 對消息進行分類,我們在收發(fā)消息時只需指定 topic。
- partition:分區(qū)。為了提升系統(tǒng)的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。另外,為了提升系統(tǒng)的可靠性,partition 通常會分組,且每組有一個主 partition、多個副本 partition,且分布在不同的 broker 上,從而起到容災的作用。
- segment:分段。宏觀上看,一個 partition 對應一個日志(Log)。由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數(shù)據(jù)檢索效率低下,Kafka 采取了分段和索引機制,將每個 partition 分為多個 segment,同時也便于消息的維護和清理。每個 segment 包含一個.log 日志文件、兩個索引(.index、timeindex)文件以及其他可能的文件。每個 Segment 的數(shù)據(jù)文件以該段中最小的 offset 為文件名,當查找 offset 的 Message 的時候,通過二分查找快找到 Message 所處于的 Segment 中。
- offset:消息在日志中的位置,消息在被追加到分區(qū)日志文件的時候都會分配一個特定的偏移量。offset 是消息在分區(qū)中的唯一標識,是一個單調(diào)遞增且不變的值。Kafka 通過它來保證消息在分區(qū)內(nèi)的順序性,不過 offset 并不跨越分區(qū),也就是說,Kafka 保證的是分區(qū)有序而不是主題有序。
Kafka 高可靠性、高性能探究
在對 Kafka 的整體系統(tǒng)框架及相關概念簡單了解后,下面我們來進一步深入探討下高可靠性、高性能實現(xiàn)原理。
Kafka 高可靠性探究
Kafka 高可靠性的核心是保證消息在傳遞過程中不丟失,涉及如下核心環(huán)節(jié):
- 消息從生產(chǎn)者可靠地發(fā)送至 Broker;-- 網(wǎng)絡、本地丟數(shù)據(jù);
- 發(fā)送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點崩潰、主從同步跨網(wǎng)絡;
- 消費者從 Broker 消費到消息且最好只消費一次 -- 跨網(wǎng)絡消息傳輸 。
消息從生產(chǎn)者可靠地發(fā)送至 Broker
為了保障消息從生產(chǎn)者可靠地發(fā)送至 Broker,我們需要確保兩點;
- Producer 發(fā)送消息后,能夠收到來自 Broker 的消息保存成功 ack;
- Producer 發(fā)送消息后,能夠捕獲超時、失敗 ack 等異常 ack 并做處理。
ack 策略
針對問題 1,Kafka 為我們提供了三種 ack 策略,
- Request.required.acks = 0:請求發(fā)送即認為成功,不關心有沒有寫成功,常用于日志進行分析場景;
- Request.required.acks = 1:當 leader partition 寫入成功以后,才算寫入成功,有丟數(shù)據(jù)的可能;
- Request.required.acks= -1:ISR 列表里面的所有副本都寫完以后,這條消息才算寫入成功,強可靠性保證;
為了實現(xiàn)強可靠的 kafka 系統(tǒng),我們需要設置 Request.required.acks= -1,同時還會設置集群中處于正常同步狀態(tài)的副本 follower 數(shù)量 min.insync.replicas>2,另外,設置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可變成新的 leader,避免特殊情況下消息截斷的出現(xiàn)。
消息發(fā)送策略
針對問題 2,kafka 提供兩類消息發(fā)送方式:同步(sync)發(fā)送和異步(async)發(fā)送,相關參數(shù)如下:
以 sarama 實現(xiàn)為例,在消息發(fā)送的過程中,無論是同步發(fā)送還是異步發(fā)送都會涉及到兩個協(xié)程--負責消息發(fā)送的主協(xié)程和負責消息分發(fā)的 dispatcher 協(xié)程。
異步發(fā)送
對于異步發(fā)送(ack != 0 場景,等于 0 時不關心寫 kafka 結(jié)果,后文詳細講解)而言,其流程大概如下:
- 在主協(xié)程中調(diào)用異步發(fā)送 kafka 消息的時候,其本質(zhì)是將消息體放進了一個 input 的 channel,只要入 channel 成功,則這個函數(shù)直接返回,不會產(chǎn)生任何阻塞。相反,如果入 channel 失敗,則會返回錯誤信息。因此調(diào)用 async 寫入的時候返回的錯誤信息是入 channel 的錯誤信息,至于具體最終消息有沒有發(fā)送到 kafka 的 broker,我們無法從返回值得知。
- 當消息進入 input 的 channel 后,會有另一個dispatcher 的協(xié)程負責遍歷 input,來真正發(fā)送消息到特定 Broker 上的主 Partition 上。發(fā)送結(jié)果通過一個異步協(xié)程進行監(jiān)聽,循環(huán)處理 err channel 和 success channel,出現(xiàn)了 error 就記一個日志。因此異步寫入場景時,寫 kafka 的錯誤信息,我們暫時僅能夠從這個錯誤日志來得知具體發(fā)生了什么錯,并且也不支持我們自建函數(shù)進行兜底處理,這一點在 trpc-go 的官方也得到了承認。
同步發(fā)送
同步發(fā)送(ack != 0 場景)是在異步發(fā)送的基礎上加以條件限制實現(xiàn)的。同步消息發(fā)送在 newSyncProducerFromAsyncProducer 中開啟兩個異步協(xié)程處理消息成功與失敗的“回調(diào)”,并使用 waitGroup 進行等待,從而將異步操作轉(zhuǎn)變?yōu)橥讲僮?,其流程大概如下?/p>
通過上述分析可以發(fā)現(xiàn),kafka 消息發(fā)送本質(zhì)上都是異步的,不過同步發(fā)送通過 waitGroup 將異步操作轉(zhuǎn)變?yōu)橥讲僮?。同步發(fā)送在一定程度上確保了我們在跨網(wǎng)絡向 Broker 傳輸消息時,消息一定可以可靠地傳輸?shù)?Broker。因為在同步發(fā)送場景我們可以明確感知消息是否發(fā)送至 Broker,若因網(wǎng)絡抖動、機器宕機等故障導致消息發(fā)送失敗或結(jié)果不明,可通過重試等手段確保消息至少一次(at least once) 發(fā)送到 Broker。另外,Kafka(0.11.0.0 版本后)還為 Producer 提供兩種機制來實現(xiàn)精確一次(exactly once) 消息發(fā)送:冪等性(Idempotence)和事務(Transaction)。
小結(jié)
通過 ack 策略配置、同步發(fā)送、事務消息組合能力,我們可以實現(xiàn)exactly once 語意跨網(wǎng)絡向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會丟失嗎?為了搞清這個問題,我們首先要搞明白 Broker 在接收到消息后做了哪些處理。
發(fā)送到 Broker 的消息可靠持久化
為了確保 Producer 收到 Broker 的成功 ack 后,消息一定不在 Broker 環(huán)節(jié)丟失,我們核心要關注以下幾點:
- Broker 返回 Producer 成功 ack 時,消息是否已經(jīng)落盤;
- Broker 宕機是否會導致數(shù)據(jù)丟失,容災機制是什么;
- Replica 副本機制帶來的多副本間數(shù)據(jù)同步一致性問題如何解決;
Broker 異步刷盤機制
kafka 為了獲得更高吞吐,Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過 linux 的 flusher 程序進行異步刷盤(刷盤觸發(fā)條:主動調(diào)用 sync 或 fsync 函數(shù)、可用內(nèi)存低于閥值、dirty data 時間達到閥值),將數(shù)據(jù)順序?qū)懙酱疟P。消息處理示意圖如下:
由于消息是寫入到 pageCache,單機場景,如果還沒刷盤 Broker 就宕機了,那么 Producer 產(chǎn)生的這部分數(shù)據(jù)就可能丟失。為了解決單機故障可能帶來的數(shù)據(jù)丟失問題,Kafka 為分區(qū)引入了副本機制。
Replica 副本機制
Kafka 每組分區(qū)通常有多個副本,同組分區(qū)的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滯后)。副本之間是“一主多從”的關系,其中 leader 副本負責處理讀寫請求,follower 副本負責從 leader 拉取消息進行同步。分區(qū)的所有副本統(tǒng)稱為 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在內(nèi))組成 ISR(In-Sync Replicas),與 leader 同步滯后過多的副本組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。
follower 副本是否與 leader 同步的判斷標準取決于 Broker 端參數(shù) replica.lag.time.max.ms(默認為 10 秒),follower 默認每隔 500ms 向 leader fetch 一次數(shù)據(jù),只要一個 Follower 副本落后 Leader 副本的時間不連續(xù)超過 10 秒,那么 Kafka 就認為該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。
當 leader 副本所在 Broker 宕機時,Kafka 會借助 ZK 從 follower 副本中選舉新的 leader 繼續(xù)對外提供服務,實現(xiàn)故障的自動轉(zhuǎn)移,保證服務可用。為了使選舉的新 leader 和舊 leader 數(shù)據(jù)盡可能一致,當 leader 副本發(fā)生故障時,默認情況下只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機會(可通過設置 unclean.leader.election.enable 改變)。
當 Kafka 通過多副本機制解決單機故障問題時,同時也帶來了多副本間數(shù)據(jù)同步一致性問題。Kafka 通過高水位更新機制、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問題,下面我們來依次看下這幾大措施。
HW 和 LEO
首先,我們來看下兩個和 Kafka 中日志相關的重要概念 HW 和 LEO:
- HW: High Watermark,高水位,表示已經(jīng)提交(commit)的最大日志偏移量,Kafka 中某條日志“已提交”的意思是 ISR 中所有節(jié)點都包含了此條日志,并且消費者只能消費 HW 之前的數(shù)據(jù);
- LEO: Log End Offset,表示當前 log 文件中下一條待寫入消息的 offset;
如上圖所示,它代表一個日志文件,這個日志文件中有 8 條消息,0 至 5 之間的消息為已提交消息,5 至 7 的消息為未提交消息。日志文件的 HW 為 6,表示消費者只能拉取到 5 之前的消息,而 offset 為 5 的消息對消費者而言是不可見的。日志文件的 LEO 為 8,下一條消息將在此處寫入。
注意:所有副本都有對應的 HW 和 LEO,只不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區(qū)的高水位。換句話說,分區(qū)的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點:
- Leader HW:min(所有副本 LEO),為此 Leader 副本不僅要保存自己的 HW 和 LEO,還要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
- Follower HW:min(follower 自身 LEO,leader HW)。
注意:為方便描述,下面Leader HW簡記為HWL,F(xiàn)ollower HW簡記為F,Leader LEO簡記為LEOL ,F(xiàn)ollower LEO簡記為LEOF。
下面我們演示一次完整的 HW / LEO 更新流程:
- 初始狀態(tài)
HWL=0,LEOL=0,HWF=0,LEOF=0。
- Follower 第一次 fetch
- Leader收到Producer發(fā)來的一條消息完成存儲, 更新LEOL=1;
- Follower從Leader fetch數(shù)據(jù), Leader收到請求,記錄follower的LEOF =0,并且嘗試更新HWL =min(全部副本LEO)=0;
- eade返回HWL=0和LEOL=1給Follower,F(xiàn)ollower存儲消息并更新LEOF =1, HW=min(LEOF,HWL)=0。
- Follower 第二次 fetch
- Follower再次從Leader fetch數(shù)據(jù), Leader收到請求,記錄follower的LEOF =1,并且嘗試更新HWL =min(全部副本LEO)=1;
- leade返回HWL=1和LEOL=1給Follower,Leader收到請求,更新自己的 HW=min(LEOF,HWL)=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有時間 GAP。如果 Leader 節(jié)點在此期間發(fā)生故障,則 Follower 的 HW 和 Leader 的 HW 可能會處于不一致狀態(tài),如果 Followe 被選為新的 Leader 并且以自己的 HW 為準對外提供服務,則可能帶來數(shù)據(jù)丟失或數(shù)據(jù)錯亂問題。
KIP-101 問題:數(shù)據(jù)丟失&數(shù)據(jù)錯亂 ^參 5^
數(shù)據(jù)丟失
第 1 步:
- 副本 B 作為 leader 收到 producer 的 m2 消息并寫入本地文件,等待副本 A 拉取。
- 副本 A 發(fā)起消息拉取請求,請求中攜帶自己的最新的日志 offset(LEO=1),B 收到后更新自己的 HW 為 1,并將 HW=1 的信息以及消息 m2 返回給 A。
- A 收到拉取結(jié)果后更新本地的 HW 為 1,并將 m2 寫入本地文件。發(fā)起新一輪拉取請求(LEO=2),B 收到 A 拉取請求后更新自己的 HW 為 2,沒有新數(shù)據(jù)只將 HW=2 的信息返回給 A,并且回復給 producer 寫入成功。此處的狀態(tài)就是圖中第一步的狀態(tài)。
第 2 步:
此時,如果沒有異常,A 會收到 B 的回復,得知目前的 HW 為 2,然后更新自身的 HW 為 2。但在此時 A 重啟了,沒有來得及收到 B 的回復,此時 B 仍然是 leader。A 重啟之后會以 HW 為標準截斷自己的日志,因為 A 作為 follower 不知道多出的日志是否是被提交過的,防止數(shù)據(jù)不一致從而截斷多余的數(shù)據(jù)并嘗試從 leader 那里重新同步。
第 3 步:
B 崩潰了,min.isr 設置的是 1,所以 zookeeper 會從 ISR 中再選擇一個作為 leader,也就是 A,但是 A 的數(shù)據(jù)不是完整的,從而出現(xiàn)了數(shù)據(jù)丟失現(xiàn)象。
問題在哪里?在于 A 重啟之后以 HW 為標準截斷了多余的日志。不截斷行不行?不行,因為這個日志可能沒被提交過(也就是沒有被 ISR 中的所有節(jié)點寫入過),如果保留會導致日志錯亂。
數(shù)據(jù)錯亂
在分析日志錯亂的問題之前,我們需要了解到 kafka 的副本可靠性保證有一個前提:在 ISR 中至少有一個節(jié)點。如果節(jié)點均宕機的情況下,是不保證可靠性的,在這種情況會出現(xiàn)數(shù)據(jù)丟失,數(shù)據(jù)丟失是可接受的。這里我們分析的問題比數(shù)據(jù)丟失更加槽糕,會引發(fā)日志錯亂甚至導致整個系統(tǒng)異常,而這是不可接受的。
第 1 步:
- A 和 B 均為 ISR 中的節(jié)點。副本 A 作為 leader,收到 producer 的消息 m2 的請求后寫入 PageCache 并在某個時刻刷新到本地磁盤。
- 副本 B 拉取到 m2 后寫入 PageCage 后(尚未刷盤)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 為 1 并回復給 producer 成功。
- 此時 A 和 B 同時宕機,B 的 m2 由于尚未刷盤,所以 m2 消息丟失。此時的狀態(tài)就是第 1 步的狀態(tài)。
第 2 步:
由于 A 和 B 均宕機,而 min.isr=1 并且 unclean.leader.election.enable=true(關閉 unclean 選擇策略),所以 Kafka 會等到第一個 ISR 中的節(jié)點恢復并選為 leader,這里不幸的是 B 被選為 leader,而且還接收到 producer 發(fā)來的新消息 m3。注意,這里丟失 m2 消息是可接受的,畢竟所有節(jié)點都宕機了。
第 3 步:
A 恢復重啟后發(fā)現(xiàn)自己是 follower,而且 HW 為 2,并沒有多余的數(shù)據(jù)需要截斷,所以開始和 B 進行新一輪的同步。但此時 A 和 B 均沒有意識到,offset 為 1 的消息不一致了。
問題在哪里?在于日志的寫入是異步的,上面也提到 Kafka 的副本策略的一個設計是消息的持久化是異步的,這就會導致在場景二的情況下被選出的 leader 不一定包含所有數(shù)據(jù),從而引發(fā)日志錯亂的問題。
Leader Epoch
為了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期號的概念很類似,每次重新選擇 leader 的時候,用一個嚴格單調(diào)遞增的 id 來標志,可以讓所有 follower 意識到 leader 的變化。而 follower 也不再以 HW 為準,每次奔潰重啟后都需要去 leader 那邊確認下當前 leader 的日志是從哪個 offset 開始的。下面看下 Leader Epoch 是如何解決上面兩個問題的。
數(shù)據(jù)丟失解決
這里的關鍵點在于副本 A 重啟后作為 follower,不是忙著以 HW 為準截斷自己的日志,而是先發(fā)起 LeaderEpochRequest 詢問副本 B 第 0 代的最新的偏移量是多少,副本 B 會返回自己的 LEO 為 2 給副本 A,A 此時就知道消息 m2 不能被截斷,所以 m2 得到了保留。當 A 選為 leader 的時候就保留了所有已提交的日志,日志丟失的問題得到解決。
如果發(fā)起 LeaderEpochRequest 的時候就已經(jīng)掛了怎么辦?這種場景下,不會出現(xiàn)日志丟失,因為副本 A 被選為 leader 后不會截斷自己的日志,日志截斷只會發(fā)生在 follower 身上。
數(shù)據(jù)錯亂解決
這里的關鍵點還是在第 3 步,副本 A 重啟作為 follower 的第一步還是需要發(fā)起 LeaderEpochRequest 詢問 leader 當前第 0 代最新的偏移量是多少,由于副本 B 已經(jīng)經(jīng)過換代,所以會返回給 A 第 1 代的起始偏移(也就是 1),A 發(fā)現(xiàn)沖突后會截斷自己偏移量為 1 的日志,并重新開始和 leader 同步。副本 A 和副本 B 的日志達到了一致,解決了日志錯亂。
小結(jié)
Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認為消息已寫入成功,但是,通過副本機制并結(jié)合 ACK 策略可以大概率規(guī)避單機宕機帶來的數(shù)據(jù)丟失問題,并通過 HW、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問題,最終實現(xiàn)了 Broker 數(shù)據(jù)的可靠持久化。
消費者從 Broker 消費到消息且最好只消費一次
Consumer 在消費消息的過程中需要向 Kafka 匯報自己的位移數(shù)據(jù),只有當 Consumer 向 Kafka 匯報了消息位移,該條消息才會被 Broker 認為已經(jīng)被消費。因此,Consumer 端消息的可靠性主要和 offset 提交方式有關,Kafka 消費端提供了兩種消息提交方式:
正常情況下我們很難實現(xiàn) exactly once 語意的消息,通常是通過手動提交+冪等實現(xiàn)消息的可靠消費。
Kafka 高性能探究
Kafka 高性能的核心是保障系統(tǒng)低延遲、高吞吐地處理消息,為此,Kafaka 采用了許多精妙的設計:
- 異步發(fā)送
- 批量發(fā)送
- 壓縮技術(shù)
- Pagecache 機制&順序追加落盤
- 零拷貝
- 稀疏索引
- broker & 數(shù)據(jù)分區(qū)
- 多 reactor 多線程網(wǎng)絡模型
異步發(fā)送
如上文所述,Kafka 提供了異步和同步兩種消息發(fā)送方式。在異步發(fā)送中,整個流程都是異步的。調(diào)用異步發(fā)送方法后,消息會被寫入 channel,然后立即返回成功。Dispatcher 協(xié)程會從 channel 輪詢消息,將其發(fā)送到 Broker,同時會有另一個異步協(xié)程負責處理 Broker 返回的結(jié)果。同步發(fā)送本質(zhì)上也是異步的,但是在處理結(jié)果時,同步發(fā)送通過 waitGroup 將異步操作轉(zhuǎn)換為同步。使用異步發(fā)送可以最大化提高消息發(fā)送的吞吐能力。
批量發(fā)送
Kafka 支持批量發(fā)送消息,將多個消息打包成一個批次進行發(fā)送,從而減少網(wǎng)絡傳輸?shù)拈_銷,提高網(wǎng)絡傳輸?shù)男屎屯掏铝?。Kafka 的批量發(fā)送消息是通過以下兩個參數(shù)來控制的:
- batch.size:控制批量發(fā)送消息的大小,默認值為 16KB,可適當增加 batch.size 參數(shù)值提升吞吐。但是,需要注意的是,如果批量發(fā)送的大小設置得過大,可能會導致消息發(fā)送的延遲增加,因此需要根據(jù)實際情況進行調(diào)整。
- linger.ms:控制消息在批量發(fā)送前的等待時間,默認值為 0。當 linger.ms 大于 0 時,如果有消息發(fā)送,Kafka 會等待指定的時間,如果等待時間到達或者批量大小達到 batch.size,就會將消息打包成一個批次進行發(fā)送??蛇m當增加 linger.ms 參數(shù)值提升吞吐,比如 10 ~ 100。
在 Kafka 的生產(chǎn)者客戶端中,當發(fā)送消息時,如果啟用了批量發(fā)送,Kafka 會將消息緩存到緩沖區(qū)中。當緩沖區(qū)中的消息大小達到 batch.size 或者等待時間到達 linger.ms 時,Kafka 會將緩沖區(qū)中的消息打包成一個批次進行發(fā)送。如果在等待時間內(nèi)沒有達到 batch.size,Kafka 也會將緩沖區(qū)中的消息發(fā)送出去,從而避免消息積壓。
壓縮技術(shù)
Kafka 支持壓縮技術(shù),通過將消息進行壓縮后再進行傳輸,從而減少網(wǎng)絡傳輸?shù)拈_銷(壓縮和解壓縮的過程會消耗一定的 CPU 資源,因此需要根據(jù)實際情況進行調(diào)整。),提高網(wǎng)絡傳輸?shù)男屎屯掏铝?。Kafka 支持多種壓縮算法,在 Kafka2.1.0 版本之前,僅支持 GZIP,Snappy 和 LZ4,2.1.0 后還支持 Zstandard 算法(Facebook 開源,能夠提供超高壓縮比)。這些壓縮算法性能對比(兩指標都是越高越好)如下:
- 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。
在 Kafka 中,壓縮技術(shù)是通過以下兩個參數(shù)來控制的:
- compression.type:控制壓縮算法的類型,默認值為 none,表示不進行壓縮。
- compression.level:控制壓縮的級別,取值范圍為 0-9,默認值為-1。當值為-1 時,表示使用默認的壓縮級別。
在 Kafka 的生產(chǎn)者客戶端中,當發(fā)送消息時,如果啟用了壓縮技術(shù),Kafka 會將消息進行壓縮后再進行傳輸。在消費者客戶端中,如果消息進行了壓縮,Kafka 會在消費消息時將其解壓縮。注意:Broker 如果設置了和生產(chǎn)者不通的壓縮算法,接收消息后會解壓后重新壓縮保存。Broker 如果存在消息版本兼容也會觸發(fā)解壓后再壓縮。
Pagecache 機制&順序追加落盤
kafka 為了提升系統(tǒng)吞吐、降低時延,Broker 接收到消息后只是將數(shù)據(jù)寫入PageCache后便認為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過 linux 的 flusher 程序進行異步刷盤(避免了同步刷盤的巨大系統(tǒng)開銷),將數(shù)據(jù)順序追加寫到磁盤日志文件中。由于 pagecache 是在內(nèi)存中進行緩存,因此讀寫速度非常快,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的隨機 I/O 操作,可有效提升 Kafka 吞吐。
如上圖所示,消息被順序追加到每個分區(qū)日志文件的尾部。
零拷貝
Kafka 中存在大量的網(wǎng)絡數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)的過程,這一過程的性能直接影響 Kafka 的整體吞吐量。傳統(tǒng)的 IO 操作存在多次數(shù)據(jù)拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術(shù)提升上述過程性能,其中網(wǎng)絡數(shù)據(jù)持久化磁盤主要用 mmap 技術(shù),網(wǎng)絡數(shù)據(jù)傳輸環(huán)節(jié)主要使用 sendfile 技術(shù)。
索引加速之 mmap
傳統(tǒng)模式下,數(shù)據(jù)從網(wǎng)絡傳輸?shù)轿募枰?4 次數(shù)據(jù)拷貝、4 次上下文切換和兩次系統(tǒng)調(diào)用。如下圖所示:
為了減少上下文切換以及數(shù)據(jù)拷貝帶來的性能開銷,Kafka使用mmap來處理其索引文件。Kafka中的索引文件用于在提取日志文件中的消息時進行高效查找。這些索引文件被維護為內(nèi)存映射文件,這允許Kafka快速訪問和搜索內(nèi)存中的索引,從而加速在日志文件中定位消息的過程。mmap 將內(nèi)核中讀緩沖區(qū)(read buffer)的地址與用戶空間的緩沖區(qū)(user buffer)進行映射,從而實現(xiàn)內(nèi)核緩沖區(qū)與應用程序內(nèi)存的共享,省去了將數(shù)據(jù)從內(nèi)核讀緩沖區(qū)(read buffer)拷貝到用戶緩沖區(qū)(user buffer)的過程,整個拷貝過程會發(fā)生 4 次上下文切換,1 次CPU 拷貝和 2次 DMA 拷貝。
網(wǎng)絡數(shù)據(jù)傳輸之 sendfile
傳統(tǒng)方式實現(xiàn):先讀取磁盤、再用 socket 發(fā)送,實際也是進過四次 copy。如下圖所示:
為了減少上下文切換以及數(shù)據(jù)拷貝帶來的性能開銷,Kafka 在 Consumer 從 Broker 讀數(shù)據(jù)過程中使用了 sendfile 技術(shù)。具體在這里采用的方案是通過 NIO 的 transferTo/transferFrom 調(diào)用操作系統(tǒng)的 sendfile 實現(xiàn)零拷貝??偣舶l(fā)生 2 次內(nèi)核數(shù)據(jù)拷貝、2 次上下文切換和一次系統(tǒng)調(diào)用,消除了 CPU 數(shù)據(jù)拷貝,如下:
稀疏索引
為了方便對日志進行檢索和過期清理,kafka 日志文件除了有用于存儲日志的.log 文件,還有一個位移索引文件.index和一個時間戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下:
Kafka 的索引文件是按照稀疏索引的思想進行設計的。稀疏索引的核心是不會為每個記錄都保存索引,而是寫入一定的記錄之后才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數(shù)進行控制,默認大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會在索引文件中增加一個索引項??梢?,單條消息大小會影響 Kakfa 索引的插入頻率,因此 log.index.interval.bytes 也是 Kafka 調(diào)優(yōu)一個重要參數(shù)值。由于索引文件也是按照消息的順序性進行增加索引項的,因此 Kafka 可以利用二分查找算法來搜索目標索引項,把時間復雜度降到了 O(lgN),大大減少了查找的時間。
位移索引文件.index
位移索引文件的索引項結(jié)構(gòu)如下:
相對位移:保存于索引文件名字上面的起始位移的差值,假設一個索引文件為:00000000000000000100.index,那么起始位移值即 100,當存儲位移為 150 的消息索引時,在索引文件中的相對位移則為 150 - 100 = 50,這么做的好處是使用 4 字節(jié)保存位移即可,可以節(jié)省非常多的磁盤空間。
文件物理位置:消息在 log 文件中保存的位置,也就是說 Kafka 可根據(jù)消息位移,通過位移索引文件快速找到消息在 log 文件中的物理位置,有了該物理位置的值,我們就可以快速地從 log 文件中找到對應的消息了。下面我用圖來表示 Kafka 是如何快速檢索消息:
假設 Kafka 需要找出位移為 3550 的消息,那么 Kafka 首先會使用二分查找算法找到小于 3550 的最大索引項:[3528, 2310272],得到索引項之后,Kafka 會根據(jù)該索引項的文件物理位置在 log 文件中從位置 2310272 開始順序查找,直至找到位移為 3550 的消息記錄為止。
時間戳索引文件.timeindex
Kafka 在 0.10.0.0 以后的版本當中,消息中增加了時間戳信息,為了滿足用戶需要根據(jù)時間戳查詢消息記錄,Kafka 增加了時間戳索引文件,時間戳索引文件的索引項結(jié)構(gòu)如下:
時間戳索引文件的檢索與位移索引文件類似,如下快速檢索消息示意圖:
broker & 數(shù)據(jù)分區(qū)
Kafka 集群包含多個 broker。一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。
多 reactor 多線程網(wǎng)絡模型
多 Reactor 多線程網(wǎng)絡模型 是一種高效的網(wǎng)絡通信模型,可以充分利用多核 CPU 的性能,提高系統(tǒng)的吞吐量和響應速度。Kafka 為了提升系統(tǒng)的吞吐,在 Broker 端處理消息時采用了該模型,示意如下:
SocketServer和KafkaRequestHandlerPool是其中最重要的兩個組件:
- SocketServer:實現(xiàn) Reactor 模式,用于處理多個 Client(包括客戶端和其他 broker 節(jié)點)的并發(fā)請求,并將處理結(jié)果返回給 Client
- KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,里面定義了多個工作線程,用于處理實際的 I/O 請求邏輯。
整個服務端處理請求的流程大致分為以下幾個步驟:
- Acceptor 接收客戶端發(fā)來的請求
- 輪詢分發(fā)給 Processor 線程處理
- Processor 將請求封裝成 Request 對象,放到 RequestQueue 隊列
- KafkaRequestHandlerPool 分配工作線程,處理 RequestQueue 中的請求
- KafkaRequestHandler 線程處理完請求后,將響應 Response 返回給 Processor 線程
- Processor 線程將響應返回給客戶端
其他知識探究
負載均衡
生產(chǎn)者負載均衡
Kafka 生產(chǎn)端的負載均衡主要指如何將消息發(fā)送到合適的分區(qū)。Kafka 生產(chǎn)者生產(chǎn)消息時,根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負載均衡很大程度上依賴于分區(qū)器。Kafka 默認的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進行分區(qū)分配的:
- 如果 key 不為 null:對 Key 值進行 Hash 計算,從所有分區(qū)中根據(jù) Key 的 Hash 值計算出一個分區(qū)號;擁有相同 Key 值的消息被寫入同一個分區(qū),順序消息實現(xiàn)的關鍵;
- 如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認的分區(qū)器,用戶可以實現(xiàn) Partitioner 接口,自行實現(xiàn)分區(qū)方法。
消費者負載均衡
在 Kafka 中,每個分區(qū)(Partition)只能由一個消費者組中的一個消費者消費。當消費者組中有多個消費者時,Kafka 會自動進行負載均衡,將分區(qū)均勻地分配給每個消費者。在 Kafka 中,消費者負載均衡算法可以通過設置消費者組的 partition.assignment.strategy 參數(shù)來選擇。目前主流的分區(qū)分配策略以下幾種:
- range: 在保證均衡的前提下,將連續(xù)的分區(qū)分配給消費者,對應的實現(xiàn)是 RangeAssignor;
- round-robin:在保證均衡的前提下,輪詢分配,對應的實現(xiàn)是 RoundRobinAssignor;
- 0.11.0.0 版本引入了一種新的分區(qū)分配策略 StickyAssignor,其優(yōu)勢在于能夠保證分區(qū)均衡的前提下盡量保持原有的分區(qū)分配結(jié)果,從而避免許多冗余的分區(qū)分配操作,減少分區(qū)再分配的執(zhí)行時間。
集群管理
Kafka 借助 ZooKeeper 進行集群管理。Kafka 中很多信息都在 ZK 中維護,如 broker 集群信息、consumer 集群信息、 topic 相關信息、 partition 信息等。Kafka 的很多功能也是基于 ZK 實現(xiàn)的,如 partition 選主、broker 集群管理、consumer 負載均衡等,限于篇幅本文將不展開陳述,這里先附一張網(wǎng)上截圖大家感受下:
參考文獻
- https://www.cnblogs.com/arvinhuang/p/16437948.html
- https://segmentfault.com/a/1190000039133960
- http://matt33.com/2018/11/04/kafka-transaction/
- https://blog.51cto.com/u_14020077/5836698
- https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
- https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
- https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
- https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
- https://cloud.tencent.com/developer/article/1657649
- https://www.cnblogs.com/vivotech/p/16347074.html