走近Kafka:大數(shù)據(jù)領(lǐng)域的不敗王者
一、引言
1.背景
和 RabbitMQ 類似,Kafka(全稱 Apache Kafka)是一個分布式發(fā)布-訂閱消息系統(tǒng)。
自 Apache 2010 年開源這個頂級實用項目以來,至今已有十?dāng)?shù)年,Kafka 仍然是非常熱門的一個消息中間件,在互聯(lián)網(wǎng)應(yīng)用里占據(jù)著舉足輕重的地位。
甚至,技術(shù)圈一度將 Kafka 評為消息隊列大數(shù)據(jù)領(lǐng)域中的最強王者!
Kafka 以其速度快(ms 級的順序?qū)懭牒土憧截悾⑿阅芨撸═B級的高吞吐量)、高可靠(有熱擴展,副本容錯機制能力)和高可用(依賴Zookeeper作分布式協(xié)調(diào))等特點聞名于世,它非常適合消息、日志和大數(shù)據(jù)業(yè)務(wù)的存儲和通信。
本文接下來將會從下載安裝,配置修改,收發(fā)消息等理論和實踐入手,帶大家一起探索 kafka 的核心組件,以及業(yè)務(wù)中常見的數(shù)據(jù)消費問題。
二、kafka下載與安裝
1.前提條件
由于 kafka 需要 JDK 環(huán)境來收發(fā)消息,并通過 ZooKeeper 協(xié)調(diào)服務(wù),將 Producer,Consumer,Broker 等結(jié)合在一起,建立起生產(chǎn)者和消費者的訂閱關(guān)系,實現(xiàn)負載均衡。
所以安裝 kafka 之前,我們需要先:
- 安裝 JDK
- 安裝 Zookeeper
網(wǎng)上安裝教程很多,而本文主要探討 kafka,所以就不再這里給出 JDK 和 zk 的詳細安裝步驟了。
2.下載安裝
安裝 Kafka 時,主要有以下兩種方式(更推薦使用 docker 安裝):
- 虛機安裝官網(wǎng)下載 kafka 壓縮包 [https://kafka.apache.org/downloads],或者使用 docker 下載解壓縮至如下路徑 /opt/usr/kafka 目錄下。
- docker安裝(需先在虛機上安裝 docker):
# 拉取鏡像,默認最新版本
docker pull bitnami/kafka
# 創(chuàng)建網(wǎng)絡(luò)環(huán)境,保證zk和kafka在同一個網(wǎng)絡(luò)中
docker network create kafka-network
# 運行zookeper
docker run -d --name zookeeper --network kafka-network bitnami/zookeeper:latest
#運行kafka,其中:環(huán)境變量KAFKA_CFG_ZOOKEEPER_CONNECT指定ZooKeeper的連接信息,KAFKA_CFG_ADVERTISED_LISTENERS是Kafka對外的訪問地址
docker run -d --name kafka --network kafka-network \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-p 9092:9092 \
bitnami/kafka:latest
3.修改配置文件
進入目錄 /opt/usr/kafka/config,如果是 docker 安裝方式,需先用命令 docker exec -it containerID bash 進入容器,修改 server.properties 文件:
#broker.id屬性在kafka集群中必須要是唯?
broker.id=0
#kafka部署的機器ip和提供服務(wù)的端?號,根據(jù)自己服務(wù)器的網(wǎng)段修改IP
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存儲?件
log.dir=/opt/usr/data
#kafka連接zookeeper的地址,根據(jù)自己服務(wù)器的網(wǎng)段修改IP
zookeeper.connect=192.168.65.60:2181
三、啟動Kafka
1.啟動 kafka 服務(wù)器
進入 /opt/kafka/bin 目錄下,使用命令啟動:
./kafka-server-start.sh -daemon ../config/server.properties
使用 ps -ef |grep server.properties 命令查看是否啟動成功
2.啟動 Zookeeper
查看 zookeeper 是否正常添加好節(jié)點,首先,進入 zookeeper 的某一個容器內(nèi)【這里進的是 zookeeper:zoo1 節(jié)點】
進入 bin 目錄下,使用 zkCli.sh 命令,啟動客戶端
3.判斷是否正常啟動
使用 ls /brokers/ids 命令查詢對應(yīng)的 kafka broker:
如果看到有對應(yīng)的 broker.id,如上圖的 1,2,3,就說明已經(jīng)啟動成功了!
如果有啟動報錯,一般是 server.properties 配置文件有誤:比如,broker Id 不唯一,IP 端口不正確導(dǎo)致。
四、Kafka常見概念與核心組件
以下是 Kafka 中的一些核心組件:
名稱 | 解釋 |
Broker | Kafka 集群中的消息處理節(jié)點,?個 Kafka 節(jié)點就是?個 broker,broker.id 不能重復(fù) |
Producer | 消息生產(chǎn)者,向 broker 發(fā)送消息的客戶端 |
Consumer | 消費者,從 broker 讀取消息的客戶端 |
Topic | 主題,Kafka 根據(jù) topic 對消息進?歸類 |
Partition | 分區(qū),將一個 topic 的消息存放到不同分區(qū) |
Replication | 副本,分區(qū)的多個備份,備份分別存放在集群不同的 broker 中 |
1.主題Topic
(1) 什么是Topic
Topic 在 kafka 中是一個邏輯概念,kafka 通過 topic 將消息進行分類,消費者需通過 topic 來進行消費消息。
注意:發(fā)送到 Kafka 集群的每條消息都需要指定?個 topic,否則無法進行消費。
(2) 如何創(chuàng)建Topic
我們可以通過以下命令創(chuàng)建一個名為 hello-world 的 topic,在創(chuàng)建 topic 時可以指定分區(qū)數(shù)量和副本數(shù)量。
# 創(chuàng)建 topic
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 1 --partitions 1 --topic hello-world
# 通過命令查看 zk 節(jié)點下所有的主題
./kafka-topics.sh --list --zookeeper 172.16.30.34:2181
以下是在 docker 容器里創(chuàng)建 topic 的例子:
(3) 查看 topic 的具體信息
我們可以通過以下命令來查看名為 my-replicated-topic 這個主題的詳細信息:
./kafka-topics.sh --describe --zookeeper 172.16.30.34:2181 --topic my-replicated-topic
可以看出該 topic 的名稱,分區(qū)數(shù)量,副本數(shù)量,以及配置信息等:
并且,我們也可以直接在 zookeeper 客戶端查看已創(chuàng)建的主題,通過以下命令查看:
# 進入客戶端
./bin/zkCli.sh
# 查看主題
ls /brokers/topics
get /brokers/topics/hello-world
可以看到,hello-world 主題已經(jīng)被創(chuàng)建成功了:
2.Partition 分區(qū)
由于單機的 CPU、內(nèi)存和磁盤等瓶頸,因此引入分區(qū)概念,類似于分布式系統(tǒng)的橫向擴展。
通過分區(qū),一個 topic 的消息可以放在不同的分區(qū)上,好處是:
- 分離存儲:解決一個分區(qū)上日志存儲文件過大的問題;
- 提高性能:讀和寫可以同時在多個分區(qū)上進行,方便擴展和提升并發(fā)。
創(chuàng)建多分區(qū)的主題
以下命令創(chuàng)建一個名稱為 hello-world 的 topic,指定 zookeeper 內(nèi)網(wǎng)節(jié)點地址為:172.16.30.34:2181(注意:如果在自己的內(nèi)網(wǎng)機器上部署,這個地址需要改成自己的服務(wù)器 IP)。
--partitions 3:指定分區(qū)數(shù)量為 3
# 創(chuàng)建topic,replication-factor副本數(shù)為3,partitions分區(qū)數(shù)為1
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 1 --partitions 3 --topic hello-world
3.Replication 副本
副本,就是主題中分區(qū)創(chuàng)建的多個備份,多個備份在 kafka 集群的多個 broker 中,會有一個 leader,多個 follower。
副本類似于冗余的意思,是保障系統(tǒng)高可用的有效應(yīng)對方案。
指定副本數(shù)量
當(dāng)新建主題時,除了可指定分區(qū)數(shù),還可以指定副本數(shù)。
--replication-factor 3:指定副本數(shù)量為 3
# 創(chuàng)建topic,replication-factor副本數(shù)為3,partitions分區(qū)數(shù)為1
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
五、在Kafka中收發(fā)消息
1.發(fā)送消息
當(dāng)創(chuàng)建完 topic 之后,我們可以通過 kafka 安裝后自帶的客戶端工具 kafka-console-producer.sh,向已創(chuàng)建的主題中發(fā)消息:
# 打開hello-world主題的消息發(fā)送窗口
./kafka-console-producer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
消息發(fā)送窗口打開后,向 hello-world 主題中發(fā)送消息:
2.消費消息
當(dāng)消息發(fā)送成功后,我們新開一個窗口,通過 kafka 安裝后自帶的客戶端工具 kafka-console-consumer.sh 創(chuàng)建一個消費者,并監(jiān)聽 hello-world 這個 topic,以消費消息:
# 打開hello-world主題的消息消費窗口
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
在 kafka 中,消費者默認從當(dāng)前主題的最后一條消息的 offset(偏移量位置)+1 位置開始監(jiān)聽,所以當(dāng)消費者開始監(jiān)聽時,只能收到 topic 之后發(fā)送的消息:
從頭開始消費
這時,如果 topic 消息已經(jīng)發(fā)送有一會了,但我們想要從頭開始消費該怎么辦呢?
只需要在開啟消費者監(jiān)聽時,加一個 --from-beginning 命令即可:
# 從當(dāng)前主題的第一條消息開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --from-beginning --topic hello-world
從第一條消息開始消費:
六、消息收發(fā)相關(guān)
1.消息的存儲和順序性
生產(chǎn)者將消息發(fā)給 broker,broker 會將消息保存在本地的日志文件中。
在 config 文件中,日志目錄為 /opt/usr/data,文件名為 主題-分區(qū)/00000000.log。
在存儲和消費消息時,kafka 會用 offset 來記錄當(dāng)前消息的順序:
- 消息存儲有序:通過 offset 偏移量來描述消息的有序性;
- 消費有序:消費者消費消息時也是通過 offset 來描述當(dāng)前要消費的消息位置。
2. 消費組
(1) 創(chuàng)建消費組
當(dāng)創(chuàng)建消費者時,我們可以為消費者指定一個組別(group)。
--consuemr-property group.id=testGroup:指定 group 名稱為 testGroup
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --consuemr-property group.id=testGroup --topic hello-world
指定組別后,在消費消息時,同一個消費組 group 只有一個消費者可以收到訂閱的 topic 消息。
(2) 查看消費組信息
我們可以通過 describe 命令查看消費組信息,命令如下:
# 消費組testGroup的詳細信息
./kafka-consumer-groups.sh --bootstrap-server 172.16.30.34:49094 --describe --group testGroup
消費者信息如下:
我們需要關(guān)注的重點字段如下:
- CURRENT-OFFSET:最后被消費的消息偏移量(offset);
- LOG-END-OFFSET:消息總量(最后一條消息的偏移量);
- LAG:積壓了多少條消息。
在同一個消費組里面,任何一個消費者拿到了消息,都會改變上述的字段值。
3.單播/多播消息
當(dāng)創(chuàng)建消費組后,我們根據(jù)消費組的個數(shù)來判斷消息是單播還是多播。這倆名詞源于網(wǎng)絡(luò)中的請求轉(zhuǎn)發(fā),單播就是一對一發(fā)送消息,多播就是多個消費組同時消費消息。
# 注意,當(dāng)兩個消費者都不指定消費組時,可以同時消費
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
每次創(chuàng)建消費者時,如果沒有指定消費組,則相當(dāng)于創(chuàng)建了一個默認消費組,kafka 會為這些默認消費組生成一個隨機的 group id。
所以多次創(chuàng)建默認消費組時,就是多播。
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --consuemr-property group.id=testGroup --topic hello-world
而單播消費時,只有一個消費組,所以 group_id 相同。
多播消費時,分別指定不同的消費組名稱或者不指定消費組名稱即可。
4.kafka消息日志文件
在 kafka 中,為了持久化數(shù)據(jù),服務(wù)器創(chuàng)建了多個主題分區(qū)文件來保存消息,其中:
(1) 主題-分區(qū)/00000000.log 日志文件里保存了某個主題下的消息;
(2) Kafka 內(nèi)部創(chuàng)建了 50 個分區(qū) consumer-offsets-0 ~ 49,用來存放消費者消費某個 topic 的偏移量,這些偏移量由消費者消費 topic 的時候主動上報給 kafka。
- 提交到哪個分區(qū)由 hash 后取模得出:hash(consumerGroupId)% 50;
- 提交的內(nèi)容為:key = consumerGroupId + topic + 分區(qū)號,value 為當(dāng)前 offset 的值,為正整數(shù)。
在 Kafka 中,消費者的偏移量(consumer offset)是指消費者在分區(qū)中已經(jīng)讀取到的位置。消費者偏移量是由 Kafka 自動管理的,以確保消費者可以在故障恢復(fù)后繼續(xù)從上次中斷的位置開始消費。
如果大家在日常業(yè)務(wù)時想要跳過某些不消費的消息,或者重復(fù)消費,可以使用 Kafka 提供的 kafka-consumer-groups.sh 腳本,來查看和修改消費者組的偏移量。
七、尾聲
1.小結(jié)
本文介紹了 Kafka 以其高速、高性能、高可靠性和高可用性在大數(shù)據(jù)領(lǐng)域中占據(jù)重要地位。
并且從下載安裝 Kafka 開始,到修改配置、服務(wù)啟動,通過命令行驗證其是否啟動成功。
接著,我們詳細介紹了 Kafka 的核心組件,包括 Broker、Producer、Consumer、Topic、Partition 和Replication。
然后特別強調(diào)了 Topic 的創(chuàng)建和管理,展示了如何創(chuàng)建 Topic、指定分區(qū)和副本數(shù)量,以及如何查看 Topic 的詳細信息。我們還講述了 Partition 分區(qū)的優(yōu)勢,如分離存儲和提高性能,并解釋了 Replication 副本的概念和重要性。
接著,我們展示了在 Kafka 中發(fā)送和消費消息的過程,然后討論了消息存儲、順序性、消費組的創(chuàng)建和查看消費組信息,以及單播和多播消息的概念。
最后,文章提到了 Kafka 中消息日志文件保存的內(nèi)容,包括消息本身和消息偏移量,以及如何修改消息偏移量的位置。
相信看了這部分內(nèi)容,大家已經(jīng)學(xué)會如何搭建自己的 kafka 消息隊列了~
2.后續(xù)
Kafka 系列文章分為上下篇,上篇主要是核心組件的介紹和實踐上手等內(nèi)容,包含對 Kafka 做了一個全面介紹,包括安裝、配置、核心組件和消息收發(fā)機制,本文是上篇內(nèi)容。
下篇內(nèi)容主要討論集群高可用、消息重復(fù)消費、延時隊列等常見的高級用法,敬請期待。