自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark Streaming場(chǎng)景應(yīng)用- Spark Streaming計(jì)算模型及監(jiān)控

大數(shù)據(jù) Spark
本篇文章主要介紹了Spark Streaming在實(shí)際應(yīng)用場(chǎng)景中的兩種計(jì)算模型,包括無(wú)狀態(tài)模型以及狀態(tài)模型;并且重點(diǎn)關(guān)注了下Spark Streaming在監(jiān)控方面所作的努力。

摘要

Spark Streaming是一套優(yōu)秀的實(shí)時(shí)計(jì)算框架。其良好的可擴(kuò)展性、高吞吐量以及容錯(cuò)機(jī)制能夠滿(mǎn)足我們很多的場(chǎng)景應(yīng)用。本篇結(jié)合我們的應(yīng)用場(chǎng)景,介結(jié)我們?cè)谑褂肧park Streaming方面的技術(shù)架構(gòu),并著重講解Spark Streaming兩種計(jì)算模型,無(wú)狀態(tài)和狀態(tài)計(jì)算模型以及該兩種模型的注意事項(xiàng);接著介紹了Spark Streaming在監(jiān)控方面所做的一些事情,***總結(jié)了Spark Streaming的優(yōu)缺點(diǎn)。

[[193096]]

一、概述

數(shù)據(jù)是非常寶貴的資源,對(duì)各級(jí)企事業(yè)單均有非常高的價(jià)值。但是數(shù)據(jù)的爆炸,導(dǎo)致原先單機(jī)的數(shù)據(jù)處理已經(jīng)無(wú)法滿(mǎn)足業(yè)務(wù)的場(chǎng)景需求。因此在此基礎(chǔ)上出現(xiàn)了一些優(yōu)秀的分布式計(jì)算框架,諸如Hadoop、Spark等。離線分布式處理框架雖然能夠處理非常大量的數(shù)據(jù),但是其遲滯性很難滿(mǎn)足一些特定的需求場(chǎng)景,比如push反饋、實(shí)時(shí)推薦、實(shí)時(shí)用戶(hù)行為等。為了滿(mǎn)足這些場(chǎng)景,使數(shù)據(jù)處理能夠達(dá)到實(shí)時(shí)的響應(yīng)和反饋,又隨之出現(xiàn)了實(shí)時(shí)計(jì)算框架。目前的實(shí)時(shí)處理框架有Apache Storm、Apache Flink以及Spark Streaming等。其中Spark Streaming由于其本身的擴(kuò)展性、高吞吐量以及容錯(cuò)能力等特性,并且能夠和離線各種框架有效結(jié)合起來(lái),因而是當(dāng)下是比較受歡迎的一種流式處理框架。

根據(jù)其官方文檔介紹,Spark Streaming 有高擴(kuò)展性、高吞吐量和容錯(cuò)能力強(qiáng)的特點(diǎn)。Spark Streaming 支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡(jiǎn)單的 TCP 套接字等等。數(shù)據(jù)輸入后可以用 Spark 的高度抽象原語(yǔ)如:map、reduce、join、window 等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如 HDFS,數(shù)據(jù)庫(kù)等。另外 Spark Streaming 也能和 MLlib(機(jī)器學(xué)習(xí))以及 Graphx ***融合。其架構(gòu)見(jiàn)下圖:

Spark Streaming 其優(yōu)秀的特點(diǎn)給我們帶來(lái)很多的應(yīng)用場(chǎng)景,如網(wǎng)站監(jiān)控和網(wǎng)絡(luò)監(jiān)控、異常監(jiān)測(cè)、網(wǎng)頁(yè)點(diǎn)擊、用戶(hù)行為、用戶(hù)遷移等。本文中,將為大家詳細(xì)介紹,我們的應(yīng)用場(chǎng)景中,Spark Streaming的技術(shù)架構(gòu)、兩種狀態(tài)模型以及Spark Streaming監(jiān)控等。

二、應(yīng)用場(chǎng)景

在 Spark Streaming 中,處理數(shù)據(jù)的單位是一批而不是單條,而數(shù)據(jù)采集卻是逐條進(jìn)行的,因此 Spark Streaming 系統(tǒng)需要設(shè)置間隔使得數(shù)據(jù)匯總到一定的量后再一并操作,這個(gè)間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關(guān)鍵參數(shù),它決定了 Spark Streaming 提交作業(yè)的頻率和數(shù)據(jù)處理的延遲,同時(shí)也影響著數(shù)據(jù)處理的吞吐量和性能。

2.1 框架

目前我們Spark Streaming的業(yè)務(wù)應(yīng)用場(chǎng)景包括異常監(jiān)測(cè)、網(wǎng)頁(yè)點(diǎn)擊、用戶(hù)行為以及用戶(hù)地圖遷徙等場(chǎng)景。按計(jì)算模型來(lái)看大體可分為無(wú)狀態(tài)的計(jì)算模型以及狀態(tài)計(jì)算模型兩種。在實(shí)際的應(yīng)用場(chǎng)景中,我們采用Kafka作為實(shí)時(shí)輸入源,Spark Streaming作為計(jì)算引擎處理完數(shù)據(jù)之后,再持久化到存儲(chǔ)中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同時(shí)Spark Streaming 數(shù)據(jù)清洗后也會(huì)寫(xiě)入Kafka,然后經(jīng)由Flume持久化到HDFS;接著基于持久化的內(nèi)容做一些UI的展現(xiàn)。架構(gòu)見(jiàn)下圖:

2.2 無(wú)狀態(tài)模型

無(wú)狀態(tài)模型只關(guān)注當(dāng)前新生成的DStream數(shù)據(jù),所以的計(jì)算邏輯均基于該批次的數(shù)據(jù)進(jìn)行處理。無(wú)狀態(tài)模型能夠很好地適應(yīng)一些應(yīng)用場(chǎng)景,比如網(wǎng)站點(diǎn)擊實(shí)時(shí)排行榜、指定batch時(shí)間段的用戶(hù)訪問(wèn)以及點(diǎn)擊情況等。該模型由于沒(méi)有狀態(tài),并不需要考慮有狀態(tài)的情況,只需要根據(jù)業(yè)務(wù)場(chǎng)景保證數(shù)據(jù)不丟就行。此種情況一般采用Direct方式讀取Kafka數(shù)據(jù),并采用監(jiān)聽(tīng)器方式持久化Offsets即可。具體流程如下:

其上模型框架包含以下幾個(gè)處理步驟:

  • 讀取Kafka實(shí)時(shí)數(shù)據(jù);
  • Spark Streaming Transformations以及actions操作;
  • 將數(shù)據(jù)結(jié)果持久化到存儲(chǔ)中,跳轉(zhuǎn)到步驟一。

受網(wǎng)絡(luò)、集群等一些因素的影響,實(shí)時(shí)程序出現(xiàn)長(zhǎng)時(shí)失敗,導(dǎo)致數(shù)據(jù)出現(xiàn)堆積。此種情況下是丟掉堆積的數(shù)據(jù)從Kafka largest處消費(fèi)還是從之前的Kafka offsets處消費(fèi),這個(gè)取決具體的業(yè)務(wù)場(chǎng)景。

2.3 狀態(tài)模型

有狀態(tài)模型是指DStreams在指定的時(shí)間范圍內(nèi)有依賴(lài)關(guān)系,具體的時(shí)間范圍由業(yè)務(wù)場(chǎng)景來(lái)指定,可以是2個(gè)及以上的多個(gè)batch time RDD組成。Spark Streaming提供了updateStateByKey方法來(lái)滿(mǎn)足此類(lèi)的業(yè)務(wù)場(chǎng)景。因涉及狀態(tài)的問(wèn)題,所以在實(shí)際的計(jì)算過(guò)程中需要保存計(jì)算的狀態(tài),Spark Streaming中通過(guò)checkpoint來(lái)保存計(jì)算的元數(shù)據(jù)以及計(jì)算的進(jìn)度。該狀態(tài)模型的應(yīng)用場(chǎng)景有網(wǎng)站具體模塊的累計(jì)訪問(wèn)統(tǒng)計(jì)、最近N batch time 的網(wǎng)站訪問(wèn)情況以及app新增累計(jì)統(tǒng)計(jì)等等。具體流程如下:

上述流程中,每batch time計(jì)算時(shí),需要依賴(lài)最近2個(gè)batch time內(nèi)的數(shù)據(jù),經(jīng)過(guò)轉(zhuǎn)換及相關(guān)統(tǒng)計(jì),最終持久化到MySQL中去。不過(guò)為了確保每個(gè)計(jì)算僅計(jì)算2個(gè)batch time內(nèi)的數(shù)據(jù),需要維護(hù)數(shù)據(jù)的狀態(tài),清除過(guò)期的數(shù)據(jù)。我們先來(lái)看下updateStateByKey的實(shí)現(xiàn),其代碼如下:

  • 暴露了全局狀態(tài)數(shù)據(jù)中的key類(lèi)型的方法。
  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
  3.       partitioner: Partitioner, 
  4.       rememberPartitioner: Boolean 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) 

隱藏了全局狀態(tài)數(shù)據(jù)中的key類(lèi)型,僅對(duì)Value提供自定義的方法。

  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Seq[V], Option[S]) => Option[S], 
  3.       partitioner: Partitioner, 
  4.       initialRDD: RDD[(K, S)] 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.     val cleanedUpdateF = sparkContext.clean(updateFunc) 
  7.     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { 
  8.       iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) 
  9.     } 
  10.     updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) 

以上兩種方法分別給我們提供清理過(guò)期數(shù)據(jù)的思路:

  • 泛型K進(jìn)行過(guò)濾。K表示全局狀態(tài)數(shù)據(jù)中對(duì)應(yīng)的key,如若K不滿(mǎn)足指定條件則反回false;
  • 返回值過(guò)濾。第二個(gè)方法中自定義函數(shù)指定了Option[S]返回值,若過(guò)期數(shù)據(jù)返回None,那么該數(shù)據(jù)將從全局狀態(tài)中清除。

三、Spark Streaming監(jiān)控

同Spark一樣,Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的監(jiān)控,其中Streaming監(jiān)控頁(yè)的內(nèi)容如下圖:

上圖是Spark UI中提供一些數(shù)據(jù)監(jiān)控,包括實(shí)時(shí)輸入數(shù)據(jù)、Scheduling Delay、處理時(shí)間以及總延遲的相關(guān)監(jiān)控?cái)?shù)據(jù)的趨勢(shì)展現(xiàn)。另外除了提供上述數(shù)據(jù)監(jiān)控外,Spark UI還提供了Active Batches以及Completed Batches相關(guān)信息。Active Batches包含當(dāng)前正在處理的batch信息以及堆積的batch相關(guān)信息,而Completed Batches剛提供每個(gè)batch處理的明細(xì)數(shù)據(jù),具體包括batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見(jiàn)下圖:

Spark Streaming能夠提供如此優(yōu)雅的數(shù)據(jù)監(jiān)控,是因在對(duì)監(jiān)聽(tīng)器設(shè)計(jì)模式的使用。如若Spark UI無(wú)法滿(mǎn)足你所需的監(jiān)控需要,用戶(hù)可以定制個(gè)性化監(jiān)控信息。Spark Streaming提供了StreamingListener特質(zhì),通過(guò)繼承此方法,就可以定制所需的監(jiān)控,其代碼如下:

  1. @DeveloperApi 
  2.     trait StreamingListener { 
  3.  
  4.       /** Called when a receiver has been started */ 
  5.       def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } 
  6.  
  7.       /** Called when a receiver has reported an error */ 
  8.       def onReceiverError(receiverError: StreamingListenerReceiverError) { } 
  9.  
  10.       /** Called when a receiver has been stopped */ 
  11.       def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } 
  12.  
  13.       /** Called when a batch of jobs has been submitted for processing. */ 
  14.       def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } 
  15.  
  16.       /** Called when processing of a batch of jobs has started.  */ 
  17.       def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } 
  18.  
  19.       /** Called when processing of a batch of jobs has completed. */ 
  20.       def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } 
  21.  
  22.       /** Called when processing of a job of a batch has started. */ 
  23.       def onOutputOperationStarted( 
  24.           outputOperationStarted: StreamingListenerOutputOperationStarted) { } 
  25.  
  26.       /** Called when processing of a job of a batch has completed. */ 
  27.       def onOutputOperationCompleted( 
  28.           outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } 
  29.     } 

目前,我們保存Offsets時(shí),采用繼承StreamingListener方式,此是一種應(yīng)用場(chǎng)景。當(dāng)然也可以監(jiān)控實(shí)時(shí)計(jì)算程序的堆積情況,并在達(dá)到一閾值后發(fā)送報(bào)警郵件。具體監(jiān)聽(tīng)器的定制還得依據(jù)應(yīng)用場(chǎng)景而定。

四、Spark Streaming優(yōu)缺點(diǎn)

Spark Streaming并非是Storm那樣,其并非是真正的流式處理框架,而是一次處理一批次數(shù)據(jù)。也正是這種方式,能夠較好地集成Spark 其他計(jì)算模塊,包括MLlib(機(jī)器學(xué)習(xí))、Graphx以及Spark SQL。這給實(shí)時(shí)計(jì)算帶來(lái)很大的便利,與此帶來(lái)便利的同時(shí),也犧牲作為流式的實(shí)時(shí)性等性能。

4.1 優(yōu)點(diǎn)

  • Spark Streaming基于Spark Core API,因此其能夠與Spark中的其他模塊保持良好的兼容性,為編程提供了良好的可擴(kuò)展性;
  • Spark Streaming 是粗粒度的準(zhǔn)實(shí)時(shí)處理框架,一次讀取完或異步讀完之后處理數(shù)據(jù),且其計(jì)算可基于大內(nèi)存進(jìn)行,因而具有較高的吞吐量;
  • Spark Streaming采用統(tǒng)一的DAG調(diào)度以及RDD,因此能夠利用其lineage機(jī)制,對(duì)實(shí)時(shí)計(jì)算有很好的容錯(cuò)支持;
  • Spark Streaming的DStream是基于RDD的在流式數(shù)據(jù)處理方面的抽象,其transformations 以及actions有較大的相似性,這在一定程度上降低了用戶(hù)的使用門(mén)檻,在熟悉Spark之后,能夠快速上手Spark Streaming。

4.2 缺點(diǎn)

  • Spark Streaming是準(zhǔn)實(shí)時(shí)的數(shù)據(jù)處理框架,采用粗粒度的處理方式,當(dāng)batch time到時(shí)才會(huì)觸發(fā)計(jì)算,這并非像Storm那樣是純流式的數(shù)據(jù)處理方式。此種方式不可避免會(huì)出現(xiàn)相應(yīng)的計(jì)算延遲 。
  • 目前來(lái)看,Spark Streaming穩(wěn)定性方面還是會(huì)存在一些問(wèn)題。有時(shí)會(huì)因一些莫名的異常導(dǎo)致退出,這種情況下得需要自己來(lái)保證數(shù)據(jù)一致性以及失敗重啟功能等。

四、總結(jié)

本篇文章主要介紹了Spark Streaming在實(shí)際應(yīng)用場(chǎng)景中的兩種計(jì)算模型,包括無(wú)狀態(tài)模型以及狀態(tài)模型;并且重點(diǎn)關(guān)注了下Spark Streaming在監(jiān)控方面所作的努力。首先本文介紹了Spark Streaming應(yīng)用場(chǎng)景以及在我們的實(shí)際應(yīng)用中所采取的技術(shù)架構(gòu)。在此基礎(chǔ)上,引入無(wú)狀態(tài)計(jì)算模型以及有狀態(tài)模型兩種計(jì)算模型;接著通過(guò)監(jiān)聽(tīng)器模式介紹Spark UI相關(guān)監(jiān)控信息等;***對(duì)Spark Streaming的優(yōu)缺點(diǎn)進(jìn)行概括。希望本篇文章能夠給各位帶來(lái)幫助,后續(xù)我們會(huì)介紹Spark Streaming在場(chǎng)景應(yīng)用中我們所做的優(yōu)化方面的努力,敬請(qǐng)期待!

關(guān)于作者

徐勝?lài)?guó),大連理工大學(xué)碩士畢業(yè),360大數(shù)據(jù)中心數(shù)據(jù)研發(fā)工程師,主要負(fù)責(zé)基于Spark Streaming的項(xiàng)目架構(gòu)及研發(fā)工作。郵箱 : xshguo_better@yeah.net。如有問(wèn)題,可郵件聯(lián)系,歡迎交流。

責(zé)任編輯:武曉燕 來(lái)源: oschina博客
相關(guān)推薦

2018-04-09 12:25:11

2017-08-14 10:30:13

SparkSpark Strea擴(kuò)容

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺(tái)

2016-05-11 10:29:54

Spark Strea數(shù)據(jù)清理Spark

2017-09-26 09:35:22

2019-10-17 09:25:56

Spark StreaPVUV

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)

2023-10-24 20:32:40

大數(shù)據(jù)

2017-06-27 15:08:05

大數(shù)據(jù)Apache SparKafka Strea

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2021-07-09 10:27:12

SparkStreaming系統(tǒng)

2016-03-03 15:11:42

Spark Strea工作流調(diào)度器

2018-04-18 08:54:28

RDD內(nèi)存Spark

2011-08-24 14:07:13

PostgreSQLStreaming R

2018-10-24 09:00:26

KafkaSpark數(shù)據(jù)

2022-06-24 08:00:00

編程工具數(shù)據(jù)結(jié)構(gòu)開(kāi)發(fā)

2010-02-23 10:57:34

WCF Streami
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)