圖解 Kafka 架構(gòu)與工作原理
一、認識kafka
面試官提問:什么是 Kafka ?用來干嘛的?
官方定義如下:
Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
翻譯過來,大致的意思就是,這是一個實時數(shù)據(jù)處理系統(tǒng),可以橫向擴展,并高可靠!
實時數(shù)據(jù)處理,從名字上看,很好理解,就是將數(shù)據(jù)進行實時處理,在現(xiàn)在流行的微服務開發(fā)中,最常用實時數(shù)據(jù)處理平臺有 RabbitMQ、RocketMQ 等消息中間件。
這些中間件,最大的特點主要有兩個:
- 服務解耦
- 流量削峰
在早期的 web 應用程序開發(fā)中,當請求量突然上來了時候,我們會將要處理的數(shù)據(jù)推送到一個隊列通道中,然后另起一個線程來不斷輪訓拉取隊列中的數(shù)據(jù),從而加快程序的運行效率。
但是隨著請求量不斷的增大,并且隊列通道的數(shù)據(jù)一致處于高負載,在這種情況下,應用程序的內(nèi)存占用率會非常高,稍有不慎,會出現(xiàn)內(nèi)存不足,造成程序內(nèi)存溢出,從而導致服務不可用。
隨著業(yè)務量的不斷擴張,在一個應用程序內(nèi),使用這種模式已然無法滿足需求,因此之后,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ 等中間件。
采用這種模型,本質(zhì)就是將要推送的數(shù)據(jù),不在存放在當前應用程序的內(nèi)存中,而是將數(shù)據(jù)存放到另一個專門負責數(shù)據(jù)處理的應用程序中,從而實現(xiàn)服務解耦。
消息中間件:主要的職責就是保證能接受到消息,并將消息存儲到磁盤,即使其他服務都掛了,數(shù)據(jù)也不會丟失,同時還可以對數(shù)據(jù)消費情況做好監(jiān)控工作。
應用程序:只需要將消息推送到消息中間件,然后啟用一個線程來不斷從消息中間件中拉取數(shù)據(jù),進行消費確認即可!
引入消息中間件之后,整個服務開發(fā)會變得更加簡單,各負其責。
Kafka 本質(zhì)其實也是消息中間件的一種,Kafka 出自于 LinkedIn 公司,與 2010 年開源到 github。
LinkedIn 的開發(fā)團隊,為了解決數(shù)據(jù)管道問題,起初采用了 ActiveMQ 來進行數(shù)據(jù)交換,大約是在 2010 年前后,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數(shù)據(jù)傳遞系統(tǒng)的要求,經(jīng)常由于各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發(fā)自己的消息傳遞系統(tǒng),Kafka 由此誕生。
在 LinkedIn 公司,Kafka 可以有效地處理每天數(shù)十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經(jīng)被業(yè)界所認可,并成為大數(shù)據(jù)流水線的首選技術(shù)。
二、架構(gòu)介紹
先來看一張圖,下面這張圖就是 kafka 生產(chǎn)與消費的核心架構(gòu)模型!
如果你看不懂這些概念沒關(guān)系,我會帶著大家一起梳理一遍!
- Producer:Producer 即生產(chǎn)者,消息的產(chǎn)生者,是消息的入口
- Broker:Broker 是 kafka 一個實例,每個服務器上有一個或多個 kafka 的實例,簡單的理解就是一臺 kafka 服務器,kafka cluster表示集群的意思
- Topic:消息的主題,可以理解為消息隊列,kafka的數(shù)據(jù)就保存在topic。在每個 broker 上都可以創(chuàng)建多個 topic 。
- Partition:Topic的分區(qū),每個 topic 可以有多個分區(qū),分區(qū)的作用是做負載,提高 kafka 的吞吐量。同一個 topic 在不同的分區(qū)的數(shù)據(jù)是不重復的,partition 的表現(xiàn)形式就是一個一個的文件夾!
- Replication:每一個分區(qū)都有多個副本,副本的作用是做備胎,主分區(qū)(Leader)會將數(shù)據(jù)同步到從分區(qū)(Follower)。當主分區(qū)(Leader)故障的時候會選擇一個備胎(Follower)上位,成為 Leader。在kafka中默認副本的最大數(shù)量是10個,且副本的數(shù)量不能大于Broker的數(shù)量,follower和leader絕對是在不同的機器,同一機器對同一個分區(qū)也只可能存放一個副本
- Message:每一條發(fā)送的消息主體。
- Consumer:消費者,即消息的消費方,是消息的出口。
- Consumer Group:我們可以將多個消費組組成一個消費者組,在 kafka 的設(shè)計中同一個分區(qū)的數(shù)據(jù)只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區(qū)的數(shù)據(jù),這也是為了提高kafka的吞吐量!
- Zookeeper:kafka 集群依賴 zookeeper 來保存集群的的元信息,來保證系統(tǒng)的可用性。
簡而言之,kafka 本質(zhì)就是一個消息系統(tǒng),與大多數(shù)的消息系統(tǒng)一樣,主要的特點如下:
- 使用推拉模型將生產(chǎn)者和消費者分離
- 為消息傳遞系統(tǒng)中的消息數(shù)據(jù)提供持久性,以允許多個消費者
- 提供高可用集群服務,主從模式,同時支持橫向水平擴展
與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一個分區(qū)Partition的概念。
這個分區(qū)的意思就是說,如果你創(chuàng)建的topic有5個分區(qū),當你一次性向 kafka 中推 1000 條數(shù)據(jù)時,這 1000 條數(shù)據(jù)默認會分配到 5 個分區(qū)中,其中每個分區(qū)存儲 200 條數(shù)據(jù)。
這樣做的目的,就是方便消費者從不同的分區(qū)拉取數(shù)據(jù),假如你啟動 5 個線程同時拉取數(shù)據(jù),每個線程拉取一個分區(qū),消費速度會非常非???
這是 kafka 與其他的消息系統(tǒng)最大的不同!
2.1、發(fā)送數(shù)據(jù)
和其他的中間件一樣,kafka 每次發(fā)送數(shù)據(jù)都是向Leader分區(qū)發(fā)送數(shù)據(jù),并順序?qū)懭氲酱疟P,然后Leader分區(qū)會將數(shù)據(jù)同步到各個從分區(qū)Follower,即使主分區(qū)掛了,也不會影響服務的正常運行。
那 kafka 是如何將數(shù)據(jù)寫入到對應的分區(qū)呢?kafka中有以下幾個原則:
- 1、數(shù)據(jù)在寫入的時候可以指定需要寫入的分區(qū),如果有指定,則寫入對應的分區(qū)
- 2、如果沒有指定分區(qū),但是設(shè)置了數(shù)據(jù)的key,則會根據(jù)key的值hash出一個分區(qū)
- 3、如果既沒指定分區(qū),又沒有設(shè)置key,則會輪詢選出一個分區(qū)
2.2、消費數(shù)據(jù)
與生產(chǎn)者一樣,消費者主動的去kafka集群拉取消息時,也是從Leader分區(qū)去拉取數(shù)據(jù)。
這里我們需要重點了解一個名詞:消費組!
考慮到多個消費者的場景,kafka 在設(shè)計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區(qū)的數(shù)據(jù),同一個分區(qū)只會被一個消費組內(nèi)的某個消費者所消費,防止出現(xiàn)重復消費的問題!
但是不同的組,可以消費同一個分區(qū)的數(shù)據(jù)!
你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。
但是,如果一個組下的消費者數(shù)量大于分區(qū)數(shù)量,就會出現(xiàn)很多的消費者閑置。
如果分區(qū)數(shù)量大于一個組下的消費者數(shù)量,會出現(xiàn)一個消費者負責多個分區(qū)的消費,會出現(xiàn)消費性能不均衡的情況。
因此,在實際的應用中,建議消費者組的consumer的數(shù)量與partition的數(shù)量保持一致!
三、kafka 安裝
光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。
kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。
3.1、安裝zookeeper
zookeeper 安裝環(huán)境依賴于 jdk,因此我們需要事先安裝 jdk
- # 安裝jdk1.8
- yum -y install java-1.8.0-openjdk
下載zookeeper,并解壓文件包
- #在線下載zookeeper
- wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
- #解壓
- tar -zxvf zookeeper-3.4.12.tar.gz
創(chuàng)建數(shù)據(jù)、日志目錄
- #創(chuàng)建數(shù)據(jù)和日志存放目錄
- cd /usr/zookeeper/
- mkdir data
- mkdir log
- #把conf下的zoo_sample.cfg備份一份,然后重命名為zoo.cfg
- cd conf/
- cp zoo_sample.cfg zoo.cfg
配置zookeeper
- #編輯zoo.cfg文件
- vim zoo.cfg
重新配置dataDir和dataLogDir的存儲路徑
最后,啟動 Zookeeper 服務
- #進入Zookeeper的bin目錄
- cd zookeeper/zookeeper-3.4.12/bin
- #啟動Zookeeper
- ./zkServer.sh start
- #查詢Zookeeper狀態(tài)
- ./zkServer.sh status
- #關(guān)閉Zookeeper狀態(tài)
- ./zkServer.sh stop
3.2、安裝kafka
到官網(wǎng)http://kafka.apache.org/downloads.html下載想要的版本,我這里下載是最新穩(wěn)定版2.8.0。
- #下載kafka 安裝包
- wget https://apache.osuosl.org/kafka/2.8.0/kafka-2.8.0-src.tgz
- #解壓文件包
- tar -xvf kafka-2.8.0-src.tgz
按需修改配置文件server.properties(可選)
- #進入配置文件夾
- cd kafka-2.8.0-src/config
- #編輯server.properties
- vim server.properties
server.properties文件內(nèi)容如下:
- broker.id=0
- listeners=PLAINTEXT://localhost:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/tmp/kafka-logs
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- zookeeper.connect=localhost:2181
- zookeeper.connection.timeout.ms=6000
- group.initial.rebalance.delay.ms=0
其中有四個重要的參數(shù):
- broker.id:唯一標識ID
- listeners=PLAINTEXT://localhost:9092:kafka服務監(jiān)聽地址和端口
- log.dirs:日志存儲目錄
- zookeeper.connect:指定zookeeper服務地址
可根據(jù)自己需求修改對應的配置!
3.3、啟動 kafka 服務
- # 進入bin腳本目錄
- cd kafka-2.8.0-src/bin
啟動 kafka 服務
- nohup kafka-server-start.sh ../config/server.properties server.log 2> server.err &
3.4、創(chuàng)建主題topics
創(chuàng)建一個名為testTopic的主題,它只包含一個分區(qū),只有一個副本:
- # 進入bin腳本目錄
- cd kafka-2.8.0-src/bin
- #創(chuàng)建topics
- kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
運行l(wèi)ist topic命令,可以看到該主題。
- # 進入bin腳本目錄
- cd kafka-2.8.0-src/bin
- #查詢當前kafka上所有的主題
- kafka-topics.sh --list --zookeeper localhost:2181
輸出內(nèi)容:
- testTopic
3.5、發(fā)送消息
Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,并將其作為消息發(fā)送到 Kafka 集群。默認情況下,每行將作為單獨的消息發(fā)送。
運行生產(chǎn)者,然后在控制臺中鍵入一些消息以發(fā)送到服務器。
- # 進入bin腳本目錄
- cd kafka-2.8.0-src/bin
- #運行一個生產(chǎn)者,向testTopic主題中發(fā)消息
- kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
輸入兩條內(nèi)容并回車:
- Hello kafka!
- This is a message
3.5、接受消息
Kafka 還有一個命令行使用者,它會將消息轉(zhuǎn)儲到標準輸出。
- # 進入bin腳本目錄
- cd kafka-2.8.0-src/bin
- #運行一個消費者,從testTopic主題中拉取消息
- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
輸出結(jié)果如下:
- Hello kafka!
- This is a message
四、小結(jié)
本文主要圍繞 kafka 的架構(gòu)模型和安裝環(huán)境做了一些初步的介紹,難免會有理解不對的地方,歡迎網(wǎng)友批評、吐槽。
由于篇幅原因,會在下期文章中詳細介紹 java 環(huán)境下 kafka 應用場景!
五、參考
1、知乎 - Java團長 - 再過半小時,你就能明白kafka的工作原理了