Spark Streaming場(chǎng)景應(yīng)用- Spark Streaming計(jì)算模型及監(jiān)控
摘要
Spark Streaming是一套優(yōu)秀的實(shí)時(shí)計(jì)算框架。其良好的可擴(kuò)展性、高吞吐量以及容錯(cuò)機(jī)制能夠滿(mǎn)足我們很多的場(chǎng)景應(yīng)用。本篇結(jié)合我們的應(yīng)用場(chǎng)景,介結(jié)我們?cè)谑褂肧park Streaming方面的技術(shù)架構(gòu),并著重講解Spark Streaming兩種計(jì)算模型,無(wú)狀態(tài)和狀態(tài)計(jì)算模型以及該兩種模型的注意事項(xiàng);接著介紹了Spark Streaming在監(jiān)控方面所做的一些事情,***總結(jié)了Spark Streaming的優(yōu)缺點(diǎn)。
一、概述
數(shù)據(jù)是非常寶貴的資源,對(duì)各級(jí)企事業(yè)單均有非常高的價(jià)值。但是數(shù)據(jù)的爆炸,導(dǎo)致原先單機(jī)的數(shù)據(jù)處理已經(jīng)無(wú)法滿(mǎn)足業(yè)務(wù)的場(chǎng)景需求。因此在此基礎(chǔ)上出現(xiàn)了一些優(yōu)秀的分布式計(jì)算框架,諸如Hadoop、Spark等。離線分布式處理框架雖然能夠處理非常大量的數(shù)據(jù),但是其遲滯性很難滿(mǎn)足一些特定的需求場(chǎng)景,比如push反饋、實(shí)時(shí)推薦、實(shí)時(shí)用戶(hù)行為等。為了滿(mǎn)足這些場(chǎng)景,使數(shù)據(jù)處理能夠達(dá)到實(shí)時(shí)的響應(yīng)和反饋,又隨之出現(xiàn)了實(shí)時(shí)計(jì)算框架。目前的實(shí)時(shí)處理框架有Apache Storm、Apache Flink以及Spark Streaming等。其中Spark Streaming由于其本身的擴(kuò)展性、高吞吐量以及容錯(cuò)能力等特性,并且能夠和離線各種框架有效結(jié)合起來(lái),因而是當(dāng)下是比較受歡迎的一種流式處理框架。
根據(jù)其官方文檔介紹,Spark Streaming 有高擴(kuò)展性、高吞吐量和容錯(cuò)能力強(qiáng)的特點(diǎn)。Spark Streaming 支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡(jiǎn)單的 TCP 套接字等等。數(shù)據(jù)輸入后可以用 Spark 的高度抽象原語(yǔ)如:map、reduce、join、window 等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如 HDFS,數(shù)據(jù)庫(kù)等。另外 Spark Streaming 也能和 MLlib(機(jī)器學(xué)習(xí))以及 Graphx ***融合。其架構(gòu)見(jiàn)下圖:
Spark Streaming 其優(yōu)秀的特點(diǎn)給我們帶來(lái)很多的應(yīng)用場(chǎng)景,如網(wǎng)站監(jiān)控和網(wǎng)絡(luò)監(jiān)控、異常監(jiān)測(cè)、網(wǎng)頁(yè)點(diǎn)擊、用戶(hù)行為、用戶(hù)遷移等。本文中,將為大家詳細(xì)介紹,我們的應(yīng)用場(chǎng)景中,Spark Streaming的技術(shù)架構(gòu)、兩種狀態(tài)模型以及Spark Streaming監(jiān)控等。
二、應(yīng)用場(chǎng)景
在 Spark Streaming 中,處理數(shù)據(jù)的單位是一批而不是單條,而數(shù)據(jù)采集卻是逐條進(jìn)行的,因此 Spark Streaming 系統(tǒng)需要設(shè)置間隔使得數(shù)據(jù)匯總到一定的量后再一并操作,這個(gè)間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關(guān)鍵參數(shù),它決定了 Spark Streaming 提交作業(yè)的頻率和數(shù)據(jù)處理的延遲,同時(shí)也影響著數(shù)據(jù)處理的吞吐量和性能。
2.1 框架
目前我們Spark Streaming的業(yè)務(wù)應(yīng)用場(chǎng)景包括異常監(jiān)測(cè)、網(wǎng)頁(yè)點(diǎn)擊、用戶(hù)行為以及用戶(hù)地圖遷徙等場(chǎng)景。按計(jì)算模型來(lái)看大體可分為無(wú)狀態(tài)的計(jì)算模型以及狀態(tài)計(jì)算模型兩種。在實(shí)際的應(yīng)用場(chǎng)景中,我們采用Kafka作為實(shí)時(shí)輸入源,Spark Streaming作為計(jì)算引擎處理完數(shù)據(jù)之后,再持久化到存儲(chǔ)中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同時(shí)Spark Streaming 數(shù)據(jù)清洗后也會(huì)寫(xiě)入Kafka,然后經(jīng)由Flume持久化到HDFS;接著基于持久化的內(nèi)容做一些UI的展現(xiàn)。架構(gòu)見(jiàn)下圖:
2.2 無(wú)狀態(tài)模型
無(wú)狀態(tài)模型只關(guān)注當(dāng)前新生成的DStream數(shù)據(jù),所以的計(jì)算邏輯均基于該批次的數(shù)據(jù)進(jìn)行處理。無(wú)狀態(tài)模型能夠很好地適應(yīng)一些應(yīng)用場(chǎng)景,比如網(wǎng)站點(diǎn)擊實(shí)時(shí)排行榜、指定batch時(shí)間段的用戶(hù)訪問(wèn)以及點(diǎn)擊情況等。該模型由于沒(méi)有狀態(tài),并不需要考慮有狀態(tài)的情況,只需要根據(jù)業(yè)務(wù)場(chǎng)景保證數(shù)據(jù)不丟就行。此種情況一般采用Direct方式讀取Kafka數(shù)據(jù),并采用監(jiān)聽(tīng)器方式持久化Offsets即可。具體流程如下:
其上模型框架包含以下幾個(gè)處理步驟:
- 讀取Kafka實(shí)時(shí)數(shù)據(jù);
- Spark Streaming Transformations以及actions操作;
- 將數(shù)據(jù)結(jié)果持久化到存儲(chǔ)中,跳轉(zhuǎn)到步驟一。
受網(wǎng)絡(luò)、集群等一些因素的影響,實(shí)時(shí)程序出現(xiàn)長(zhǎng)時(shí)失敗,導(dǎo)致數(shù)據(jù)出現(xiàn)堆積。此種情況下是丟掉堆積的數(shù)據(jù)從Kafka largest處消費(fèi)還是從之前的Kafka offsets處消費(fèi),這個(gè)取決具體的業(yè)務(wù)場(chǎng)景。
2.3 狀態(tài)模型
有狀態(tài)模型是指DStreams在指定的時(shí)間范圍內(nèi)有依賴(lài)關(guān)系,具體的時(shí)間范圍由業(yè)務(wù)場(chǎng)景來(lái)指定,可以是2個(gè)及以上的多個(gè)batch time RDD組成。Spark Streaming提供了updateStateByKey方法來(lái)滿(mǎn)足此類(lèi)的業(yè)務(wù)場(chǎng)景。因涉及狀態(tài)的問(wèn)題,所以在實(shí)際的計(jì)算過(guò)程中需要保存計(jì)算的狀態(tài),Spark Streaming中通過(guò)checkpoint來(lái)保存計(jì)算的元數(shù)據(jù)以及計(jì)算的進(jìn)度。該狀態(tài)模型的應(yīng)用場(chǎng)景有網(wǎng)站具體模塊的累計(jì)訪問(wèn)統(tǒng)計(jì)、最近N batch time 的網(wǎng)站訪問(wèn)情況以及app新增累計(jì)統(tǒng)計(jì)等等。具體流程如下:
上述流程中,每batch time計(jì)算時(shí),需要依賴(lài)最近2個(gè)batch time內(nèi)的數(shù)據(jù),經(jīng)過(guò)轉(zhuǎn)換及相關(guān)統(tǒng)計(jì),最終持久化到MySQL中去。不過(guò)為了確保每個(gè)計(jì)算僅計(jì)算2個(gè)batch time內(nèi)的數(shù)據(jù),需要維護(hù)數(shù)據(jù)的狀態(tài),清除過(guò)期的數(shù)據(jù)。我們先來(lái)看下updateStateByKey的實(shí)現(xiàn),其代碼如下:
- 暴露了全局狀態(tài)數(shù)據(jù)中的key類(lèi)型的方法。
- def updateStateByKey[S: ClassTag](
- updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
- partitioner: Partitioner,
- rememberPartitioner: Boolean
- ): DStream[(K, S)] = ssc.withScope {
- new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
- }
隱藏了全局狀態(tài)數(shù)據(jù)中的key類(lèi)型,僅對(duì)Value提供自定義的方法。
- def updateStateByKey[S: ClassTag](
- updateFunc: (Seq[V], Option[S]) => Option[S],
- partitioner: Partitioner,
- initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = ssc.withScope {
- val cleanedUpdateF = sparkContext.clean(updateFunc)
- val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
- iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
- }
- updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
- }
以上兩種方法分別給我們提供清理過(guò)期數(shù)據(jù)的思路:
- 泛型K進(jìn)行過(guò)濾。K表示全局狀態(tài)數(shù)據(jù)中對(duì)應(yīng)的key,如若K不滿(mǎn)足指定條件則反回false;
- 返回值過(guò)濾。第二個(gè)方法中自定義函數(shù)指定了Option[S]返回值,若過(guò)期數(shù)據(jù)返回None,那么該數(shù)據(jù)將從全局狀態(tài)中清除。
三、Spark Streaming監(jiān)控
同Spark一樣,Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的監(jiān)控,其中Streaming監(jiān)控頁(yè)的內(nèi)容如下圖:
上圖是Spark UI中提供一些數(shù)據(jù)監(jiān)控,包括實(shí)時(shí)輸入數(shù)據(jù)、Scheduling Delay、處理時(shí)間以及總延遲的相關(guān)監(jiān)控?cái)?shù)據(jù)的趨勢(shì)展現(xiàn)。另外除了提供上述數(shù)據(jù)監(jiān)控外,Spark UI還提供了Active Batches以及Completed Batches相關(guān)信息。Active Batches包含當(dāng)前正在處理的batch信息以及堆積的batch相關(guān)信息,而Completed Batches剛提供每個(gè)batch處理的明細(xì)數(shù)據(jù),具體包括batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見(jiàn)下圖:
Spark Streaming能夠提供如此優(yōu)雅的數(shù)據(jù)監(jiān)控,是因在對(duì)監(jiān)聽(tīng)器設(shè)計(jì)模式的使用。如若Spark UI無(wú)法滿(mǎn)足你所需的監(jiān)控需要,用戶(hù)可以定制個(gè)性化監(jiān)控信息。Spark Streaming提供了StreamingListener特質(zhì),通過(guò)繼承此方法,就可以定制所需的監(jiān)控,其代碼如下:
- @DeveloperApi
- trait StreamingListener {
- /** Called when a receiver has been started */
- def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
- /** Called when a receiver has reported an error */
- def onReceiverError(receiverError: StreamingListenerReceiverError) { }
- /** Called when a receiver has been stopped */
- def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
- /** Called when a batch of jobs has been submitted for processing. */
- def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
- /** Called when processing of a batch of jobs has started. */
- def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
- /** Called when processing of a batch of jobs has completed. */
- def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
- /** Called when processing of a job of a batch has started. */
- def onOutputOperationStarted(
- outputOperationStarted: StreamingListenerOutputOperationStarted) { }
- /** Called when processing of a job of a batch has completed. */
- def onOutputOperationCompleted(
- outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
- }
目前,我們保存Offsets時(shí),采用繼承StreamingListener方式,此是一種應(yīng)用場(chǎng)景。當(dāng)然也可以監(jiān)控實(shí)時(shí)計(jì)算程序的堆積情況,并在達(dá)到一閾值后發(fā)送報(bào)警郵件。具體監(jiān)聽(tīng)器的定制還得依據(jù)應(yīng)用場(chǎng)景而定。
四、Spark Streaming優(yōu)缺點(diǎn)
Spark Streaming并非是Storm那樣,其并非是真正的流式處理框架,而是一次處理一批次數(shù)據(jù)。也正是這種方式,能夠較好地集成Spark 其他計(jì)算模塊,包括MLlib(機(jī)器學(xué)習(xí))、Graphx以及Spark SQL。這給實(shí)時(shí)計(jì)算帶來(lái)很大的便利,與此帶來(lái)便利的同時(shí),也犧牲作為流式的實(shí)時(shí)性等性能。
4.1 優(yōu)點(diǎn)
- Spark Streaming基于Spark Core API,因此其能夠與Spark中的其他模塊保持良好的兼容性,為編程提供了良好的可擴(kuò)展性;
- Spark Streaming 是粗粒度的準(zhǔn)實(shí)時(shí)處理框架,一次讀取完或異步讀完之后處理數(shù)據(jù),且其計(jì)算可基于大內(nèi)存進(jìn)行,因而具有較高的吞吐量;
- Spark Streaming采用統(tǒng)一的DAG調(diào)度以及RDD,因此能夠利用其lineage機(jī)制,對(duì)實(shí)時(shí)計(jì)算有很好的容錯(cuò)支持;
- Spark Streaming的DStream是基于RDD的在流式數(shù)據(jù)處理方面的抽象,其transformations 以及actions有較大的相似性,這在一定程度上降低了用戶(hù)的使用門(mén)檻,在熟悉Spark之后,能夠快速上手Spark Streaming。
4.2 缺點(diǎn)
- Spark Streaming是準(zhǔn)實(shí)時(shí)的數(shù)據(jù)處理框架,采用粗粒度的處理方式,當(dāng)batch time到時(shí)才會(huì)觸發(fā)計(jì)算,這并非像Storm那樣是純流式的數(shù)據(jù)處理方式。此種方式不可避免會(huì)出現(xiàn)相應(yīng)的計(jì)算延遲 。
- 目前來(lái)看,Spark Streaming穩(wěn)定性方面還是會(huì)存在一些問(wèn)題。有時(shí)會(huì)因一些莫名的異常導(dǎo)致退出,這種情況下得需要自己來(lái)保證數(shù)據(jù)一致性以及失敗重啟功能等。
四、總結(jié)
本篇文章主要介紹了Spark Streaming在實(shí)際應(yīng)用場(chǎng)景中的兩種計(jì)算模型,包括無(wú)狀態(tài)模型以及狀態(tài)模型;并且重點(diǎn)關(guān)注了下Spark Streaming在監(jiān)控方面所作的努力。首先本文介紹了Spark Streaming應(yīng)用場(chǎng)景以及在我們的實(shí)際應(yīng)用中所采取的技術(shù)架構(gòu)。在此基礎(chǔ)上,引入無(wú)狀態(tài)計(jì)算模型以及有狀態(tài)模型兩種計(jì)算模型;接著通過(guò)監(jiān)聽(tīng)器模式介紹Spark UI相關(guān)監(jiān)控信息等;***對(duì)Spark Streaming的優(yōu)缺點(diǎn)進(jìn)行概括。希望本篇文章能夠給各位帶來(lái)幫助,后續(xù)我們會(huì)介紹Spark Streaming在場(chǎng)景應(yīng)用中我們所做的優(yōu)化方面的努力,敬請(qǐng)期待!
關(guān)于作者
徐勝?lài)?guó),大連理工大學(xué)碩士畢業(yè),360大數(shù)據(jù)中心數(shù)據(jù)研發(fā)工程師,主要負(fù)責(zé)基于Spark Streaming的項(xiàng)目架構(gòu)及研發(fā)工作。郵箱 : xshguo_better@yeah.net。如有問(wèn)題,可郵件聯(lián)系,歡迎交流。