自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用Kafka和Druid了解Spark流

大數(shù)據(jù) Kafka Spark
在本博文中,我將分享通過將Spark Streaming,Kafka和Apache Druid結合在一起以構建實時分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識。

[[326057]]

作為一名數(shù)據(jù)工程師,我正在研究大數(shù)據(jù)技術,例如Spark Streaming,Kafka和Apache Druid。 他們都有自己的教程和RTFM頁面。 但是,將這些技術大規(guī)模地組合在一起時,您會發(fā)現(xiàn)自己正在尋找涵蓋更復雜的生產(chǎn)用例的解決方案。 在本博文中,我將分享通過將Spark Streaming,Kafka和Apache Druid結合在一起以構建實時分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識。

在開始之前……關于實時分析的幾句話

實時分析是大數(shù)據(jù)技術的新趨勢,通常具有顯著的業(yè)務影響。 在分析新鮮數(shù)據(jù)時,見解更加精確。 例如,為數(shù)據(jù)分析師,BI和客戶經(jīng)理團隊提供實時分析儀表板可以幫助這些團隊做出快速決策。 大規(guī)模實時分析的常用架構基于Spark Streaming和Kafka。 這兩種技術都具有很好的可擴展性。 它們在群集上運行,并在許多計算機之間分配負載。 Spark作業(yè)的輸出可以到達許多不同的目的地,這取決于特定的用例和體系結構。 我們的目標是提供顯示實時事件的可視工具。 為此,我們選擇了Apache Druid數(shù)據(jù)庫。

Apache Druid中的數(shù)據(jù)可視化

Druid是高性能的實時分析數(shù)據(jù)庫。 它的好處之一是能夠使用來自Kafka主題的實時數(shù)據(jù),并使用Pivot模塊在其上構建強大的可視化效果。 它的可視化功能可以運行各種臨時的"切片和切塊"查詢,并快速獲得可視化結果。 這對于分析各種用例非常有用,例如特定運動在某些國家的表現(xiàn)。 實時檢索數(shù)據(jù),延遲1-2分鐘。

架構

因此,我們決定基于Kafka事件和Apache Druid構建實時分析系統(tǒng)。 我們已經(jīng)在Kafka主題中進行過活動。 但是我們不能將它們直接攝取到德魯伊中。 我們需要為每個事件添加更多維度。 我們需要用更多的數(shù)據(jù)豐富每個事件,以便在德魯伊中方便地查看它。 關于規(guī)模,我們每分鐘要處理數(shù)十萬個事件,因此我們需要使用能夠支持這些數(shù)字的技術。 我們決定使用Spark Streaming作業(yè)豐富原始的Kafka事件。

 

使用Kafka和Druid了解Spark流
圖1.實時分析架構

Spark Streaming作業(yè)永遠運行? 并不是的。

Spark Streaming作業(yè)的想法是它始終在運行。 這項工作永遠都不應停止。 它不斷讀取來自Kafka主題的事件,對其進行處理,并將輸出寫入另一個Kafka主題。 但是,這是一個樂觀的看法。 在現(xiàn)實生活中,事情更加復雜。 Spark群集中存在驅動程序故障,在這種情況下,作業(yè)將重新啟動。 有時新版本的spark應用程序已部署到生產(chǎn)中。 在這種情況下會發(fā)生什么? 重新啟動的作業(yè)如何讀取Kafka主題并處理事件? 在深入研究這些細節(jié)之前,此圖顯示了重新啟動Spark Streaming作業(yè)時在Druid中看到的內容:

 

使用Kafka和Druid了解Spark流
圖2.作業(yè)重新啟動時數(shù)據(jù)丟失

絕對是數(shù)據(jù)丟失!

我們要解決什么問題?

我們正在處理Spark Streaming應用程序,該應用程序從一個Kafka主題讀取事件,并將事件寫入另一個Kafka主題。 這些事件稍后將在Druid中顯示。 我們的目標是在重新啟動Spark Streaming應用程序期間實現(xiàn)平滑的數(shù)據(jù)可視化。 換句話說,我們需要確保在Spark Streaming作業(yè)重啟期間不會丟失或重復任何事件。

都是關于補償

為了理解為什么作業(yè)重新啟動時會丟失數(shù)據(jù),我們需要熟悉Kafka體系結構中的一些術語。 您可以在這里找到Kafka的官方文檔。 簡而言之:Kafka中的事件存儲在主題中; 每個主題都分為多個分區(qū)。 分區(qū)中的每個記錄都有一個偏移量-一個連續(xù)的數(shù)字,它定義了記錄的順序。 當應用程序使用該主題時,它可以通過多種方式處理偏移量。 默認行為始終是從最新的偏移量讀取。 另一個選擇是提交偏移量,即持久保留偏移量,以便作業(yè)可以在重新啟動時讀取已提交的偏移量并從此處繼續(xù)。 讓我們看一下解決方案的步驟,并在每個步驟中加深對Kafka膠印管理的了解。

步驟#1-自動提交偏移量

默認行為始終是從最新的偏移量讀取。 這將不起作用,因為重新啟動作業(yè)時,該主題中有新事件。 如果作業(yè)從最新讀取,它將丟失重新啟動期間添加的所有消息,如圖2所示。Spark Streaming中有一個" enable.auto.commit"參數(shù)。 默認情況下,其值為false。 圖3顯示了將其值更改為true,運行Spark應用程序并重新啟動后的行為。

 

使用Kafka和Druid了解Spark流
圖3.作業(yè)重啟的數(shù)據(jù)峰值

我們可以看到,使用Kafka自動提交功能會產(chǎn)生新的效果。 沒有"數(shù)據(jù)丟失",但是現(xiàn)在我們看到重復的事件。 沒有真正的事件"爆發(fā)"。 實際發(fā)生的情況是自動提交機制"不時"提交偏移量。 輸出主題中有許多未提交的消息。 重新啟動后,作業(yè)將使用最新提交的偏移量中的消息,并再次處理其中一些事件。 這就是為什么在輸出中會出現(xiàn)大量事件的原因。

顯然,將這些重復項合并到我們的可視化中可能會誤導業(yè)務消費者此數(shù)據(jù),并影響他們的決策和對系統(tǒng)的信任。

步驟#2:手動提交Kafka偏移

因此,我們不能依靠Kafka自動提交功能。 我們需要自己進行卡夫卡補償。 為了做到這一點,讓我們看看Spark Streaming如何使用Kafka主題中的數(shù)據(jù)。 Spark Streaming使用稱為離散流或DStream的體系結構。 DStream由一系列連續(xù)的RDD(彈性分布式數(shù)據(jù)集)表示,這是Spark的主要抽象之一。 大多數(shù)Spark Streaming作業(yè)如下所示:

  1. dstream.foreachRDD { rdd => rdd.foreach { record => process(record)} } 

在我們的案例中,處理記錄意味著將記錄寫入輸出Kafka主題。 因此,為了提交Kafka偏移量,我們需要執(zhí)行以下操作:

  1. dstream.foreachRDD { rdd => val offsetRanges =  
  2. rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreach { record  
  3. => process(record)}  
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } 

這是一種簡單明了的方法,在我們深入討論之前,讓我們看一下大局。 假設我們正確處理了偏移量。 即,在每次RDD處理之后都保存偏移量。 當我們停止工作時會怎樣? 該作業(yè)在RDD的處理過程中停止。 微批處理的部分將寫入輸出Kafka主題,并且不會提交。 一旦作業(yè)再次運行,它將第二次處理某些消息,并且重復消息的峰值將(與以前一樣)出現(xiàn)在Druid中:

 

使用Kafka和Druid了解Spark流
圖4.作業(yè)重新啟動時的數(shù)據(jù)峰值

正常關機

事實證明,有一種方法可以確保在RDD處理期間不會殺死作業(yè)。這稱為"正常關機"。有幾篇博客文章描述了如何優(yōu)雅地殺死Spark應用程序,但是其中大多數(shù)與舊版本的Spark有關,并且有很多限制。我們一直在尋找一種適用于任何規(guī)模且不依賴于特定Spark版本或操作系統(tǒng)的"安全"解決方案。要啟用正常關機,應使用以下參數(shù)創(chuàng)建Spark上下文:spark.streaming.stopGracefullyOnShutdown = true。這指示Spark在JVM關閉時(而不是立即)正常關閉StreamingContext。另外,我們需要一種機制來有意地停止工作,例如在部署新版本時。我們已經(jīng)通過簡單地檢查是否存在指示作業(yè)關閉的HDFS文件來實現(xiàn)該機制的第一個版本。當文件顯示在HDFS中時,流上下文將使用以下參數(shù)停止:ssc.stop(stopSparkContext = true,stopGracefully = true)

在這種情況下,只有在完成所有接收到的數(shù)據(jù)處理之后,Spark應用程序才會正常停止。 這正是我們所需要的。

步驟#3:Kafka commitAsync

讓我們回顧一下到目前為止的情況。 我們有意在每個RDD處理中提交Kafka偏移量(使用Kafka commitAsync API),并使用Spark正常關機。 顯然,還有另一個警告。 深入研究Kafka API和Kafka commitAsync()源代碼的文檔,我了解到commitAsync()僅將offsetRanges放入隊列中,實際上僅在下一個foreachRDD循環(huán)中進行處理。 即使Spark作業(yè)正常停止并完成了所有RDD的處理,實際上也不會提交最后一個RDD的偏移量。 為解決此問題,我們實現(xiàn)了一個代碼,該代碼可同步保留Kafka偏移量,并且不依賴于Kafka commitAsync()。 然后,對于每個RDD,我們將提交的偏移量存儲在HDFS文件中。 當作業(yè)再次開始運行時,它將從HDFS加載偏移文件,并從這些偏移開始使用Kafka主題。

在這里,它有效!

僅僅是正常關機和Kafka偏移量同步存儲的組合,才為我們提供了理想的結果。 重新啟動期間沒有數(shù)據(jù)丟失,沒有數(shù)據(jù)高峰:

 

使用Kafka和Druid了解Spark流
圖5.重新啟動Spark作業(yè)期間沒有峰值數(shù)據(jù)丟失

結論

解決Spark Streaming和Kafka之間的集成問題是構建實時分析儀表板的重要里程碑。 我們找到了可以確保穩(wěn)定的數(shù)據(jù)流的解決方案,而不會在Spark Streaming作業(yè)重啟期間丟失事件或重復。 現(xiàn)在,我們獲得了在Druid中可視化的可信賴數(shù)據(jù)。 因此,我們將更多類型的事件(Kafka主題)添加到了Druid中,并建立了實時儀表板。 這些儀表板為各種團隊提供了見解,例如BI,產(chǎn)品和客戶支持。 我們的下一個目標是利用Druid的更多功能,例如新的分析功能和警報。

 

責任編輯:未麗燕 來源: 今日頭條
相關推薦

2024-08-21 08:00:00

2023-12-11 08:00:00

架構FlinkDruid

2021-12-14 09:56:51

HadoopSparkKafka

2022-08-26 17:08:51

KafkaRedi數(shù)據(jù)

2019-07-26 15:01:42

SparkShuffle內存

2019-06-06 15:22:07

SparkShuffle內存

2019-07-05 12:16:26

大數(shù)據(jù)IT互聯(lián)網(wǎng)

2020-06-28 07:39:44

Kafka分布式消息

2020-05-29 09:48:54

Python開發(fā)Kafka

2022-07-04 09:05:04

DruidSqlParserSQL

2019-12-06 09:41:40

開源技術 軟件

2018-08-19 09:15:25

MongoDBGo 微服務

2019-11-05 11:56:58

Kafka微服務開發(fā)

2023-11-02 10:39:58

2020-06-28 13:54:22

Apache Spar窗口函數(shù)數(shù)據(jù)

2018-04-25 08:45:46

大數(shù)據(jù)

2020-06-08 18:41:07

Kafka微服務Web

2022-06-24 08:00:00

編程工具數(shù)據(jù)結構開發(fā)

2022-08-22 08:07:45

DruidMySQL密碼

2021-12-02 07:50:30

字節(jié)緩沖流使用
點贊
收藏

51CTO技術棧公眾號