玩了分布式這么久,你不會連Kafka都不清楚吧
Kafka 現(xiàn)在在企業(yè)和互聯(lián)網(wǎng)項目中的應用越來越多了,本篇文章就從 Kafka 的基礎開始帶你一展 Kafka 的宏圖。
圖片來自 Pexels
什么是 Kafka
Kafka 是一個分布式流式平臺,它有三個關鍵能力:
- 訂閱發(fā)布記錄流,它類似于企業(yè)中的消息隊列或企業(yè)消息傳遞系統(tǒng)。
- 以容錯的方式存儲記錄流。
- 實時記錄流。
Kafka 的應用:
- 作為消息系統(tǒng)。
- 作為存儲系統(tǒng)。
- 作為流處理器。
Kafka 可以建立流數(shù)據(jù)管道,可靠地在系統(tǒng)或應用之間獲取數(shù)據(jù)。建立流式應用傳輸和響應數(shù)據(jù)。
Kafka 作為消息系統(tǒng)
Kafka 作為消息系統(tǒng),它有三個基本組件:
- Producer : 發(fā)布消息的客戶端
- Broker:一個從生產(chǎn)者接受并存儲消息的客戶端
- Consumer : 消費者從 Broker 中讀取消息
在大型系統(tǒng)中,會需要和很多子系統(tǒng)做交互,也需要消息傳遞,在諸如此類系統(tǒng)中,你會找到源系統(tǒng)(消息發(fā)送方)和目的系統(tǒng)(消息接收方)。
為了在這樣的消息系統(tǒng)中傳輸數(shù)據(jù),你需要有合適的數(shù)據(jù)管道:
這種數(shù)據(jù)的交互看起來就很混亂,如果我們使用消息傳遞系統(tǒng),那么系統(tǒng)就會變得更加簡單和整潔。
Kafka 運行在一個或多個數(shù)據(jù)中心的服務器上作為集群運行:
- Kafka 集群存儲消息記錄的目錄被稱為 Topics。
- 每一條消息記錄包含三個要素:鍵(Key)、值(Value)、時間戳(Timestamp)。
核心 API
Kafka 有四個核心 API,它們分別是:
- Producer API,它允許應用程序向一個或多個 Topics 上發(fā)送消息記錄。
- Consumer API,允許應用程序訂閱一個或多個 Topics 并處理為其生成的記錄流。
- Streams API,它允許應用程序作為流處理器,從一個或多個主題中消費輸入流并為其生成輸出流,有效的將輸入流轉(zhuǎn)換為輸出流。
- Connector API,它允許構(gòu)建和運行將 Kafka 主題連接到現(xiàn)有應用程序或數(shù)據(jù)系統(tǒng)的可用生產(chǎn)者和消費者。例如,關系數(shù)據(jù)庫的連接器可能會捕獲對表的所有更改。
Kafka 基本概念
Kafka 作為一個高度可擴展可容錯的消息系統(tǒng),它有很多基本概念,下面就來認識一下這些 Kafka 專屬的概念。
Topic
Topic 被稱為主題,在 Kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱為 Topic。
Topic 相當于消息的分配標簽,是一個邏輯概念。主題好比是數(shù)據(jù)庫的表,或者文件系統(tǒng)中的文件夾。
Partition
Partition 譯為分區(qū),Topic 中的消息被分割為一個或多個的 Partition,它是一個物理概念,對應到系統(tǒng)上就是一個或若干個目錄,一個分區(qū)就是一個提交日志。消息以追加的形式寫入分區(qū),先后以順序的方式讀取。
注意:由于一個主題包含無數(shù)個分區(qū),因此無法保證在整個 Topic 中有序,但是單個 Partition 分區(qū)可以保證有序。消息被迫加寫入每個分區(qū)的尾部。Kafka 通過分區(qū)來實現(xiàn)數(shù)據(jù)冗余和伸縮性。
分區(qū)可以分布在不同的服務器上,也就是說,一個主題可以跨越多個服務器,以此來提供比單個服務器更強大的性能。
Segment
Segment 被譯為段,將 Partition 進一步細分為若干個 Segment,每個 Segment 文件的大小相等。
Broker
Kafka 集群包含一個或多個服務器,每個 Kafka 中服務器被稱為 Broker。Broker 接收來自生產(chǎn)者的消息,為消息設置偏移量,并提交消息到磁盤保存。
Broker 為消費者提供服務,對讀取分區(qū)的請求作出響應,返回已經(jīng)提交到磁盤上的消息。
Broker 是集群的組成部分,每個集群中都會有一個 Broker 同時充當了集群控制器(Leader)的角色,它是由集群中的活躍成員選舉出來的。
每個集群中的成員都有可能充當 Leader,Leader 負責管理工作,包括將分區(qū)分配給 Broker 和監(jiān)控 Broker。
集群中,一個分區(qū)從屬于一個 Leader,但是一個分區(qū)可以分配給多個 Broker(非 Leader),這時候會發(fā)生分區(qū)復制。
這種復制的機制為分區(qū)提供了消息冗余,如果一個 Broker 失效,那么其他活躍用戶會重新選舉一個 Leader 接管。
Producer
生產(chǎn)者,即消息的發(fā)布者,其會將某 Topic 的消息發(fā)布到相應的 Partition 中。
生產(chǎn)者在默認情況下把消息均衡地分布到主題的所有分區(qū)上,而并不關心特定消息會被寫到哪個分區(qū)。不過,在某些情況下,生產(chǎn)者會把消息直接寫到指定的分區(qū)。
Consumer
消費者,即消息的使用者,一個消費者可以消費多個 Topic 的消息,對于某一個 Topic 的消息,其只會消費同一個 Partition 中的消息。
在了解完 Kafka 的基本概念之后,我們通過搭建 Kafka 集群來進一步深刻認識一下 Kafka。
確保安裝環(huán)境
安裝 Java 環(huán)境
在安裝 Kafka 之前,先確保 Linux 環(huán)境上是否有 Java 環(huán)境,使用 java -version 命令查看 Java 版本,推薦使用 Jdk 1.8 。
如果沒有安裝 Java 環(huán)境的話,可以按照這篇文章進行安裝:
- https://www.cnblogs.com/zs-notes/p/8535275.html
安裝 Zookeeper 環(huán)境
Kafka 的底層使用 Zookeeper 儲存元數(shù)據(jù),確保一致性,所以安裝 Kafka 前需要先安裝 Zookeeper,Kafka 的發(fā)行版自帶了 Zookeeper ,可以直接使用腳本來啟動,不過安裝一個 Zookeeper 也不費勁。
Zookeeper 單機搭建
Zookeeper 單機搭建比較簡單,直接從官網(wǎng)下載一個穩(wěn)定版本的 Zookeeper:
- https://www.apache.org/dyn/closer.cgi/zookeeper/
這里我使用的是 3.4.10,下載完成后,在 Linux 系統(tǒng)中的 /usr/local 目錄下創(chuàng)建 Zookeeper 文件夾。
然后使用 xftp 工具(xftp 和 xshell 工具都可以在官網(wǎng) https://www.netsarang.com/zh/xshell/ 申請免費的家庭版)把下載好的 Zookeeper 壓縮包放到 /usr/local/zookeeper 目錄下。
如果下載的是一個 tar.gz 包的話,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz 解壓即可。
如果下載的是 zip 包的話,還要檢查一下 Linux 中是否有 unzip 工具,如果沒有的話,使用 yum install unzip 安裝 zip 解壓工具,完成后使用 unzip zookeeper-3.4.10.zip 解壓即可。
解壓完成后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10 ,創(chuàng)建一個 data 文件夾,然后進入到 conf 文件夾下,使用 mv zoo_sample.cfg zoo.cfg 進行重命名操作。
然后使用 vi 打開 zoo.cfg ,更改一下dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data ,保存。
進入 bin 目錄,啟動服務輸入命令 ./zkServer.sh start 輸出下面內(nèi)容表示搭建成功:
關閉服務輸入命令,./zkServer.sh stop:
使用 ./zkServer.sh status 可以查看狀態(tài)信息。
Zookeeper 集群搭建
①準備條件
需要三個服務器,這里我使用了 CentOS7 并安裝了三個虛擬機,并為各自的虛擬機分配了 1GB 的內(nèi)存,在每個 /usr/local/ 下面新建 Zookeeper 文件夾。
把 Zookeeper 的壓縮包挪過來,解壓,完成后會有 zookeeper-3.4.10 文件夾,進入到文件夾,新建兩個文件夾,分別是 data 和 log 文件夾。
注:上一節(jié)單機搭建中已經(jīng)創(chuàng)建了一個 data 文件夾,就不需要重新創(chuàng)建了,直接新建一個 log 文件夾,對另外兩個新增的服務需要新建這兩個文件夾。
②設置集群
新建完成后,需要編輯 conf/zoo.cfg 文件,三個文件的內(nèi)容如下:
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
- dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
- clientPort=12181
- server.1=192.168.1.7:12888:13888
- server.2=192.168.1.8:12888:13888
- server.3=192.168.1.9:12888:13888
server.1 中的這個 1 表示的是服務器的標識也可以是其他數(shù)字,表示這是第幾號服務器,這個標識要和下面我們配置的 myid 的標識一致。
192.168.1.7:12888:13888 為集群中的 ip 地址,第一個端口表示的是 master 與 slave 之間的通信接口,默認是 2888。
第二個端口是 Leader 選舉的端口,集群剛啟動的時候選舉或者 Leader 掛掉之后進行新的選舉的端口,默認是 3888。
現(xiàn)在對上面的配置文件進行解釋:
- tickTime:這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發(fā)送一個心跳。
- initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數(shù)。
當已經(jīng)超過 5 個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗??偟臅r間長度就是 5*2000=10 秒。
- syncLimit:這個配置項標識 Leader 與 Follower 之間發(fā)送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 5*2000=10 秒。
- dataDir:快照日志的存儲路徑。
- dataLogDir:事務日志的存儲路徑,如果不配置這個那么事務日志會默認存儲到 dataDir 指定的目錄,這樣會嚴重影響 ZK 的性能,當 ZK 吞吐量較大的時候,產(chǎn)生的事務日志、快照日志太多。
- clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監(jiān)聽這個端口,接受客戶端的訪問請求。
③創(chuàng)建 myid 文件
在了解完其配置文件后,現(xiàn)在來創(chuàng)建每個集群節(jié)點的 myid ,我們上面說過,這個 myid 就是 server.1 的這個 1 ,類似的,需要為集群中的每個服務都指定標識,使用 echo 命令進行創(chuàng)建:
- # server.1
- echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
- # server.2
- echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
- # server.3
- echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
④啟動服務并測試
配置完成,為每個 ZK 服務啟動并測試,我在 Windows 電腦的測試結(jié)果如下。
啟動服務(每臺都需要執(zhí)行):
- cd /usr/local/zookeeper/zookeeper-3.4.10/bin
- ./zkServer.sh start
使用 ./zkServer.sh status 命令檢查服務狀態(tài):
192.168.1.7 --- follower:
192.168.1.8 --- leader:
192.168.1.9 --- follower:
ZK 集群一般只有一個 Leader,多個 Follower,主一般是相應客戶端的讀寫請求,而從主同步數(shù)據(jù),當主掛掉之后就會從 Follower 里投票選舉一個 Leader 出來。
Kafka 集群搭建
準備條件
準備條件如下:
- 搭建好的 Zookeeper 集群
- Kafka 壓縮包
- https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
在 /usr/local 下新建 Kafka 文件夾,然后把下載完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用 tar -zxvf 壓縮包進行解壓。
解壓完成后,進入到 kafka_2.12-2.3.0 目錄下,新建 log 文件夾,進入到 config 目錄下。
我們可以看到有很多 properties 配置文件,這里主要關注 server.properties 這個文件即可。
Kafka 啟動方式有兩種:
- 一種是使用 Kafka 自帶的 Zookeeper 配置文件來啟動(可以按照官網(wǎng)來進行啟動,并使用單個服務多個節(jié)點來模擬集群http://kafka.apache.org/quickstart#quickstart_multibroker)。
- 一種是通過使用獨立的 ZK 集群來啟動,這里推薦使用第二種方式,使用 ZK 集群來啟動。
②修改配置項
需要為每個服務都修改一下配置項,也就是 server.properties, 需要更新和添加的內(nèi)容有:
- broker.id=0 //初始是0,每個 server 的broker.id 都應該設置為不一樣的,就和 myid 一樣 我的三個服務分別設置的是 1,2,3
- log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log
- #在log.retention.hours=168 下面新增下面三項
- message.max.byte=5242880
- default.replication.factor=2
- replica.fetch.max.bytes=5242880
- #設置zookeeper的連接端口
- zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置項的含義:
- broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質(zhì)一樣
- port=9092 #當前kafka對外提供服務的端口默認是9092
- host.name=192.168.1.7 #這個參數(shù)默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
- num.network.threads=3 #這個是borker進行網(wǎng)絡處理的線程數(shù)
- num.io.threads=8 #這個是borker進行I/O處理的線程數(shù)
- log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數(shù)這個目錄,如果配置多個目錄,新創(chuàng)建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區(qū)數(shù)最少就放那一個
- socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送,能提高性能
- socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當數(shù)據(jù)到達一定大小后在序列化到磁盤
- socket.request.max.bytes=104857600 #這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個值不能超過java的堆棧大小
- num.partitions=1 #默認的分區(qū)數(shù),一個topic默認1個分區(qū)數(shù)
- log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
- message.max.byte=5242880 #消息保存的最大值5M
- default.replication.factor=2 #kafka保存消息的副本數(shù),如果一個副本失效了,另一個還可以繼續(xù)提供服務
- replica.fetch.max.bytes=5242880 #取消息的最大直接數(shù)
- log.segment.bytes=1073741824 #這個參數(shù)是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
- log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
- log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
- zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設置zookeeper的連接端口
③啟動 Kafka 集群并測試
啟動服務,進入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目錄下:
- # 啟動后臺進程
- ./kafka-server-start.sh -daemon ../config/server.properties
檢查服務是否啟動:
- # 執(zhí)行命令 jps
- 6201 QuorumPeerMain
- 7035 Jps
- 6972 Kafka
Kafka 已經(jīng)啟動,創(chuàng)建 Topic 來驗證是否創(chuàng)建成功:
- # cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下
- bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
對上面的解釋:
- Replication-factor 2:復制兩份
- Partitions 1:創(chuàng)建1個分區(qū)
- Topic:創(chuàng)建主題
查看我們的主題是否創(chuàng)建成功:
- bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
啟動一個服務就能把集群啟動起來。
在一臺機器上創(chuàng)建一個發(fā)布者:
- # 創(chuàng)建一個broker,發(fā)布者
- ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一臺服務器上創(chuàng)建一個訂閱者:
- # 創(chuàng)建一個consumer, 消費者
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:這里使用 --zookeeper 的話可能出現(xiàn) zookeeper is not a recognized option 的錯誤,這是因為 Kafka 版本太高,需要使用 --bootstrap-server 指令。
測試結(jié)果如下:
發(fā)布:
消費:
④其他命令
顯示 topic:
- bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
- # 顯示
- cxuantopic
查看 topic 狀態(tài):
- bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic
- # 下面是顯示的詳細信息
- Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
- Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
- # 分區(qū)為為1 復制因子為2 主題 cxuantopic 的分區(qū)為0
- # Replicas: 0,1 復制的為1,2
Leader 負責給定分區(qū)的所有讀取和寫入的節(jié)點,每個節(jié)點都會通過隨機選擇成為 Leader。
Replicas 是為該分區(qū)復制日志的節(jié)點列表,無論它們是 Leader 還是當前處于活動狀態(tài)。
Isr 是同步副本的集合。它是副本列表的子集,當前仍處于活動狀態(tài)并追隨Leader。至此,Kafka 集群搭建完畢。
⑤驗證多節(jié)點接收數(shù)據(jù)
剛剛我們都使用的是相同的 ip 服務,下面使用其他集群中的節(jié)點,驗證是否能夠接受到服務。
在另外兩個節(jié)點上使用:
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
然后再使用 Broker 進行消息發(fā)送,經(jīng)測試三個節(jié)點都可以接受到消息。
配置詳解
在搭建 Kafka 的時候我們簡單介紹了一下 server.properties 中配置的含義,現(xiàn)在我們來詳細介紹一下參數(shù)的配置和概念。
常規(guī)配置
這些參數(shù)是 Kafka 中最基本的配置:
broker.id:每個 Broker 都需要有一個標識符,使用 broker.id 來表示。它的默認值是 0,它可以被設置成其他任意整數(shù),在集群中需要保證每個節(jié)點的 broker.id 都是唯一的。
port:如果使用配置樣本來啟動 Kafka ,它會監(jiān)聽 9092 端口,修改 port 配置參數(shù)可以把它設置成其他任意可用的端口。
zookeeper.connect:用于保存 Broker 元數(shù)據(jù)的地址是通過 zookeeper.connect 來指定。
localhost:2181:表示運行在本地 2181 端口。該配置參數(shù)是用逗號分隔的一組 hostname:port/path 列表。
每一部分含義如下:
- hostname 是 Zookeeper 服務器的服務名或 IP 地址。
- port 是 Zookeeper 連接的端口。
- /path 是可選的 Zookeeper 路徑,作為 Kafka 集群的 chroot 環(huán)境。如果不指定,默認使用跟路徑。
log.dirs:Kafka 把消息都保存在磁盤上,存放這些日志片段的目錄都是通過 log.dirs 來指定的。它是一組用逗號分隔的本地文件系統(tǒng)路徑。
如果指定了多個路徑,那么 Broker 會根據(jù) "最少使用" 原則,把同一分區(qū)的日志片段保存到同一路徑下。
要注意,Broker 會向擁有最少數(shù)目分區(qū)的路徑新增分區(qū),而不是向擁有最小磁盤空間的路徑新增分區(qū)。
num.recovery.threads.per.data.dir:對于如下三種情況,Kafka 會使用可配置的線程池來處理日志片段:
- 服務器正常啟動,用于打開每個分區(qū)的日志片段。
- 服務器崩潰后啟動,用于檢查和截斷每個分區(qū)的日志片段。
- 服務器正常關閉,用于關閉日志片段。
默認情況下,每個日志目錄只使用一個線程。因為這些線程只是在服務器啟動和關閉時會用到,所以完全可以設置大量的線程來達到并行操作的目的。
特別是對于包含大量分區(qū)的服務器來說,一旦發(fā)生崩潰,在進行恢復時使用井行操作可能會省下數(shù)小時的時間。
設置此參數(shù)時需要注意,所配置的數(shù)字對應的是 log.dirs 指定的單個日志目錄。
也就是說,如果 num.recovery.threads.per.data.dir 被設為 8,并且 log.dir 指定了 3 個路徑,那么總共需要 24 個線程。
auto.create.topics.enable:默認情況下,Kafka 會在如下 3 種情況下創(chuàng)建主題:
- 當一個生產(chǎn)者開始往主題寫入消息時
- 當一個消費者開始從主題讀取消息時
- 當任意一個客戶向主題發(fā)送元數(shù)據(jù)請求時
delete.topic.enable:如果你想要刪除一個主題,你可以使用主題管理工具。
默認情況下,是不允許刪除主題的,delete.topic.enable 的默認值是 false 因此你不能隨意刪除主題。
這是對生產(chǎn)環(huán)境的合理性保護,但是在開發(fā)環(huán)境和測試環(huán)境,是可以允許你刪除主題的,所以,如果你想要刪除主題,需要把 delete.topic.enable 設為 true。
主題默認配置
Kafka 為新創(chuàng)建的主題提供了很多默認配置參數(shù),下面就來一起認識一下這些參數(shù)。
num.partitions:num.partitions 參數(shù)指定了新創(chuàng)建的主題需要包含多少個分區(qū)。
如果啟用了主題自動創(chuàng)建功能(該功能是默認啟用的),主題分區(qū)的個數(shù)就是該參數(shù)指定的值。該參數(shù)的默認值是 1。要注意,我們可以增加主題分區(qū)的個數(shù),但不能減少分區(qū)的個數(shù)。
default.replication.factor:這個參數(shù)比較簡單,它表示 Kafka 保存消息的副本數(shù)。
如果一個副本失效了,另一個還可以繼續(xù)提供服務,default.replication.factor 的默認值為 1,這個參數(shù)在你啟用了主題自動創(chuàng)建功能后有效。
log.retention.ms:Kafka 通常根據(jù)時間來決定數(shù)據(jù)可以保留多久。默認使用 log.retention.hours 參數(shù)來配置時間,默認是 168 個小時,也就是一周。
除此之外,還有兩個參數(shù) log.retention.minutes 和 log.retentiion.ms 。這三個參數(shù)作用是一樣的,都是決定消息多久以后被刪除,推薦使用 log.retention.ms。
log.retention.bytes:另一種保留消息的方式是判斷消息是否過期。它的值通過參數(shù) log.retention.bytes 來指定,作用在每一個分區(qū)上。
也就是說,如果有一個包含 8 個分區(qū)的主題,并且 log.retention.bytes 被設置為 1GB,那么這個主題最多可以保留 8GB 數(shù)據(jù)。
所以,當主題的分區(qū)個數(shù)增加時,整個主題可以保留的數(shù)據(jù)也隨之增加。
log.segment.bytes:上述的日志都是作用在日志片段上,而不是作用在單個消息上。
當消息到達 Broker 時,它們被追加到分區(qū)的當前日志片段上,當日志片段大小到達 log.segment.bytes 指定上限(默認為 1GB)時,當前日志片段就會被關閉,一個新的日志片段被打開。
如果一個日志片段被關閉,就開始等待過期。這個參數(shù)的值越小,就越會頻繁的關閉和分配新文件,從而降低磁盤寫入的整體效率。
log.segment.ms:上面提到日志片段經(jīng)關閉后需等待過期,那么 log.segment.ms 這個參數(shù)就是指定日志多長時間被關閉的參數(shù)。
log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日志片段會在大小或時間到達上限時被關閉,就看哪個條件先得到滿足。
message.max.bytes:Broker 通過設置 message.max.bytes 參數(shù)來限制單個消息的大小,默認是 1000 000, 也就是 1MB。
如果生產(chǎn)者嘗試發(fā)送的消息超過這個大小,不僅消息不會被接收,還會收到 Broker 返回的錯誤消息。
跟其他與字節(jié)相關的配置參數(shù)一樣,該參數(shù)指的是壓縮后的消息大小,也就是說,只要壓縮后的消息小于 mesage.max.bytes,那么消息的實際大小可以大于這個值。
這個值對性能有顯著的影響。值越大,那么負責處理網(wǎng)絡連接和請求的線程就需要花越多的時間來處理這些請求。它還會增加磁盤寫入塊的大小,從而影響 IO 吞吐量。
文章參考:
- Kafka【第一篇】Kafka 集群搭建
- https://juejin.im/post/5ba792f5e51d450e9e44184d
- https://blog.csdn.net/k393393/article/details/93099276
- 《Kafka 權(quán)威指南》
- https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/
【編輯推薦】