自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

為啥Spark 的Broadcast要用單例模式

大數(shù)據(jù) Spark
很多用Spark Streaming 的朋友應(yīng)該使用過broadcast,大多數(shù)情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?

很多用Spark Streaming 的朋友應(yīng)該使用過broadcast,大多數(shù)情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?浪尖在這里幫大家分析一下,有以下幾個原因:

  1. 廣播變量大多數(shù)情況下是不會變更的,使用單例模式可以減少spark streaming每次job生成執(zhí)行,重復(fù)生成廣播變量帶來的開銷。
  2. 單例模式也要做同步。這個對于很多新手來說可以不用考慮同步問題,原因很簡單因為新手不會調(diào)整spark 程序task的調(diào)度模式,而默認(rèn)采用FIFO的調(diào)度模式,基本不會產(chǎn)生并發(fā)問題。1).假如你配置了Fair調(diào)度模式,同時修改了Spark Streaming運行的并行執(zhí)行的job數(shù),默認(rèn)為1,那么就要加上同步代碼了。2).還有一個原因,在多輸出流的情況下共享broadcast,同時配置了Fair調(diào)度模式,也會產(chǎn)生并發(fā)問題。
  3. 注意。有些時候比如廣播配置文件,規(guī)則等需要變更broadcast,在使用fair的時候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。

先看例子,后面逐步揭曉內(nèi)部機制。

1.例子

下面是一個雙重檢查式的broadcast變量的聲明方式。

  1. object WordBlacklist { 
  2.  
  3.   @volatile private var instance: Broadcast[Seq[String]] = null 
  4.  
  5.   def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
  6.     if (instance == null) { 
  7.       synchronized { 
  8.         if (instance == null) { 
  9.           val wordBlacklist = Seq("a""b""c"
  10.           instance = sc.broadcast(wordBlacklist) 
  11.         } 
  12.       } 
  13.     } 
  14.     instance 
  15.   } 

廣播變量的使用方法如下:

  1. val lines = ssc.socketTextStream(ip, port) 
  2.     val words = lines.flatMap(_.split(" ")) 
  3.     val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
  4.     wordCounts.foreachRDD { (rdd: RDD[(String, Int)], timeTime) => 
  5.       // Get or register the blacklist Broadcast 
  6.       val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
  7.       // Get or register the droppedWordsCounter Accumulator 
  8.       val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
  9.       // Use blacklist to drop words and use droppedWordsCounter to count them 
  10.       val counts = rdd.filter { case (word, count) => 
  11.         if (blacklist.value.contains(word)) { 
  12.           droppedWordsCounter.add(count
  13.           false 
  14.         } else { 
  15.           true 
  16.         } 
  17.       }.collect().mkString("["", ""]"
  18.       val output = s"Counts at time $time $counts" 
  19.       println(output
  20.       println(s"Dropped ${droppedWordsCounter.value} word(s) totally"
  21.       println(s"Appending to ${outputFile.getAbsolutePath}"
  22.       Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
  23.     } 

2.概念補充

為啥Spark 的Broadcast要用單例模式

首先,一個基本概念就是Spark應(yīng)用程序從開始提交到task執(zhí)行分了很多層。

  1. 應(yīng)用調(diào)度器。主要是資源管理器,比如standalone,yarn等負(fù)責(zé)Spark整個應(yīng)用的調(diào)度和集群資源的管理。
  2. job調(diào)度器。spark 的算子分為主要兩大類,transform和action,其中每一個action都會產(chǎn)生一個job。這個job需要在executor提供的資源池里調(diào)度執(zhí)行,當(dāng)然并不少直接調(diào)度執(zhí)行job。
  3. stage劃分及調(diào)度。job具體會劃分為若干stage,這個就有一個基本的概念就是寬依賴和窄依賴,寬依賴就會劃分stage。stage也需要調(diào)度執(zhí)行,從后往前劃分,從前往后調(diào)度執(zhí)行。
  4. task切割及調(diào)度。stage往下繼續(xù)細(xì)化就是會根據(jù)不太的并行度劃分出task集合,這個就是在executor上調(diào)度執(zhí)行的基本單元,目前的調(diào)度默認(rèn)是一個task一個cpu。
  5. Spark Streaming 的job生成是周期性的。當(dāng)前job的執(zhí)行時間超過生成周期就會產(chǎn)生job 累加。累加一定數(shù)目的job后有可能會導(dǎo)致應(yīng)用程序失敗。這個主要原因是由于FIFO的調(diào)度模式和Spark Streaming的默認(rèn)單線程的job執(zhí)行機制

3.Spark Streaming job生成

這個源碼主要入口是StreamingContext#JobScheduler#JobGenerator對象,內(nèi)部有個RecurringTimer,主要負(fù)責(zé)按照批處理時間周期產(chǎn)生GenrateJobs事件,當(dāng)然在存在windows的情況下,該周期有可能不會生成job,要取決于滑動間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
  2.    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

我們直接看其實現(xiàn)代碼塊:

  1. eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
  2.       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
  3.  
  4.       override protected def onError(e: Throwable): Unit = { 
  5.         jobScheduler.reportError("Error in job generator", e) 
  6.       } 
  7.     } 
  8.     eventLoop.start() 

event處理函數(shù)是processEvent方法

  1. /** Processes all events */ 
  2.   private def processEvent(event: JobGeneratorEvent) { 
  3.     logDebug("Got event " + event) 
  4.     event match { 
  5.       case GenerateJobs(time) => generateJobs(time
  6.       case ClearMetadata(time) => clearMetadata(time
  7.       case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.         doCheckpoint(time, clearCheckpointDataLater) 
  9.       case ClearCheckpointData(time) => clearCheckpointData(time
  10.     } 
  11.   } 

在接受到GenerateJob事件的時候,會執(zhí)行g(shù)enerateJobs代碼,就是在該代碼內(nèi)部產(chǎn)生和調(diào)度job的。

  1. /** Generate jobs and perform checkpointing for the given `time`.  */ 
  2.   private def generateJobs(timeTime) { 
  3.     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
  4.     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
  5.     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true"
  6.     Try { 
  7.       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
  8.       graph.generateJobs(time) // generate jobs using allocated block 
  9.     } match { 
  10.       case Success(jobs) => 
  11.         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time
  12.         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  13.       case Failure(e) => 
  14.         jobScheduler.reportError("Error generating jobs for time " + time, e) 
  15.         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
  16.     } 
  17.     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  18.   } 

可以看到代碼里首先會執(zhí)行job生成代碼

  1. graph.generateJobs(time
  2.  
  3. 具體代碼塊兒 
  4.  
  5. def generateJobs(timeTime): Seq[Job] = { 
  6.     logDebug("Generating jobs for time " + time
  7.     val jobs = this.synchronized { 
  8.       outputStreams.flatMap { outputStream => 
  9.         val jobOption = outputStream.generateJob(time
  10.         jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
  11.         jobOption 
  12.       } 
  13.     } 
  14.     logDebug("Generated " + jobs.length + " jobs for time " + time
  15.     jobs 
  16.   } 

每個輸出流都會生成一個job,輸出流就類似于foreachrdd,print這些。其實內(nèi)部都是ForEachDStream。所以生成的是一個job集合。

然后就會將job集合提交到線程池里去執(zhí)行,這些都是在driver端完成的哦。

  1. jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  2.  
  3. 具體h函數(shù)內(nèi)容 
  4. def submitJobSet(jobSet: JobSet) { 
  5.     if (jobSet.jobs.isEmpty) { 
  6.       logInfo("No jobs added for time " + jobSet.time
  7.     } else { 
  8.       listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
  9.       jobSets.put(jobSet.time, jobSet) 
  10.       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
  11.       logInfo("Added jobs for time " + jobSet.time
  12.     } 
  13.   } 

其實就是遍歷生成的job集合,然后提交到線程池jobExecutor內(nèi)部執(zhí)行。這個也是在driver端的哦。

jobExecutor就是一個固定線程數(shù)的線程池,默認(rèn)是1個線程。

  1. private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  2.   private val jobExecutor = 
  3.     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor"

需要的話可以配置spark.streaming.concurrentJobs來同時提交執(zhí)行多個job。

那么這種情況下,job就可以并行執(zhí)行了嗎?

顯然不是的!

還要修改一下調(diào)度模式為Fair,詳細(xì)的配置可以參考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

簡單的均分的話只需要

  1. conf.set("spark.scheduler.mode""FAIR"

然后,同時運行的job就會均分所有executor提供的資源。

這就是整個job生成的整個過程了哦。

因為Spark Streaming的任務(wù)存在Fair模式下并發(fā)的情況,所以需要在使用單例模式生成broadcast的時候要注意聲明同步。

責(zé)任編輯:未麗燕 來源: Spark學(xué)習(xí)技巧
相關(guān)推薦

2021-09-07 10:44:35

異步單例模式

2021-03-02 08:50:31

設(shè)計單例模式

2021-02-01 10:01:58

設(shè)計模式 Java單例模式

2022-02-06 22:30:36

前端設(shè)計模式

2022-09-29 08:39:37

架構(gòu)

2013-11-26 16:20:26

Android設(shè)計模式

2016-03-28 10:23:11

Android設(shè)計單例

2021-02-07 23:58:10

單例模式對象

2011-03-16 10:13:31

java單例模式

2022-06-07 08:55:04

Golang單例模式語言

2024-02-04 12:04:17

2024-03-06 13:19:19

工廠模式Python函數(shù)

2021-08-11 17:22:11

設(shè)計模式單例

2015-09-06 11:07:52

C++設(shè)計模式單例模式

2016-10-09 09:37:49

javascript單例模式

2023-11-21 21:39:38

單例模式音頻管理器

2011-06-28 15:18:45

Qt 單例模式

2021-05-29 10:22:49

單例模式版本

2022-08-10 11:02:56

Python單例模式

2013-03-26 10:35:47

Objective-C單例實現(xiàn)
點贊
收藏

51CTO技術(shù)棧公眾號