Kafka都沒整明白,還敢去面試?
原創(chuàng)【51CTO.com原創(chuàng)稿件】Apache Kafka 被譽為時下熱門的企業(yè)級消息傳遞系統(tǒng),其初衷是一個分布式流系統(tǒng),用于發(fā)布和訂閱記錄流,以其快速,高可擴展性以及較完美的容錯效果備受業(yè)內(nèi)人士青睞。
圖片來自 Pexels
放眼當下數(shù)據(jù)為王的時代,深入了解 Apache Kafka 及其常見的部署應用,快速實現(xiàn)數(shù)據(jù)架構(Kafka Fast Data Architecture)已是大勢所趨,刻不容緩。
以下分別 Kafka 架構,四大核心 API,典型應用場景,Kafka 代理與消息主題,集群的創(chuàng)建,流 APIs(Stream APIs)及其處理模式等不同方面展開詳細介紹。
Kafka:分布式流平臺
Kafka 是一個分布式流平臺,用于發(fā)布和訂閱消息流(也稱記錄流或數(shù)據(jù)流),快速有效地利用 I/O 進行數(shù)據(jù)流的批處理,壓縮及解耦,并將數(shù)據(jù)流傳輸?shù)綌?shù)據(jù)池,應用程序和實時流分析系統(tǒng)中。
Kafka 將主題消息分區(qū)復制到多個服務器中,允許用戶通過自己的應用程序來處理這些記錄。
Kafka 四大核心 APIs
Kafka 由記錄(records),主題(topics),使用者(consumers),生產(chǎn)者(producers),代理服務(brokers),日志(logs),分區(qū)(partitions)和集群(clusters)組成。
Kafka 主題是一個記錄流,每個主題都有對應的日志,該日志是該主題在磁盤上的存儲,每個主題日志又分為多個分區(qū)和片段。
Kafka Producer API 用于生成數(shù)據(jù)記錄流。Kafka Consumer API 用于消費來自 Kafka 的記錄流。
Broker 是在 Kafka 集群中運行的 Kafka 服務器,Kafka 集群由多個代理服務器組成。
①生產(chǎn)者 API(Producer API):消息的生產(chǎn)者,向 Kafka broker 發(fā)消息的客戶端。
允許客戶端與集群中運行著的 Kafka 服務器相連接,并將記錄流發(fā)布到一個或多個 Kafka topics(消息主題)中。
一臺 Kafka 服務器就是一個 broker,一個集群由多個 broker 組成,一個 broker 可以容納多個 topic。
②消費者 API(Consumer API):消息消費者,向 Kafka broker 獲取消息的客戶端。
允許客戶端連接集群中運行著的 Kafka 服務器,并消費其中一個或多個 Kafka topics(消息主題)的記錄流。
③流 API(Stream API):充當流處理器,用于輸入輸出流的轉換。
允許客戶端充當流處理器,從一個或多個 topics(消息主題)消費輸入流,并生產(chǎn)輸出流,輸出到一個或多個其他 topics(消息主題)中,從而有效地將輸入流轉換至輸出流。
④連接器 API(Connector API):允許編寫可重用的生產(chǎn)者和消費者代碼。
我們可以從任何關系型數(shù)據(jù)庫中讀取數(shù)據(jù),并將其發(fā)布到主題中,同時也可以“消費”這個主題中的數(shù)據(jù),并將其寫入關系型數(shù)據(jù)庫。
由此可見,Connector API 支持構建和運行可重復使用的生產(chǎn)者或消費者,并將 topic 連接到現(xiàn)有的應用程序或數(shù)據(jù)系統(tǒng)。(例如,就關系型數(shù)據(jù)庫而言,其連接器可以捕獲到各個表中的每個變化。)
Kafka應用場景
消息系統(tǒng)
Kafka 作為企業(yè)消息傳遞系統(tǒng),通過源系統(tǒng)及目標系統(tǒng)間的分離來實現(xiàn)數(shù)據(jù)交換。與 JMS 相比,Kafka 兼具高吞吐量分區(qū)及高可靠容錯力的復制功能。
Web 站點活動跟蹤
跟蹤記錄用戶在網(wǎng)站上的所有事件信息,從而進行數(shù)據(jù)的分析及脫機處理。
日志匯總
用于處理來自不同系統(tǒng)的日志,尤其是那些處于微服務架構分布式環(huán)境中的系統(tǒng),這類系統(tǒng)通常部署在不同的主機上,因此 Kafka 需要匯總來自不同系統(tǒng)的各類日志,進而對這些日志集中進行分析處理。
指標收集
Kafka 可用于收集來自各類系統(tǒng)/網(wǎng)絡的指標,并進行監(jiān)控,Kafka 配有專門的指標報告生成工具,如 Ganglia,Graphite 等。
Kafka Brokers & Kafka Topics
Kafka Broker(代理服務器)
Kafka 集群中的一個實例稱之為代理(服務器),在 Kafka 集群中,只要連接其中任意一個代理(服務器)就能訪問到整個集群,每個代理在集群中通過 ID 進行標識。
Kafka Topics(消息主題)
一個消息主題(Topic)是一個消息記錄發(fā)布后的邏輯名稱,在 Kafka 中,Topic 被分為若干個分區(qū)(Partitions),用于消息的發(fā)布。
這些分區(qū)分布在集群的各個代理服務器(Brokers)中,為了實現(xiàn)可擴展性,通常將一個非常大的 Topic 分布在多個代理服務器(Broker)上。
由于一個 Topic 可以分為多個分區(qū)(Partition),每個分區(qū)(Partition)都是一個有序的隊列。
分區(qū)(Partition)中的每條消息都會被分配一個有序的 ID(即偏移量,Offset)。
如下圖所示,假設當前有一個主題(Topic),該主題(Topic)有三個分區(qū),集群中有三個代理(Broker),則每個代理都有一個分區(qū)。要發(fā)布到分區(qū)的數(shù)據(jù)以偏移量(Offset)增量的方式追加。
其中“Offset”即偏移量,Kafka 的存儲文件都是按照“offset.kafka”來命名,用 Offset 方式命名是為了便于查找,如果想找位于 2046 的位置,只需找到 2045.kafka 的文件即可。
以下是分區(qū)(Partitions)使用時值得注意的要點:
- 每個消息主題(Topic)按名稱標識,集群中允許有多個已命名的消息主題。
- 每個消息前后順序的有效性僅限于當前分區(qū)級別(maintained at the partition level),而非跨主題。
- 數(shù)據(jù)一旦寫入分區(qū),則不會被覆蓋,這就是Kafka中強調(diào)的數(shù)據(jù)不變性(immutability)
- 分區(qū)中的消息通過鍵(key),值(values),時間戳(timestamps)的形式一起存儲,Kafka 確保每一個給定密鑰的消息都會發(fā)布到同一個分區(qū)中。
- 在 Kafka 集群中,每一個分區(qū)都有一個引導程序(leader),該引導程序負責對該分區(qū)執(zhí)行讀/寫操作。
上圖是一個例子,當前集群中僅一個消息主題(Topic),該主題包含三個分區(qū)(partition0,partition1,partition2),集群中有三個代理服務器(broker1,broker2,broker3)。
當前每個分區(qū)的副本都復制到另外兩個代理服務器(Broker)中,即每個代理服務器(Broker)上包含了三個分區(qū)。
因此即便其中某兩個代理服務器(Broker)發(fā)生故障,也不用擔心數(shù)據(jù)會丟失。
如上,當我們在 Kafka 中創(chuàng)建主題時,始終建議確保主題(Topic)的復制因子大于 1,并且小于/等于集群中的代理服務器(Broker)數(shù)量,這是非常推薦的做法。
上圖示例中,當前主題的復制因子為 3(即,一份原始數(shù)據(jù),兩份副本數(shù)據(jù)), 不難推算出每個分區(qū)的引導程序加上其副本數(shù)量總共為“3”。
該示例中,每個分區(qū)都有一個引導程序(稱之為“leader”),以及其他兩個同步副本(稱之為“follower”)。
對于分區(qū) partition 0 來說,broker1 是“leader”, broker2 和 broker3 都是“follower”,從而分區(qū) partition 0 的所有讀寫操作都將在 broker1 中進行。
同時,之后更新的內(nèi)容也會被同步復制到 broker2 和 broker3 對應的分區(qū)(partition)中。
創(chuàng)建 Kafka 集群——Demo
我們還是以上圖中三個 Broker 組成的 Kafka 集群為例,拆解 Kafka 集群創(chuàng)建的步驟。
①Kafka 集群環(huán)境準備
首先需要準備好一臺安裝有 Zookeeper 的機器,沒有 Zookeeper,Kafka 集群將無法工作。
同時建議直接從官網(wǎng)下載最新版本的 Apache Kafka,目前版本更新至2.11,直接解壓后將其放置到 bin 目錄下:
- https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
然后啟動 ZooKeeper,為什么需要 Zookeeper?它在這里主要負責協(xié)調(diào)服務,管理代理服務 Broker,確定每個分區(qū)中的引導程序,以及在 Kafka 消息主題或代理服務發(fā)生變更時及時發(fā)出警告。
通過以下命令可以啟動一個Zookeeper實例:
②啟動 Kafka Brokers
成功安裝 Kafka 并啟動 ZooKeeper 實例后,接下來就可以開啟 Kafka Broker 了,這里共啟動了三個 Kafka Broker。
具體啟動方式:先定位到 Kafka 根目錄下的“config”文件夾下,找到“server.properties”文件,將其復制三次。
然后分別命名為server_1.properties,server_2.properties 以及 server_3.properties,并針對三個文件內(nèi)容做如下編輯,直接保存即可:
保存后通過命令開啟這三個代理服務:
③創(chuàng)建主題
通過如下命令創(chuàng)建消息主題:
④生成引導服務
通過 Kafka 控制臺生成器(Kafka console)指定任意一個代理服務地址,并基于之前創(chuàng)建的主題發(fā)布一些消息。
這個指定的代理服務就被視作為引導服務程序,用于訪問整個集群。
⑤“消費”消息
通過 Kafka 控制臺來使用消息,用戶(即:消息消費者)需要指定任意一個代理服務(Broker)地址作為引導服務器。
在閱讀消息時,用戶(即:消息消費者)是看不到消息順序的,上文中也提到過消息的先后順序僅在分區(qū)級別(partition level)進行維護,而非主題級別(topic level)。
通過以下命令可以描述主題并查看各分區(qū)的分布情況,以及每個分區(qū)的引導服務器:
從上面的執(zhí)行結果可以看出:
- broker-1 是分區(qū) 0 的引導服務器。
- broker-2 是分區(qū) 1 的引導服務器。
- broker-3 是分區(qū) 2 的引導服務器。
- broker-1,broker-2,broker-3 分別具有每個分區(qū)的副本(同步且相互備份)。
Kafka Streams API
Kafka 常被用作將流數(shù)據(jù)實時傳輸?shù)狡渌到y(tǒng)中,此時 Kafka 作為中間層,主要用來解耦分離實時數(shù)據(jù)管道。
Kafka 流是 Kafka 生態(tài)系統(tǒng)的一部分,它提供了實時分析的功能,支持將流數(shù)據(jù)傳輸?shù)酱髷?shù)據(jù)平臺或 RDBMS,Cassandra,Spark 中,以進行將來的數(shù)據(jù)分析。
Kafka Stream API 簡單易用,通過其強大的技術能力可處理所有存儲于其中的數(shù)據(jù),同時該 API 也為我們提供了一套 Kafka 標準類的實現(xiàn)規(guī)則。
在實際工作中為了能夠創(chuàng)建支持核心業(yè)務的實時應用程序,我們需要 Kafka Stream API 的大力協(xié)助。
Kafka Stream API 獨特之處在于,通過其構建的應用程序都是普通應用程序。
所以這些應用程序可以像其他任何應用程序一樣,進行打包,部署和監(jiān)控,而無需單獨安裝專門的處理集群或類似基礎架構,這些額外部署的基礎架構往往比較耗錢。
流(Stream)是 Kafka Streams 提供的最重要的抽象對象,代表了無限且持續(xù)更新的數(shù)據(jù)集。
流是一系列不可變數(shù)據(jù)記錄的序列,具備有序,可重復,容錯等特性,我們可以簡單將其視為記錄流(定義為:KStream)或變更日志流(定位為:KTable 或 GlobelKTable)。
流處理器(Stream Processor)是處理器拓撲結構中的一個節(jié)點,包含應用于流數(shù)據(jù)的處理邏輯,一系列節(jié)點組成了拓撲結構中的處理步驟(用于轉換數(shù)據(jù))。
Kafka Streams API 處理數(shù)據(jù)——Demo
Kafka Stream API 為實現(xiàn)流數(shù)據(jù)處理,即消息在 Kafka 中的消費及回寫,提供了兩種選項:
- 高級 Kafka Streams DSL(high-level DSL)。
- 低級處理器 API:用于數(shù)據(jù)基本處理,組合處理,本地狀態(tài)存儲。
①高級 DSL(high-level DSL)
高級 DSL 由記錄流(KStream) 和日志流(KTable/GlobalKTable)兩大主要抽象類別組成,包含一系列已實現(xiàn)的方法可供調(diào)用。
KStream 是記錄流的抽象,其中每個數(shù)據(jù)都是無限數(shù)據(jù)集中的簡單鍵值,KStream 提供了多種處理數(shù)據(jù)流的功能。
例如:map,mapValue,flatMap,flatMapValues,filter;同時還支持多個流連接,流數(shù)據(jù)的聚合。
KTable 是變更日志流的抽象,在變更日志中,對具有相同鍵的行(row)進行覆蓋,因而每條數(shù)據(jù)記錄都被視作為插入或更新。
②處理器 API(lower-level processor )
低級處理器 API 通過擴展抽象類(AbstractProcessor),覆蓋含有業(yè)務邏輯的處理方法,從而實現(xiàn)客戶端流數(shù)據(jù)的訪問,允許基于輸入數(shù)據(jù)流執(zhí)行相應的業(yè)務邏輯,同時將其結果作為下游數(shù)據(jù)轉發(fā)至客戶端。
相較于高級 DSL 提供具有功能樣式的即用型方法,低級處理器API則按需提供處理邏輯。
③Kafka Stream API 應用——高級 DSL Demo
前提:必須在當前環(huán)境中有以下依賴,版本視當前情況而定。
導入以下包:
Kafka 配置屬性:
實例化 KStreamBuilder,創(chuàng)建一個 KStream 對象:
KStreamBuilder 有個 Stream 方法,該方法以主題名稱(topic name)作為參數(shù),返回一個 KStream 對象,即,訂閱了指定主題的實例化對象。
基于 KStream 對象,這時我們就可以使用 Kafka Streams 高級 DSL 提供的眾多方法(例如:map,process,transform,join 等),然后將處理后的數(shù)據(jù)發(fā)送到另一個主題。
最后,通過構建器(builder)和流配置進行流式傳輸:

通過 Kafka Streams API,我們無需單獨部署集群即可在 Kafka 中進行數(shù)據(jù)流處理。
Kafka Streams API 給我們帶來的便捷主要包含以下幾個方面:
- 高可擴展性,靈活性,分布式和容錯性。
- 支持有狀態(tài)和無狀態(tài)處理。
- 具有窗口,聯(lián)接和聚合的事件時間處理。
- 通過 Kafka Streams DSL 或較低級別的處理器 API 使用已經(jīng)定義的常見轉換操作。
- 對處理沒有單獨的群集要求(與 Kafka 集成)。
- 采用一次一個記錄的處理以實現(xiàn)毫秒級的處理延遲。
- 支持 Kafka Connect 連接到不同的應用程序和數(shù)據(jù)庫。
總結
Kafka 的便捷操作是其備受業(yè)內(nèi)人士廣泛關注的原因之一,然而更重要的是其出色的穩(wěn)定性,可靠性及耐用性,且具有靈活的發(fā)布/隊列,可以很好地適應 N 個消費者組,具有強大的可復制性,可以為生產(chǎn)者提供一致性保證。
本次分享基于 Kafka 核心要素及其常見部署做了詳情解析,希望給圈內(nèi)感興趣的人士提供技術普及,交流互補。
作者:羅小羅
簡介:英國 TOP10 計算機專業(yè),計算機科學與技術碩士,先后就職于匯豐,JPMorgan,HP,交行,阿里等國內(nèi)外知名企業(yè)。涉及項目領域主要有:互聯(lián)網(wǎng)金融,電商,教育,醫(yī)療等?,F(xiàn)任就職于某世界 500 強公司,擔任測試開發(fā)團隊負責人,帶領團隊構建并持續(xù)優(yōu)化自動化測試框架,研發(fā)自動化測試輔助類工具;擅長領域:單元/接口/性能/安全/自動化測試/CD/CI/DevOps;個人持續(xù)研究領域:自動化測試模型/數(shù)據(jù)分析/算法/機器學習等。
編輯:陶家龍
征稿:有投稿、尋求報道意向技術人請聯(lián)絡 editor@51cto.com
【51CTO原創(chuàng)稿件,合作站點轉載請注明原文作者和出處為51CTO.com】