Kafka Streams 在監(jiān)控場景的應(yīng)用與實踐
一、背景
在當(dāng)今大數(shù)據(jù)時代,實時數(shù)據(jù)處理變得越來越重要,而監(jiān)控數(shù)據(jù)的實時性和可靠性是監(jiān)控能力建設(shè)最重要的一環(huán)。隨著監(jiān)控業(yè)務(wù)需求的變化和技術(shù)的發(fā)展,需要能夠?qū)崟r處理和分析龐大的數(shù)據(jù)流。作為一種流式處理平臺,Kafka Streams 為處理實時數(shù)據(jù)提供了強大的支持。本文將重點介紹如何利用 Kafka Streams 進(jìn)行實時數(shù)據(jù)處理,包括其基本原理、功能和實際應(yīng)用。通過本文的學(xué)習(xí),讀者將能夠深入了解 Kafka Streams 的優(yōu)勢、在監(jiān)控場景的應(yīng)用及實踐。
二、Kafka Streams 的基本概念
Kafka Streams 是一個開源的流式處理框架,基于 Kafka 消息隊列構(gòu)建,能夠處理無限量的數(shù)據(jù)流。與傳統(tǒng)的批處理不同,Kafka Streams 允許用戶以流式處理的方式實時處理數(shù)據(jù),而且處理延遲僅為毫秒級。
通過 Kafka Streams ,用戶可以進(jìn)行數(shù)據(jù)的實時轉(zhuǎn)換、聚合、過濾等操作,同時能夠與 Kafka Connect 和 Kafka Producer/Consumer 無縫集成。Kafka Streams 也是一個客戶端程序庫,用于處理和分析存儲在 Kafka 中的數(shù)據(jù),并將得到的數(shù)據(jù)寫回 Kafka 或發(fā)送到外部系統(tǒng)。
Kafka、Storm、Flink 和 Spark 是大數(shù)據(jù)領(lǐng)域常用的工具和框架。
1、區(qū)別
- Kafka 是一個分布式消息系統(tǒng),主要用于構(gòu)建實時數(shù)據(jù)管道和事件驅(qū)動的應(yīng)用程序。它提供了高吞吐量、持久性、可伸縮性和容錯性,主要用于數(shù)據(jù)的發(fā)布和訂閱。
- Storm 是一個分布式實時計算系統(tǒng),用于處理實時數(shù)據(jù)流。它提供了低延遲、高吞吐量的實時計算能力,適用于實時數(shù)據(jù)處理和流式計算。
- Flink 是一個流處理引擎,提供了精確一次的狀態(tài)處理和事件時間處理等特性。它支持流處理和批處理,并提供了統(tǒng)一的 API 和運行時環(huán)境。
- Spark 是一個通用的大數(shù)據(jù)處理框架,提供了批處理和流處理的功能。Spark 提供了豐富的數(shù)據(jù)處理和計算功能,包括 SQL 查詢、機器學(xué)習(xí)、圖處理等。
2、Kafka 的優(yōu)勢
- 持久性和可靠性:Kafka 提供了數(shù)據(jù)持久化的功能,能夠確保數(shù)據(jù)不丟失,并且支持?jǐn)?shù)據(jù)的持久存儲和重放。
- 可伸縮性:Kafka 集群可以很容易地進(jìn)行水平擴展,支持大規(guī)模數(shù)據(jù)處理和高并發(fā)訪問。
- 靈活性:Kafka 可以與各種不同的數(shù)據(jù)處理框架集成,作為數(shù)據(jù)源或數(shù)據(jù)目的地,使其在實時數(shù)據(jù)處理的場景中具有廣泛的適用性。
總的來說,Kafka 的優(yōu)勢在于其高吞吐量、持久性和可靠性,以及靈活的集成能力,使其成為構(gòu)建實時數(shù)據(jù)管道和事件驅(qū)動應(yīng)用程序的理想選擇。
2.1 Stream 處理拓?fù)?/span>
2.1.1 流
流是 Kafka Streams 提出的最重要的抽象概念:它表示一個無限的,不斷更新的數(shù)據(jù)集。流是一個有序的,可重放(反復(fù)的使用),不可變的容錯序列,數(shù)據(jù)記錄的格式是鍵值對(key-value)。這里的 key 主要記錄的是 value 的索引,決定了 Kafka 和 Kafka Streams 中數(shù)據(jù)的分區(qū),即數(shù)據(jù)如何路由到 Topic 的特定分區(qū)。value 是主要后續(xù)處理器要處理的數(shù)據(jù)。
2.1.2 處理器拓?fù)?/strong>
處理器拓?fù)涫且粋€由流(邊緣)連接的流處理(節(jié)點)的圖。通過 Kafka Streams ,我們可以編寫一個或多個的計算邏輯的處理器拓?fù)洌糜趯?shù)據(jù)進(jìn)行多步驟的處理。
2.1.3 流處理器
流處理器是處理器拓?fù)渲械囊粋€節(jié)點;它表示一個處理的步驟,用來轉(zhuǎn)換流中的數(shù)據(jù)(從拓?fù)渲械纳嫌翁幚砥饕淮谓邮芤粋€輸入消息,并且隨后產(chǎn)生一個或多個輸出消息到其下游處理器中)。
在拓?fù)渲杏袃蓚€特別的處理器:
- 源處理器(Source Processor):源處理器是一個沒有任何上游處理器的特殊類型的流處理器。它從一個或多個 Kafka 主題生成輸入流。通過消費這些主題的消息并將它們轉(zhuǎn)發(fā)到下游處理器。
- sink 處理器(Sink Processor):sink 處理器是一個沒有下游流處理器的特殊類型的流處理器。它接收上游流處理器的消息發(fā)送到一個指定的 Kafka 主題。
(圖片來源: Kafka 官網(wǎng))
Kafka Streams 提供2種方式來定義流處理器拓?fù)洌篕afka Streams DSL 提供了更常用的數(shù)據(jù)轉(zhuǎn)換操作,如 map 和 filter;低級別 Processor API 允許開發(fā)者定義和連接自定義的處理器,以及和狀態(tài)倉庫交互。處理器拓?fù)鋬H僅是流處理代碼的邏輯抽象。
2.2 時間
在流處理方面有一些重要的時間概念,它們是建模和集成一些操作的重要元素,例如定義窗口的時間界限。
時間在流中的常見概念如下:
- 事件時間 - 當(dāng)一個事件或數(shù)據(jù)記錄發(fā)生的時間點,就是最初創(chuàng)建的“源頭”。
- 處理時間 - 事件或數(shù)據(jù)消息發(fā)生在流處理應(yīng)用程序處理的時間點。即,記錄已被消費。處理時間可能是毫秒,小時,或天等。比原始事件時間要晚。
- 攝取時間 - 事件或數(shù)據(jù)記錄是 Kafka broker 存儲在 topic 分區(qū)的時間點。與事件時間的差異是,當(dāng)記錄由 Kafka broker 追加到目標(biāo) topic 時,生成的攝取時間戳,而不是消息創(chuàng)建時間(“源頭”)。與處理時間的差異是處理時間是流處理應(yīng)用處理記錄時的時間。比如,如果一個記錄從未被處理,那么就沒有處理時間,但仍然有攝取時間。
Kafka Streams 通過 TimestampExtractor 接口為每個數(shù)據(jù)記錄分配一個時間戳。該接口的具體實現(xiàn)了基于數(shù)據(jù)記錄的實際內(nèi)容檢索或計算獲得時間戳,例如嵌入時間戳字段提供的事件時間語義,或使用其他的方法,比如在處理時返回當(dāng)前的 wall-clock(墻鐘)時間,從而產(chǎn)生了流應(yīng)用程序的處理時間語義。因此開發(fā)者可以根據(jù)自己的業(yè)務(wù)需要選擇執(zhí)行不同的時間。例如,每條記錄時間戳描述了流的時間增長(盡管記錄在 stream 中是無序的)并利用時間依賴性來操作,如 join。
最后,當(dāng)一個 Kafka Streams 應(yīng)用程序?qū)懭胗涗浀?Kafka 時,它將分配時間戳到新的消息。時間戳分配的方式取決于上下文:
- 當(dāng)通過處理一些輸入記錄(例如,在 process()函數(shù)調(diào)用中觸發(fā)的 context.forward())生成新的輸出記錄時,輸出記錄時間戳直接從輸入記錄時間戳繼承。
- 當(dāng)通過周期性函數(shù)(如 punctuate())生成新的輸出記錄時。輸出記錄時間戳被定義為流任務(wù)的當(dāng)前內(nèi)部時間(通過 context.timestamp() 獲?。?。
- 對于聚合,生成的聚合更新的記錄時間戳將被最新到達(dá)的輸入記錄觸發(fā)更新。
本部分簡要介紹了 Kafka Streams 的基本概念,下一部分將介紹 Kafka Streams 的在監(jiān)控場景的應(yīng)用實踐。
三、Kafka Streams 在監(jiān)控場景的應(yīng)用
3.1 鏈路分布示意圖
3.2 示例:使用 Kafka Streams 來處理實時數(shù)據(jù)
流式處理引擎(如 Kafka Streams)與監(jiān)控數(shù)據(jù) ETL 可以為業(yè)務(wù)運維帶來諸多好處,例如實時數(shù)據(jù)分析、實時監(jiān)控、事件驅(qū)動的架構(gòu)等。在本部分,我們將重點介紹 Kafka Streams 與監(jiān)控數(shù)據(jù) ETL 的集成,以及如何在監(jiān)控數(shù)據(jù) ETL 中利用 Kafka Streams 進(jìn)行實時數(shù)據(jù)處理。
在監(jiān)控數(shù)據(jù)ETL架構(gòu)中,Kafka Streams 扮演著舉足輕重的角色。它可以作為一個獨立的數(shù)據(jù)處理服務(wù)來處理實時的數(shù)據(jù)流,并將處理結(jié)果輸出到其他存儲組件(例如,ES、VM等)中。同時,它也可以作為多個數(shù)據(jù)源之間的數(shù)據(jù)交換和通信的橋梁,扮演著數(shù)據(jù)總線的角色。Kafka Streams 的高可用性、高吞吐量和流式處理能力使得它成為監(jiān)控數(shù)據(jù)ETL架構(gòu)中的重要組件之一。
下面給出一個示例,演示了如何將 Kafka Streams 作為監(jiān)控數(shù)據(jù) ETL 來處理實時的數(shù)據(jù)。假設(shè)我們有一個監(jiān)控數(shù)據(jù)流 TopicA,我們希望對這些數(shù)據(jù)進(jìn)行實時的分析,并將分析結(jié)果輸出到另一個 TopicB。我們可以創(chuàng)建一個 Kafka Streams 來處理這個需求:
//創(chuàng)建配置類
Properties props = new Properties();
//設(shè)置訂閱者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-service");
//設(shè)置servers地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
StreamsBuilder builder = new StreamsBuilder();
//構(gòu)建流
KStream<String, String> userActions = builder.stream("TopicA");
//對流進(jìn)行處理
KTable<String, Long> userClickCounts = userActions
.filter((key, value) -> value.contains("click"))
.groupBy((key, value) -> value.split(":")[0])
.count();
//流寫回Kafka
userClickCounts.toStream().to("TopicB", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
在這個示例中,我們創(chuàng)建了一個 Kafka Streams 監(jiān)控數(shù)據(jù) ETL,用于處理實時的監(jiān)控數(shù)據(jù)流。它對數(shù)據(jù)進(jìn)行了過濾、分組和統(tǒng)計分析,并將結(jié)果輸出到 TopicB。通過這個 ETL,我們可以很容易地實現(xiàn)實時的數(shù)據(jù)處理功能,并且能夠與其他數(shù)據(jù)源和數(shù)據(jù)存儲組件進(jìn)行無縫的集成。
3.3 監(jiān)控 ETL 的流處理示意圖
本部分介紹了 Kafka Streams 的在監(jiān)控場景的應(yīng)用實踐,下一部分將深入探討 Kafka Streams 的運作原理及實時數(shù)據(jù)處理的常見操作,并闡述 Kafka Streams 如何實現(xiàn)這些操作。
四、監(jiān)控數(shù)據(jù) ETL 中 Kafka Streams 的運作原理
4.1 架構(gòu)
Kafka Streams 通過生產(chǎn)者和消費者,并利用 Kafka 自有的能力來提供數(shù)據(jù)平行性,分布式協(xié)調(diào)性,故障容錯和操作簡單性,從而簡化了應(yīng)用程序的開發(fā),在本節(jié)中,我們將描述 Kafka Streams 是如何工作的。
下圖展示了 Kafka Streams 應(yīng)用程序的解剖圖,讓我們來看一下。
圖片來源: Kafka 官網(wǎng)
Kafka 消費者通過消費1個或多個 Topic 拿到數(shù)據(jù),形成輸入 Kafka 流,經(jīng)過處理器拓?fù)鋵?shù)據(jù)進(jìn)行統(tǒng)一處理形成輸出 Kafka 流,將數(shù)據(jù)寫入1個或多個出流 Topic,這是 kafka 流整體的運行流程。
4.1.1 Stream 分區(qū)和任務(wù)
Kafka 分區(qū)數(shù)據(jù)的消息層用于存儲和傳輸,Kafka Streams 分區(qū)數(shù)據(jù)用于處理, 在這兩種情況下,這種分區(qū)規(guī)劃和設(shè)計使數(shù)據(jù)具有彈性,可擴展,高性能和高容錯的能力。Kafka Streams 使用了分區(qū)和任務(wù)的概念,基于 Kafka 主題分區(qū)的并行性模型。在并發(fā)環(huán)境里,Kafka Streams 和 Kafka 之間有著緊密的聯(lián)系:
- 每個流分區(qū)是完全有序的數(shù)據(jù)記錄隊列,并映射到 Kafka 主題的分區(qū)。
- 流的數(shù)據(jù)消息與主題的消息映射。
- 數(shù)據(jù)記錄中的 keys 決定了 Kafka 和 Kafka Streams 中數(shù)據(jù)的分區(qū),即,如何將數(shù)據(jù)路由到指定的分區(qū)。
應(yīng)用程序的處理器拓?fù)渫ㄟ^將其分成多個任務(wù)來進(jìn)行擴展,更具體點說,Kafka Streams 根據(jù)輸入流分區(qū)創(chuàng)建固定數(shù)量的任務(wù),其中每個任務(wù)分配一個輸入流的分區(qū)列表(即,Kafka 主題)。分區(qū)對任務(wù)的分配不會改變,因此每個任務(wù)是應(yīng)用程序并行性的固定單位。然后,任務(wù)可以基于分配的分區(qū)實現(xiàn)自己的處理器拓?fù)洌凰麄冞€可以為每個分配的分區(qū)維護(hù)一個緩沖,并從這些記錄緩沖一次一個地處理消息。作為結(jié)果,流任務(wù)可以獨立和并行的處理而無需手動干預(yù)。
重要的是要理解 Kafka Streams 不是資源管理器,而是可在任何地方都能“運行”的流處理應(yīng)用程序庫。多個實例的應(yīng)用程序在同一臺機器上執(zhí)行,或分布多個機器上,并且任務(wù)可以通過該庫自動的分發(fā)到這些運行的實例上。分區(qū)對任務(wù)的分配永遠(yuǎn)不會改變;如果一個應(yīng)用程式實例失敗,則這些被分配的任務(wù)將自動地在其他的實例重新創(chuàng)建,并從相同的流分區(qū)繼續(xù)消費。
下面展示了2個分區(qū),每個任務(wù)分配了輸出流的1個分區(qū)。
(圖片來源: Kafka 官網(wǎng))
4.1.2 線程模型
Kafka Streams 允許用戶配置線程數(shù),可用于平衡處理應(yīng)用程序的實例。每個線程的處理器拓?fù)洫毩⒌膱?zhí)行一個或多個任務(wù)。例如,下面展示了一個流線程運行2個流任務(wù)。
(圖片來源: Kafka 官網(wǎng))
啟動更多的流線程或更多應(yīng)用程序?qū)嵗?,只需?fù)制拓?fù)溥壿嫞磸?fù)制代碼到不同的機器上運行),達(dá)到并行處理處理不同的 Kafka 分區(qū)子集的目的。要注意的是,這些線程之間不共享狀態(tài)。因此無需協(xié)調(diào)內(nèi)部的線程。這使它非常簡單在應(yīng)用實例和線程之間并行拓?fù)?。Kafka 主題分區(qū)的分配是通過 Kafka Streams 利用 Kafka 的協(xié)調(diào)功能在多個流線程之間透明處理。
如上所述,Kafka Streams 擴展流處理應(yīng)用程序是很容易的:你只需要運行你的應(yīng)用程序?qū)嵗琄afka Streams 負(fù)責(zé)在實例中運行的任務(wù)之間分配分區(qū)。你可以啟動多個應(yīng)用程序線程處理多個輸入的 Kafka 主題分區(qū)。這樣,所有運行中的應(yīng)用實例,每個線程(即運行的任務(wù))至少有一個輸入分區(qū)可以處理。
4.1.3 故障容錯
Kafka Streams 基于 Kafka 分區(qū)的高可用和副本故障容錯能力。因此,當(dāng)流數(shù)據(jù)持久到 Kafka,即使應(yīng)用程序故障,如果需要重新處理它,它也是可用的。Kafka Streams 中的任務(wù)利用 Kafka 消費者客戶端提供的故障容錯的能力來處理故障。如果任務(wù)故障,Kafka Streams 將自動的在剩余運行中的應(yīng)用實例重新啟動該任務(wù)。
此外,Kafka Streams 還確保了本地狀態(tài)倉庫對故障的穩(wěn)定性。對于每個狀態(tài)倉庫都維持一個追蹤所有的狀態(tài)更新的變更日志主題。這些變更日志主題也分區(qū),因此,每個本地狀態(tài)存儲實例,在任務(wù)訪問倉里,都有自己的專用的變更日志分區(qū)。變更主題日志也啟用了日志壓縮,以便可以安全的清除舊數(shù)據(jù),以防止主題無限制的增長。如果任務(wù)失敗并在其他的機器上重新運行,則 Kafka Streams 在恢復(fù)新啟動的任務(wù)進(jìn)行處理之前,重放相應(yīng)的變更日志主題,保障在故障之前將其關(guān)聯(lián)的狀態(tài)存儲恢復(fù)。故障處理對于終端用戶是完全透明的。
請注意,任務(wù)(重新)初始化的成本通常主要取決于通過重放狀態(tài)倉庫變更日志主題來恢復(fù)狀態(tài)的時間。為了減少恢復(fù)時間,用戶可以配置他們的應(yīng)用程序增加本地狀態(tài)的備用副本(即完全的復(fù)制狀態(tài))。當(dāng)一個任務(wù)遷移發(fā)生時,Kafka Streams 嘗試去分配任務(wù)給應(yīng)用實例,提前配置了備用副本的應(yīng)用實例就可以減少任務(wù)(重新)初始化的成本。
4.2 創(chuàng)建流
記錄流(KStreams)或變更日志流(KTable或GlobalkTable)可以從一個或多個 Kafka 主題創(chuàng)建源流,(而 KTable 和 GlobalKTable,只能從單個主題創(chuàng)建源流)。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4", "globalStoreName");
左右滑動查看完整代碼
4.3 流回寫 Kafka
在處理結(jié)束后,開發(fā)者可以通過 KStream.to 和 KTable.to 將最終的結(jié)果流(連續(xù)不斷的)寫回 Kafka 主題。
joined.to("topic4");
如果已經(jīng)通過上面的to方法寫入到一個主題中,但是如果你還需要繼續(xù)讀取和處理這些消息,可以從輸出主題構(gòu)建一個新流,Kafka Streams 提供了便利的方法,through:
// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream materialized = joined.through("topic4");
左右滑動查看完整代碼
4.4 流程序的配置與啟執(zhí)行
除了定義的 topology,開發(fā)者還需要在運行它之前在 StreamsConfig 配置他們的應(yīng)用程序,Kafka Streams 配置的完整列表可以在這里找到。
Kafka Streams 中指定配置和生產(chǎn)者、消費者客戶端類似,通常,你創(chuàng)建一個 java.util.Properties,設(shè)置必要的參數(shù),并通過 Properties 實例構(gòu)建一個 StreamsConfig 實例。
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
// Any further settings
settings.put(... , ...);
// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);
除了 Kafka Streams 自己配置參數(shù),你也可以為 Kafka 內(nèi)部的消費者和生產(chǎn)者指定參數(shù)。根據(jù)你應(yīng)用的需要。類似于 Streams 設(shè)置,你可以通過 StreamsConfig 設(shè)置任何消費者和/或生產(chǎn)者配置。請注意,一些消費者和生產(chǎn)者配置參數(shù)使用相同的參數(shù)名。例如,用于配置 TCP 緩沖的 send.buffer.bytes 或 receive.buffer.bytes。用于控制客戶端請求重試的 request.timeout.ms 和 retry.backoff.ms。如果需要為消費者和生產(chǎn)者設(shè)置不同的值,可以使用 consumer. 或 producer. 作為參數(shù)名稱的前綴。
Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);
你可以在應(yīng)用程序代碼中的任何地方使用 Kafka Streams ,常見的是在應(yīng)用程序的 main() 方法中使用。
首先,先創(chuàng)建一個 KafkaStreams 實例,其中構(gòu)造函數(shù)的第一個參數(shù)用于定義一個
topology builder(Streams DSL的KStreamBuilder,或 Processor API 的 TopologyBuilder)。
第二個參數(shù)是上面提到的 StreamsConfig 的實例。
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
KStreamBuilder builder = ...; // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
在這點上,內(nèi)部結(jié)果已經(jīng)初始化,但是處理還沒有開始。你必須通過調(diào)用 start() 方法啟動 Kafka Streams 線程:
// Start the Kafka Streams instance
streams.start();
捕獲任何意外的異常,設(shè)置 java.lang.Thread.UncaughtExceptionHandler。
每當(dāng)流線程由于意外終止時,將調(diào)用此處理程序。
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public uncaughtException(Thread t, throwable e) {
// here you should examine the exception and perform an appropriate action!
}
);
close() 方法結(jié)束程序。
// Stop the Kafka Streams instance
streams.close();
現(xiàn)在,運行你的應(yīng)用程序,像其他的 Java 應(yīng)用程序一樣(Kafka Sterams 沒有任何特殊的要求)。同樣,你也可以打包成 jar,通過以下方式運行:
# Start the application in class com.example.MyStreamsApp
# from the fat jar named path-to-app-fatjar.jar.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
當(dāng)應(yīng)用程序?qū)嵗_始運行時,定義的處理器拓?fù)鋵⒈怀跏蓟?個或多個流任務(wù),可以由實例內(nèi)的流線程并行的執(zhí)行。如果處理器拓?fù)涠x了狀態(tài)倉庫,則這些狀態(tài)倉庫在初始化流任務(wù)期間(重新)構(gòu)建。這一點要理解,當(dāng)如上所訴的啟動你的應(yīng)用程序時,實際上 Kafka Streams 認(rèn)為你發(fā)布了一個實例?,F(xiàn)實場景中,更常見的是你的應(yīng)用程序有多個實例并行運行(如,其他的 JVM 中或別的機器上)。在這種情況下,Kafka Streams 會將任務(wù)從現(xiàn)有的實例中分配給剛剛啟動的新實例。
五、監(jiān)控數(shù)據(jù) ETL 中 Kafka Streams 參數(shù)及其調(diào)優(yōu)
5.1 必配參數(shù)
- bootstrap.servers:這是 Kafka 集群的地址列表,Kafka Streams 使用它來初始化與 Kafka 的連接。
- key.deserializer 和 value.deserializer:這些配置定義了流中鍵和值的序列化和反序列化器。
- auto.offset.reset:當(dāng)沒有初始偏移量或偏移量無效時,這個配置定義了 Kafka Streams 如何處理。
- group.id:這對于使用 Kafka Streams 的消費者組來說很重要,它定義了消費者組的ID。
5.2 基礎(chǔ)參數(shù)
- num.stream.threads:定義 Kafka Streams 應(yīng)用程序中的線程數(shù),默認(rèn)與處理器的邏輯核心數(shù)相等。
- state.dir:定義 Kafka Streams 存儲狀態(tài)的本地目錄。
- threading.max.instances:定義每個主題分區(qū)的最大線程實例數(shù),默認(rèn)與分區(qū)數(shù)相等。
- threading.instances:定義每個主題分區(qū)的線程實例數(shù),默認(rèn)與分區(qū)數(shù)相等。
5.3 消費者參數(shù)
- enable.auto.commit:自動提交偏移量,默認(rèn)值為"true",建議設(shè)置為"false",以便更好地控制偏移量的提交。
- commit.interval.ms:提交偏移量的頻率,默認(rèn)值為5000ms,可以根據(jù)需要進(jìn)行調(diào)整。
- max.poll.records:一次拉取的消息數(shù)量,默認(rèn)值為1000,可以根據(jù)網(wǎng)絡(luò)帶寬和處理能力進(jìn)行調(diào)整。
5.4 生產(chǎn)者參數(shù)
- batch.size:批量發(fā)送消息的大小,默認(rèn)值通常是16384(字節(jié)),可以根據(jù)網(wǎng)絡(luò)帶寬和 Kafka 集群的性能進(jìn)行調(diào)整。
- linger.ms:消息在生產(chǎn)者緩沖區(qū)中的最小停留時間,默認(rèn)值為100ms,可以根據(jù)需要進(jìn)行調(diào)整。
- compression.type:壓縮類型,可以提高網(wǎng)絡(luò)帶寬利用率,但會增加 CPU 開銷。默認(rèn)值為"none",可以根據(jù)需要設(shè)置為"gzip"、“snappy"或"lz4”。
對于 Kafka 的調(diào)優(yōu)參數(shù),可以根據(jù)實際的應(yīng)用場景和性能需求進(jìn)行調(diào)整,以達(dá)到最佳的性能和穩(wěn)定性。
六、監(jiān)控數(shù)據(jù) ETL 中 Kafka Streams 的分區(qū)傾斜問題原因和解決方式
6.1 原因
分區(qū)傾斜是監(jiān)控數(shù)據(jù) ETL 的 Kafka Streams 在處理大規(guī)模數(shù)據(jù)流時遇到的常見問題。分區(qū)傾斜指的是在一個流處理應(yīng)用程序中,某個分區(qū)的消息消費速度遠(yuǎn)遠(yuǎn)慢于其他分區(qū),或某個分區(qū)的延遲積壓數(shù)據(jù)遠(yuǎn)大于其他分區(qū),導(dǎo)致 Kafka Streams 的實時性受到限制。
產(chǎn)生分區(qū)傾斜的原因可能包括:
- 數(shù)據(jù)分布不均勻:原始數(shù)據(jù)在 Kafka 主題的分區(qū)中分布不均勻,導(dǎo)致某些分區(qū)的消息量遠(yuǎn)大于其他分區(qū)。
- 消費者實例數(shù)量不足:在 Kafka Streams 應(yīng)用程序中,消費者的實例數(shù)量不足,無法充分處理所有分區(qū)的消息。
- 消費者負(fù)載不均衡:消費者的負(fù)載不均衡(包括但不限于某些消費者實例處理的分區(qū)數(shù)大于其他實例),導(dǎo)致某些消費者實例處理的消息量遠(yuǎn)大于其他實例。
- 消費者實例負(fù)載不均衡:消費者實例性能不一致或性能被擠占,導(dǎo)致消費能力不均衡,消費速率異常小于平均消費速率
6.2 解決方案
- 數(shù)據(jù)均衡策略:在設(shè)計 Kafka 主題分區(qū)分配策略時,可以采用如輪詢(Round-robin)或范圍(Range)等均衡策略,使得數(shù)據(jù)在各個分區(qū)之間均勻分布。
- 增加消費者實例:根據(jù)應(yīng)用程序的實際情況,適當(dāng)增加消費者的實例數(shù)量,以提高整個系統(tǒng)的處理能力,例如擴容。
- 負(fù)載均衡策略:在消費者組內(nèi)部實現(xiàn)負(fù)載均衡,如使用均勻分配消費者(Uniform Distribution Consumer)等策略,確保消費者實例之間的負(fù)載均衡,例如重啟或剔除傾斜分區(qū)實例使 Kafka Streams 的分區(qū)進(jìn)行重新分配。
- 優(yōu)化消費者處理邏輯:分析消費者處理消息的速度慢的原因,優(yōu)化處理邏輯,提高消費者的處理能力。
- 調(diào)整批次大小和窗口函數(shù):通過調(diào)整 Kafka Streams 的批次大小和窗口函數(shù)等參數(shù),降低消費者的處理壓力。
- 使用側(cè)輸出:對于一些處理速度較慢的分區(qū),可以考慮使用側(cè)輸出將部分消息引流至其他系統(tǒng)處理,減輕消費者負(fù)載。
七、總結(jié)
本文介紹了 Kafka Streams 在監(jiān)控場景中的應(yīng)用,闡述了 Kafka Streams 的基本概念,包括流、處理器拓?fù)?、流處理器、時間概念等,舉例說明了 Kafka Streams 在監(jiān)控實時數(shù)據(jù)ETL中的具體應(yīng)用,并詳細(xì)解釋了 Kafka Streams 的運作原理,包括其架構(gòu)、創(chuàng)建流、流回寫 Kafka、流程序配置與啟執(zhí)行等內(nèi)容。文章還介紹了 Kafka Streams 的參數(shù)及其調(diào)優(yōu)方法,以及可能出現(xiàn)的分區(qū)傾斜問題及其解決方法。
本文意在讓讀者對于 Kafka 流在監(jiān)控業(yè)務(wù)的實際應(yīng)用有所認(rèn)識,并且了解 Kafka 流的基本概念和原理,閱讀本文后對構(gòu)建自己 Kafka 流應(yīng)用程序有所幫助,能夠理解在監(jiān)控數(shù)據(jù) ETL 常見分區(qū)傾斜的原理和解決方式。
八、引用
Kafka 官網(wǎng):https://kafka.apache.org/