在家辦公這些天整理的Kafka知識(shí)點(diǎn)大全
Kakfa 廣泛應(yīng)用于國(guó)內(nèi)外大廠,例如 BAT、字節(jié)跳動(dòng)、美團(tuán)、Netflix、Airbnb、Twitter 等等。今天我們通過這篇文章深入了解 Kafka 的工作原理。
圖片來自 Pexels
Kafka 概述
Kakfa 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于大數(shù)據(jù)的實(shí)時(shí)處理領(lǐng)域。
消息隊(duì)列
傳統(tǒng)消息隊(duì)列與新式消息隊(duì)列模式如下圖:
上面是傳統(tǒng)的消息隊(duì)列,比如一個(gè)用戶要注冊(cè)信息,當(dāng)用戶信息寫入數(shù)據(jù)庫(kù)后,后面還有一些其他流程,比如發(fā)送短信,則需要等這些流程處理完成后,再返回給用戶。
而新式隊(duì)列,比如一個(gè)用戶注冊(cè)信息,數(shù)據(jù)直接丟進(jìn)數(shù)據(jù)庫(kù),就直接返回給用戶成功。
使用消息隊(duì)列的好處如下:
- 解耦
- 可恢復(fù)性
- 緩沖
- 靈活性與峰值處理能力
- 異步通信
消息隊(duì)列的模式如下:
①點(diǎn)對(duì)點(diǎn)模式:消息生產(chǎn)者發(fā)送消息到消息隊(duì)列中,然后消息消費(fèi)者從隊(duì)列中取出并且消費(fèi)消息,消息被消費(fèi)后,隊(duì)列中不在存儲(chǔ)。
所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息;隊(duì)列支持存在多個(gè)消費(fèi)者,但是對(duì)于一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi);如果想發(fā)給多個(gè)消費(fèi)者,則需要多次發(fā)送該條消息。
②發(fā)布/訂閱模式(一對(duì)多,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會(huì)清除消息):消息生產(chǎn)者將消息發(fā)布到 Topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。
和點(diǎn)對(duì)點(diǎn)的方式不同,發(fā)布到 Topic 的消息會(huì)被所有的訂閱者消費(fèi);但是數(shù)據(jù)保留是有期限的,默認(rèn)是 7 天,因?yàn)樗皇谴鎯?chǔ)系統(tǒng)。
Kafka 就是這種模式的。有兩種方式,一種是消費(fèi)者去主動(dòng)去消費(fèi)(拉取)消息,而不是生產(chǎn)者推送消息給消費(fèi)者;另外一種就是生產(chǎn)者主動(dòng)推送消息給消費(fèi)者,類似公眾號(hào)。
Kafka 基礎(chǔ)架構(gòu)
Kafka 的架構(gòu)如下圖:
Kafka 的基礎(chǔ)架構(gòu)主要有 Broker、生產(chǎn)者、消費(fèi)者組構(gòu)成,當(dāng)前還包括 ZooKeeper。
生產(chǎn)者負(fù)責(zé)發(fā)送消息,Broker 負(fù)責(zé)緩沖消息,Broker 中可以創(chuàng)建 Topic,每個(gè) Topic 又有 Partition 和 Replication 的概念。
消費(fèi)者組負(fù)責(zé)處理消息,同一個(gè)消費(fèi)者組的消費(fèi)者不能消費(fèi)同一個(gè) Partition 中的數(shù)據(jù)。
消費(fèi)者組主要是提高消費(fèi)能力,比如之前是一個(gè)消費(fèi)者消費(fèi) 100 條數(shù)據(jù),現(xiàn)在是 2 個(gè)消費(fèi)者消費(fèi) 100 條數(shù)據(jù),可以提高消費(fèi)能力。
所以消費(fèi)者組的消費(fèi)者的個(gè)數(shù)要小于 Partition 的個(gè)數(shù),不然就會(huì)有消費(fèi)者沒有 Partition 可以消費(fèi),造成資源的浪費(fèi)。
注意:不同消費(fèi)者組的消費(fèi)者是可以消費(fèi)相同的 Partition 數(shù)據(jù)。
Kakfa 如果要組件集群,則只需要注冊(cè)到一個(gè) ZooKeeper 中就可以了,ZooKeeper 中還保留消息消費(fèi)的進(jìn)度或者說偏移量或者消費(fèi)位置:
- 0.9 之前的版本偏移量存儲(chǔ)在 ZooKeeper。
- 0.9 之后的版本偏移量存儲(chǔ)在 Kafka中。Kafka 定義了一個(gè)系統(tǒng) Topic,專用用來存儲(chǔ)偏移量的數(shù)據(jù)。
為什么要改?主要是考慮到頻繁更改偏移量,對(duì) ZooKeeper 的壓力較大,而且 Kafka 本身自己的處理也較復(fù)雜。
安裝 Kafka
①Kafka 的安裝只需要解壓安裝包就可以完成安裝。
tar -zxvf kafka_2.11-2.1.1.tgz -C /usr/local/
②查看配置文件:
- [root@es1 config]# pwd
- /usr/local/kafka/config
- [root@es1 config]# ll
- total 84
- -rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties
- -rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties
- -rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties
- -rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties
- -rw-r--r--. 1 root root 881 Feb 8 2019 connect-file-source.properties
- -rw-r--r--. 1 root root 1111 Feb 8 2019 connect-log4j.properties
- -rw-r--r--. 1 root root 2262 Feb 8 2019 connect-standalone.properties
- -rw-r--r--. 1 root root 1221 Feb 8 2019 consumer.properties
- -rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties
- -rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties
- -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties
- -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties
- -rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties
- -rw-r--r--. 1 root root 1032 Feb 8 2019 tools-log4j.properties
- -rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf
- -rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties
③修改配置文件 server.properties。
設(shè)置 broker.id 這個(gè)是 Kafka 集群區(qū)分每個(gè)節(jié)點(diǎn)的唯一標(biāo)志符。
④設(shè)置 Kafka 的數(shù)據(jù)存儲(chǔ)路徑:
注意:這個(gè)目錄下不能有其他非 Kafka 目錄,不然會(huì)導(dǎo)致 Kafka 集群無法啟動(dòng)。
⑤設(shè)置是否可以刪除 Topic,默認(rèn) Kafka 的 Topic 是不允許刪除的。
⑥Kafka 的數(shù)據(jù)保留的時(shí)間,默認(rèn)是 7 天。
⑦Log 文件最大的大小,如果 Log 文件超過 1 G 會(huì)創(chuàng)建一個(gè)新的文件。
⑧Kafka 連接的 ZooKeeper 的地址和連接 Kafka 的超時(shí)時(shí)間。
⑨默認(rèn)的 Partition 的個(gè)數(shù)。
啟動(dòng) Kafka
①啟動(dòng)方式一,Kafka 只能單節(jié)點(diǎn)啟動(dòng),所以每個(gè) Kakfa 節(jié)點(diǎn)都需要手動(dòng)啟動(dòng),下面的方式是以阻塞的方式啟動(dòng)。
②啟動(dòng)方式二,守護(hù)的方式啟動(dòng),推薦使用。
Kafka 操作
①查看當(dāng)前 Kafka 集群已有的 Topic。
注意:這里連接的 ZooKeeper,而不是連接的 Kafka。
②創(chuàng)建 Topic,指定分片和副本個(gè)數(shù)。
說明:replication-factor 副本數(shù),replication-factor 分區(qū)數(shù),topic 主題名。
如果當(dāng)前 Kafka 集群只有 3 個(gè) Broker 節(jié)點(diǎn),則 replication-factor 最大就是 3 了,下面的例子創(chuàng)建副本為 4,則會(huì)報(bào)錯(cuò)。
③刪除 Topic。
④查看 Topic 信息。
啟動(dòng)生產(chǎn)者生產(chǎn)消息
Kafka 自帶一個(gè)生產(chǎn)者和消費(fèi)者的客戶端。
①啟動(dòng)一個(gè)生產(chǎn)者,注意此時(shí)連的 9092 端口,連接的 Kafka 集群。
②啟動(dòng)一個(gè)消費(fèi)者,注意此時(shí)連接的還是 9092 端口,在 0.9 版本之前連接的還是 2181 端口。
這里我們啟動(dòng) 2 個(gè)消費(fèi)者來測(cè)試一下。
說明:如果不指定消費(fèi)者組的配置文件的話,默認(rèn)每個(gè)消費(fèi)者都屬于不同的消費(fèi)者組。
③發(fā)送消息,可以看到每個(gè)消費(fèi)者都能收到消息。
④Kakfa 中的實(shí)際數(shù)據(jù)。
Kafka 架構(gòu)深入
Kafka 不能保證消息的全局有序,只能保證消息在 Partition 內(nèi)有序,因?yàn)橄M(fèi)者消費(fèi)消息是在不同的 Partition 中隨機(jī)的。
Kafka 的工作流程
Kafka 中的消息是以 Topic 進(jìn)行分類的,生產(chǎn)者生成消息、消費(fèi)者消費(fèi)消息都面向 Topic。
Topic 是一個(gè)邏輯上的概念,而 Partition 是物理上的概念。每個(gè) Partition 又有副本的概念。
每個(gè) Partition 對(duì)應(yīng)于一個(gè) Log 文件,該 Log 文件中存儲(chǔ)的就是生產(chǎn)者生成的數(shù)據(jù),生產(chǎn)者生成的數(shù)據(jù)會(huì)不斷的追加到該 Log 的文件末端。
且每條數(shù)據(jù)都有自己的 Offset,消費(fèi)者都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了那個(gè) Offset,以便出錯(cuò)的時(shí)候從上次的位置繼續(xù)消費(fèi),這個(gè) Offset 就保存在 Index 文件中。
Kafka 的 Offset 是分區(qū)內(nèi)有序的,但是在不同分區(qū)中是無順序的,Kafka 不保證數(shù)據(jù)的全局有序。
Kafka 原理
由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 Log 文件的末尾,為防止 Log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采用分片和索引的機(jī)制,將每個(gè) Partition 分為多個(gè) Segment,每個(gè) Segment 對(duì)應(yīng) 2 個(gè)文件 Index 文件和 Log 文件。
兩個(gè)文件位于一個(gè)相同的文件夾下,文件夾的命名規(guī)則為:Topic 名稱+分區(qū)序號(hào)。
Index 和 Log 的文件的文件名是當(dāng)前這個(gè)索引是最小的數(shù)據(jù)的 Offset。Kafka 如何快速的消費(fèi)數(shù)據(jù)呢?
Index 文件中存儲(chǔ)的數(shù)據(jù)的索引信息,第一列是 Offset,第二列這個(gè)數(shù)據(jù)所對(duì)應(yīng)的 Log 文件中的偏移量,就像我們?nèi)プx文件,使用 seek() 設(shè)置當(dāng)前鼠標(biāo)的位置一樣,可以更快的找到數(shù)據(jù)。
如果要去消費(fèi) Offset 為 3 的數(shù)據(jù),首先通過二分法找到數(shù)據(jù)在哪個(gè) Index 文件中,然后在通過 Index 中 Offset 找到數(shù)據(jù)在 Log 文件中的 Offset;這樣就可以快速的定位到數(shù)據(jù),并消費(fèi)。
所以,Kakfa 雖然把數(shù)據(jù)存儲(chǔ)在磁盤中,但是他的讀取速度還是非??斓?。
Kafka 生產(chǎn)者和消費(fèi)者
Kafka 生產(chǎn)者
Kafka 的 Partition 分區(qū)的作用:Kafka 分區(qū)的原因主要就是提供并發(fā)提高性能,因?yàn)樽x寫是 Partition 為單位讀寫的。
那生產(chǎn)者發(fā)送消息是發(fā)送到哪個(gè) Partition 中呢?
在客戶端中指定 Partition。
輪詢(推薦)消息 1 去 p1,消息 2 去 p2,消息 3 去 p3,消息 4 去 p1,消息 5 去 p2,消息 6 去 p3……
Kafka 如何保證數(shù)據(jù)可靠性
Kafka 如何保證數(shù)據(jù)可靠性呢?通過 Ack 來保證!
為保證生產(chǎn)者發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的 Topic,Topic 的每個(gè) Partition 收到生產(chǎn)者發(fā)送的數(shù)據(jù)后,都需要向生產(chǎn)者發(fā)送 Ack(確認(rèn)收到),如果生產(chǎn)者收到 Ack,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。
那么 Kafka 什么時(shí)候向生產(chǎn)者發(fā)送 Ack?確保 Follower 和 Leader 同步完成,Leader 在發(fā)送 Ack 給生產(chǎn)者,這樣才能確保 Leader 掛掉之后,能在 Follower 中選舉出新的 Leader 后,數(shù)據(jù)不會(huì)丟失。
那多少個(gè) Follower 同步完成后發(fā)送 Ack?
- 方案 1:半數(shù)已經(jīng)完成同步,就發(fā)送 Ack。
- 方案 2:全部完成同步,才發(fā)送 Ack(Kafka 采用這種方式)。
采用第二種方案后,設(shè)想以下場(chǎng)景:Leader 收到數(shù)據(jù),所有的 Follower 都開始同步數(shù)據(jù),但是有一個(gè) Follower 因?yàn)槟撤N故障,一直無法完成同步,那 Leader 就要一直等下,直到他同步完成,才能發(fā)送 Ack。
這樣就非常影響效率,這個(gè)問題怎么解決?
Leader 維護(hù)了一個(gè)動(dòng)態(tài)的 ISR 列表(同步副本的作用),只需要這個(gè)列表中的 Follower 和 Leader 同步。
當(dāng) ISR 中的 Follower 完成數(shù)據(jù)的同步之后,Leader 就會(huì)給生產(chǎn)者發(fā)送 Ack,如果 Follower 長(zhǎng)時(shí)間未向 Leader 同步數(shù)據(jù),則該 Follower 將被剔除 ISR,這個(gè)時(shí)間閾值也是自定義的。
同樣 Leader 故障后,就會(huì)從 ISR 中選舉新的 Leader。
怎么選擇 ISR 的節(jié)點(diǎn)呢?首先通信的時(shí)間要快,要和 Leader 可以很快的完成通信,這個(gè)時(shí)間默認(rèn)是 10s。
然后就看 Leader 數(shù)據(jù)差距,消息條數(shù)默認(rèn)是 10000 條(后面版本被移除)。
為什么移除?因?yàn)?Kafka 發(fā)送消息是批量發(fā)送的,所以會(huì)一瞬間 Leader 接受完成,但是 Follower 還沒有拉取,所以會(huì)頻繁踢出和加入 ISR,這個(gè)數(shù)據(jù)會(huì)保存到 ZooKeeper 和內(nèi)存中,所以會(huì)頻繁更新 ZooKeeper 和內(nèi)存。
但是對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
所以 Kafka 為用戶提供了三種可靠性級(jí)別,用戶可以根據(jù)可靠性和延遲進(jìn)行權(quán)衡,這個(gè)設(shè)置在 kafka 的生成中設(shè)置:Ack 參數(shù)設(shè)置。
①Acks 為 0:生產(chǎn)者不等 Ack,只管往 Topic 丟數(shù)據(jù)就可以了,這個(gè)丟數(shù)據(jù)的概率非常高。
②Ack 為 1:leader 落盤后就會(huì)返回 Ack,會(huì)有數(shù)據(jù)丟失的現(xiàn)象,如果 leader 在同步完成后出現(xiàn)故障,則會(huì)出現(xiàn)數(shù)據(jù)丟失。
③Ack 為 -1(all):Leader 和 Follower(ISR)落盤才會(huì)返回 Ack,會(huì)有數(shù)據(jù)重復(fù)現(xiàn)象,如果在 Leader 已經(jīng)寫完成,且 Follower 同步完成,但是在返回 Ack 時(shí)出現(xiàn)故障,則會(huì)出現(xiàn)數(shù)據(jù)重復(fù)現(xiàn)象。
極限情況下,這個(gè)也會(huì)有數(shù)據(jù)丟失的情況,比如 Follower 和 Leader 通信都很慢,所以 ISR 中只有一個(gè) Leader 節(jié)點(diǎn)。
這個(gè)時(shí)候,Leader 完成落盤,就會(huì)返回 Ack,如果此時(shí) Leader 故障后,就會(huì)導(dǎo)致丟失數(shù)據(jù)。
Kafka 如何保證消費(fèi)數(shù)據(jù)一致性
Kafka 如何保證消費(fèi)數(shù)據(jù)的一致性?通過 HW 來保證:
- LEO:指每個(gè) Follower 的最大的 Offset。
- HW(高水位):指消費(fèi)者能見到的最大的 Offset,LSR 隊(duì)列中最小的 LEO,也就是說消費(fèi)者只能看到 1~6 的數(shù)據(jù),后面的數(shù)據(jù)看不到,也消費(fèi)不了。
避免 Leader 掛掉后,比如當(dāng)前消費(fèi)者消費(fèi) 8 這條數(shù)據(jù)后,Leader 掛了,此時(shí)比如 f2 成為 Leader,f2 根本就沒有 9 這條數(shù)據(jù),那么消費(fèi)者就會(huì)報(bào)錯(cuò),所以設(shè)計(jì)了 HW 這個(gè)參數(shù),只暴露最少的數(shù)據(jù)給消費(fèi)者,避免上面的問題。
HW 保證數(shù)據(jù)存儲(chǔ)的一致性:
①Follower 故障:Follower 發(fā)生故障后會(huì)被臨時(shí)踢出 LSR,待該 Follower 恢復(fù)后,F(xiàn)ollower 會(huì)讀取本地的磁盤記錄的上次的 HW,并將該 Log 文件高于 HW 的部分截取掉,從 HW 開始向 Leader 進(jìn)行同步,等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 LSR。
②Leader 故障:Leader 發(fā)生故障后,會(huì)從 ISR 中選出一個(gè)新的 Leader,之后,為了保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 Follower 會(huì)先將各自的 Log 文件高于 HW 的部分截掉(新 Leader 自己不會(huì)截掉),然后從新的 Leader 同步數(shù)據(jù)。
注意:這個(gè)是為了保證多個(gè)副本間的數(shù)據(jù)存儲(chǔ)的一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
精準(zhǔn)一次(冪等性),保證數(shù)據(jù)不重復(fù):
- Ack 設(shè)置為 -1,則可以保證數(shù)據(jù)不丟失,但是會(huì)出現(xiàn)數(shù)據(jù)重復(fù)(at least once)。
- Ack 設(shè)置為 0,則可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失(at most once)。
但是如果魚和熊掌兼得,該怎么辦?這個(gè)時(shí)候就就引入了 Exact Once(精準(zhǔn)一次)。
在 0.11 版本后,引入冪等性解決 Kakfa 集群內(nèi)部的數(shù)據(jù)重復(fù),在 0.11 版本之前,在消費(fèi)者處自己做處理。
如果啟用了冪等性,則 Ack 默認(rèn)就是 -1,Kafka 就會(huì)為每個(gè)生產(chǎn)者分配一個(gè) Pid,并未每條消息分配 Seqnumber。
如果 Pid、Partition、Seqnumber 三者一樣,則 Kafka 認(rèn)為是重復(fù)數(shù)據(jù),就不會(huì)落盤保存。
但是如果生產(chǎn)者掛掉后,也會(huì)出現(xiàn)有數(shù)據(jù)重復(fù)的現(xiàn)象;所以冪等性解決在單次會(huì)話的單個(gè)分區(qū)的數(shù)據(jù)重復(fù),但是在分區(qū)間或者跨會(huì)話的是數(shù)據(jù)重復(fù)的是無法解決的。
Kafka 消費(fèi)者
①消費(fèi)方式
消息隊(duì)列有兩種消費(fèi)消息的方式,Push(微信公眾號(hào))Pull(kafka)。
Push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄M(fèi)發(fā)送速率是由 Broker 決定的,他的目標(biāo)是盡可能以最快的的速度傳遞消息。
但是這樣很容易造成消費(fèi)者來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 Pull 的方式可以消費(fèi)者的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
Pull 模式的不足之處是如果 Kafka 沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入死循環(huán),一直返回空數(shù)據(jù),針對(duì)這一點(diǎn),Kafka 消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)候回傳遞一個(gè) Timeout 參數(shù),如果當(dāng)時(shí)沒有數(shù)據(jù)可供消費(fèi),消費(fèi)者會(huì)等待一段時(shí)間在返回。
②分區(qū)分配策略
一個(gè)消費(fèi)者組有多個(gè)消費(fèi)者,一個(gè) Topic 有多個(gè) Partition。所以必然會(huì)涉及到 Partition 的分配問題,即確定哪個(gè) Partition 由哪個(gè)消費(fèi)者來消費(fèi)。
Kafka 提供兩種方式,一種是輪詢(RountRobin)對(duì)于 Topic 組生效,一種是(Range)對(duì)于單個(gè) Topic 生效。
輪詢:前置條件是需要一個(gè)消費(fèi)者里的消費(fèi)者訂閱的是相同的 Topic。不然就會(huì)出現(xiàn)問題;非默認(rèn)的的方式。
同一個(gè)消費(fèi)者組里的消費(fèi)者不能同時(shí)消費(fèi)同一個(gè)分區(qū),比如三個(gè)消費(fèi)者消費(fèi)一個(gè) Topic 的 9 個(gè)分區(qū)。
如果一個(gè)消費(fèi)者組里有 2 個(gè)消費(fèi)者,這個(gè)消費(fèi)者組里同時(shí)消費(fèi) 2 個(gè) Topic,每個(gè) Topic 又有三個(gè) Partition。
首先會(huì)把 2 個(gè) Topic 當(dāng)做一個(gè)主題,然后根據(jù) Topic 和 Partition 做 Hash,然后在按照 Hash 排序。然后輪詢分配給一個(gè)消費(fèi)者組中的 2 個(gè)消費(fèi)者。
如果是下面這樣的方式訂閱的呢?比如有 3 個(gè) Topic,每個(gè) Topic 有 3 個(gè) Partition,一個(gè)消費(fèi)者組中有 2 個(gè)消費(fèi)者。
消費(fèi)者 1 訂閱 Topic1 和 Topic2,消費(fèi)者 2 訂閱 Topic2 和 Topic3。那么這樣的場(chǎng)景,使用輪詢的方式訂閱 Topic 就會(huì)有問題。
如果是下面這種方式訂閱呢?比如有 2 個(gè) Topic,每個(gè) Topic 有 3 個(gè) Partition,一個(gè)消費(fèi)者組有 2 個(gè)消費(fèi)者,消費(fèi)者 1 訂閱 Topic1,消費(fèi)者 2 訂閱 Topic2,這樣使用輪詢的方式訂閱 Topic 也會(huì)有問題。
所以我們一直強(qiáng)調(diào),使用輪詢的方式訂閱 Topic 的前提是一個(gè)消費(fèi)者組中的所有消費(fèi)者訂閱的主題是一樣的;所以輪詢的方式不是 Kafka 默認(rèn)的方式;Range 是按照單個(gè) Topic 來劃分的,默認(rèn)的分配方式。
Range 的問題會(huì)出現(xiàn)消費(fèi)者數(shù)據(jù)不均衡的問題。比如下面的例子,一個(gè)消費(fèi)者組訂閱了 2 個(gè) Topic,就會(huì)出現(xiàn)消費(fèi)者 1 消費(fèi) 4 個(gè) Partition,而另外一個(gè)消費(fèi)者只消費(fèi) 2 個(gè) Partition。
分區(qū)策略什么時(shí)候會(huì)觸發(fā)呢?當(dāng)消費(fèi)者組里的消費(fèi)者個(gè)數(shù)變化的時(shí)候,會(huì)觸發(fā)分區(qū)策略調(diào)整,比如消費(fèi)者里增加消費(fèi)者,或者減少消費(fèi)者。
③維護(hù) Offset
由于消費(fèi)者在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,消費(fèi)者恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以消費(fèi)者需要實(shí)施記錄自己消費(fèi)哪個(gè) Offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Offset 保存的位置有 2 個(gè),一個(gè) ZooKeeper,一個(gè)是 Kafka。首先看下 Offset 保存到 ZooKeeper,由消費(fèi)者組、Topic、Partition 三個(gè)元素確定唯一的 Offset。
所以消費(fèi)者組中的某個(gè)消費(fèi)者掛掉之后,或者消費(fèi)者還是可以拿到這個(gè) Offset。
Controller 這個(gè)節(jié)點(diǎn)和 ZooKeeper 通信,同步數(shù)據(jù),這個(gè)節(jié)點(diǎn)就是誰先起來,誰就先注冊(cè) Controller,誰就是 Controller。其他節(jié)點(diǎn)和 Controller 信息保持同步。
④消費(fèi)者組的案例
修改消費(fèi)者組 id:
啟動(dòng)一個(gè)消費(fèi)者發(fā)送 3 條數(shù)據(jù):
指定消費(fèi)者組啟動(dòng)消費(fèi)者,啟動(dòng)三個(gè)消費(fèi)者,可以看到每個(gè)消費(fèi)者消費(fèi)了一條數(shù)據(jù)。
在演示下不同組可以消費(fèi)同一個(gè) Topic 的,我們看到 2 個(gè)消費(fèi)者的消費(fèi)者都消費(fèi)到同一條數(shù)據(jù)。再次啟動(dòng)一個(gè)消費(fèi)者,這個(gè)消費(fèi)者屬于另外一個(gè)消費(fèi)者組。
Kafka 的高效讀寫機(jī)制
分布式部署
多節(jié)點(diǎn)并行操作。
順序?qū)懘疟P
Kafka 的 producer 生產(chǎn)數(shù)據(jù),要寫入到 log 文件中,寫的過程中一直追加到文件末尾,為順序?qū)懀倬W(wǎng)有數(shù)據(jù)表明。
同樣的磁盤,順序?qū)懩艿?600M/S,而隨機(jī)寫只有 100K/S。這與磁盤的機(jī)械結(jié)構(gòu)有關(guān),順序?qū)懼钥?,是因?yàn)槠涫∪チ舜罅看蓬^尋址的時(shí)間。
零復(fù)制技術(shù)
正常情況下,先把數(shù)據(jù)讀到內(nèi)核空間,在從內(nèi)核空間把數(shù)據(jù)讀到用戶空間,然后在調(diào)操作系統(tǒng)的 IO 接口寫到內(nèi)核空間,最終在寫到硬盤中。
Kafka 是這樣做的,直接在內(nèi)核空間流轉(zhuǎn) IO 流,所以 Kafka 的性能非常高。
ZooKeeper 在 Kafka 中的作用
Kafka 集群中有一個(gè) Broker 會(huì)被選舉為 Controller,負(fù)責(zé)管理集群 Broker 的上下線,所有的 Topic 的分區(qū)副本分配和 Leader 選舉等工作。