自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka 核心全面總結(jié),高可靠高性能核心原理探究

開發(fā) 架構(gòu)
為了提升系統(tǒng)的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。

你好,我是碼哥,可以叫我靚仔

作者:mo

引言

在探究 Kafka 核心知識之前,我們先思考一個問題:什么場景會促使我們使用 Kafka?  說到這里,我們頭腦中或多或少會蹦出異步解耦和削峰填谷等字樣,是的,這就是 Kafka 最重要的落地場景。

異步解耦:同步調(diào)用轉(zhuǎn)換成異步消息通知,實現(xiàn)生產(chǎn)者和消費者的解耦。想象一個場景,在商品交易時,在訂單創(chuàng)建完成之后,需要觸發(fā)一系列其他的操作,比如進行用戶訂單數(shù)據(jù)的統(tǒng)計、給用戶發(fā)送短信、給用戶發(fā)送郵件等等。如果所有操作都采用同步方式實現(xiàn),將嚴重影響系統(tǒng)性能。針對此場景,我們可以利用消息中間件解耦訂單創(chuàng)建操作和其他后續(xù)行為。

削峰填谷:利用 broker 緩沖上游生產(chǎn)者瞬時突發(fā)的流量,使消費者消費流量整體平滑。對于發(fā)送能力很強的上游系統(tǒng),如果沒有消息中間件的保護,下游系統(tǒng)可能會直接被壓垮導致全鏈路服務雪崩。想象秒殺業(yè)務場景,上游業(yè)務發(fā)起下單請求,下游業(yè)務執(zhí)行秒殺業(yè)務(庫存檢查,庫存凍結(jié),余額凍結(jié),生成訂單等等),下游業(yè)務處理的邏輯是相當復雜的,并發(fā)能力有限,如果上游服務不做限流策略,瞬時可能把下游服務壓垮。針對此場景,我們可以利用 MQ 來做削峰填谷,讓高峰流量填充低谷空閑資源,達到系統(tǒng)資源的合理利用。

通過上述例子可以發(fā)現(xiàn)交易、支付等場景常需要異步解耦和削峰填谷功能解決問題,而交易、支付等場景對性能、可靠性要求特別高。那么,我們本文的主角 Kafka 能否滿足相應要求呢?下面我們來探討下。

Kafka 宏觀認知

在探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來看下 Kafka 的系統(tǒng)架構(gòu):

圖片

如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負責集群管理的 ZooKeeper 組成,各部分功能如下:

  • Producer:生產(chǎn)者,負責消息的創(chuàng)建并通過一定的路由策略發(fā)送消息到合適的 Broker;
  • Broker:服務實例,負責消息的持久化、中轉(zhuǎn)等功能;
  • Consumer :消費者,負責從 Broker 中拉?。≒ull)訂閱的消息并進行消費,通常多個消費者構(gòu)成一個分組,消息只能被同組中的一個消費者消費;
  • ZooKeeper:負責 broker、consumer 集群元數(shù)據(jù)的管理等;(注意:Producer 端直接連接 broker,不在 zk 上存任何數(shù)據(jù),只是通過 ZK 監(jiān)聽 broker 和 topic 等信息)

上圖消息流轉(zhuǎn)過程中,還有幾個特別重要的概念—主題(Topic)、分區(qū)(Partition)、分段(segment)、位移(offset)。

  • topic:消息主題。Kafka 按 topic 對消息進行分類,我們在收發(fā)消息時只需指定 topic。
  • partition:分區(qū)。為了提升系統(tǒng)的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。另外,為了提升系統(tǒng)的可靠性,partition 通常會分組,且每組有一個主 partition、多個副本 partition,且分布在不同的 broker 上,從而起到容災的作用。
  • segment:分段。宏觀上看,一個 partition 對應一個日志(Log)。由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數(shù)據(jù)檢索效率低下,Kafka 采取了分段和索引機制,將每個 partition 分為多個 segment,同時也便于消息的維護和清理。每個 segment 包含一個.log 日志文件、兩個索引(.index、timeindex)文件以及其他可能的文件。每個 Segment 的數(shù)據(jù)文件以該段中最小的 offset 為文件名,當查找 offset 的 Message 的時候,通過二分查找快找到 Message 所處于的 Segment 中。
  • offset:消息在日志中的位置,消息在被追加到分區(qū)日志文件的時候都會分配一個特定的偏移量。offset 是消息在分區(qū)中的唯一標識,是一個單調(diào)遞增且不變的值。Kafka 通過它來保證消息在分區(qū)內(nèi)的順序性,不過 offset 并不跨越分區(qū),也就是說,Kafka 保證的是分區(qū)有序而不是主題有序。

Kafka 高可靠性、高性能探究

在對 Kafka 的整體系統(tǒng)框架及相關概念簡單了解后,下面我們來進一步深入探討下高可靠性、高性能實現(xiàn)原理。

Kafka 高可靠性探究

Kafka 高可靠性的核心是保證消息在傳遞過程中不丟失,涉及如下核心環(huán)節(jié):

  • 消息從生產(chǎn)者可靠地發(fā)送至 Broker;-- 網(wǎng)絡、本地丟數(shù)據(jù);
  • 發(fā)送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點崩潰、主從同步跨網(wǎng)絡;
  • 消費者從 Broker 消費到消息且最好只消費一次 -- 跨網(wǎng)絡消息傳輸 。
消息從生產(chǎn)者可靠地發(fā)送至 Broker

為了保障消息從生產(chǎn)者可靠地發(fā)送至 Broker,我們需要確保兩點;

  1. Producer 發(fā)送消息后,能夠收到來自 Broker 的消息保存成功 ack;
  2. Producer 發(fā)送消息后,能夠捕獲超時、失敗 ack 等異常 ack 并做處理。
ack 策略

針對問題 1,Kafka 為我們提供了三種 ack 策略,

  • Request.required.acks = 0:請求發(fā)送即認為成功,不關心有沒有寫成功,常用于日志進行分析場景;
  • Request.required.acks = 1:當 leader partition 寫入成功以后,才算寫入成功,有丟數(shù)據(jù)的可能;
  • Request.required.acks= -1:ISR 列表里面的所有副本都寫完以后,這條消息才算寫入成功,強可靠性保證;

為了實現(xiàn)強可靠的 kafka 系統(tǒng),我們需要設置 Request.required.acks= -1,同時還會設置集群中處于正常同步狀態(tài)的副本 follower 數(shù)量 min.insync.replicas>2,另外,設置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可變成新的 leader,避免特殊情況下消息截斷的出現(xiàn)。

消息發(fā)送策略

針對問題 2,kafka 提供兩類消息發(fā)送方式:同步(sync)發(fā)送和異步(async)發(fā)送,相關參數(shù)如下:

圖片

以 sarama 實現(xiàn)為例,在消息發(fā)送的過程中,無論是同步發(fā)送還是異步發(fā)送都會涉及到兩個協(xié)程--負責消息發(fā)送的主協(xié)程和負責消息分發(fā)的 dispatcher 協(xié)程。

異步發(fā)送

對于異步發(fā)送(ack != 0 場景,等于 0 時不關心寫 kafka 結(jié)果,后文詳細講解)而言,其流程大概如下:

  1. 在主協(xié)程中調(diào)用異步發(fā)送 kafka 消息的時候,其本質(zhì)是將消息體放進了一個 input 的 channel,只要入 channel 成功,則這個函數(shù)直接返回,不會產(chǎn)生任何阻塞。相反,如果入 channel 失敗,則會返回錯誤信息。因此調(diào)用 async 寫入的時候返回的錯誤信息是入 channel 的錯誤信息,至于具體最終消息有沒有發(fā)送到 kafka 的 broker,我們無法從返回值得知。
  2. 當消息進入 input 的 channel 后,會有另一個dispatcher 的協(xié)程負責遍歷 input,來真正發(fā)送消息到特定 Broker 上的主 Partition 上。發(fā)送結(jié)果通過一個異步協(xié)程進行監(jiān)聽,循環(huán)處理 err channel 和 success channel,出現(xiàn)了 error 就記一個日志。因此異步寫入場景時,寫 kafka 的錯誤信息,我們暫時僅能夠從這個錯誤日志來得知具體發(fā)生了什么錯,并且也不支持我們自建函數(shù)進行兜底處理,這一點在 trpc-go 的官方也得到了承認。

同步發(fā)送

同步發(fā)送(ack != 0 場景)是在異步發(fā)送的基礎上加以條件限制實現(xiàn)的。同步消息發(fā)送在 newSyncProducerFromAsyncProducer 中開啟兩個異步協(xié)程處理消息成功與失敗的“回調(diào)”,并使用 waitGroup 進行等待,從而將異步操作轉(zhuǎn)變?yōu)橥讲僮?,其流程大概如下?/p>

通過上述分析可以發(fā)現(xiàn),kafka 消息發(fā)送本質(zhì)上都是異步的,不過同步發(fā)送通過 waitGroup 將異步操作轉(zhuǎn)變?yōu)橥讲僮?。同步發(fā)送在一定程度上確保了我們在跨網(wǎng)絡向 Broker 傳輸消息時,消息一定可以可靠地傳輸?shù)?Broker。因為在同步發(fā)送場景我們可以明確感知消息是否發(fā)送至 Broker,若因網(wǎng)絡抖動、機器宕機等故障導致消息發(fā)送失敗或結(jié)果不明,可通過重試等手段確保消息至少一次(at least once) 發(fā)送到 Broker。另外,Kafka(0.11.0.0 版本后)還為 Producer 提供兩種機制來實現(xiàn)精確一次(exactly once) 消息發(fā)送:冪等性(Idempotence)和事務(Transaction)。

小結(jié)

通過 ack 策略配置、同步發(fā)送、事務消息組合能力,我們可以實現(xiàn)exactly once 語意跨網(wǎng)絡向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會丟失嗎?為了搞清這個問題,我們首先要搞明白 Broker 在接收到消息后做了哪些處理。

發(fā)送到 Broker 的消息可靠持久化

為了確保 Producer 收到 Broker 的成功 ack 后,消息一定不在 Broker 環(huán)節(jié)丟失,我們核心要關注以下幾點:

  • Broker 返回 Producer 成功 ack 時,消息是否已經(jīng)落盤;
  • Broker 宕機是否會導致數(shù)據(jù)丟失,容災機制是什么;
  • Replica 副本機制帶來的多副本間數(shù)據(jù)同步一致性問題如何解決;

Broker 異步刷盤機制

kafka 為了獲得更高吞吐,Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過 linux 的 flusher 程序進行異步刷盤(刷盤觸發(fā)條:主動調(diào)用 sync 或 fsync 函數(shù)、可用內(nèi)存低于閥值、dirty data 時間達到閥值),將數(shù)據(jù)順序?qū)懙酱疟P。消息處理示意圖如下:

由于消息是寫入到 pageCache,單機場景,如果還沒刷盤 Broker 就宕機了,那么 Producer 產(chǎn)生的這部分數(shù)據(jù)就可能丟失。為了解決單機故障可能帶來的數(shù)據(jù)丟失問題,Kafka 為分區(qū)引入了副本機制。

Replica 副本機制

Kafka 每組分區(qū)通常有多個副本,同組分區(qū)的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滯后)。副本之間是“一主多從”的關系,其中 leader 副本負責處理讀寫請求,follower 副本負責從 leader 拉取消息進行同步。分區(qū)的所有副本統(tǒng)稱為 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在內(nèi))組成 ISR(In-Sync Replicas),與 leader 同步滯后過多的副本組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。

follower 副本是否與 leader 同步的判斷標準取決于 Broker 端參數(shù) replica.lag.time.max.ms(默認為 10 秒),follower 默認每隔 500ms 向 leader fetch 一次數(shù)據(jù),只要一個 Follower 副本落后 Leader 副本的時間不連續(xù)超過 10 秒,那么 Kafka 就認為該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。

當 leader 副本所在 Broker 宕機時,Kafka 會借助 ZK 從 follower 副本中選舉新的 leader 繼續(xù)對外提供服務,實現(xiàn)故障的自動轉(zhuǎn)移,保證服務可用。為了使選舉的新 leader 和舊 leader 數(shù)據(jù)盡可能一致,當 leader 副本發(fā)生故障時,默認情況下只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機會(可通過設置 unclean.leader.election.enable 改變)。

當 Kafka 通過多副本機制解決單機故障問題時,同時也帶來了多副本間數(shù)據(jù)同步一致性問題。Kafka 通過高水位更新機制、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問題,下面我們來依次看下這幾大措施。

HW 和 LEO

首先,我們來看下兩個和 Kafka 中日志相關的重要概念 HW 和 LEO:

  • HW: High Watermark,高水位,表示已經(jīng)提交(commit)的最大日志偏移量,Kafka 中某條日志“已提交”的意思是 ISR 中所有節(jié)點都包含了此條日志,并且消費者只能消費 HW 之前的數(shù)據(jù);
  • LEO: Log End Offset,表示當前 log 文件中下一條待寫入消息的 offset;

如上圖所示,它代表一個日志文件,這個日志文件中有 8 條消息,0 至 5 之間的消息為已提交消息,5 至 7 的消息為未提交消息。日志文件的 HW 為 6,表示消費者只能拉取到 5 之前的消息,而 offset 為 5 的消息對消費者而言是不可見的。日志文件的 LEO 為 8,下一條消息將在此處寫入。

注意:所有副本都有對應的 HW 和 LEO,只不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區(qū)的高水位。換句話說,分區(qū)的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點:

  • Leader HW:min(所有副本 LEO),為此 Leader 副本不僅要保存自己的 HW 和 LEO,還要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
  • Follower HW:min(follower 自身 LEO,leader HW)。

注意:為方便描述,下面Leader HW簡記為HWL,F(xiàn)ollower HW簡記為F,Leader LEO簡記為LEOL ,F(xiàn)ollower LEO簡記為LEOF。

下面我們演示一次完整的 HW / LEO 更新流程:

  1. 初始狀態(tài)

HWL=0,LEOL=0,HWF=0,LEOF=0。

  1. Follower 第一次 fetch
  • Leader收到Producer發(fā)來的一條消息完成存儲, 更新LEOL=1;
  • Follower從Leader fetch數(shù)據(jù),  Leader收到請求,記錄follower的LEOF =0,并且嘗試更新HWL =min(全部副本LEO)=0;
  • eade返回HWL=0和LEOL=1給Follower,F(xiàn)ollower存儲消息并更新LEOF =1, HW=min(LEOF,HWL)=0。
  1. Follower 第二次 fetch
  • Follower再次從Leader fetch數(shù)據(jù),  Leader收到請求,記錄follower的LEOF =1,并且嘗試更新HWL =min(全部副本LEO)=1;
  • leade返回HWL=1和LEOL=1給Follower,Leader收到請求,更新自己的 HW=min(LEOF,HWL)=1。

上述更新流程中 Follower 和 Leader 的 HW 更新有時間 GAP。如果 Leader 節(jié)點在此期間發(fā)生故障,則 Follower 的 HW 和 Leader 的 HW 可能會處于不一致狀態(tài),如果 Followe 被選為新的 Leader 并且以自己的 HW 為準對外提供服務,則可能帶來數(shù)據(jù)丟失或數(shù)據(jù)錯亂問題。

KIP-101 問題:數(shù)據(jù)丟失&數(shù)據(jù)錯亂 ^參 5^

數(shù)據(jù)丟失

第 1 步:

  1. 副本 B 作為 leader 收到 producer 的 m2 消息并寫入本地文件,等待副本 A 拉取。
  2. 副本 A 發(fā)起消息拉取請求,請求中攜帶自己的最新的日志 offset(LEO=1),B 收到后更新自己的 HW 為 1,并將 HW=1 的信息以及消息 m2 返回給 A。
  3. A 收到拉取結(jié)果后更新本地的 HW 為 1,并將 m2 寫入本地文件。發(fā)起新一輪拉取請求(LEO=2),B 收到 A 拉取請求后更新自己的 HW 為 2,沒有新數(shù)據(jù)只將 HW=2 的信息返回給 A,并且回復給 producer 寫入成功。此處的狀態(tài)就是圖中第一步的狀態(tài)。

第 2 步:

此時,如果沒有異常,A 會收到 B 的回復,得知目前的 HW 為 2,然后更新自身的 HW 為 2。但在此時 A 重啟了,沒有來得及收到 B 的回復,此時 B 仍然是 leader。A 重啟之后會以 HW 為標準截斷自己的日志,因為 A 作為 follower 不知道多出的日志是否是被提交過的,防止數(shù)據(jù)不一致從而截斷多余的數(shù)據(jù)并嘗試從 leader 那里重新同步。

第 3 步:

B 崩潰了,min.isr 設置的是 1,所以 zookeeper 會從 ISR 中再選擇一個作為 leader,也就是 A,但是 A 的數(shù)據(jù)不是完整的,從而出現(xiàn)了數(shù)據(jù)丟失現(xiàn)象。

問題在哪里?在于 A 重啟之后以 HW 為標準截斷了多余的日志。不截斷行不行?不行,因為這個日志可能沒被提交過(也就是沒有被 ISR 中的所有節(jié)點寫入過),如果保留會導致日志錯亂。

數(shù)據(jù)錯亂

在分析日志錯亂的問題之前,我們需要了解到 kafka 的副本可靠性保證有一個前提:在 ISR 中至少有一個節(jié)點。如果節(jié)點均宕機的情況下,是不保證可靠性的,在這種情況會出現(xiàn)數(shù)據(jù)丟失,數(shù)據(jù)丟失是可接受的。這里我們分析的問題比數(shù)據(jù)丟失更加槽糕,會引發(fā)日志錯亂甚至導致整個系統(tǒng)異常,而這是不可接受的。

第 1 步:

  1. A 和 B 均為 ISR 中的節(jié)點。副本 A 作為 leader,收到 producer 的消息 m2 的請求后寫入 PageCache 并在某個時刻刷新到本地磁盤。
  2. 副本 B 拉取到 m2 后寫入 PageCage 后(尚未刷盤)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 為 1 并回復給 producer 成功。
  3. 此時 A 和 B 同時宕機,B 的 m2 由于尚未刷盤,所以 m2 消息丟失。此時的狀態(tài)就是第 1 步的狀態(tài)。

第 2 步:

由于 A 和 B 均宕機,而 min.isr=1 并且 unclean.leader.election.enable=true(關閉 unclean 選擇策略),所以 Kafka 會等到第一個 ISR 中的節(jié)點恢復并選為 leader,這里不幸的是 B 被選為 leader,而且還接收到 producer 發(fā)來的新消息 m3。注意,這里丟失 m2 消息是可接受的,畢竟所有節(jié)點都宕機了。

第 3 步:

A 恢復重啟后發(fā)現(xiàn)自己是 follower,而且 HW 為 2,并沒有多余的數(shù)據(jù)需要截斷,所以開始和 B 進行新一輪的同步。但此時 A 和 B 均沒有意識到,offset 為 1 的消息不一致了。

問題在哪里?在于日志的寫入是異步的,上面也提到 Kafka 的副本策略的一個設計是消息的持久化是異步的,這就會導致在場景二的情況下被選出的 leader 不一定包含所有數(shù)據(jù),從而引發(fā)日志錯亂的問題。

Leader Epoch

為了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期號的概念很類似,每次重新選擇 leader 的時候,用一個嚴格單調(diào)遞增的 id 來標志,可以讓所有 follower 意識到 leader 的變化。而 follower 也不再以 HW 為準,每次奔潰重啟后都需要去 leader 那邊確認下當前 leader 的日志是從哪個 offset 開始的。下面看下 Leader Epoch 是如何解決上面兩個問題的。

數(shù)據(jù)丟失解決

這里的關鍵點在于副本 A 重啟后作為 follower,不是忙著以 HW 為準截斷自己的日志,而是先發(fā)起 LeaderEpochRequest 詢問副本 B 第 0 代的最新的偏移量是多少,副本 B 會返回自己的 LEO 為 2 給副本 A,A 此時就知道消息 m2 不能被截斷,所以 m2 得到了保留。當 A 選為 leader 的時候就保留了所有已提交的日志,日志丟失的問題得到解決。

如果發(fā)起 LeaderEpochRequest 的時候就已經(jīng)掛了怎么辦?這種場景下,不會出現(xiàn)日志丟失,因為副本 A 被選為 leader 后不會截斷自己的日志,日志截斷只會發(fā)生在 follower 身上。

數(shù)據(jù)錯亂解決

這里的關鍵點還是在第 3 步,副本 A 重啟作為 follower 的第一步還是需要發(fā)起 LeaderEpochRequest 詢問 leader 當前第 0 代最新的偏移量是多少,由于副本 B 已經(jīng)經(jīng)過換代,所以會返回給 A 第 1 代的起始偏移(也就是 1),A 發(fā)現(xiàn)沖突后會截斷自己偏移量為 1 的日志,并重新開始和 leader 同步。副本 A 和副本 B 的日志達到了一致,解決了日志錯亂。

小結(jié)

Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認為消息已寫入成功,但是,通過副本機制并結(jié)合 ACK 策略可以大概率規(guī)避單機宕機帶來的數(shù)據(jù)丟失問題,并通過 HW、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問題,最終實現(xiàn)了 Broker 數(shù)據(jù)的可靠持久化。

消費者從 Broker 消費到消息且最好只消費一次

Consumer 在消費消息的過程中需要向 Kafka 匯報自己的位移數(shù)據(jù),只有當 Consumer 向 Kafka 匯報了消息位移,該條消息才會被 Broker 認為已經(jīng)被消費。因此,Consumer 端消息的可靠性主要和 offset 提交方式有關,Kafka 消費端提供了兩種消息提交方式:

正常情況下我們很難實現(xiàn) exactly once 語意的消息,通常是通過手動提交+冪等實現(xiàn)消息的可靠消費。

Kafka 高性能探究

Kafka 高性能的核心是保障系統(tǒng)低延遲、高吞吐地處理消息,為此,Kafaka 采用了許多精妙的設計:

  • 異步發(fā)送
  • 批量發(fā)送
  • 壓縮技術(shù)
  • Pagecache 機制&順序追加落盤
  • 零拷貝
  • 稀疏索引
  • broker & 數(shù)據(jù)分區(qū)
  • 多 reactor 多線程網(wǎng)絡模型

異步發(fā)送

如上文所述,Kafka 提供了異步和同步兩種消息發(fā)送方式。在異步發(fā)送中,整個流程都是異步的。調(diào)用異步發(fā)送方法后,消息會被寫入 channel,然后立即返回成功。Dispatcher 協(xié)程會從 channel 輪詢消息,將其發(fā)送到 Broker,同時會有另一個異步協(xié)程負責處理 Broker 返回的結(jié)果。同步發(fā)送本質(zhì)上也是異步的,但是在處理結(jié)果時,同步發(fā)送通過 waitGroup 將異步操作轉(zhuǎn)換為同步。使用異步發(fā)送可以最大化提高消息發(fā)送的吞吐能力。

批量發(fā)送

Kafka 支持批量發(fā)送消息,將多個消息打包成一個批次進行發(fā)送,從而減少網(wǎng)絡傳輸?shù)拈_銷,提高網(wǎng)絡傳輸?shù)男屎屯掏铝?。Kafka 的批量發(fā)送消息是通過以下兩個參數(shù)來控制的:

  1. batch.size:控制批量發(fā)送消息的大小,默認值為 16KB,可適當增加 batch.size 參數(shù)值提升吞吐。但是,需要注意的是,如果批量發(fā)送的大小設置得過大,可能會導致消息發(fā)送的延遲增加,因此需要根據(jù)實際情況進行調(diào)整。
  2. linger.ms:控制消息在批量發(fā)送前的等待時間,默認值為 0。當 linger.ms 大于 0 時,如果有消息發(fā)送,Kafka 會等待指定的時間,如果等待時間到達或者批量大小達到 batch.size,就會將消息打包成一個批次進行發(fā)送??蛇m當增加 linger.ms 參數(shù)值提升吞吐,比如 10 ~ 100。

在 Kafka 的生產(chǎn)者客戶端中,當發(fā)送消息時,如果啟用了批量發(fā)送,Kafka 會將消息緩存到緩沖區(qū)中。當緩沖區(qū)中的消息大小達到 batch.size 或者等待時間到達 linger.ms 時,Kafka 會將緩沖區(qū)中的消息打包成一個批次進行發(fā)送。如果在等待時間內(nèi)沒有達到 batch.size,Kafka 也會將緩沖區(qū)中的消息發(fā)送出去,從而避免消息積壓。

壓縮技術(shù)

Kafka 支持壓縮技術(shù),通過將消息進行壓縮后再進行傳輸,從而減少網(wǎng)絡傳輸?shù)拈_銷(壓縮和解壓縮的過程會消耗一定的 CPU 資源,因此需要根據(jù)實際情況進行調(diào)整。),提高網(wǎng)絡傳輸?shù)男屎屯掏铝?。Kafka 支持多種壓縮算法,在 Kafka2.1.0 版本之前,僅支持 GZIP,Snappy 和 LZ4,2.1.0 后還支持 Zstandard 算法(Facebook 開源,能夠提供超高壓縮比)。這些壓縮算法性能對比(兩指標都是越高越好)如下:

  • 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。

在 Kafka 中,壓縮技術(shù)是通過以下兩個參數(shù)來控制的:

  1. compression.type:控制壓縮算法的類型,默認值為 none,表示不進行壓縮。
  2. compression.level:控制壓縮的級別,取值范圍為 0-9,默認值為-1。當值為-1 時,表示使用默認的壓縮級別。

在 Kafka 的生產(chǎn)者客戶端中,當發(fā)送消息時,如果啟用了壓縮技術(shù),Kafka 會將消息進行壓縮后再進行傳輸。在消費者客戶端中,如果消息進行了壓縮,Kafka 會在消費消息時將其解壓縮。注意:Broker 如果設置了和生產(chǎn)者不通的壓縮算法,接收消息后會解壓后重新壓縮保存。Broker 如果存在消息版本兼容也會觸發(fā)解壓后再壓縮。

Pagecache 機制&順序追加落盤

kafka 為了提升系統(tǒng)吞吐、降低時延,Broker 接收到消息后只是將數(shù)據(jù)寫入PageCache后便認為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過 linux 的 flusher 程序進行異步刷盤(避免了同步刷盤的巨大系統(tǒng)開銷),將數(shù)據(jù)順序追加寫到磁盤日志文件中。由于 pagecache 是在內(nèi)存中進行緩存,因此讀寫速度非常快,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的隨機 I/O 操作,可有效提升 Kafka 吞吐。

如上圖所示,消息被順序追加到每個分區(qū)日志文件的尾部。

零拷貝

Kafka 中存在大量的網(wǎng)絡數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)的過程,這一過程的性能直接影響 Kafka 的整體吞吐量。傳統(tǒng)的 IO 操作存在多次數(shù)據(jù)拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術(shù)提升上述過程性能,其中網(wǎng)絡數(shù)據(jù)持久化磁盤主要用 mmap 技術(shù),網(wǎng)絡數(shù)據(jù)傳輸環(huán)節(jié)主要使用 sendfile 技術(shù)。

索引加速之 mmap

傳統(tǒng)模式下,數(shù)據(jù)從網(wǎng)絡傳輸?shù)轿募枰?4 次數(shù)據(jù)拷貝、4 次上下文切換和兩次系統(tǒng)調(diào)用。如下圖所示:

為了減少上下文切換以及數(shù)據(jù)拷貝帶來的性能開銷,Kafka使用mmap來處理其索引文件。Kafka中的索引文件用于在提取日志文件中的消息時進行高效查找。這些索引文件被維護為內(nèi)存映射文件,這允許Kafka快速訪問和搜索內(nèi)存中的索引,從而加速在日志文件中定位消息的過程。mmap 將內(nèi)核中讀緩沖區(qū)(read buffer)的地址與用戶空間的緩沖區(qū)(user buffer)進行映射,從而實現(xiàn)內(nèi)核緩沖區(qū)與應用程序內(nèi)存的共享,省去了將數(shù)據(jù)從內(nèi)核讀緩沖區(qū)(read buffer)拷貝到用戶緩沖區(qū)(user buffer)的過程,整個拷貝過程會發(fā)生 4 次上下文切換,1 次CPU 拷貝和 2次 DMA 拷貝。

網(wǎng)絡數(shù)據(jù)傳輸之 sendfile

傳統(tǒng)方式實現(xiàn):先讀取磁盤、再用 socket 發(fā)送,實際也是進過四次 copy。如下圖所示:

為了減少上下文切換以及數(shù)據(jù)拷貝帶來的性能開銷,Kafka 在 Consumer 從 Broker 讀數(shù)據(jù)過程中使用了 sendfile 技術(shù)。具體在這里采用的方案是通過 NIO 的 transferTo/transferFrom 調(diào)用操作系統(tǒng)的 sendfile 實現(xiàn)零拷貝??偣舶l(fā)生 2 次內(nèi)核數(shù)據(jù)拷貝、2 次上下文切換和一次系統(tǒng)調(diào)用,消除了 CPU 數(shù)據(jù)拷貝,如下:

稀疏索引

為了方便對日志進行檢索和過期清理,kafka 日志文件除了有用于存儲日志的.log 文件,還有一個位移索引文件.index和一個時間戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下:

Kafka 的索引文件是按照稀疏索引的思想進行設計的。稀疏索引的核心是不會為每個記錄都保存索引,而是寫入一定的記錄之后才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數(shù)進行控制,默認大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會在索引文件中增加一個索引項??梢?,單條消息大小會影響 Kakfa 索引的插入頻率,因此 log.index.interval.bytes 也是 Kafka 調(diào)優(yōu)一個重要參數(shù)值。由于索引文件也是按照消息的順序性進行增加索引項的,因此 Kafka 可以利用二分查找算法來搜索目標索引項,把時間復雜度降到了 O(lgN),大大減少了查找的時間。

位移索引文件.index

位移索引文件的索引項結(jié)構(gòu)如下:

相對位移:保存于索引文件名字上面的起始位移的差值,假設一個索引文件為:00000000000000000100.index,那么起始位移值即 100,當存儲位移為 150 的消息索引時,在索引文件中的相對位移則為 150 - 100 = 50,這么做的好處是使用 4 字節(jié)保存位移即可,可以節(jié)省非常多的磁盤空間。

文件物理位置:消息在 log 文件中保存的位置,也就是說 Kafka 可根據(jù)消息位移,通過位移索引文件快速找到消息在 log 文件中的物理位置,有了該物理位置的值,我們就可以快速地從 log 文件中找到對應的消息了。下面我用圖來表示 Kafka 是如何快速檢索消息:

假設 Kafka 需要找出位移為 3550 的消息,那么 Kafka 首先會使用二分查找算法找到小于 3550 的最大索引項:[3528, 2310272],得到索引項之后,Kafka 會根據(jù)該索引項的文件物理位置在 log 文件中從位置 2310272 開始順序查找,直至找到位移為 3550 的消息記錄為止。

時間戳索引文件.timeindex

Kafka 在 0.10.0.0 以后的版本當中,消息中增加了時間戳信息,為了滿足用戶需要根據(jù)時間戳查詢消息記錄,Kafka 增加了時間戳索引文件,時間戳索引文件的索引項結(jié)構(gòu)如下:

時間戳索引文件的檢索與位移索引文件類似,如下快速檢索消息示意圖:

broker & 數(shù)據(jù)分區(qū)

Kafka 集群包含多個 broker。一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。

多 reactor 多線程網(wǎng)絡模型

多 Reactor 多線程網(wǎng)絡模型 是一種高效的網(wǎng)絡通信模型,可以充分利用多核 CPU 的性能,提高系統(tǒng)的吞吐量和響應速度。Kafka 為了提升系統(tǒng)的吞吐,在 Broker 端處理消息時采用了該模型,示意如下:

圖片

SocketServer和KafkaRequestHandlerPool是其中最重要的兩個組件:

  • SocketServer:實現(xiàn) Reactor 模式,用于處理多個 Client(包括客戶端和其他 broker 節(jié)點)的并發(fā)請求,并將處理結(jié)果返回給 Client
  • KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,里面定義了多個工作線程,用于處理實際的 I/O 請求邏輯。

整個服務端處理請求的流程大致分為以下幾個步驟:

  1. Acceptor 接收客戶端發(fā)來的請求
  2. 輪詢分發(fā)給 Processor 線程處理
  3. Processor 將請求封裝成 Request 對象,放到 RequestQueue 隊列
  4. KafkaRequestHandlerPool 分配工作線程,處理 RequestQueue 中的請求
  5. KafkaRequestHandler 線程處理完請求后,將響應 Response 返回給 Processor 線程
  6. Processor 線程將響應返回給客戶端

其他知識探究

負載均衡

生產(chǎn)者負載均衡

Kafka 生產(chǎn)端的負載均衡主要指如何將消息發(fā)送到合適的分區(qū)。Kafka 生產(chǎn)者生產(chǎn)消息時,根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負載均衡很大程度上依賴于分區(qū)器。Kafka 默認的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進行分區(qū)分配的:

  • 如果 key 不為 null:對 Key 值進行 Hash 計算,從所有分區(qū)中根據(jù) Key 的 Hash 值計算出一個分區(qū)號;擁有相同 Key 值的消息被寫入同一個分區(qū),順序消息實現(xiàn)的關鍵;
  • 如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認的分區(qū)器,用戶可以實現(xiàn) Partitioner 接口,自行實現(xiàn)分區(qū)方法。

消費者負載均衡

在 Kafka 中,每個分區(qū)(Partition)只能由一個消費者組中的一個消費者消費。當消費者組中有多個消費者時,Kafka 會自動進行負載均衡,將分區(qū)均勻地分配給每個消費者。在 Kafka 中,消費者負載均衡算法可以通過設置消費者組的 partition.assignment.strategy 參數(shù)來選擇。目前主流的分區(qū)分配策略以下幾種:

  • range: 在保證均衡的前提下,將連續(xù)的分區(qū)分配給消費者,對應的實現(xiàn)是 RangeAssignor;
  • round-robin:在保證均衡的前提下,輪詢分配,對應的實現(xiàn)是 RoundRobinAssignor;
  • 0.11.0.0 版本引入了一種新的分區(qū)分配策略 StickyAssignor,其優(yōu)勢在于能夠保證分區(qū)均衡的前提下盡量保持原有的分區(qū)分配結(jié)果,從而避免許多冗余的分區(qū)分配操作,減少分區(qū)再分配的執(zhí)行時間。

集群管理

Kafka 借助 ZooKeeper 進行集群管理。Kafka 中很多信息都在 ZK 中維護,如 broker 集群信息、consumer 集群信息、 topic 相關信息、 partition 信息等。Kafka 的很多功能也是基于 ZK 實現(xiàn)的,如 partition 選主、broker 集群管理、consumer 負載均衡等,限于篇幅本文將不展開陳述,這里先附一張網(wǎng)上截圖大家感受下:

圖片

參考文獻

  1. https://www.cnblogs.com/arvinhuang/p/16437948.html
  2. https://segmentfault.com/a/1190000039133960
  3. http://matt33.com/2018/11/04/kafka-transaction/
  4. https://blog.51cto.com/u_14020077/5836698
  5. https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
  6. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
  7. https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
  8. https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
  9. https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
  10. https://cloud.tencent.com/developer/article/1657649
  11. https://www.cnblogs.com/vivotech/p/16347074.html
責任編輯:武曉燕 來源: 碼哥字節(jié)
相關推薦

2021-06-21 17:00:05

云計算Hologres云原生

2024-07-12 08:42:58

Redis高性能架構(gòu)

2009-08-12 17:48:56

存儲高性能計算曙光

2025-01-27 11:49:55

2011-07-01 09:36:30

高性能Web

2022-06-28 08:42:03

磁盤kafka高性能

2010-03-11 15:31:11

核心交換機

2021-12-26 00:03:25

Spark性能調(diào)優(yōu)

2021-09-06 08:31:11

Kafka架構(gòu)主從架構(gòu)

2020-11-02 09:35:04

ReactHook

2020-01-07 16:16:57

Kafka開源消息系統(tǒng)

2020-12-03 08:14:45

Axios核心Promise

2024-08-15 06:51:31

2019-09-03 09:41:48

運維架構(gòu)技術(shù)

2023-03-09 10:22:00

SpringBootRabbitMQ

2009-11-17 10:14:27

核心路由器

2020-06-24 08:43:29

5G核心網(wǎng)通信

2015-09-23 09:35:42

高性能高可靠塊存儲

2025-04-03 00:20:00

2019-09-12 08:50:37

Kafka分布式系統(tǒng)服務器
點贊
收藏

51CTO技術(shù)棧公眾號