十張圖,五個(gè)問題帶你徹底理解 Kafka 架構(gòu)調(diào)優(yōu)
1 了解Kafka超高并發(fā)網(wǎng)絡(luò)架構(gòu)是如何設(shè)計(jì)嗎?
我們知道 Kafka 網(wǎng)絡(luò)通信架構(gòu)使用到了 Java NIO 以及 Reactor 設(shè)計(jì)模式。我們先從整體上看一下完整的網(wǎng)絡(luò)通信層架構(gòu),如下圖所示:
1)從上圖中我們可以看出,Kafka 網(wǎng)絡(luò)通信架構(gòu)中用到的組件主要由兩大部分構(gòu)成:SocketServer 和 RequestHandlerPool。
2)SocketServer 組件是 Kafka 超高并發(fā)網(wǎng)絡(luò)通信層中最重要的子模塊。它包含 Acceptor 線程、Processor 線程和 RequestChannel 等對(duì)象,都是網(wǎng)絡(luò)通信的重要組成部分。
3)RequestHandlerPool 組件就是我們常說的 I/O 工作線程池,里面定義了若干個(gè) I/O 線程,主要用來執(zhí)行真實(shí)的請求處理邏輯。
01 Accept 線程
在經(jīng)典的 Reactor 設(shè)計(jì)模式有個(gè)「Dispatcher」的角色,主要用來接收外部請求并分發(fā)給下面的實(shí)際處理線程。在 Kafka 網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)中,這個(gè) Dispatcher 就是「Acceptor 線程」, 用來接收和創(chuàng)建外部 TCP 連接的線程。在 Broker 端每個(gè) SocketServer 實(shí)例只會(huì)創(chuàng)建一個(gè) Acceptor 線程。它的主要功能就是創(chuàng)建連接,并將接收到的 Request 請求傳遞給下游的 Processor 線程處理。
1)我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環(huán)的輪詢準(zhǔn)備就緒的 I/O 事件。
2)將 ServerSocketChannel 通道注冊到nioSelector 上,并關(guān)注網(wǎng)絡(luò)連接創(chuàng)事件:SelectionKey.OP_ACCEPT。
3)事件注冊好后,一旦后續(xù)接收到連接請求后,Acceptor 線程就會(huì)指定一個(gè) Processor 線程,并將該請求交給它并創(chuàng)建網(wǎng)絡(luò)連接用于后續(xù)處理。
02 Processor 線程
Acceptor 只是做了請求入口連接處理的,那么,真正創(chuàng)建網(wǎng)絡(luò)連接以及分發(fā)網(wǎng)絡(luò)請求是由 Processor 線程來完成的。而每個(gè) Processor 線程在創(chuàng)建時(shí)都會(huì)創(chuàng)建 3 個(gè)隊(duì)列。
1)newConnections 隊(duì)列: 它主要是用來保存要?jiǎng)?chuàng)建的新連接信息,也就是SocketChannel 對(duì)象,目前是硬編碼隊(duì)列長度大小為20。每當(dāng) Processor 線程接收到新的連接請求時(shí),都會(huì)將對(duì)應(yīng)的 SocketChannel 對(duì)象放入隊(duì)列,等到后面創(chuàng)建連接時(shí),從該隊(duì)列中獲取 SocketChannel,然后注冊新的連接。
2)inflightResponse 隊(duì)列:它是一個(gè)臨時(shí)的 Response 隊(duì)列,當(dāng) Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊(duì)列。它存在的意義:由于有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回 Request 發(fā)送方后,才能執(zhí)行,因此需要暫存到臨時(shí)隊(duì)列。
3)ResponseQueue 隊(duì)列:它主要是存放需要返回給Request 發(fā)送方的所有 Response 對(duì)象。每個(gè) Processor 線程都會(huì)維護(hù)自己的 Response 隊(duì)列。
03 RequestHandlerPool 線程池
Acceptor 線程和 Processor 線程只是請求和響應(yīng)的「搬運(yùn)工」,而「真正處理 Kafka 請求」是 KafkaRequestHandlerPool 線程池,在上面網(wǎng)絡(luò)超高并發(fā)通信架構(gòu)圖,有兩個(gè)參數(shù)跟整個(gè)流程有關(guān)系,分別是「num.network.threads」、「num.io.threads」。其中 num.io.threads 就是 I/O 工作線程池的大小配置。
下面我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖來講解下一個(gè)完整請求處理核心流程:
1)Clients 發(fā)送請求給 Acceptor 線程。
2)Acceptor 線程會(huì)創(chuàng)建 NIO Selector 對(duì)象,并創(chuàng)建 ServerSocketChannel 通道實(shí)例,然后將 Channel 和 OP_ACCEPT 事件綁定到 Selector 多路復(fù)用器上。
3)Acceptor 線程默認(rèn)創(chuàng)建3個(gè)Processor 線程參數(shù):num.network.threads, 并輪詢的將請求對(duì)象 SocketChannel 放入到連接隊(duì)列中。
4)這時(shí)候連接隊(duì)列就源源不斷有請求數(shù)據(jù)了,然后不停地執(zhí)行 NIO Poll, 獲取對(duì)應(yīng) SocketChannel 上已經(jīng)準(zhǔn)備就緒的 I/O 事件。
5)Processor 線程向 SocketChannel 注冊了 OP_READ/OP_WRITE 事件,這樣 客戶端發(fā)過來的請求就會(huì)被該 SocketChannel 對(duì)象獲取到,具體就是 processCompleteReceives 方法。
6)這個(gè)時(shí)候客戶端就可以源源不斷進(jìn)行請求發(fā)送了,服務(wù)端通過 Selector NIO Poll 不停的獲取準(zhǔn)備就緒的 I/O 事件。
7)然后根據(jù)Channel中獲取已經(jīng)完成的 Receive 對(duì)象,構(gòu)建 Request 對(duì)象,并將其存入到 Requestchannel 的 RequestQueue 請求隊(duì)列中 。
8)這個(gè)時(shí)候就該 I/O 線程池上場了,KafkaRequestHandler 線程循環(huán)地從請求隊(duì)列RequestQueue 中獲取 Request 實(shí)例,然后交由KafkaApis 的 handle 方法,執(zhí)行真正的請求處理邏輯,并最終將數(shù)據(jù)存儲(chǔ)到磁盤中。
9)待處理完請求后,KafkaRequestHandler 線程會(huì)將 Response 對(duì)象放入 Processor 線程的 Response 隊(duì)列。
10)然后 Processor 線程通過 Request 中的 ProcessorID 不停地從 Response 隊(duì)列中來定位并取出 Response 對(duì)象,返還給 Request 發(fā)送方。
2 了解Kafka高吞吐日志存儲(chǔ)架構(gòu)是如何設(shè)計(jì)嗎?
對(duì)于 Kafka 來說, 它主要用來處理海量數(shù)據(jù)流,這個(gè)場景的特點(diǎn)主要包括:
1) 寫操作:寫并發(fā)要求非常高,基本得達(dá)到百萬級(jí) TPS,順序追加寫日志即可,無需考慮更新操作。
2)讀操作:相對(duì)寫操作來說,比較簡單,只要能按照一定規(guī)則高效查詢即可,支持(offset或者時(shí)間戳)讀取。
根據(jù)上面兩點(diǎn)分析,對(duì)于寫操作來說,直接采用「順序追加寫日志」的方式就可以滿足 Kafka 對(duì)于百萬TPS寫入效率要求。
如何解決高效查詢這些日志呢?我們可以設(shè)想把消息的 Offset 設(shè)計(jì)成一個(gè)有序的字段,這樣消息在日志文件中也就有序存放了,也不需要額外引入哈希表結(jié)構(gòu),可以直接將消息劃分成若干個(gè)塊,對(duì)于每個(gè)塊我們只需要索引當(dāng)前塊的第一條消息的 Offset ,這個(gè)是不是有點(diǎn)二分查找算法的意思。即先根據(jù) Offset 大小找到對(duì)應(yīng)的塊, 然后再從塊中順序查找。如下圖所示:
這樣就可以快速定位到要查找的消息的位置了,在 Kafka 中,我們將這種索引結(jié)構(gòu)叫做「稀疏哈希索引」。
上面得出了 Kafka 最終的存儲(chǔ)實(shí)現(xiàn)方案, 即基于順序追加寫日志 + 稀疏哈希索引。
接下來我們來看看 Kafka 日志存儲(chǔ)結(jié)構(gòu):
從上圖看出來,Kafka 是基于「主題 + 分區(qū) + 副本 + 分段 + 索引」的結(jié)構(gòu)進(jìn)行日志存儲(chǔ)的。
了解了整體的日志存儲(chǔ)架構(gòu),我們來看下 Kafka 日志格式,Kafka 日志格式也經(jīng)歷了多個(gè)版本迭代,這里我們主要看下V2版本的日志格式:
通過上圖可以得出:V2 版本日志格式主要是通過可變長度提高了消息格式的空間使用率,并將某些字段抽取到消息批次(RecordBatch)中,同時(shí)消息批次可以存放多條消息,從而在批量發(fā)送消息時(shí),可以大幅度地節(jié)省了磁盤空間。
接下來我們來看看日志消息寫入磁盤的整體過程如下圖所示:
3 針對(duì) Kafka 線上集群部署方案, 你是怎么做的?
這里我們從架構(gòu)師必備能力出發(fā), 以電商平臺(tái)為例講述了 Kafka 生產(chǎn)級(jí)容量評(píng)估方案該如何做?如何讓公司領(lǐng)導(dǎo)以及運(yùn)維部門得到認(rèn)可, 獲準(zhǔn)你的方案。
詳細(xì)可以深讀:八大步驟帶你深度剖析Kafka生產(chǎn)級(jí)容量評(píng)估方案
4 針對(duì) Kafka 線上系統(tǒng), 你是如何進(jìn)行監(jiān)控的?
Kafka 作為大型系統(tǒng)架構(gòu)中重要的一環(huán),有著舉足輕重的作用,因此 Kafka 集群的穩(wěn)定性尤為重要,我們要對(duì)生產(chǎn)的 Kafka 集群進(jìn)行全方位的監(jiān)控, 一般線上系統(tǒng)可以從以下五個(gè)維度進(jìn)行監(jiān)控:
01 主機(jī)節(jié)點(diǎn)監(jiān)控
所謂主機(jī)節(jié)點(diǎn)監(jiān)控就是監(jiān)控 Kafka 集群 Broker 所在節(jié)點(diǎn)機(jī)器的性能。主機(jī)節(jié)點(diǎn)監(jiān)控對(duì)于 Kafka 來說是最重要的,因?yàn)楹芏嗑€上環(huán)境問題首先就是由于主機(jī)的某些性能出現(xiàn)了問題。
因此對(duì)于 Kafka 來說,主機(jī)監(jiān)控通常是發(fā)現(xiàn)問題的第一步,主要性能指標(biāo)如下:
「機(jī)器負(fù)載(Load)」、「CPU 使用率」、「內(nèi)存使用率」、「磁盤 I/O 使用率」、「網(wǎng)絡(luò) I/O 使用率」、「TCP 連接數(shù)」、「打開文件數(shù)」、「inode 使用情況」。
如果想要更好的監(jiān)控主機(jī)性能的話,有以下兩個(gè)教程可以學(xué)習(xí)和參考:
02 JVM 監(jiān)控
另一個(gè)重要的監(jiān)控維度就是 JVM 監(jiān)控。監(jiān)控 JVM 進(jìn)程主要是為了讓你全面地了解Kafka Broker 進(jìn)程。
要監(jiān)控 JVM 進(jìn)程,需要關(guān)注 3 個(gè)指標(biāo):
「監(jiān)控Full GC 發(fā)生頻率和時(shí)長」、「監(jiān)控堆上活躍對(duì)象大小」、「監(jiān)控應(yīng)用線程總數(shù)」
03 Kafka 集群監(jiān)控
另外一個(gè)重要監(jiān)控維度就是 Kafka Broker 集群和各類客戶端的監(jiān)控,主要有3個(gè)方法:
1)查看 Broker 端重要日志:主要包括 Broker 端服務(wù)器日志 server.log,控制器日志 controller.log 以及主題分區(qū)狀態(tài)變更日志 state-change.log。其中,server.log 是最重要的,如果你的 Kafka 集群出現(xiàn)了故障,你要第一時(shí)間查看 server.log,定位故障原因。
2)查看 Broker 端關(guān)鍵線程運(yùn)行狀態(tài),例如:
Log Compaction 線程:日志壓縮清理。一旦它掛掉了,所有 Compaction 操作都會(huì)中斷,但用戶對(duì)此通常是無感知的。
副本拉取消息的線程:主要執(zhí)行 Follower 副本向 Leader 副本拉取消息的邏輯。如果它們掛掉了,系統(tǒng)會(huì)表現(xiàn)為 Follower 副本延遲 Leader 副本越來越大 。
3)查看 Broker 端關(guān)鍵的 JMX 性能指標(biāo): 主要有BytesIn/BytesOut、NetworkProcessorAvgIdlePercent、RequestHandlerAvgIdlePercent、UnderReplicatedPartitions、ISRShrink/ISRExpand、ActiveControllerCount 這幾個(gè)指標(biāo) 。
04 Kafka 客戶端監(jiān)控
客戶端監(jiān)控主要是生產(chǎn)者和消費(fèi)者的監(jiān)控,生產(chǎn)者往 Kafka 發(fā)送消息,此時(shí)我們要了解客戶端機(jī)器與 Broker 機(jī)器之間的往返時(shí)延 RTT 是多少,對(duì)于跨數(shù)據(jù)中心或者異地集群來說,RTT 會(huì)更大,很難支撐很大的 TPS。
Producer角度: request-latency 是需要重點(diǎn)關(guān)注的JMX指標(biāo),即消息生產(chǎn)請求的延時(shí);另外 Sender 線程的運(yùn)行狀態(tài)也是非常重要的, 如果 Sender 線程掛了,對(duì)于用戶是無感知的,表象只是 Producer 端消息發(fā)送失敗。
Consumer角度: 對(duì)于 Consumer Group,需要重點(diǎn)關(guān)注 join rate 和 sync rate 指標(biāo),它表示 Rebalance 的頻繁程度。另外還包括消息消費(fèi)偏移量、消息堆積數(shù)量等。
05 Broker 之間的監(jiān)控
最后一個(gè)監(jiān)控維度就是 Broker 之間的監(jiān)控,主要指副本拉取的性能。Follower 副本實(shí)時(shí)拉取 Leader 副本的數(shù)據(jù),此時(shí)我們希望拉取過程越快越好。Kafka 提供了一個(gè)特別重要的 JMX 指標(biāo),叫做「under replicated partitions」,意思就是比如我們規(guī)定這條消息,應(yīng)該在兩個(gè) Broker 上面保存,假設(shè)只有一個(gè) Broker 上保存該消息,那么這條消息所在的分區(qū)就叫 under replicated partitions,這種情況是特別關(guān)注的,因?yàn)橛锌赡茉斐蓴?shù)據(jù)的丟失。
另外還有一個(gè)比較重要的指標(biāo)是「active controllor count」。在整個(gè) Kafka 集群中應(yīng)該確保只能有一臺(tái)機(jī)器的指標(biāo)是1,其他全應(yīng)該是0,如果發(fā)現(xiàn)有一臺(tái)機(jī)器大于1,一定是出現(xiàn)腦裂了,此時(shí)應(yīng)該去檢查下是否出現(xiàn)了網(wǎng)絡(luò)分區(qū)。Kafka本身是不能對(duì)抗腦裂的,完全依靠 Zookeeper 來做,但是如果真正出現(xiàn)網(wǎng)絡(luò)分區(qū)的話,也是沒有辦法處理的,應(yīng)該讓其快速失敗重啟。
5 針對(duì) Kafka 線上系統(tǒng), 你是如何進(jìn)行調(diào)優(yōu)的?
對(duì) Kafka 來說,「吞吐量」和「延時(shí)」是非常重要的優(yōu)化指標(biāo)。
吞吐量 TPS:是指 Broker 端或 Client 端每秒能處理的消息數(shù),越大越好。
延時(shí):表示從 Producer 端發(fā)送消息到 Broker 端持久化完成到 Consumer 端成功消費(fèi)之間的時(shí)間間隔。與吞吐量 TPS 相反,延時(shí)越短越好。
總之,高吞吐量、低延時(shí)是我們調(diào)優(yōu) Kafka 集群的主要目標(biāo)。
01 提升吞吐量
首先是提升吞吐量參數(shù)和措施:
Broker | num.replica.fetchers:表示 Follower 副本用多少個(gè)線程來拉取消息,默認(rèn)1個(gè)線程。如果 Broker 端 CPU 資源很充足,適當(dāng)增加該值「但不要超過 CPU 核數(shù)」,以加快 Follower 副本的同步速度。這是因?yàn)樵谏a(chǎn)環(huán)境中,配置了 acks=all 的 Producer 端影響吞吐量的首要因素就是副本同步性能。適當(dāng)增加該值后通??梢钥吹?Producer 端吞吐量增加 |
replica.lag.time.max.ms:在 ISR 中,如果 Follower 長時(shí)間未向 Leader 發(fā)送通信請求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR,該值默認(rèn)為 30s。 | |
num.network.threads:單個(gè)Acceptor創(chuàng)建Processor處理器的線程個(gè)數(shù),默認(rèn)值為3, 可以適當(dāng)提高該值為9。 | |
num.io.threads:服務(wù)器用于處理請求的線程數(shù),可能包括磁盤 I/O,默認(rèn)值是 8,可以適當(dāng)提高該值為32。 | |
調(diào)優(yōu)參數(shù)以避免頻繁性的 Full GC | |
Producer | batch.size:表示消息批次大小,默認(rèn)是 16kb。 如果 batch 設(shè)置太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降; 如果 batch 設(shè)置太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。 所以適當(dāng)增加會(huì)提高吞吐量,建議從默認(rèn)的16kb增加到512kb或者1M。 |
buffer.memory:RecordAccumulator 發(fā)送消息的緩沖區(qū)總大小,默認(rèn)值是 32M,可以增加到 64M。 | |
linger.ms:表示批次緩存時(shí)間,如果數(shù)據(jù)遲遲未達(dá)到 batch.size,sender 等待 linger.ms 之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0,意思就是消息必須立即被發(fā)送。 如果設(shè)置的太短,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降; 如果設(shè)置的太長,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。 所以適當(dāng)增加會(huì)提高吞吐量,建議10~100毫秒。 | |
compression.type:默認(rèn)是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大 producer 端的 CPU 開銷。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 | |
設(shè)置acks=0/1,retries=0,優(yōu)化目標(biāo)是吞吐量,不要設(shè)置 acks=all「副本同步時(shí)間拉長」 及開啟重試 「執(zhí)行時(shí)間拉長」。 | |
Consumer | 利用多線程增加整體吞吐量 |
fetch.min.bytes:表示只要 Broker 端積攢了多少數(shù)據(jù),就可以返回給 Consumer 端。默認(rèn)值1字節(jié),適當(dāng)增大該值為1kb或者更大。 | |
fetch.max.bytes:消費(fèi)者獲取服務(wù)器端一批消息最大的字節(jié)數(shù)。一批次的大小受 message.max.bytes 【broker 配置】或 max.message.bytes 【topic config】影響,默認(rèn)是50M。 | |
max.poll.records:表示一次 poll 拉取數(shù)據(jù)返回消息的最大條數(shù)大小,默認(rèn)是 500 條。 | |
分 區(qū) | 增加分區(qū)來提高吞吐量 |
02 降低延時(shí)
降低延時(shí)的目的就是盡量減少端到端的延時(shí)。
對(duì)比上面提升吞吐量的參數(shù),我們只能調(diào)整 Producer 端和 Consumer 端的參數(shù)配置。
對(duì)于 Producer 端,此時(shí)我們希望可以快速的將消息發(fā)送出去,必須設(shè)置 linger.ms=0,同時(shí)關(guān)閉壓縮,另外設(shè)置 acks = 1,減少副本同步時(shí)間。
而對(duì)于 Consumer 端我們只保持 fetch.min.bytes=1 ,即 Broker 端只要有能返回的數(shù)據(jù),就立即返回給 Consumer,減少延時(shí)。
03 合理設(shè)置分區(qū)數(shù)
分區(qū)數(shù)不是越多越好,也不是越少越好,需要搭建完集群,進(jìn)行壓測,再靈活調(diào)整分區(qū)個(gè)數(shù)。
這里可以用 Kafka 官方自帶的腳本,對(duì) Kafka 進(jìn)行壓測。
1)生產(chǎn)者壓測:kafka-producer-perf-test.sh
2)消費(fèi)者壓測:kafka-consumer-perf-test.sh