Broker的實現(xiàn)邏輯-kafka知識體系(三)
上篇文章分享了kafka 生產(chǎn)端的邏輯,以及消息發(fā)送到緩存后由sender線程發(fā)送到Broker,那么Broker 是怎么進行數(shù)據(jù)接收和持久化的呢?下面我們從Broker 的網(wǎng)絡(luò)設(shè)計聊起。
Broker 網(wǎng)絡(luò)設(shè)計
kafka的網(wǎng)絡(luò)設(shè)計和Kafka的調(diào)優(yōu)有關(guān),這也是為什么它能支持高并發(fā)的原因。
Kafka的網(wǎng)絡(luò)三層架構(gòu)
首先客戶端發(fā)送請求全部會先發(fā)送給一個Acceptor,broker里面會存在3個線程(默認(rèn)是3個),這3個線程都是叫做processor,
Acceptor不會對客戶端的請求做任何的處理,直接封裝成一個個socketChannel發(fā)送給這些processor形成一個隊列,發(fā)送的方式是輪詢,就是先給第一個processor發(fā)送,然后再給第二個,第三個,然后又回到第一個。
消費者線程去消費這些socketChannel時,會獲取一個個request請求,這些request請求中就會伴隨著數(shù)據(jù)。
線程池里面默認(rèn)有8個線程,這些線程是用來處理request的,解析請求,如果request是寫請求,就寫到磁盤里。讀的話返回結(jié)果。processor會從response中讀取響應(yīng)數(shù)據(jù),然后再返回給客戶端。這就是Kafka的網(wǎng)絡(luò)三層架構(gòu)。

調(diào)優(yōu)點1
所以如果我們需要對kafka進行增強調(diào)優(yōu),增加processor并增加線程池里面的處理線程,就可以達到效果。request和response那一塊部分其實就是起到了一個緩存的效果,是考慮到processor們生成請求太快,線程數(shù)不夠不能及時處理的問題。所以這就是一個加強版的reactor網(wǎng)絡(luò)線程模型。
Broker數(shù)據(jù)存儲設(shè)計
【partition 的數(shù)據(jù)文件】
我們知道topic 是邏輯上的概念,partition是topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
例如創(chuàng)建2個topic名稱分別為report_push、launch_info, partitions數(shù)量都為partitions=4 存儲路徑和目錄規(guī)則為:xxx/message-folder
- |--report_push-0
- |--report_push-1
- |--report_push-2
- |--report_push-3
- |--launch_info-0
- |--launch_info-1
- |--launch_info-2
- |--launch_info-3
而partition物理上由多個segment組成。
【segment】log
每個segment 大小相等,順序讀寫.
每個segment數(shù)據(jù)文件以該段中最小的offset 命名,文件擴展名為.log
日志回滾受log.segment.bytes控制,默認(rèn)1G;
這樣在查找指定offset 的Message 的時候,用二分查找(跳表)就可以定位到該Message 在哪個segment 數(shù)據(jù)文件中.
在磁盤上,一個partition就是一個目錄,然后每個segment由一個index文件和一個log文件組成。如下:
- $ tree kafka | head -n 6
- kafka
- ├── events-1
- │ ├── 00000000003064504069.index
- │ ├── 00000000003064504069.log
- │ ├── 00000000003065011416.index
- │ ├── 00000000003065011416.log
Segment下的log文件就是存儲消息的地方
每個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本號等。
在磁盤上的數(shù)據(jù)格式和producer發(fā)送到broker的數(shù)據(jù)格式一模一樣,也和consumer收到的數(shù)據(jù)格式一模一樣。由于磁盤格式與consumer以及producer的數(shù)據(jù)格式一模一樣,這樣就使得Kafka可以通過零拷貝(zero-copy)技術(shù)來提高傳輸效率。
【segment】index
索引文件是內(nèi)存映射(memory mapped)的。
索引文件,一個稀疏格式的索引,受參數(shù)log.index.interval.bytes控制,默認(rèn)4KB。即不是每條數(shù)據(jù)都會寫索引,默認(rèn)每寫4KB數(shù)據(jù)才會寫一條索引。
Kafka 為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴展名為.index.
index 文件中并沒有為數(shù)據(jù)文件中的每條 Message 建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引.
這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。
有關(guān)內(nèi)存映射:
- 即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實時的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高I/O效率。Memory Mapped Files(后面簡稱mmap)也被翻譯成內(nèi)存映射文件,它的工作原理是直接利用操作系統(tǒng)的Page來實現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r候)。通過mmap,進程像讀寫硬盤一樣讀寫內(nèi)存,也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。mmap其實是Linux中的一個用來實現(xiàn)內(nèi)存映射的函數(shù),在Java NIO中可用MappedByteBuffer來實現(xiàn)內(nèi)存映射。
【Kafka中通過offset查詢消息內(nèi)容的整個流程】
Kafka 中存在一個 ConcurrentSkipListMap 來保存在每個日志分段。
offset-->concurrentSkipListMap-->找到baseOffset對應(yīng)的日志分段-->讀取索引文件.index-->找打不大于offset-baseoffset的最大索引項-->讀取分段文件(.log)-->從日志分段文件(.log)中順序查找
當(dāng)前索引文件的文件名即為 baseOffset 的值。
【日志留存策略】
Kafka 會定期檢查是否要刪除舊消息,見參數(shù)
log.retention.check.interval.ms,默認(rèn)5分鐘。當(dāng)前有三種日志留存策略:
基于空間:log.retention.bytes,默認(rèn)未開啟;
基于時間:log.retention.hours(mintues/ms),默認(rèn)7天;
基于起始位移:Kafka 0.11.0.0版本引入,解決流處理場景中已處理的中間消息刪除問題。
目前基于時間的日志留存策略最常使用。
調(diào)優(yōu)點2
即盡力保持客戶端版本和 Broker 端版本一致
即盡力保持客戶端版本和 Broker 端版本一致。不要小看版本間的不一致問題,它會令 Kafka 喪失很多性能收益,比如 Zero Copy。

圖中藍(lán)色的 Producer、Consumer 和 Broker 的版本是相同的,它們之間的通信可以享受 Zero Copy 的快速通道;相反,一個低版本的 Consumer 程序想要與 Producer、Broker 交互的話,就只能依靠 JVM 堆中轉(zhuǎn)一下,丟掉了快捷通道,就只能走慢速通道了。因此,在優(yōu)化 Broker 這一層時,你只要保持服務(wù)器端和客戶端版本的一致,就能獲得很多性能收益了。
Broker 副本機制
分區(qū)副本默認(rèn)1,見參數(shù)
default.replication.factor。
【副本作用(并不提供讀寫分離)】
1、實現(xiàn)冗余,提高消息可靠性
2、實現(xiàn)高可用,參與leader選舉,在leader不可用時提高可用性。
3、leader處理partition的所有讀寫請求;follower會被動定期地去復(fù)制leader上的數(shù)據(jù)
【leader副本選舉】
1、由控制器負(fù)責(zé)
2、選舉機制或策略
所有的副本(replicas)統(tǒng)稱為Assigned Replicas,即AR
副本同步隊列(ISR)
SR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數(shù)據(jù)有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR
基本策略是從AR中找第一個存活的副本,且該副本在ISR中。
3、leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發(fā)現(xiàn)ISR變化,則會將新的ISR的信息返回到Zookeeper的相關(guān)節(jié)點中。
【副本機制的好處】
通常來講副本機制的好處:
1、提供數(shù)據(jù)冗余。即使系統(tǒng)部分組件失效,系統(tǒng)依然能夠繼續(xù)運轉(zhuǎn),因而增加了整體可用性以及數(shù)據(jù)持久性。
2、提供高伸縮性。支持橫向擴展,能夠通過增加機器的方式來提升讀性能,進而提高讀操作吞吐量。
3、改善數(shù)據(jù)局部性。允許將數(shù)據(jù)放入與用戶地理位置相近的地方,從而降低系統(tǒng)延時。
對于 Apache Kafka 而言,目前只能享受到副本機制帶來的第 1 個好處,也就是提供數(shù)據(jù)冗余實現(xiàn)高可用性和高持久性。
對于客戶端用戶而言,Kafka 的追隨者副本沒有任何作用,它既不能像 MySQL 那樣幫助領(lǐng)導(dǎo)者副本“抗讀”,也不能實現(xiàn)將某些副本放到離客戶端近的地方來改善數(shù)據(jù)局部性。
Broker 高水位機制
【概念】
HW即高水位,是Kafka副本對象的重要屬性,分區(qū)的高水位由leader副本的高水位表示,含義是被follower副本同步之后的位置。
對于leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費
【作用】
定義消息可見性,只有分區(qū)高水位以下的消息才能被消費;
幫助kafka完成副本同步,kafka是基于高水位實現(xiàn)的異步的副本同步機制。
【LEO的概念】
含義是日志末端位移(Log End Offset),下一條消息寫入的位移。
總結(jié)為什么MySQL的索引不采用kafka的索引機制?
既然kafka那么優(yōu)秀那么快,為什么MySQL的索引不采用kafka的索引機制?
我們還要考慮一個問題:InnoDB中維護索引的代價比Kafka中的要高。Kafka中當(dāng)有新的索引文件建立的時候ConcurrentSkipListMap才會更新,而不是每次有數(shù)據(jù)寫入時就會更新,這塊的維護量基本可以忽略,B+樹中數(shù)據(jù)有插入、更新、刪除的時候都需要更新索引,還會引來“頁分裂”等相對耗時的操作。Kafka中的索引文件也是順序追加文件的操作,和B+樹比起來工作量要小很多。
其實說到底還是應(yīng)用場景不同所決定的。MySQL中需要頻繁地執(zhí)行CRUD的操作,CRUD是MySQL的主要工作內(nèi)容,而為了支撐這個操作需要使用維護量大很多的B+樹去支撐。Kafka中的消息一般都是順序?qū)懭氪疟P,再到從磁盤順序讀出(不深入探討page cache等),他的主要工作內(nèi)容就是:寫入+讀取,很少有檢索查詢的操作,換句話說,檢索查詢只是Kafka的一個輔助功能,不需要為了這個功能而去花費特別太的代價去維護一個高level的索引。前面也說過,Kafka中的這種方式是在磁盤空間、內(nèi)存空間、查找時間等多方面之間的一個折中。