自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

十張圖,五個(gè)問題帶你徹底理解 Kafka 架構(gòu)調(diào)優(yōu)

開發(fā) 架構(gòu) 網(wǎng)絡(luò)
今天給大家分享 Kafka 架構(gòu)調(diào)優(yōu)的技術(shù)。這篇文章干貨很多,希望你可以耐心讀完。

1 了解Kafka超高并發(fā)網(wǎng)絡(luò)架構(gòu)是如何設(shè)計(jì)嗎?

我們知道 Kafka 網(wǎng)絡(luò)通信架構(gòu)使用到了 Java NIO 以及 Reactor 設(shè)計(jì)模式。我們先從整體上看一下完整的網(wǎng)絡(luò)通信層架構(gòu),如下圖所示:

1)從上圖中我們可以看出,Kafka 網(wǎng)絡(luò)通信架構(gòu)中用到的組件主要由兩大部分構(gòu)成:SocketServer 和 RequestHandlerPool。

2)SocketServer 組件是 Kafka 超高并發(fā)網(wǎng)絡(luò)通信層中最重要的子模塊。它包含 Acceptor 線程、Processor 線程和 RequestChannel 等對(duì)象,都是網(wǎng)絡(luò)通信的重要組成部分。

3)RequestHandlerPool 組件就是我們常說的 I/O 工作線程池,里面定義了若干個(gè) I/O 線程,主要用來執(zhí)行真實(shí)的請求處理邏輯。

01 Accept 線程

在經(jīng)典的 Reactor 設(shè)計(jì)模式有個(gè)「Dispatcher」的角色,主要用來接收外部請求并分發(fā)給下面的實(shí)際處理線程。在 Kafka 網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)中,這個(gè) Dispatcher 就是Acceptor 線程」, 用來接收和創(chuàng)建外部 TCP 連接的線程。在 Broker 端每個(gè) SocketServer 實(shí)例只會(huì)創(chuàng)建一個(gè) Acceptor 線程。它的主要功能就是創(chuàng)建連接,并將接收到的 Request 請求傳遞給下游的 Processor 線程處理。

1)我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環(huán)的輪詢準(zhǔn)備就緒的 I/O 事件。

2)將 ServerSocketChannel 通道注冊到nioSelector 上,并關(guān)注網(wǎng)絡(luò)連接創(chuàng)事件:SelectionKey.OP_ACCEPT。

3)事件注冊好后,一旦后續(xù)接收到連接請求后,Acceptor 線程就會(huì)指定一個(gè) Processor 線程,并將該請求交給它并創(chuàng)建網(wǎng)絡(luò)連接用于后續(xù)處理。

02 Processor 線程

Acceptor 只是做了請求入口連接處理的,那么,真正創(chuàng)建網(wǎng)絡(luò)連接以及分發(fā)網(wǎng)絡(luò)請求是由 Processor 線程來完成的。而每個(gè) Processor 線程在創(chuàng)建時(shí)都會(huì)創(chuàng)建 3 個(gè)隊(duì)列。

1)newConnections 隊(duì)列: 它主要是用來保存要?jiǎng)?chuàng)建的新連接信息,也就是SocketChannel 對(duì)象,目前是硬編碼隊(duì)列長度大小為20。每當(dāng) Processor 線程接收到新的連接請求時(shí),都會(huì)將對(duì)應(yīng)的 SocketChannel 對(duì)象放入隊(duì)列,等到后面創(chuàng)建連接時(shí),從該隊(duì)列中獲取 SocketChannel,然后注冊新的連接。

2)inflightResponse 隊(duì)列:它是一個(gè)臨時(shí)的 Response 隊(duì)列,當(dāng) Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊(duì)列。它存在的意義:由于有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回 Request 發(fā)送方后,才能執(zhí)行,因此需要暫存到臨時(shí)隊(duì)列。

3)ResponseQueue 隊(duì)列:它主要是存放需要返回給Request 發(fā)送方的所有 Response 對(duì)象。每個(gè) Processor 線程都會(huì)維護(hù)自己的 Response 隊(duì)列。

03 RequestHandlerPool 線程池

Acceptor 線程和 Processor 線程只是請求和響應(yīng)的「搬運(yùn)工」,而真正處理 Kafka 請求」是 KafkaRequestHandlerPool 線程池,在上面網(wǎng)絡(luò)超高并發(fā)通信架構(gòu)圖,有兩個(gè)參數(shù)跟整個(gè)流程有關(guān)系,分別是「num.network.threads」、「num.io.threads」。其中 num.io.threads 就是 I/O 工作線程池的大小配置。

               

下面我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖來講解下一個(gè)完整請求處理核心流程:

1)Clients 發(fā)送請求給 Acceptor 線程。

2)Acceptor 線程會(huì)創(chuàng)建 NIO Selector 對(duì)象,并創(chuàng)建 ServerSocketChannel 通道實(shí)例,然后將 Channel 和 OP_ACCEPT 事件綁定到 Selector 多路復(fù)用器上。

3)Acceptor 線程默認(rèn)創(chuàng)建3個(gè)Processor 線程參數(shù):num.network.threads, 并輪詢的將請求對(duì)象 SocketChannel 放入到連接隊(duì)列中。

4)這時(shí)候連接隊(duì)列就源源不斷有請求數(shù)據(jù)了,然后不停地執(zhí)行 NIO Poll, 獲取對(duì)應(yīng) SocketChannel 上已經(jīng)準(zhǔn)備就緒的 I/O 事件。

5)Processor 線程向 SocketChannel 注冊了 OP_READ/OP_WRITE 事件,這樣 客戶端發(fā)過來的請求就會(huì)被該 SocketChannel 對(duì)象獲取到,具體就是 processCompleteReceives 方法。

6)這個(gè)時(shí)候客戶端就可以源源不斷進(jìn)行請求發(fā)送了,服務(wù)端通過 Selector NIO Poll 不停的獲取準(zhǔn)備就緒的 I/O 事件。

7)然后根據(jù)Channel中獲取已經(jīng)完成的 Receive 對(duì)象,構(gòu)建 Request 對(duì)象,并將其存入到 Requestchannel 的 RequestQueue 請求隊(duì)列中 。

8)這個(gè)時(shí)候就該 I/O 線程池上場了,KafkaRequestHandler 線程循環(huán)地從請求隊(duì)列RequestQueue 中獲取 Request 實(shí)例,然后交由KafkaApis 的 handle 方法,執(zhí)行真正的請求處理邏輯,并最終將數(shù)據(jù)存儲(chǔ)到磁盤中。

9)待處理完請求后,KafkaRequestHandler 線程會(huì)將 Response 對(duì)象放入 Processor 線程的 Response 隊(duì)列。

10)然后 Processor 線程通過 Request 中的 ProcessorID 不停地從 Response 隊(duì)列中來定位并取出 Response 對(duì)象,返還給 Request 發(fā)送方。

2 了解Kafka高吞吐日志存儲(chǔ)架構(gòu)是如何設(shè)計(jì)嗎?

對(duì)于 Kafka 來說, 它主要用來處理海量數(shù)據(jù)流,這個(gè)場景的特點(diǎn)主要包括:

1)  寫操作:寫并發(fā)要求非常高,基本得達(dá)到百萬級(jí) TPS,順序追加寫日志即可,無需考慮更新操作。


2)讀操作:相對(duì)寫操作來說,比較簡單,只要能按照一定規(guī)則高效查詢即可,支持(offset或者時(shí)間戳)讀取。


根據(jù)上面兩點(diǎn)分析,對(duì)于寫操作來說,直接采用「順序追加寫日志」的方式就可以滿足 Kafka 對(duì)于百萬TPS寫入效率要求。

如何解決高效查詢這些日志呢?我們可以設(shè)想把消息的 Offset 設(shè)計(jì)成一個(gè)有序的字段,這樣消息在日志文件中也就有序存放了,也不需要額外引入哈希表結(jié)構(gòu),可以直接將消息劃分成若干個(gè)塊,對(duì)于每個(gè)塊我們只需要索引當(dāng)前塊的第一條消息的 Offset ,這個(gè)是不是有點(diǎn)二分查找算法的意思。即先根據(jù) Offset 大小找到對(duì)應(yīng)的塊, 然后再從塊中順序查找。如下圖所示:

這樣就可以快速定位到要查找的消息的位置了,在 Kafka 中,我們將這種索引結(jié)構(gòu)叫做稀疏哈希索引。

上面得出了 Kafka 最終的存儲(chǔ)實(shí)現(xiàn)方案, 即基于順序追加寫日志 + 稀疏哈希索引。

接下來我們來看看 Kafka 日志存儲(chǔ)結(jié)構(gòu):

從上圖看出來,Kafka 是基于主題 + 分區(qū) + 副本 + 分段 + 索引」的結(jié)構(gòu)進(jìn)行日志存儲(chǔ)的。

了解了整體的日志存儲(chǔ)架構(gòu),我們來看下 Kafka 日志格式,Kafka 日志格式也經(jīng)歷了多個(gè)版本迭代,這里我們主要看下V2版本的日志格式:

通過上圖可以得出:V2 版本日志格式主要是通過可變長度提高了消息格式的空間使用率,并將某些字段抽取到消息批次(RecordBatch)中,同時(shí)消息批次可以存放多條消息,從而在批量發(fā)送消息時(shí),可以大幅度地節(jié)省了磁盤空間。

接下來我們來看看日志消息寫入磁盤的整體過程如下圖所示:

3 針對(duì) Kafka 線上集群部署方案, 你是怎么做的?

這里我們從架構(gòu)師必備能力出發(fā), 以電商平臺(tái)為例講述了 Kafka 生產(chǎn)級(jí)容量評(píng)估方案該如何做?如何讓公司領(lǐng)導(dǎo)以及運(yùn)維部門得到認(rèn)可, 獲準(zhǔn)你的方案。

詳細(xì)可以深讀:八大步驟帶你深度剖析Kafka生產(chǎn)級(jí)容量評(píng)估方案

4 針對(duì) Kafka 線上系統(tǒng), 你是如何進(jìn)行監(jiān)控的?

Kafka 作為大型系統(tǒng)架構(gòu)中重要的一環(huán),有著舉足輕重的作用,因此 Kafka 集群的穩(wěn)定性尤為重要,我們要對(duì)生產(chǎn)的 Kafka 集群進(jìn)行全方位的監(jiān)控, 一般線上系統(tǒng)可以從以下五個(gè)維度進(jìn)行監(jiān)控:

01 主機(jī)節(jié)點(diǎn)監(jiān)控

所謂主機(jī)節(jié)點(diǎn)監(jiān)控就是監(jiān)控 Kafka 集群 Broker 所在節(jié)點(diǎn)機(jī)器的性能。主機(jī)節(jié)點(diǎn)監(jiān)控對(duì)于 Kafka 來說是最重要的,因?yàn)楹芏嗑€上環(huán)境問題首先就是由于主機(jī)的某些性能出現(xiàn)了問題。

因此對(duì)于 Kafka 來說,主機(jī)監(jiān)控通常是發(fā)現(xiàn)問題的第一步,主要性能指標(biāo)如下:

機(jī)器負(fù)載(Load)」、CPU 使用率」、內(nèi)存使用率」、磁盤 I/O 使用率」、「網(wǎng)絡(luò) I/O 使用率」、TCP 連接數(shù)」、打開文件數(shù)」、inode 使用情況」。

如果想要更好的監(jiān)控主機(jī)性能的話,有以下兩個(gè)教程可以學(xué)習(xí)和參考:

02 JVM 監(jiān)控

另一個(gè)重要的監(jiān)控維度就是 JVM 監(jiān)控。監(jiān)控 JVM 進(jìn)程主要是為了讓你全面地了解Kafka Broker 進(jìn)程。

要監(jiān)控 JVM 進(jìn)程,需要關(guān)注 3 個(gè)指標(biāo):

「監(jiān)控Full GC 發(fā)生頻率和時(shí)長」、監(jiān)控堆上活躍對(duì)象大小」、監(jiān)控應(yīng)用線程總數(shù)

03 Kafka 集群監(jiān)控

另外一個(gè)重要監(jiān)控維度就是 Kafka Broker 集群和各類客戶端的監(jiān)控,主要有3個(gè)方法:

1)查看 Broker 端重要日志:主要包括 Broker 端服務(wù)器日志 server.log,控制器日志 controller.log 以及主題分區(qū)狀態(tài)變更日志 state-change.log。其中,server.log 是最重要的,如果你的 Kafka 集群出現(xiàn)了故障,你要第一時(shí)間查看 server.log,定位故障原因。


2)查看 Broker 端關(guān)鍵線程運(yùn)行狀態(tài),例如: 

Log Compaction 線程:日志壓縮清理。一旦它掛掉了,所有 Compaction 操作都會(huì)中斷,但用戶對(duì)此通常是無感知的。


副本拉取消息的線程:主要執(zhí)行 Follower 副本向 Leader 副本拉取消息的邏輯。如果它們掛掉了,系統(tǒng)會(huì)表現(xiàn)為 Follower 副本延遲 Leader 副本越來越大 。


3)查看 Broker 端關(guān)鍵的 JMX 性能指標(biāo): 主要有BytesIn/BytesOut、NetworkProcessorAvgIdlePercent、RequestHandlerAvgIdlePercent、UnderReplicatedPartitions、ISRShrink/ISRExpand、ActiveControllerCount 這幾個(gè)指標(biāo) 。


04 Kafka 客戶端監(jiān)控

客戶端監(jiān)控主要是生產(chǎn)者和消費(fèi)者的監(jiān)控,生產(chǎn)者往 Kafka 發(fā)送消息,此時(shí)我們要了解客戶端機(jī)器與 Broker 機(jī)器之間的往返時(shí)延 RTT 是多少,對(duì)于跨數(shù)據(jù)中心或者異地集群來說,RTT 會(huì)更大,很難支撐很大的 TPS。

Producer角度: request-latency 是需要重點(diǎn)關(guān)注的JMX指標(biāo),即消息生產(chǎn)請求的延時(shí);另外 Sender 線程的運(yùn)行狀態(tài)也是非常重要的, 如果 Sender 線程掛了,對(duì)于用戶是無感知的,表象只是 Producer 端消息發(fā)送失敗。

Consumer角度: 對(duì)于 Consumer Group,需要重點(diǎn)關(guān)注 join rate 和 sync rate 指標(biāo),它表示 Rebalance 的頻繁程度。另外還包括消息消費(fèi)偏移量、消息堆積數(shù)量等。

05 Broker 之間的監(jiān)控

最后一個(gè)監(jiān)控維度就是 Broker 之間的監(jiān)控,主要指副本拉取的性能。Follower 副本實(shí)時(shí)拉取 Leader 副本的數(shù)據(jù),此時(shí)我們希望拉取過程越快越好。Kafka 提供了一個(gè)特別重要的 JMX 指標(biāo),叫做「under replicated partitions,意思就是比如我們規(guī)定這條消息,應(yīng)該在兩個(gè) Broker 上面保存,假設(shè)只有一個(gè) Broker 上保存該消息,那么這條消息所在的分區(qū)就叫 under replicated partitions,這種情況是特別關(guān)注的,因?yàn)橛锌赡茉斐蓴?shù)據(jù)的丟失。

另外還有一個(gè)比較重要的指標(biāo)是「active controllor count」。在整個(gè) Kafka 集群中應(yīng)該確保只能有一臺(tái)機(jī)器的指標(biāo)是1,其他全應(yīng)該是0,如果發(fā)現(xiàn)有一臺(tái)機(jī)器大于1,一定是出現(xiàn)腦裂了,此時(shí)應(yīng)該去檢查下是否出現(xiàn)了網(wǎng)絡(luò)分區(qū)。Kafka本身是不能對(duì)抗腦裂的,完全依靠 Zookeeper 來做,但是如果真正出現(xiàn)網(wǎng)絡(luò)分區(qū)的話,也是沒有辦法處理的,應(yīng)該讓其快速失敗重啟。

5 針對(duì) Kafka 線上系統(tǒng), 你是如何進(jìn)行調(diào)優(yōu)的?

對(duì) Kafka 來說,吞吐量延時(shí)是非常重要的優(yōu)化指標(biāo)。

吞吐量 TPS:是指 Broker 端或 Client 端每秒能處理的消息數(shù),越大越好。

延時(shí):表示從 Producer 端發(fā)送消息到 Broker 端持久化完成到 Consumer 端成功消費(fèi)之間的時(shí)間間隔。與吞吐量 TPS 相反,延時(shí)越短越好。

總之,高吞吐量、低延時(shí)是我們調(diào)優(yōu) Kafka 集群的主要目標(biāo)。

01 提升吞吐量

首先是提升吞吐量參數(shù)和措施:









Broker

num.replica.fetchers:表示 Follower 副本用多少個(gè)線程來拉取消息,默認(rèn)1個(gè)線程。如果 Broker 端 CPU 資源很充足,適當(dāng)增加該值但不要超過 CPU 核數(shù)」,以加快 Follower 副本的同步速度。這是因?yàn)樵谏a(chǎn)環(huán)境中,配置了 acks=all 的 Producer 端影響吞吐量的首要因素就是副本同步性能。適當(dāng)增加該值后通??梢钥吹?Producer 端吞吐量增加

replica.lag.time.max.ms:在 ISR 中,如果 Follower 長時(shí)間未向 Leader 發(fā)送通信請求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR,該值默認(rèn)為 30s。

num.network.threads:單個(gè)Acceptor創(chuàng)建Processor處理器的線程個(gè)數(shù),默認(rèn)值為3, 可以適當(dāng)提高該值為9。

num.io.threads:服務(wù)器用于處理請求的線程數(shù),可能包括磁盤 I/O,默認(rèn)值是 8,可以適當(dāng)提高該值為32。

調(diào)優(yōu)參數(shù)以避免頻繁性的 Full GC









Producer

batch.size:表示消息批次大小,默認(rèn)是 16kb。

如果 batch 設(shè)置太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;

如果 batch 設(shè)置太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。

所以適當(dāng)增加會(huì)提高吞吐量,建議從默認(rèn)的16kb增加到512kb或者1M。

buffer.memory:RecordAccumulator 發(fā)送消息的緩沖區(qū)總大小,默認(rèn)值是 32M,可以增加到 64M。

linger.ms:表示批次緩存時(shí)間,如果數(shù)據(jù)遲遲未達(dá)到 batch.size,sender 等待 linger.ms 之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0,意思就是消息必須立即被發(fā)送。

如果設(shè)置的太短,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;

如果設(shè)置的太長,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時(shí)。

所以適當(dāng)增加會(huì)提高吞吐量,建議10~100毫秒。

compression.type:默認(rèn)是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大 producer 端的 CPU 開銷。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。

設(shè)置acks=0/1,retries=0,優(yōu)化目標(biāo)是吞吐量,不要設(shè)置 acks=all副本同步時(shí)間拉長 及開啟重試 執(zhí)行時(shí)間拉長




Consumer

利用多線程增加整體吞吐量

fetch.min.bytes:表示只要 Broker 端積攢了多少數(shù)據(jù),就可以返回給 Consumer 端。默認(rèn)值1字節(jié),適當(dāng)增大該值為1kb或者更大。

fetch.max.bytes:消費(fèi)者獲取服務(wù)器端一批消息最大的字節(jié)數(shù)。一批次的大小受 message.max.bytes 【broker 配置】或 max.message.bytes 【topic config】影響,默認(rèn)是50M。

max.poll.records:表示一次 poll 拉取數(shù)據(jù)返回消息的最大條數(shù)大小,默認(rèn)是 500 條。

分 區(qū)

增加分區(qū)來提高吞吐量

02 降低延時(shí)

降低延時(shí)的目的就是盡量減少端到端的延時(shí)。

對(duì)比上面提升吞吐量的參數(shù),我們只能調(diào)整 Producer 端和 Consumer 端的參數(shù)配置。

對(duì)于 Producer 端,此時(shí)我們希望可以快速的將消息發(fā)送出去,必須設(shè)置 linger.ms=0,同時(shí)關(guān)閉壓縮,另外設(shè)置 acks = 1,減少副本同步時(shí)間。

而對(duì)于 Consumer 端我們只保持 fetch.min.bytes=1 ,即 Broker 端只要有能返回的數(shù)據(jù),就立即返回給 Consumer,減少延時(shí)。

03 合理設(shè)置分區(qū)數(shù)

分區(qū)數(shù)不是越多越好,也不是越少越好,需要搭建完集群,進(jìn)行壓測,再靈活調(diào)整分區(qū)個(gè)數(shù)。 

這里可以用 Kafka 官方自帶的腳本,對(duì) Kafka 進(jìn)行壓測。

1)生產(chǎn)者壓測:kafka-producer-perf-test.sh

2)消費(fèi)者壓測:kafka-consumer-perf-test.sh 

責(zé)任編輯:張燕妮 來源: 華仔聊技術(shù)
相關(guān)推薦

2021-10-22 09:28:15

開發(fā)技能代碼

2022-07-11 11:06:11

RocketMQ函數(shù).消費(fèi)端

2021-05-07 17:11:19

負(fù)載均衡運(yùn)維服務(wù)

2022-02-28 11:10:42

ZGCG1收集器

2024-07-03 08:28:44

HWKafkaLEO

2022-07-04 11:06:02

RocketMQ事務(wù)消息實(shí)現(xiàn)

2021-05-18 06:55:07

Java AQS源碼

2020-11-03 10:32:48

回調(diào)函數(shù)模塊

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2022-03-07 17:43:30

注冊微服務(wù)架構(gòu)

2022-06-27 11:04:24

RocketMQ順序消息

2021-04-25 10:45:59

Docker架構(gòu)Job

2020-06-28 07:39:44

Kafka分布式消息

2021-12-06 07:15:47

Pulsar地域復(fù)制

2022-09-26 11:32:14

用戶分層服務(wù)業(yè)務(wù)

2021-03-18 12:16:44

用戶分層業(yè)務(wù)

2020-11-27 06:28:55

Spring循環(huán)依賴

2021-08-15 18:59:13

垃圾收集器JDK

2021-01-28 11:39:01

數(shù)據(jù)分析銷售

2022-07-05 11:18:50

數(shù)據(jù)分析銷售業(yè)績
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)