自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark Streaming 數(shù)據(jù)清理機(jī)制

大數(shù)據(jù) Spark
大家剛開始用Spark Streaming時(shí),心里肯定嘀咕,對(duì)于一個(gè)7*24小時(shí)運(yùn)行的數(shù)據(jù),cache住的RDD,broadcast 系統(tǒng)會(huì)幫忙自己清理掉么?還a是說必須自己做清理?如果系統(tǒng)幫忙清理的話,機(jī)制是啥?

前言

為啥要了解機(jī)制呢?這就好比JVM的垃圾回收,雖然JVM的垃圾回收已經(jīng)巨牛了,但是依然會(huì)遇到很多和它相關(guān)的case導(dǎo)致系統(tǒng)運(yùn)行不正常。

這個(gè)內(nèi)容我記得自己剛接觸Spark Streaming的時(shí)候,老板也問過我,運(yùn)行期間會(huì)保留多少個(gè)RDD? 當(dāng)時(shí)沒回答出來。后面在群里也有人問到了,所以就整理了下。文中如有謬誤之處,還望指出。

DStream 和 RDD

我們知道Spark Streaming 計(jì)算還是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關(guān)系。然而Spark Streaming 并沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的關(guān)系,你可以理解為Java里的裝飾模式,也就是DStream 是對(duì)RDD的增強(qiáng),但是行為表現(xiàn)和RDD是基本上差不多的。都具備幾個(gè)條件:

具有類似的tranformation動(dòng)作,比如map,reduceByKey等,也有一些自己獨(dú)有的,比如Window,mapWithStated等

都具有Action動(dòng)作,比如foreachRDD,count等

從編程模型上看是一致的。

所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而并不能直接復(fù)用,因?yàn)橐粋€(gè)是DStream的變換,一個(gè)是RDD的變化。

Spark Streaming中 DStream 介紹

DStream 下面包含幾個(gè)類:

  • 數(shù)據(jù)源類,比如InputDStream,具體如DirectKafkaInputStream等
  • 轉(zhuǎn)換類,典型比如MappedDStream,ShuffledDStream
  • 輸出類,典型比如ForEachDStream

從上面來看,數(shù)據(jù)從開始(輸入)到結(jié)束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產(chǎn)生和操作RDD的,這也就是說,DStream有機(jī)會(huì)和義務(wù)去負(fù)責(zé)RDD的生命周期。

這就回答了前言中的問題了。Spark Streaming具備自動(dòng)清理功能。

RDD 在Spark Stream中產(chǎn)生的流程

在Spark Streaming中RDD的生命流程大體如下:

  • 在InputDStream會(huì)將接受到的數(shù)據(jù)轉(zhuǎn)化成RDD,比如DirectKafkaInputStream 產(chǎn)生的就是 KafkaRDD
  • 接著通過MappedDStream等進(jìn)行數(shù)據(jù)轉(zhuǎn)換,這個(gè)時(shí)候是直接調(diào)用RDD對(duì)應(yīng)的map方法進(jìn)行轉(zhuǎn)換的
  • 在進(jìn)行輸出類操作時(shí),才暴露出RDD,可以讓用戶執(zhí)行相應(yīng)的存儲(chǔ),其他計(jì)算等操作。

我們這里就以下面的代碼來進(jìn)行更詳細(xì)的解釋:

  1. val source  =   KafkaUtils.createDirectInputStream(....) 
  2. source.map(....).foreachRDD{rdd=> 
  3.     rdd.saveTextFile(....) 

foreachRDD 產(chǎn)生ForEachDStream,因?yàn)閒oreachRDD是個(gè)Action,所以會(huì)觸發(fā)任務(wù)的執(zhí)行,會(huì)被調(diào)用generateJob方法。

  1. override def generateJob(time: Time): Option[Job] = { 
  2.    parent.getOrCompute(time) match { 
  3.      case Some(rdd) => 
  4.        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { 
  5.          foreachFunc(rdd, time) 
  6.        } 
  7.        Some(new Job(time, jobFunc)) 
  8.      case None => None 
  9.    } 
  10.  } 

對(duì)應(yīng)的parent是MappedDStream,也就是說調(diào)用MappedDStream.getOrCompute.該方法在DStream中,首先會(huì)在MappedDStream對(duì)象中的generatedRDDs 變量中查找是否已經(jīng)有RDD,如果沒有則觸發(fā)計(jì)算,并且將產(chǎn)生的RDD放到generatedRDDs

  1. @transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () 
  2.  
  3. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { 
  4.     // If RDD was already generated, then retrieve it from HashMap, 
  5.     // or else compute the RDD 
  6.     generatedRDDs.get(time).orElse { 
  7. .... 
  8. generatedRDDs.put(time, newRDD) 
  9. .... 

計(jì)算RDD是調(diào)用的compute方法,MappedDStream 的compute方法很簡(jiǎn)單,直接調(diào)用的父類也就是DirectKafkaInputStream的getOrCompute方法:

  1. override def compute(validTime: Time): Option[RDD[U]] = { 
  2.     parent.getOrCompute(validTime).map(_.map[U](mapFunc)) 
  3.   } 

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個(gè)數(shù)據(jù)源,所以他的compute方法會(huì)直接new出一個(gè)RDD.

從上面可以得出幾個(gè)結(jié)論:

  • 數(shù)據(jù)源以及轉(zhuǎn)換類DStream都會(huì)維護(hù)一個(gè)generatedRDDs,可以按batchTime 進(jìn)行獲取
  • 內(nèi)部本質(zhì)還是進(jìn)行的RDD的轉(zhuǎn)換
  • 如果我們調(diào)用了cache會(huì)發(fā)生什么

這里又會(huì)有兩種情況,一種是調(diào)用DStream.cache,第二種是RDD.cache。事實(shí)上他們是完全一樣的。

  1. DStream的cache 動(dòng)作只是將DStream的變量storageLevel 設(shè)置為MEMORY_ONLY_SER,然后在產(chǎn)生(或者獲取)RDD的時(shí)候,調(diào)用RDD的persit方法進(jìn)行設(shè)置。所以DStream.cache 產(chǎn)生的效果等價(jià)于RDD.cache(也就是你自己調(diào)用foreachRDD 將RDD 都設(shè)置一遍)
  2. 進(jìn)入正題,我們是怎么釋放Cache住的RDD的

其實(shí)無所謂Cache不Cache住,RDD最終都是要釋放的,否則運(yùn)行久了,光RDD對(duì)象也能承包了你的內(nèi)存。我們知道,在Spark Streaming中,周期性產(chǎn)生事件驅(qū)動(dòng)Spark Streaming 的類其實(shí)是:

  1. org.apache.spark.streaming.scheduler.JobGenerator 

他內(nèi)部有個(gè)永動(dòng)機(jī)(定時(shí)器),定時(shí)發(fā)布一個(gè)產(chǎn)生任務(wù)的事件:

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

然后通過processEvent進(jìn)行事件處理:

  1. /** Processes all events */ 
  2.  private def processEvent(event: JobGeneratorEvent) { 
  3.    logDebug("Got event " + event) 
  4.    event match { 
  5.      case GenerateJobs(time) => generateJobs(time) 
  6.      case ClearMetadata(time) => clearMetadata(time) 
  7.      case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.        doCheckpoint(time, clearCheckpointDataLater) 
  9.      case ClearCheckpointData(time) => clearCheckpointData(time) 
  10.    } 
  11.  } 

目前我們只關(guān)注ClearMetadata 事件。對(duì)應(yīng)的方法為:

  1. private def clearMetadata(time: Time) { 
  2.     ssc.graph.clearMetadata(time) 
  3.  
  4.     // If checkpointing is enabled, then checkpoint, 
  5.     // else mark batch to be fully processed 
  6.     if (shouldCheckpoint) { 
  7.       eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) 
  8.     } else { 
  9.       // If checkpointing is not enabled, then delete metadata information about 
  10.       // received blocks (block data not saved in any case). Otherwise, wait for 
  11.       // checkpointing of this batch to complete. 
  12.       val maxRememberDuration = graph.getMaxInputStreamRememberDuration() 
  13.       jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration) 
  14.       jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration) 
  15.       markBatchFullyProcessed(time) 
  16.     } 
  17.   } 

首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基于Receiver模式)的數(shù)據(jù)。

ForeachDStream 其實(shí)調(diào)用的也是DStream的方法。該方法大體如下:

  1. private[streaming] def clearMetadata(time: Time) { 
  2.     val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist"true
  3.     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) 
  4.     logDebug("Clearing references to old RDDs: [" + 
  5.       oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]"
  6.     generatedRDDs --= oldRDDs.keys 
  7.     if (unpersistData) { 
  8.       logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", ")) 
  9.       oldRDDs.values.foreach { rdd => 
  10.         rdd.unpersist(false
  11.         // Explicitly remove blocks of BlockRDD 
  12.         rdd match { 
  13.           case b: BlockRDD[_] => 
  14.             logInfo("Removing blocks of RDD " + b + " of time " + time) 
  15.             b.removeBlocks() 
  16.           case _ => 
  17.         } 
  18.       } 
  19.     } 
  20.     logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + 
  21.       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) 
  22.     dependencies.foreach(_.clearMetadata(time)) 
  23.   } 

大體執(zhí)行動(dòng)作如下描述:

  1. 根據(jù)記憶周期得到應(yīng)該剔除的RDD
  2. 根據(jù)是否要清理cache數(shù)據(jù),進(jìn)行unpersit 操作,并且顯示的移除block
  3. 根據(jù)依賴調(diào)用其他的DStream進(jìn)行動(dòng)作清理

這里我們還可以看到,通過參數(shù)spark.streaming.unpersist 你是可以決定是否手工控制是否需要對(duì)cache住的數(shù)據(jù)進(jìn)行清理。

這里你會(huì)有兩個(gè)疑問:

  1. dependencies 是什么?
  2. rememberDuration 是怎么來的?

dependencies 你可以簡(jiǎn)單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。

rememberDuration 的設(shè)置略微復(fù)雜些,大體是 slideDuration,如果設(shè)置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設(shè)置了的話,譬如通過StreamingContext.remember方法,不過通過該方法設(shè)置的值要大于計(jì)算得到的值會(huì)生效)

另外值得一提的就是后面的DStream 會(huì)調(diào)整前面的DStream的rememberDuration,譬如如果你用了window* 相關(guān)的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration。

然后根據(jù)Spark Streaming的定時(shí)性,每個(gè)周期只要完成了,都會(huì)觸發(fā)清理動(dòng)作,這個(gè)就是清理動(dòng)作發(fā)生的時(shí)機(jī)。代碼如下:

  1. def onBatchCompletion(time: Time) {      
  2.     eventLoop.post(ClearMetadata(time)) 

總結(jié)下

Spark Streaming 會(huì)在每個(gè)Batch任務(wù)結(jié)束時(shí)進(jìn)行一次清理動(dòng)作。每個(gè)DStream 都會(huì)被掃描,不同的DStream根據(jù)情況不同,保留的RDD數(shù)量也是不一致的,但都是根據(jù)rememberDuration變量決定,而該變量會(huì)被下游的DStream所影響,所以不同的DStream的rememberDuration取值是不一樣的。

 

 

責(zé)任編輯:Ophira 來源: 簡(jiǎn)書
相關(guān)推薦

2017-08-14 10:30:13

SparkSpark Strea擴(kuò)容

2017-06-06 08:31:10

Spark Strea計(jì)算模型監(jiān)控

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2018-04-09 12:25:11

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺(tái)

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2022-05-30 08:21:17

Kafka數(shù)據(jù)傳遞

2018-10-14 15:52:46

MySQL數(shù)據(jù)清理數(shù)據(jù)庫(kù)

2019-10-17 09:25:56

Spark StreaPVUV

2023-10-24 20:32:40

大數(shù)據(jù)

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)

2017-09-26 09:35:22

2021-07-09 10:27:12

SparkStreaming系統(tǒng)

2017-06-27 15:08:05

大數(shù)據(jù)Apache SparKafka Strea

2016-03-03 15:11:42

Spark Strea工作流調(diào)度器

2018-04-18 08:54:28

RDD內(nèi)存Spark

2025-04-02 08:17:42

2022-06-24 08:00:00

編程工具數(shù)據(jù)結(jié)構(gòu)開發(fā)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)