聊聊 Kafka 那點破事!
本文轉(zhuǎn)載自微信公眾號「微觀技術(shù)」,作者微觀技術(shù)。轉(zhuǎn)載本文請聯(lián)系微觀技術(shù)公眾號。
大家好,我是Tom哥~
Kafka作為一款開源的消息引擎,很多人并不陌生,但深入其源碼的同學(xué)估計不多,除非你是中間件團隊消息系統(tǒng)維護者。但術(shù)業(yè)有專攻,市面上那么多開源框架且每個框架又經(jīng)常迭代升級,花精力深入了解每一個框架源碼不太現(xiàn)實,本文會以業(yè)務(wù)視角羅列工作中大家需要熟知的一些知識
本篇文章的目錄:
首先,為什么使用kafka?
- 削峰填谷。緩沖上下游瞬時突發(fā)流量,保護“脆弱”的下游系統(tǒng)不被壓垮,避免引發(fā)全鏈路服務(wù)“雪崩”。
- 系統(tǒng)解耦。發(fā)送方和接收方的松耦合,一定程度簡化了開發(fā)成本,減少了系統(tǒng)間不必要的直接依賴。
Kafka 名詞術(shù)語,一網(wǎng)打盡
- Broker:接收客戶端發(fā)送過來的消息,對消息進(jìn)行持久化
- 主題:Topic。主題是承載消息的邏輯容器,在實際使用中多用來區(qū)分具體的業(yè)務(wù)。
- 分區(qū):Partition。一個有序不變的消息序列。每個主題下可以有多個分區(qū)。
- 消息:這里的消息就是指 Kafka 處理的主要對象。
- 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個單調(diào)遞增且不變的值。
- 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數(shù)據(jù)冗余,這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。每個分區(qū)可配置多個副本實現(xiàn)高可用。一個分區(qū)的N個副本一定在N個不同的Broker上。
- 生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。
- 消費者:Consumer。從主題訂閱新消息的應(yīng)用程序。
- 消費者位移:Consumer Offset。表示消費者消費進(jìn)度,每個消費者都有自己的消費者位移。offset保存在broker端的內(nèi)部topic中,不是在clients中保存
- 消費者組:Consumer Group。多個消費者實例共同組成的一個組,同時消費多個分區(qū)以實現(xiàn)高吞吐。
- 重平衡:Rebalance。消費者組內(nèi)某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區(qū)
ZooKeeper 在里面的職責(zé)是什么?
它是一個分布式協(xié)調(diào)框架,負(fù)責(zé)協(xié)調(diào)管理并保存 Kafka 集群的所有元數(shù)據(jù)信息,比如集群都有哪些 Broker 在運行、創(chuàng)建了哪些 Topic,每個 Topic 都有多少分區(qū)以及這些分區(qū)的 Leader 副本都在哪些機器上等信息。
消息傳輸?shù)母袷?/h3>
純二進(jìn)制的字節(jié)序列。當(dāng)然消息還是結(jié)構(gòu)化的,只是在使用之前都要將其轉(zhuǎn)換成二進(jìn)制的字節(jié)序列。
消息傳輸協(xié)議
- 點對點模型。系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息
- 發(fā)布/訂閱模型。該模型也有發(fā)送方和接收方,只不過提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個,它們都能接收到相同主題的消息。
消息壓縮
生產(chǎn)者程序中配置compression.type 參數(shù)即表示啟用指定類型的壓縮算法。
props.put(“compression.type”, “gzip”),它表明該 Producer 的壓縮算法使用的是GZIP。這樣 Producer 啟動后生產(chǎn)的每個消息集合都是經(jīng) GZIP 壓縮過的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及 Kafka Broker 端的磁盤占用。
但如果Broker又指定了不同的壓縮算法,如:Snappy,會將生產(chǎn)端的消息解壓然后按自己的算法重新壓縮。
各壓縮算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
kafka默認(rèn)不指定壓縮算法。
消息解壓縮
當(dāng) Consumer pull消息時,Broker 會原樣發(fā)送出去,當(dāng)消息到達(dá) Consumer 端后,由 Consumer 自行解壓縮還原成之前的消息。
分區(qū)策略
編寫一個類實現(xiàn)org.apache.kafka.clients.Partitioner接口。實現(xiàn)內(nèi)部兩個方法:partition()和close()。然后顯式地配置生產(chǎn)者端的參數(shù)partitioner.class
常見的策略:
- 輪詢策略(默認(rèn))。保證消息最大限度地被平均分配到所有分區(qū)上。
- 隨機策略。隨機策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
- 按key分區(qū)策略。key可能是uid或者訂單id,將同一標(biāo)志位的所有消息都發(fā)送到同一分區(qū),這樣可以保證一個分區(qū)內(nèi)的消息有序
- 其他分區(qū)策略。如:基于地理位置的分區(qū)策略
生產(chǎn)者管理TCP連接
在new KafkaProducer 實例時,生產(chǎn)者應(yīng)用會在后臺創(chuàng)建并啟動一個名為 Sender 的線程,該 Sender 線程開始運行時首先會創(chuàng)建與 Broker 的連接。此時還不知道給哪個topic發(fā)消息,所以Producer 啟動時會發(fā)起與所有的 Broker 的連接。
Producer 通過metadata.max.age.ms 參數(shù)定期地去更新元數(shù)據(jù)信息,默認(rèn)值是 300000,即 5 分鐘,不管集群那邊是否有變化,Producer 每 5 分鐘都會強制刷新一次元數(shù)據(jù)以保證它是最新的數(shù)據(jù)。
Producer 發(fā)送消息:
Producer 使用帶回調(diào)通知的發(fā)送 API, producer.send(msg, callback)。
設(shè)置 acks = all。Producer 的一個參數(shù),表示所有副本都成功接收到消息,該消息才算是“已提交”,最高等級,acks的其它值說明。min.insync.replicas > 1,表示消息至少要被寫入到多少個副本才算是“已提交”
retries 是 Producer 的參數(shù)。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。
冪等性 Producer
設(shè)置參數(shù)props.put(“enable.idempotence”, ture),Producer 自動升級成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動幫你做消息的重復(fù)去重。
原理很簡單,就是經(jīng)典的空間換時間,即在 Broker 端多保存一些字段。當(dāng) Producer 發(fā)送了具有相同字段值的消息后,Broker 能夠自動知曉這些消息已經(jīng)重復(fù)了,可以在后臺默默地把它們“丟棄”掉。
只能保證單分區(qū)、單會話上的消息冪等性。一個冪等性 Producer 能夠保證某個topic的一個分區(qū)上不出現(xiàn)重復(fù)消息,但無法實現(xiàn)多個分區(qū)的冪等性。比如采用輪詢,下一次提交換了一個分區(qū)就無法解決
事務(wù)型 Producer
能夠保證將消息原子性地寫入到多個分區(qū)中。這批消息要么全部寫入成功,要么全部失敗。能夠保證跨分區(qū)、跨會話間的冪等性。
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- //提交事務(wù)
- producer.commitTransaction();
- } catch (KafkaException e) {
- //事務(wù)終止
- producer.abortTransaction();
- }
實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日志中,也就是說 Consumer 還是會看到這些消息。要不要處理在 Consumer 端設(shè)置 isolation.level ,這個參數(shù)有兩個值:
- read_uncommitted:這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息
- read_committed:表明 Consumer 只會讀取事務(wù)型 Producer 成功提交事務(wù)寫入的消息
Kafka Broker 是如何存儲數(shù)據(jù)?
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現(xiàn) Kafka 高吞吐量特性的一個重要手段。
不過如果你不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?
簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又近一步細(xì)分成多個日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實現(xiàn)回收磁盤空間的目的。
Kafka 的備份機制
相同的數(shù)據(jù)拷貝到多臺機器上。副本的數(shù)量是可以配置的。Kafka 中follow副本不會對外提供服務(wù)。
副本的工作機制也很簡單:生產(chǎn)者總是向leader副本寫消息;而消費者總是從leader副本讀消息。至于follow副本,它只做一件事:向leader副本以異步方式發(fā)送pull請求,請求leader把最新的消息同步給它,必然有一個時間窗口導(dǎo)致它和leader中的數(shù)據(jù)是不一致的,或者說它是落后于leader。
為什么要引入消費者組?
主要是為了提升消費者端的吞吐量。多個消費者實例同時消費,加速整個消費端的吞吐量(TPS)。
在一個消費者組下,一個分區(qū)只能被一個消費者消費,但一個消費者可能被分配多個分區(qū),因而在提交位移時也就能提交多個分區(qū)的位移。如果1個topic有2個分區(qū),消費者組有3個消費者,有一個消費者將無法分配到任何分區(qū),處于idle狀態(tài)。
理想情況下,Consumer 實例的數(shù)量應(yīng)該等于該 Group 訂閱topic(可能多個)的分區(qū)總數(shù)。
消費端拉取(批量)、ACK
消費端先拉取并消費消息,然后再ack更新offset。
1)消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例,負(fù)責(zé)完整的消息拉取、消息處理流程。一個KafkaConsumer負(fù)責(zé)一個分區(qū),能保證分區(qū)內(nèi)的消息消費順序。
缺點:線程數(shù)受限于 Consumer 訂閱topic的總分區(qū)數(shù)。
2)任務(wù)切分成了消息獲取和消息處理兩個部分。消費者程序使用單或多線程拉取消息,同時創(chuàng)建專門線程池執(zhí)行業(yè)務(wù)邏輯。優(yōu)點:可以靈活調(diào)節(jié)消息獲取的線程數(shù),以及消息處理的線程數(shù)。
缺點:無法保證分區(qū)內(nèi)的消息消費順序。另外引入了多組線程,使得整個消息消費鏈路被拉長,最終導(dǎo)致正確位移提交會變得異常困難,可能會出現(xiàn)消息的重復(fù)消費或丟失。
消費端offset管理
1)老版本的 Consumer組把位移保存在 ZooKeeper 中,但很快發(fā)現(xiàn)zk并不適合頻繁的寫更新。
2)在新版本的 Consumer Group 中,Kafka 社區(qū)重新設(shè)計了 Consumer組的位移管理方式,采用了將位移保存在 Broker端的內(nèi)部topic中,也稱為“位移主題”,由kafka自己來管理。原理很簡單, Consumer的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到__consumer_offsets 中。它的消息格式由 Kafka 自己定義,用戶不能修改。位移主題的 Key 主要包括 3 部分內(nèi)容:
Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。
Kafka 使用Compact策略來刪除位移主題中的過期消息,避免該topic無限期膨脹。提供了專門的后臺線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)。
Rebalance 觸發(fā)條件
1)組成員數(shù)發(fā)生變更。比如有新的 Consumer 實例加入組或者離開組,又或是有 Consumer 實例崩潰被“踢出”組。(99%原因是由它導(dǎo)致)
2) 訂閱topic數(shù)發(fā)生變更。Consumer Group 可以使用正則表達(dá)式的方式訂閱topic,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結(jié)尾的topic。在 Consumer Group 的運行過程中,你新創(chuàng)建了一個滿足這樣條件的topic,那么該 Group 就會發(fā)生 Rebalance。
3) 訂閱topic的分區(qū)數(shù)發(fā)生變化。Kafka 目前只允許增加topic的分區(qū)數(shù)。當(dāng)分區(qū)數(shù)增加時,也會觸發(fā)訂閱該topic的所有 Group 開啟 Rebalance。
消息的順序性
Kafka的設(shè)計中多個分區(qū)的話無法保證全局的消息順序。如果一定要實現(xiàn)全局的消息順序,只能單分區(qū)。
方法二:通過有key分組,同一個key的消息放入同一個分區(qū),保證局部有序
歷史數(shù)據(jù)清理策略
基于保存時間,log.retention.hours
基于日志大小的清理策略。通過log.retention.bytes控制
組合方式