Kafka的架構(gòu)原理,你真的理解嗎?
Apache Kafka 最早是由 LinkedIn 開源出來(lái)的分布式消息系統(tǒng),現(xiàn)在是 Apache 旗下的一個(gè)子項(xiàng)目,并且已經(jīng)成為開源領(lǐng)域應(yīng)用最廣泛的消息系統(tǒng)之一。
Kafka 社區(qū)非?;钴S,從 0.9 版本開始,Kafka 的標(biāo)語(yǔ)已經(jīng)從“一個(gè)高吞吐量,分布式的消息系統(tǒng)”改為"一個(gè)分布式流平臺(tái)"。
Kafka 和傳統(tǒng)的消息系統(tǒng)不同在于:
- Kafka是一個(gè)分布式系統(tǒng),易于向外擴(kuò)展。
- 它同時(shí)為發(fā)布和訂閱提供高吞吐量。
- 它支持多訂閱者,當(dāng)失敗時(shí)能自動(dòng)平衡消費(fèi)者。
- 消息的持久化。
Kafka 和其他消息隊(duì)列的對(duì)比:
入門實(shí)例
生產(chǎn)者
代碼如下:
- import java.util.Properties;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- public class UserKafkaProducer extends Thread
- {
- private final KafkaProducer<Integer, String> producer;
- private final String topic;
- private final Properties props = new Properties();
- public UserKafkaProducer(String topic)
- {
- props.put("metadata.broker.list", "localhost:9092");
- props.put("bootstrap.servers", "master2:6667");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer<Integer, String>(props);
- this.topic = topic;
- }
- @Override
- public void run() {
- int messageNo = 1;
- while (true)
- {
- String messageStr = new String("Message_" + messageNo);
- System.out.println("Send:" + messageStr);
- //返回的是Future<RecordMetadata>,異步發(fā)送
- producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
- messageNo++;
- try {
- sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
消費(fèi)者
代碼如下:
- Properties props = new Properties();
- /* 定義kakfa 服務(wù)的地址,不需要將所有broker指定上 */
- props.put("bootstrap.servers", "localhost:9092");
- /* 制定consumer group */
- props.put("group.id", "test");
- /* 是否自動(dòng)確認(rèn)offset */
- props.put("enable.auto.commit", "true");
- /* 自動(dòng)確認(rèn)offset的時(shí)間間隔 */
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- /* key的序列化類 */
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- /* value的序列化類 */
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- /* 定義consumer */
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- /* 消費(fèi)者訂閱的topic, 可同時(shí)訂閱多個(gè) */
- consumer.subscribe(Arrays.asList("foo", "bar"));
- /* 讀取數(shù)據(jù),讀取超時(shí)時(shí)間為100ms */
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- }
Kafka 架構(gòu)原理
對(duì)于 Kafka 的架構(gòu)原理,我們先提出如下幾個(gè)問(wèn)題:
- Kafka 的 topic 和分區(qū)內(nèi)部是如何存儲(chǔ)的,有什么特點(diǎn)?
- 與傳統(tǒng)的消息系統(tǒng)相比,Kafka 的消費(fèi)模型有什么優(yōu)點(diǎn)?
- Kafka 如何實(shí)現(xiàn)分布式的數(shù)據(jù)存儲(chǔ)與數(shù)據(jù)讀取?
Kafka 架構(gòu)圖
Kafka 名詞解釋
在一套 Kafka 架構(gòu)中有多個(gè) Producer,多個(gè) Broker,多個(gè) Consumer,每個(gè) Producer 可以對(duì)應(yīng)多個(gè) Topic,每個(gè) Consumer 只能對(duì)應(yīng)一個(gè) Consumer Group。
整個(gè) Kafka 架構(gòu)對(duì)應(yīng)一個(gè) ZK 集群,通過(guò) ZK 管理集群配置,選舉 Leader,以及在 Consumer Group 發(fā)生變化時(shí)進(jìn)行 Rebalance。
Topic 和 Partition
在 Kafka 中的每一條消息都有一個(gè) Topic。一般來(lái)說(shuō)在我們應(yīng)用中產(chǎn)生不同類型的數(shù)據(jù),都可以設(shè)置不同的主題。
一個(gè)主題一般會(huì)有多個(gè)消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個(gè)主題時(shí),訂閱了這個(gè)主題的消費(fèi)者都可以接收到生產(chǎn)者寫入的新消息。
Kafka 為每個(gè)主題維護(hù)了分布式的分區(qū)(Partition)日志文件,每個(gè) Partition 在 Kafka 存儲(chǔ)層面是 Append Log。
任何發(fā)布到此 Partition 的消息都會(huì)被追加到 Log 文件的尾部,在分區(qū)中的每條消息都會(huì)按照時(shí)間順序分配到一個(gè)單調(diào)遞增的順序編號(hào),也就是我們的 Offset。Offset 是一個(gè) Long 型的數(shù)字。
我們通過(guò)這個(gè) Offset 可以確定一條在該 Partition 下的唯一消息。在 Partition 下面是保證了有序性,但是在 Topic 下面沒(méi)有保證有序性。
在上圖中我們的生產(chǎn)者會(huì)決定發(fā)送到哪個(gè) Partition:
如果沒(méi)有 Key 值則進(jìn)行輪詢發(fā)送。
如果有 Key 值,對(duì) Key 值進(jìn)行 Hash,然后對(duì)分區(qū)數(shù)量取余,保證了同一個(gè) Key 值的會(huì)被路由到同一個(gè)分區(qū);如果想隊(duì)列的強(qiáng)順序一致性,可以讓所有的消息都設(shè)置為同一個(gè) Key。
消費(fèi)模型
消息由生產(chǎn)者發(fā)送到 Kafka 集群后,會(huì)被消費(fèi)者消費(fèi)。一般來(lái)說(shuō)我們的消費(fèi)模型有兩種:
- 推送模型(Push)
- 拉取模型(Pull)
基于推送模型的消息系統(tǒng),由消息代理記錄消費(fèi)狀態(tài)。消息代理將消息推送到消費(fèi)者后,標(biāo)記這條消息為已經(jīng)被消費(fèi),但是這種方式無(wú)法很好地保證消費(fèi)的處理語(yǔ)義。
比如當(dāng)我們已經(jīng)把消息發(fā)送給消費(fèi)者之后,由于消費(fèi)進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒(méi)有收到這條消息,如果我們?cè)谙M(fèi)代理將其標(biāo)記為已消費(fèi),這個(gè)消息就***丟失了。
如果我們利用生產(chǎn)者收到消息后回復(fù)這種方法,消息代理需要記錄消費(fèi)狀態(tài),這種不可取。
如果采用 Push,消息消費(fèi)的速率就完全由消費(fèi)代理控制,一旦消費(fèi)者發(fā)生阻塞,就會(huì)出現(xiàn)問(wèn)題。
Kafka 采取拉取模型(Poll),由自己控制消費(fèi)速度,以及消費(fèi)的進(jìn)度,消費(fèi)者可以按照任意的偏移量進(jìn)行消費(fèi)。
比如消費(fèi)者可以消費(fèi)已經(jīng)消費(fèi)過(guò)的消息進(jìn)行重新處理,或者消費(fèi)最近的消息等等。
網(wǎng)絡(luò)模型
Kafka Client:?jiǎn)尉€程 Selector
單線程模式適用于并發(fā)鏈接數(shù)小,邏輯簡(jiǎn)單,數(shù)據(jù)量小的情況。在 Kafka 中,Consumer 和 Producer 都是使用的上面的單線程模式。
這種模式不適合 Kafka 的服務(wù)端,在服務(wù)端中請(qǐng)求處理過(guò)程比較復(fù)雜,會(huì)造成線程阻塞,一旦出現(xiàn)后續(xù)請(qǐng)求就會(huì)無(wú)法處理,會(huì)造成大量請(qǐng)求超時(shí),引起雪崩。而在服務(wù)器中應(yīng)該充分利用多線程來(lái)處理執(zhí)行邏輯。
Kafka Server:多線程 Selector
在 Kafka 服務(wù)端采用的是多線程的 Selector 模型,Acceptor 運(yùn)行在一個(gè)單獨(dú)的線程中,對(duì)于讀取操作的線程池中的線程都會(huì)在 Selector 注冊(cè) Read 事件,負(fù)責(zé)服務(wù)端讀取請(qǐng)求的邏輯。
成功讀取后,將請(qǐng)求放入 Message Queue共享隊(duì)列中。然后在寫線程池中,取出這個(gè)請(qǐng)求,對(duì)其進(jìn)行邏輯處理。
這樣,即使某個(gè)請(qǐng)求線程阻塞了,還有后續(xù)的線程從消息隊(duì)列中獲取請(qǐng)求并進(jìn)行處理,在寫線程中處理完邏輯處理,由于注冊(cè)了 OP_WIRTE 事件,所以還需要對(duì)其發(fā)送響應(yīng)。
高可靠分布式存儲(chǔ)模型
在 Kafka 中保證高可靠模型依靠的是副本機(jī)制,有了副本機(jī)制之后,就算機(jī)器宕機(jī)也不會(huì)發(fā)生數(shù)據(jù)丟失。
高性能的日志存儲(chǔ)
Kafka 一個(gè) Topic 下面的所有消息都是以 Partition 的方式分布式的存儲(chǔ)在多個(gè)節(jié)點(diǎn)上。
同時(shí)在 Kafka 的機(jī)器上,每個(gè) Partition 其實(shí)都會(huì)對(duì)應(yīng)一個(gè)日志目錄,在目錄下面會(huì)對(duì)應(yīng)多個(gè)日志分段(LogSegment)。
LogSegment 文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為 Segment 索引文件和數(shù)據(jù)文件。
這兩個(gè)文件的命令規(guī)則為:Partition 全局的***個(gè) Segment 從 0 開始,后續(xù)每個(gè) Segment 文件名為上一個(gè) Segment 文件***一條消息的 Offset 值,數(shù)值大小為 64 位,20 位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用 0 填充。
如下,假設(shè)有 1000 條消息,每個(gè) LogSegment 大小為 100,下面展現(xiàn)了 900-1000 的索引和 Log:
由于 Kafka 消息數(shù)據(jù)太大,如果全部建立索引,既占了空間又增加了耗時(shí),所以 Kafka 選擇了稀疏索引的方式,這樣索引可以直接進(jìn)入內(nèi)存,加快偏查詢速度。
簡(jiǎn)單介紹一下如何讀取數(shù)據(jù),如果我們要讀取第 911 條數(shù)據(jù)首先***步,找到它是屬于哪一段的。
根據(jù)二分法查找到它屬于的文件,找到 0000900.index 和 00000900.log 之后,然后去 index 中去查找 (911-900) = 11 這個(gè)索引或者小于 11 最近的索引。
在這里通過(guò)二分法我們找到了索引是 [10,1367],然后我們通過(guò)這條索引的物理位置 1367,開始往后找,直到找到 911 條數(shù)據(jù)。
上面講的是如果要找某個(gè) Offset 的流程,但是我們大多數(shù)時(shí)候并不需要查找某個(gè) Offset,只需要按照順序讀即可。
而在順序讀中,操作系統(tǒng)會(huì)在內(nèi)存和磁盤之間添加 Page Cache,也就是我們平常見到的預(yù)讀操作,所以我們的順序讀操作時(shí)速度很快。
但是 Kafka 有個(gè)問(wèn)題,如果分區(qū)過(guò)多,那么日志分段也會(huì)很多,寫的時(shí)候由于是批量寫,其實(shí)就會(huì)變成隨機(jī)寫了,隨機(jī) I/O 這個(gè)時(shí)候?qū)π阅苡绊懞艽蟆K砸话銇?lái)說(shuō) Kafka 不能有太多的 Partition。
針對(duì)這一點(diǎn),RocketMQ 把所有的日志都寫在一個(gè)文件里面,就能變成順序?qū)?,通過(guò)一定優(yōu)化,讀也能接近于順序讀。
大家可以思考一下:
- 為什么需要分區(qū),也就是說(shuō)主題只有一個(gè)分區(qū),難道不行嗎?
- 日志為什么需要分段?
副本機(jī)制
Kafka 的副本機(jī)制是多個(gè)服務(wù)端節(jié)點(diǎn)對(duì)其他節(jié)點(diǎn)的主題分區(qū)的日志進(jìn)行復(fù)制。
當(dāng)集群中的某個(gè)節(jié)點(diǎn)出現(xiàn)故障,訪問(wèn)故障節(jié)點(diǎn)的請(qǐng)求會(huì)被轉(zhuǎn)移到其他正常節(jié)點(diǎn)(這一過(guò)程通常叫 Reblance)。
Kafka 每個(gè)主題的每個(gè)分區(qū)都有一個(gè)主副本以及 0 個(gè)或者多個(gè)副本,副本保持和主副本的數(shù)據(jù)同步,當(dāng)主副本出故障時(shí)就會(huì)被替代。
在 Kafka 中并不是所有的副本都能被拿來(lái)替代主副本,所以在 Kafka 的 Leader 節(jié)點(diǎn)中維護(hù)著一個(gè) ISR(In Sync Replicas)集合。
翻譯過(guò)來(lái)也叫正在同步中集合,在這個(gè)集合中的需要滿足兩個(gè)條件:
- 節(jié)點(diǎn)必須和 ZK 保持連接。
- 在同步的過(guò)程中這個(gè)副本不能落后主副本太多。
另外還有個(gè) AR(Assigned Replicas)用來(lái)標(biāo)識(shí)副本的全集,OSR 用來(lái)表示由于落后被剔除的副本集合。
所以公式如下:ISR = Leader + 沒(méi)有落后太多的副本;AR = OSR+ ISR。
這里先要說(shuō)下兩個(gè)名詞:HW(高水位)是 Consumer 能夠看到的此 Partition 的位置,LEO 是每個(gè) Partition 的 Log ***一條 Message 的位置。
HW 能保證 Leader 所在的 Broker 失效,該消息仍然可以從新選舉的 Leader 中獲取,不會(huì)造成消息丟失。
當(dāng) Producer 向 Leader 發(fā)送數(shù)據(jù)時(shí),可以通過(guò) request.required.acks 參數(shù)來(lái)設(shè)置數(shù)據(jù)可靠性的級(jí)別:
- 1(默認(rèn)):這意味著 Producer 在 ISR 中的 Leader 已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條 Message。如果 Leader 宕機(jī)了,則會(huì)丟失數(shù)據(jù)。
- 0:這意味著 Producer 無(wú)需等待來(lái)自 Broker 的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率***,但是數(shù)據(jù)可靠性卻是***的。
- -1:Producer 需要等待 ISR 中的所有 Follower 都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性***。
但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng) ISR 中只有 Leader 時(shí)(其他節(jié)點(diǎn)都和 ZK 斷開連接,或者都沒(méi)追上),這樣就變成了 acks = 1 的情況。
高可用模型及冪等
在分布式系統(tǒng)中一般有三種處理語(yǔ)義:
at-least-once
至少一次,有可能會(huì)有多次。如果 Producer 收到來(lái)自 Ack 的確認(rèn),則表示該消息已經(jīng)寫入到 Kafka 了,此時(shí)剛好是一次,也就是我們后面的 Exactly-once。
但是如果 Producer 超時(shí)或收到錯(cuò)誤,并且 request.required.acks 配置的不是 -1,則會(huì)重試發(fā)送消息,客戶端會(huì)認(rèn)為該消息未寫入 Kafka。
如果 Broker 在發(fā)送 Ack 之前失敗,但在消息成功寫入 Kafka 之后,這一次重試將會(huì)導(dǎo)致我們的消息會(huì)被寫入兩次。
所以消息就不止一次地傳遞給最終 Consumer,如果 Consumer 處理邏輯沒(méi)有保證冪等的話就會(huì)得到不正確的結(jié)果。
在這種語(yǔ)義中會(huì)出現(xiàn)亂序,也就是當(dāng)***次 Ack 失敗準(zhǔn)備重試的時(shí)候,但是第二消息已經(jīng)發(fā)送過(guò)去了,這個(gè)時(shí)候會(huì)出現(xiàn)單分區(qū)中亂序的現(xiàn)象。
我們需要設(shè)置 Prouducer 的參數(shù) max.in.flight.requests.per.connection,flight.requests 是 Producer 端用來(lái)保存發(fā)送請(qǐng)求且沒(méi)有響應(yīng)的隊(duì)列,保證 Produce r端未響應(yīng)的請(qǐng)求個(gè)數(shù)為 1。
at-most-once
如果在 Ack 超時(shí)或返回錯(cuò)誤時(shí) Producer 不重試,也就是我們講 request.required.acks = -1,則該消息可能最終沒(méi)有寫入 Kafka,所以 Consumer 不會(huì)接收消息。
exactly-once
剛好一次,即使 Producer 重試發(fā)送消息,消息也會(huì)保證最多一次地傳遞給 Consumer。該語(yǔ)義是最理想的,也是最難實(shí)現(xiàn)的。
在 0.10 之前并不能保證 exactly-once,需要使用 Consumer 自帶的冪等性保證。0.11.0 使用事務(wù)保證了。
如何實(shí)現(xiàn) exactly-once
要實(shí)現(xiàn) exactly-once 在 Kafka 0.11.0 中有兩個(gè)官方策略:
單 Producer 單 Topic
每個(gè) Producer 在初始化的時(shí)候都會(huì)被分配一個(gè)唯一的 PID,對(duì)于每個(gè)唯一的 PID,Producer 向指定的 Topic 中某個(gè)特定的 Partition 發(fā)送的消息都會(huì)攜帶一個(gè)從 0 單調(diào)遞增的 Sequence Number。
在我們的 Broker 端也會(huì)維護(hù)一個(gè)維度為,每次提交一次消息的時(shí)候都會(huì)對(duì)齊進(jìn)行校驗(yàn):
- 如果消息序號(hào)比 Broker 維護(hù)的序號(hào)大一以上,說(shuō)明中間有數(shù)據(jù)尚未寫入,也即亂序,此時(shí) Broker 拒絕該消息,Producer 拋出 InvalidSequenceNumber。
- 如果消息序號(hào)小于等于 Broker 維護(hù)的序號(hào),說(shuō)明該消息已被保存,即為重復(fù)消息,Broker 直接丟棄該消息,Producer 拋出 DuplicateSequenceNumber。
- 如果消息序號(hào)剛好大一,就證明是合法的。
上面所說(shuō)的解決了兩個(gè)問(wèn)題:
- 當(dāng) Prouducer 發(fā)送了一條消息之后失敗,Broker 并沒(méi)有保存,但是第二條消息卻發(fā)送成功,造成了數(shù)據(jù)的亂序。
- 當(dāng) Producer 發(fā)送了一條消息之后,Broker 保存成功,Ack 回傳失敗,Producer 再次投遞重復(fù)的消息。
上面所說(shuō)的都是在同一個(gè) PID 下面,意味著必須保證在單個(gè) Producer 中的同一個(gè) Seesion 內(nèi),如果 Producer 掛了,被分配了新的 PID,這樣就無(wú)法保證了,所以 Kafka 中又有事務(wù)機(jī)制去保證。
事務(wù)
在 Kafka 中事務(wù)的作用是:
- 實(shí)現(xiàn) exactly-once 語(yǔ)義。
- 保證操作的原子性,要么全部成功,要么全部失敗。
- 有狀態(tài)的操作的恢復(fù)。
事務(wù)可以保證就算跨多個(gè),在本次事務(wù)中的對(duì)消費(fèi)隊(duì)列的操作都當(dāng)成原子性,要么全部成功,要么全部失敗。
并且,有狀態(tài)的應(yīng)用也可以保證重啟后從斷點(diǎn)處繼續(xù)處理,也即事務(wù)恢復(fù)。
在 Kafka 的事務(wù)中,應(yīng)用程序必須提供一個(gè)唯一的事務(wù) ID,即 Transaction ID,并且宕機(jī)重啟之后,也不會(huì)發(fā)生改變。
Transactin ID 與 PID 可能一一對(duì)應(yīng),區(qū)別在于 Transaction ID 由用戶提供,而 PID 是內(nèi)部的實(shí)現(xiàn)對(duì)用戶透明。
為了 Producer 重啟之后,舊的 Producer 具有相同的 Transaction ID 失效,每次 Producer 通過(guò) Transaction ID 拿到 PID 的同時(shí),還會(huì)獲取一個(gè)單調(diào)遞增的 Epoch。
由于舊的 Producer 的 Epoch 比新 Producer 的 Epoch 小,Kafka 可以很容易識(shí)別出該 Producer 是老的,Producer 并拒絕其請(qǐng)求。
為了實(shí)現(xiàn)這一點(diǎn),Kafka 0.11.0.0 引入了一個(gè)服務(wù)器端的模塊,名為 Transaction Coordinator,用于管理 Producer 發(fā)送的消息的事務(wù)性。
該 Transaction Coordinator 維護(hù) Transaction Log,該 Log 存于一個(gè)內(nèi)部的 Topic 內(nèi)。
由于 Topic 數(shù)據(jù)具有持久性,因此事務(wù)的狀態(tài)也具有持久性。Producer 并不直接讀寫 Transaction Log,它與 Transaction Coordinator 通信,然后由 Transaction Coordinator 將該事務(wù)的狀態(tài)插入相應(yīng)的 Transaction Log。
Transaction Log 的設(shè)計(jì)與 Offset Log 用于保存 Consumer 的 Offset 類似。
***
關(guān)于消息隊(duì)列或者 Kafka 的一些常見的面試題,通過(guò)上面的文章可以提煉出以下幾個(gè)比較經(jīng)典的問(wèn)題,大部分問(wèn)題都可以從上面總結(jié)后找到答案:
- 為什么使用消息隊(duì)列?消息隊(duì)列的作用是什么?
- Kafka 的 Topic 和分區(qū)內(nèi)部是如何存儲(chǔ)的,有什么特點(diǎn)?
- 與傳統(tǒng)的消息系統(tǒng)相比,Kafka 的消費(fèi)模型有什么優(yōu)點(diǎn)?
- Kafka 如何實(shí)現(xiàn)分布式的數(shù)據(jù)存儲(chǔ)與數(shù)據(jù)讀取?
- Kafka 為什么比 RocketMQ 支持的單機(jī) Partition 要少?
- 為什么需要分區(qū),也就是說(shuō)主題只有一個(gè)分區(qū),難道不行嗎?
- 日志為什么需要分段?
- Kafka 是依靠什么機(jī)制保持高可靠,高可用?
- 消息隊(duì)列如何保證消息冪等?
- 讓你自己設(shè)計(jì)個(gè)消息隊(duì)列,你會(huì)怎么設(shè)計(jì),會(huì)考慮哪些方面?
作者:李釗
簡(jiǎn)介:目前就職于美團(tuán)點(diǎn)評(píng)餐飲生態(tài)技術(shù)部,喜歡鉆研閱讀開源源碼。招3年以上Java開發(fā),請(qǐng)發(fā)送簡(jiǎn)歷到郵箱:lizhao07@meituan.com。