自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

圖解 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)演進(jìn)過程

開發(fā) 架構(gòu)
今天我們來深度剖析下「Kafka Broker 端網(wǎng)絡(luò)架構(gòu)和請求處理流程」是如何設(shè)計的? 相信使用過 Kafka 的朋友都知道其吞吐量可以高達(dá)百萬,但很少人理解其中的設(shè)計原理。


大家好,我是 華仔, 又跟大家見面了。

上一篇作為專題系列的第一篇,我們深度剖析了關(guān)于 Kafka 存儲架構(gòu)設(shè)計的實(shí)現(xiàn)細(xì)節(jié),今天開啟第二篇,我們來深度剖析下「Kafka Broker 端網(wǎng)絡(luò)架構(gòu)和請求處理流程」是如何設(shè)計的?  相信使用過 Kafka 的朋友都知道其吞吐量可以高達(dá)百萬,但很少人理解其中的設(shè)計原理。

那么 Kafka Broker 端網(wǎng)絡(luò)架構(gòu)和請求處理到底是使用了哪些高大上的技術(shù)?它到底解決了什么問題?究竟是怎么解決的?

只有了解了這些, 我們才能深刻掌握 Kafka 服務(wù)端設(shè)計精髓所在,更加深刻理解一個高并發(fā)、高性能服務(wù)端架構(gòu)該如何設(shè)計。

認(rèn)真讀完這篇文章,我相信你會對Kafka Broker請求處理流程和網(wǎng)絡(luò)架構(gòu)設(shè)計實(shí)現(xiàn)細(xì)節(jié),有更加深刻的理解。

這篇文章干貨很多,希望你可以耐心讀完。

圖片

一、總體概述

要想理解 Kafka Broker 請求處理架構(gòu)設(shè)計,我們需要從簡單請求處理模型來說起。

對于日常系統(tǒng)開發(fā),我們都知道是基于 Request/Response 的模式來實(shí)現(xiàn)的, 對于 Kafka 來說, 無論是 Producer 端、Consumer 端 還是 Broker 端,他們之間的請求交互也都是基于「Request/Response」模式來完成的。比如,客戶端會通過網(wǎng)絡(luò)發(fā)送消息生產(chǎn)請求給 Broker,而 Broker 處理完成后,會發(fā)送對應(yīng)的響應(yīng)給到客戶端。

下面,我會從自我設(shè)計角度出發(fā),如果是我們會如何設(shè)計,帶你一步步演化出來「kafka Broker 的網(wǎng)絡(luò)請求處理」架構(gòu)。

在這個過程中,你會看到 Kafka 在處理請求的過程中會遇到哪些高性能和高并發(fā)問題,以及架構(gòu)為什么要這樣演進(jìn),從而理解 Kafka 這么設(shè)計的意義和精妙之處。

二、順序處理模式

我們從最簡單的網(wǎng)絡(luò)編程思路處理方式講起。

因?yàn)閷τ?Kafka Broker 來說就是用來接收生產(chǎn)者發(fā)送過來的請求,那這個時候最簡單的實(shí)現(xiàn)大概是這樣的:

圖片

如上述代碼所示:我們可以理解 Kafka 每個服務(wù)器啟動起來后就是一個 while 循環(huán), 不斷的 accept 生產(chǎn)者提交上來的請求, 然后進(jìn)行處理并存儲到磁盤上,這種方式實(shí)現(xiàn)最簡單,也非常好理解,但是這種方式存在2個致命的缺陷?

  1. 請求阻塞:只能順序處理每個請求,即每個請求都必須等待前一個請求處理完畢才能得到處理。
  2. 吞吐量非常差:由于只能順序處理,無法并發(fā),效率太低,所以吞吐量非常差,只適合請求發(fā)送非常不頻繁的系統(tǒng)。

從上面來看很明顯,如果你的 Kafka 系統(tǒng)請求并發(fā)量很大,意味著要處理的時間就會越久。那按照前面我們提到的 Kafka「吞吐量」的標(biāo)準(zhǔn),這個方案遠(yuǎn)遠(yuǎn)無法滿足我們對高性能、高并發(fā)的要求。

那有什么更好的方案可以快速處理請求嗎?

接下來我們可以試著采取這個方案:獨(dú)立線程異步處理模式。

三、多線程異步處理模式

既然同步方式會阻塞請求,吞吐量差, 我們可以嘗試著使用獨(dú)立線程異步方式進(jìn)行處理, 即經(jīng)典的 connection per thread 模型, 那這個時候的實(shí)現(xiàn)大概是這樣的:

圖片

如上述代碼所示:同上還是一個 while 循環(huán)不斷的 accept 生產(chǎn)者提交上來的請求,但是這時候 Kafka 系統(tǒng)會為每個請求都創(chuàng)建一個「單獨(dú)的線程」來處理。

這個實(shí)現(xiàn)方案的好處就是:

  1. 吞吐量稍強(qiáng):相對上面同步方式的方案,一定程度上極大地提高了服務(wù)器的吞吐量。
  2. 非阻塞:它是完全異步的,每個請求的處理都不會阻塞下一個請求。

但同樣缺陷也同樣很明顯:即為每個請求都創(chuàng)建線程的做法開銷很大,在某些高并發(fā)場景下會壓垮整個服務(wù)??梢?,這個方案也只適用于請求發(fā)送頻率很低的業(yè)務(wù)場景。還是無法滿足我們對高性能、高并發(fā)的要求。

既然這種方案還是不能滿足, 那么我們究竟該使用什么方案來支撐高并發(fā)呢?

這個時候我們可以想想我們?nèi)粘i_發(fā)用到的7層負(fù)載Nginx或者Redis在處理高并發(fā)請求的時候是使用什么方案呢?

從上面啟發(fā)你可以看出,提升系統(tǒng) I/O 并發(fā)性能的關(guān)鍵思路就是:「事件驅(qū)動」。

想必大家已經(jīng)猜到了,沒錯,就是「多路復(fù)用」。那么Kafka 是不是也是采用這種方案來實(shí)現(xiàn)呢?

這里我們先考慮采用基于「事件驅(qū)動」的設(shè)計方案,當(dāng)有事件觸發(fā)時,才會調(diào)用處理器進(jìn)行數(shù)據(jù)處理。

四、Reactor 模式

在高性能網(wǎng)絡(luò)編程領(lǐng)域,有一個非常著名的模式——Reactor模式。那么何為「Reactor模式」,首先它是基于事件驅(qū)動的,有一個或多個并發(fā)輸入源,有一個Service Handler,有多個Request Handler;這個Service Handler會同步的將輸入的請求輪詢地分發(fā)給相應(yīng)的Request Handler進(jìn)行處理。

借助于 Doug Lea(就是那位讓人無限景仰的大爺)的 "Scalable IO in Java" 中講述的Reactor模式。

"Scalable IO in Java" 的地址是:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf。

簡單來說,Reactor 模式特別適合應(yīng)用于處理多個客戶端并發(fā)向服務(wù)器端發(fā)送請求的場景。這里借用大神 PDF 中的一幅圖來說明 Reactor 架構(gòu):

圖片

從上面這張圖中,我們可以看出多個客戶端會發(fā)送請求給到 Reactor。Reactor 有個請求分發(fā)線程 Dispatcher,也就是圖中的綠色的 Acceptor,它會將不同的請求下分發(fā)到多個工作線程中處理。

在這個架構(gòu)中,Acceptor 線程只是用來進(jìn)行請求分發(fā),所以非常輕量級,因此會有很高的吞吐量。而這些工作線程可以根據(jù)實(shí)際系統(tǒng)負(fù)載情況動態(tài)調(diào)節(jié)系統(tǒng)負(fù)載能力,從而達(dá)到請求處理的平衡性。

基于上面的 Reactor 架構(gòu), 我們來看看如果是我們該如何設(shè)計 Kafka 服務(wù)端的架構(gòu)?

圖片

  1. 這里我們采用多路復(fù)用方案,Reactor 設(shè)計模式,并引用Java NIO的方式可以更好的解決上面并發(fā)請求問題。
  2. 當(dāng) Client 端將請求發(fā)送到 Server 端的時候, 首先在 Server 端有個多路復(fù)用器(Selector),然后會啟動一個 Accepter 線程將OP_CONNECT事件注冊到多路復(fù)用器上, 主要用來監(jiān)聽連接事件到來。
  3. 當(dāng)監(jiān)聽到連接事件后,就會在多路復(fù)用器上注冊 OP_READ 事件, 這樣 Cient 端發(fā)送過來的請求, 都會被接收到。如果請求特別多的話, 我們這里進(jìn)行優(yōu)化, 創(chuàng)建一個Read HandlePool 線程池。
  4. 當(dāng) Read HandlePool 線程池接收到請求數(shù)據(jù)后,最終會交給 Handler ThreadPool 線程池進(jìn)行后續(xù)處理。比如如果是生產(chǎn)者發(fā)送過來的請求,肯定會解析請求體,處理并最終存儲到磁盤中,待處理完后要返回處理結(jié)果狀態(tài), 這時候就由它在多路復(fù)用器上注冊O(shè)P_WRITE事件來完成。這樣多路復(fù)用器遍歷到 OP_WRITE 事件后就會將請求返回到 Client 端。
  5. 在上圖中我們看到在整個流程中還有一個 MessageQueue 的隊(duì)列組件存在, 為什么要加這個組件呢? 我們可以想象一下, 如果請求量非常大,直接交給 Handler ThreadPool 線程池進(jìn)行處理, 可能會出現(xiàn)該線程池處理不過來的情況發(fā)生,如果處理不過來,也會出現(xiàn)阻塞瓶頸。所以這里我們在 Server 端內(nèi)部也設(shè)計一個消息隊(duì)列, 起到一個緩沖的作用,Handler ThreadPool 線程池會根據(jù)自己的負(fù)載能力進(jìn)行處理。

以上就是我們引用了「多路復(fù)用」的設(shè)計方案,但是 Kafka Broker 端就是這樣的架構(gòu)設(shè)計方案嗎?如果我們是 Kafka 系統(tǒng)架構(gòu)的設(shè)計者,采用這樣的架構(gòu)設(shè)計方案會不會還是有什么問題,有沒有哪個環(huán)節(jié)會出現(xiàn)系統(tǒng)性能瓶頸呢?

這是個值得思考的問題, 很考驗(yàn)?zāi)愕募軜?gòu)設(shè)計能力。

細(xì)心的讀者可能會發(fā)現(xiàn):對于 Kafka 這種超高并發(fā)系統(tǒng)來說,一個 Selector 多路復(fù)用器是 Hold 不住的,從上圖可以得出,我們監(jiān)聽這些連接、接收請求、處理響應(yīng)結(jié)果都是同一個 Selector 在進(jìn)行處理,很容易成為系統(tǒng)性能瓶頸。

接下來,我們將進(jìn)一步進(jìn)行優(yōu)化,為了減輕當(dāng)前 Selector 的處理負(fù)擔(dān),引入另外一個Selector 處理隊(duì)列,如下圖所示:

圖片

  1. 首先上圖是目前我認(rèn)為最接近 Kafka Broker 真實(shí)架構(gòu)設(shè)計方案的。
  2. 整體架構(gòu)跟上一版的類似,只不過這里多引入了一個多 Selector 處理隊(duì)列,原來的 Selector 只負(fù)責(zé)監(jiān)聽連接, 這時候有讀者就會有疑問,請求量超級大的時候,一個 Selector 會不會成為瓶頸呢? 這里可以大可放心, 這時候它的工作非常單一,是完全能 hold 住的。
  3. 那么對于我們接收請求、處理請求、返回狀態(tài)操作都會交由多 Selector 處理隊(duì)列,至于這里到底需要多少個 Selector,又會跟什么參數(shù)和配置有關(guān)系,我們后續(xù)再進(jìn)行分析,總之這里記住有多個 Selector 就行了,這樣系統(tǒng)壓力就會被分散處理。
  4. 另外我們要搞清楚的一點(diǎn)就是對于 Kafka 服務(wù)端指的是每個 Broker 節(jié)點(diǎn),如果我們的服務(wù)集群總共有10個節(jié)點(diǎn), 每個節(jié)點(diǎn)內(nèi)部都是上面的這樣的架構(gòu),這樣我們就有理由相信如果采用這樣的架構(gòu)設(shè)計方案,是可以支持高并發(fā)和高性能的。

架構(gòu)設(shè)計方案演進(jìn)到這里,基本上已經(jīng)差不多了,接下來我們看看 Kafka 真實(shí)超高并發(fā)的網(wǎng)絡(luò)架構(gòu)是如何設(shè)計的。

五、Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)

在上面 Kafka 高性能、高吞吐量架構(gòu)演進(jìn)的時候,我們提到了 Java NIO 以及 Reactor 設(shè)計模式。實(shí)際上,搞透了「Kafka 究竟是怎么使用 NIO 來實(shí)現(xiàn)網(wǎng)絡(luò)通信的」,不僅能讓我們掌握 Kafka 請求處理全流程處理,也能讓我們對 Reactor 設(shè)計模式有更深的理解,還能幫助我們解決很多實(shí)際問題。

那么接下來我們就來深入剖析下 Kafka 的 NIO 通訊機(jī)制吧。

我們先從整體上看一下完整的網(wǎng)絡(luò)通信層架構(gòu),如下圖所示:

圖片

  1. 從上圖中我們可以看出,Kafka 網(wǎng)絡(luò)通信架構(gòu)中用到的組件主要由兩大部分構(gòu)成:SocketServer和RequestHandlerPool。
  2. SocketServer 組件是 Kafka 超高并發(fā)網(wǎng)絡(luò)通信層中最重要的子模塊。它包含Acceptor 線程、Processor 線程和 RequestChannel等對象,都是網(wǎng)絡(luò)通信的重要組成部分。它主要實(shí)現(xiàn)了 Reactor 設(shè)計模式,主要用來處理外部多個 Clients(這里的 Clients 可能包含 Producer、Consumer 或其他 Broker)的并發(fā)請求,并負(fù)責(zé)將處理結(jié)果封裝進(jìn) Response 中,返還給 Clients。
  3. RequestHandlerPool 組件就是我們常說的 I/O 工作線程池,里面定義了若干個 I/O 線程,主要用來執(zhí)行真實(shí)的請求處理邏輯。
  4. 這里注意的是:跟 RequestHandler 相比, 上面所說的Acceptor、Processor 線程 還有 RequestChannel 等都不做請求處理,它們只是請求和響應(yīng)的「搬運(yùn)工」。

接下來我們來具體聊聊SocketServer中的實(shí)現(xiàn)原理,這里先來講講:

  1. Acceptor 線程
  2. Processor 線程

以Kafka 2.5版本,源碼位置:

https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/network/SocketServer.scala。

圖片

1、聊聊 Acceptor 線程

在經(jīng)典的 Reactor 設(shè)計模式有個 「Dispatcher」 的角色,主要用來接收外部請求并分發(fā)給下面的實(shí)際處理線程。通過上面分析我們知道在 Kafka 網(wǎng)絡(luò)架構(gòu)設(shè)計中,這個 Dispatcher 就是「Acceptor 線程」。

Acceptor 線程是用來接收和創(chuàng)建外部 TCP 連接的線程。在Broker 端每個 SocketServer 實(shí)例只會創(chuàng)建一個 Acceptor 線程。它的主要功能就是創(chuàng)建連接,并將接收到的 Request 請求傳遞給下游的 Processor 線程處理。

/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 1. 創(chuàng)建底層的NIO Selector對象,用來監(jiān)聽連接創(chuàng)建請求、讀寫請求等
private val nioSelector = NSelector.open()
// 2. Broker端創(chuàng)建對應(yīng)的ServerSocketChannel實(shí)例,然后將Channel注冊到Selector對象上
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 3. 創(chuàng)建Processor線程池
private val processors = new ArrayBuffer[Processor]()
......
/**
* Accept loop that checks for new connection attempts
*/
def run(): Unit = {
//注冊O(shè)P_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor線程啟動完成
startupComplete()
try {
// 當(dāng)前使用的Processor序號,從0開始
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒獲取一次就緒I/O事件
val ready = nioSelector.select(500)
// 如果有I/O事件準(zhǔn)備就緒
if (ready > 0) {
........
// 調(diào)用accept方法創(chuàng)建Socket連接
accept(key).foreach { socketChannel =>
........
// 指定由哪個Processor線程進(jìn)行處理
processor = synchronized {
.........
processors(currentProcessorIndex)
}
// 更新Processor線程序號
currentProcessorIndex += 1
}
.........
}
}

這里重點(diǎn)看下 Acceptor 線程中三個非常關(guān)鍵且重要的屬性和方法:

  1. nioSelector:它就是我們所熟悉的 Java NIO 庫中的 Selector 對象實(shí)例,所有網(wǎng)絡(luò)通信組件實(shí)現(xiàn) Java NIO 機(jī)制的基礎(chǔ)。
  2. processors:通過源碼我們可以知道在Acceptor 線程在初始化時,需要創(chuàng)建對應(yīng)的 Processor 線程池。由此可以得出,Processor 線程是在 Acceptor 線程中管理和維護(hù)的。
  3. run方法:它是處理 Reactor 模式中分發(fā)邏輯的主要實(shí)現(xiàn)方法。

圖片

  1. 從上述源碼中,我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環(huán)的輪詢準(zhǔn)備就緒的 I/O 事件。
  2. 這里的 I/O 事件主要是指網(wǎng)絡(luò)連接創(chuàng)建事件即:SelectionKey.OP_ACCEPT。
  3. 這樣注冊好事件后,一旦后續(xù)接收到連接請求后,Acceptor 線程就會指定一個 Processor 線程,并將該請求交給它并創(chuàng)建網(wǎng)絡(luò)連接用于后續(xù)處理。

2、聊聊 Processor 線程

從上面分析我們知道 Acceptor 只是做了請求入口連接處理的,那么,真正創(chuàng)建網(wǎng)絡(luò)連接以及分發(fā)網(wǎng)絡(luò)請求是由 Processor 線程來完成的。

override def run(): Unit = {
// 等待Processor線程啟動完成
startupComplete()
try {
while (isRunning) {
try {
// 創(chuàng)建新連接
configureNewConnections()
// 發(fā)送Response
processNewResponses()
// 執(zhí)行NIO poll,獲取對應(yīng)SocketChannel上準(zhǔn)備就緒的I/O操作
poll()
// 將接收到的Request放入Request隊(duì)列
processCompletedReceives()
.......
} catch {
.........
}
}
} finally {
........
}
}
........
// 默認(rèn)連接對接大小
val ConnectionQueueSize = 20
// 保存要創(chuàng)建的新連接信息
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 一個臨時 Response 隊(duì)列
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
// Response 隊(duì)列
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

從上面 Processor 線程源碼,可以看出 Kafka 的代碼邏輯實(shí)現(xiàn)的非常好,各個子方法的邊界非常清楚。

這里我們就不展開源碼分析了, 更深入詳細(xì)的等到源碼分析專題再進(jìn)行。我們簡單的看下 Processor 線程初始化時要做的事情。

看上面代碼最后部分,我們知道每個 Processor 線程在創(chuàng)建時都會創(chuàng)建 3 個隊(duì)列。

  1. newConnections 隊(duì)列:它主要是用來保存要創(chuàng)建的新連接信息,也就是SocketChannel 對象,目前是硬編碼隊(duì)列長度大小為20。每當(dāng) Processor 線程接收到新的連接請求時,都會將對應(yīng)的 SocketChannel 對象放入隊(duì)列,等到后面創(chuàng)建連接時,從該隊(duì)列中獲取 SocketChannel,然后注冊新的連接。
  2. inflightResponse 隊(duì)列:它是一個臨時的 Response 隊(duì)列, 當(dāng) Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊(duì)列。它存在的意義:由于有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回 Request 發(fā)送方后,才能執(zhí)行,因此需要暫存到臨時隊(duì)列。
  3. ResponseQueue 隊(duì)列:它主要是存放需要返回給Request 發(fā)送方的所有 Response 對象。通過源碼得知:每個 Processor 線程都會維護(hù)自己的 Response 隊(duì)列。

圖片圖片

六、請求處理核心流程剖析

上面深入的剖析了 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu) 以及 SocketServer 中的 Acceptor 線程跟 Processor 線程的實(shí)現(xiàn)原理, 接下來我們來將請求處理核心流程給串起來。

只有搞透這部分的實(shí)現(xiàn)原理,才能幫助我們有針對性的進(jìn)行 Broker端請求處理的性能調(diào)優(yōu)。

比如:在上面網(wǎng)絡(luò)架構(gòu)圖,有兩個參數(shù)跟整個流程有關(guān)系,分別是num.network.threads、num.io.threads。如果我們不掌握請求處理的整個流程,就不能更好的對此進(jìn)行調(diào)整,來達(dá)到更高的性能要求。

其中 num.io.threads 就是 I/O 工作線程池的大小配置,即 KafkaRequestHandlerPool 線程池,它才是「真正處理 Kafka 請求」的地方。

以Kafka 2.5版本,源碼位置:

https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/server/KafkaRequestHandler.scala。

圖片

/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, //I/O線程序號
brokerId: Int, //所在Broker序號,即broker.id值
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, //I/O線程池大小
val requestChannel: RequestChannel, //請求處理通道
apis: KafkaApis, //KafkaApis類,用于真正實(shí)現(xiàn)請求處理邏輯的類
time: Time) extends Runnable with Logging {
......
def run(): Unit = {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從請求隊(duì)列中獲取下一個待處理的請求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
// 統(tǒng)計線程空閑時間
val idleTime = endTime - startSelectTime
// 更新線程空閑百分比指標(biāo)
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
// 當(dāng)關(guān)閉線程請求處理
case RequestChannel.ShutdownRequest =>
......
// 當(dāng)普通請求到來時
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
// 由KafkaApis.handle方法執(zhí)行相應(yīng)處理邏輯
apis.handle(request)
} catch {
....
} finally {
// 釋放請求對象資源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
}

圖片

下面我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖來講解下一個完整請求處理核心流程:

  1. Clients 發(fā)送請求給 Acceptor 線程。
  2. Acceptor 線程會創(chuàng)建 NIO Selector 對象,并創(chuàng)建 ServerSocketChannel 實(shí)例,然后將ChannelOP_ACCEPT事件綁定到 Selector 多路復(fù)用器上。
  3. Acceptor 線程還會默認(rèn)創(chuàng)建3個大小的 Processor 線程池,參數(shù):num.network.threads, 并輪詢的將請求對象 SocketChannel 放入到連接隊(duì)列中(newConnections)。
  4. 這時候連接隊(duì)列就源源不斷有請求數(shù)據(jù)了,然后不停地執(zhí)行NIO Poll, 獲取對應(yīng) SocketChannel 上已經(jīng)準(zhǔn)備就緒的 I/O 事件。
  5. Processor 線程向 SocketChannel 注冊了OP_READ/OP_WRITE事件,這樣 客戶端發(fā)過來的請求就會被該 SocketChannel 對象獲取到,具體就是CompleteReceives。
  6. 這個時候客戶端就可以源源不斷進(jìn)行請求發(fā)送了,服務(wù)端通過Selector NIO Poll不停的獲取準(zhǔn)備就緒的 I/O 事件。
  7. 然后根據(jù)Channel中獲取已經(jīng)完成的 Receive 對象,構(gòu)建 Request 對象,并將其存入到RequestchannelRequestQueue請求隊(duì)列中 。
  8. 這個時候就該 I/O 線程池上場了,KafkaRequestHandler 線程循環(huán)地從請求隊(duì)列中獲取 Request 實(shí)例,然后交由KafkaApishandle方法,執(zhí)行真正的請求處理邏輯,并最終將數(shù)據(jù)存儲到磁盤中。
  9. 待處理完請求后,KafkaRequestHandler線程會將 Response 對象放入Processor線程的Response隊(duì)列。
  10. 然后 Processor 線程通過 Request 中的ProcessorID不停地從 Response 隊(duì)列中來定位并取出 Response 對象,返還給 Request 發(fā)送方。

至此,我們深入剖析完畢 Kafka 網(wǎng)絡(luò)架構(gòu)請求「核心流程」。

七、系統(tǒng)調(diào)優(yōu)

搞透了 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)設(shè)計和請求處理核心流程后,我們來聊聊 Broker 端參數(shù)調(diào)優(yōu)。

對 Kafka 而言,性能一般是指吞吐量和延時。所以高吞吐量、低延時是我們調(diào)優(yōu) Kafka 集群的主要目標(biāo)。

Broker 端調(diào)優(yōu)主要就是合理地設(shè)置 Broker 端參數(shù)值,以匹配你的生產(chǎn)環(huán)境。另外還有一點(diǎn)要說明的就是「保證服務(wù)器端和客戶端版本的一致」,做到這一點(diǎn),就能獲得很多性能收益了。

num.network.threads

創(chuàng)建 Processor 處理網(wǎng)絡(luò)請求線程個數(shù),建議設(shè)置為 Broker 當(dāng)前CPU核心數(shù)*2,這個值太低經(jīng)常出現(xiàn)網(wǎng)絡(luò)空閑太低而缺失副本。

num.io.threads

創(chuàng)建 KafkaRequestHandler 處理具體請求線程個數(shù),建議設(shè)置為Broker磁盤個數(shù)*2。

num.replica.fetchers

建議設(shè)置為CPU核心數(shù)/4,適當(dāng)提高可以提升CPU利用率及 Follower同步 Leader 數(shù)據(jù)當(dāng)并行度。

compression.type

建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量。

queued.max.requests

在網(wǎng)絡(luò)線程停止讀取新請求之前,可以排隊(duì)等待I/O線程處理的最大請求個數(shù),生產(chǎn)環(huán)境建議配置最少500以上,默認(rèn)500。

log.flush.xxx

  • ?log.flush.scheduler.interval.ms
  • log.flush.interval.ms
  • log.flush.interval.messages?

這幾個參數(shù)表示日志數(shù)據(jù)刷新到磁盤的策略,應(yīng)該保持默認(rèn)配置,刷盤策略讓操作系統(tǒng)去完成,由操作系統(tǒng)來決定什么時候把數(shù)據(jù)刷盤;如果設(shè)置來這個參數(shù),可能對吞吐量影響非常大。

auto.leader.rebalance.enable

表示是否開啟leader自動負(fù)載均衡,默認(rèn)true;我們應(yīng)該把這個參數(shù)設(shè)置為false,因?yàn)樽詣迂?fù)載均衡不可控,可能影響集群性能和穩(wěn)定。

八、總結(jié)

這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。

1、對于 Kafka 這樣一個優(yōu)秀的服務(wù)端系統(tǒng)架構(gòu)來說,應(yīng)該遵循高可用、高性能、高并發(fā) 3 大原則。

2、本文從最簡單的網(wǎng)絡(luò)編程思路出發(fā)一步一步演進(jìn)到 Reactor 設(shè)計模式,假設(shè)我們就是 Kafka 架構(gòu)的設(shè)計者,我們該如何設(shè)計其服務(wù)端網(wǎng)絡(luò)架構(gòu)。

3、通過本文的深度剖析,提升系統(tǒng)I/O性能的核心是基于「事件驅(qū)動」模型實(shí)現(xiàn)。

4、在剖析完服務(wù)端網(wǎng)絡(luò)架構(gòu)后,我們也深度剖析了 SocketServer中兩個最重要的線程:Acceptor 線程和 Processor 線程。

5、接著我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖又梳理了 Kafka 請求處理核心流程。

6、最后給大家分析并做了 Broker 端系統(tǒng)調(diào)優(yōu)的方案。

責(zé)任編輯:姜華 來源: 華仔聊技術(shù)
相關(guān)推薦

2017-11-30 11:29:15

數(shù)據(jù)中心網(wǎng)絡(luò)虛擬化

2014-06-16 14:35:31

OpenFlow

2018-09-19 13:42:28

Kubernetes架構(gòu)負(fù)載均衡

2010-02-22 13:16:25

軟交換技術(shù)

2009-11-03 10:21:46

ADSL接入網(wǎng)技術(shù)

2020-03-09 08:00:03

架構(gòu)緩存集群

2024-08-23 16:04:45

2019-03-06 09:36:12

Kafka緩存磁盤

2018-01-09 22:18:18

架構(gòu)阿里巴巴服務(wù)器

2018-05-29 09:21:13

K8S系統(tǒng)架構(gòu)

2018-05-31 21:07:14

工業(yè)4.0工業(yè)物聯(lián)網(wǎng)IIoT

2009-12-25 15:28:51

光纖接入技術(shù)

2012-05-21 09:57:13

IPv6

2024-10-30 10:06:51

2021-12-07 07:32:09

kafka架構(gòu)原理

2018-04-03 09:27:42

分布式架構(gòu)系統(tǒng)

2016-12-12 10:33:08

網(wǎng)易云

2016-12-13 14:16:28

直播

2020-09-08 06:30:59

微服務(wù)代碼模塊

2019-07-18 11:55:53

架構(gòu)運(yùn)維技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號