為啥Spark 的Broadcast要用單例模式
很多用Spark Streaming 的朋友應(yīng)該使用過broadcast,大多數(shù)情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?浪尖在這里幫大家分析一下,有以下幾個原因:
- 廣播變量大多數(shù)情況下是不會變更的,使用單例模式可以減少spark streaming每次job生成執(zhí)行,重復(fù)生成廣播變量帶來的開銷。
- 單例模式也要做同步。這個對于很多新手來說可以不用考慮同步問題,原因很簡單因為新手不會調(diào)整spark 程序task的調(diào)度模式,而默認(rèn)采用FIFO的調(diào)度模式,基本不會產(chǎn)生并發(fā)問題。1).假如你配置了Fair調(diào)度模式,同時修改了Spark Streaming運行的并行執(zhí)行的job數(shù),默認(rèn)為1,那么就要加上同步代碼了。2).還有一個原因,在多輸出流的情況下共享broadcast,同時配置了Fair調(diào)度模式,也會產(chǎn)生并發(fā)問題。
- 注意。有些時候比如廣播配置文件,規(guī)則等需要變更broadcast,在使用fair的時候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。
先看例子,后面逐步揭曉內(nèi)部機制。
1.例子
下面是一個雙重檢查式的broadcast變量的聲明方式。
- object WordBlacklist {
- @volatile private var instance: Broadcast[Seq[String]] = null
- def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
- if (instance == null) {
- synchronized {
- if (instance == null) {
- val wordBlacklist = Seq("a", "b", "c")
- instance = sc.broadcast(wordBlacklist)
- }
- }
- }
- instance
- }
- }
廣播變量的使用方法如下:
- val lines = ssc.socketTextStream(ip, port)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
- wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
- // Get or register the blacklist Broadcast
- val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
- // Get or register the droppedWordsCounter Accumulator
- val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
- // Use blacklist to drop words and use droppedWordsCounter to count them
- val counts = rdd.filter { case (word, count) =>
- if (blacklist.value.contains(word)) {
- droppedWordsCounter.add(count)
- false
- } else {
- true
- }
- }.collect().mkString("[", ", ", "]")
- val output = s"Counts at time $time $counts"
- println(output)
- println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
- println(s"Appending to ${outputFile.getAbsolutePath}")
- Files.append(output + "\n", outputFile, Charset.defaultCharset())
- }
2.概念補充
首先,一個基本概念就是Spark應(yīng)用程序從開始提交到task執(zhí)行分了很多層。
- 應(yīng)用調(diào)度器。主要是資源管理器,比如standalone,yarn等負(fù)責(zé)Spark整個應(yīng)用的調(diào)度和集群資源的管理。
- job調(diào)度器。spark 的算子分為主要兩大類,transform和action,其中每一個action都會產(chǎn)生一個job。這個job需要在executor提供的資源池里調(diào)度執(zhí)行,當(dāng)然并不少直接調(diào)度執(zhí)行job。
- stage劃分及調(diào)度。job具體會劃分為若干stage,這個就有一個基本的概念就是寬依賴和窄依賴,寬依賴就會劃分stage。stage也需要調(diào)度執(zhí)行,從后往前劃分,從前往后調(diào)度執(zhí)行。
- task切割及調(diào)度。stage往下繼續(xù)細(xì)化就是會根據(jù)不太的并行度劃分出task集合,這個就是在executor上調(diào)度執(zhí)行的基本單元,目前的調(diào)度默認(rèn)是一個task一個cpu。
- Spark Streaming 的job生成是周期性的。當(dāng)前job的執(zhí)行時間超過生成周期就會產(chǎn)生job 累加。累加一定數(shù)目的job后有可能會導(dǎo)致應(yīng)用程序失敗。這個主要原因是由于FIFO的調(diào)度模式和Spark Streaming的默認(rèn)單線程的job執(zhí)行機制
3.Spark Streaming job生成
這個源碼主要入口是StreamingContext#JobScheduler#JobGenerator對象,內(nèi)部有個RecurringTimer,主要負(fù)責(zé)按照批處理時間周期產(chǎn)生GenrateJobs事件,當(dāng)然在存在windows的情況下,該周期有可能不會生成job,要取決于滑動間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下
- private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
我們直接看其實現(xiàn)代碼塊:
- eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
- override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
- override protected def onError(e: Throwable): Unit = {
- jobScheduler.reportError("Error in job generator", e)
- }
- }
- eventLoop.start()
event處理函數(shù)是processEvent方法
- /** Processes all events */
- private def processEvent(event: JobGeneratorEvent) {
- logDebug("Got event " + event)
- event match {
- case GenerateJobs(time) => generateJobs(time)
- case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time, clearCheckpointDataLater) =>
- doCheckpoint(time, clearCheckpointDataLater)
- case ClearCheckpointData(time) => clearCheckpointData(time)
- }
- }
在接受到GenerateJob事件的時候,會執(zhí)行g(shù)enerateJobs代碼,就是在該代碼內(nèi)部產(chǎn)生和調(diào)度job的。
- /** Generate jobs and perform checkpointing for the given `time`. */
- private def generateJobs(time: Time) {
- // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
- // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
- ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
- Try {
- jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
- graph.generateJobs(time) // generate jobs using allocated block
- } match {
- case Success(jobs) =>
- val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
- case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
- }
- eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
- }
可以看到代碼里首先會執(zhí)行job生成代碼
- graph.generateJobs(time)
- 具體代碼塊兒
- def generateJobs(time: Time): Seq[Job] = {
- logDebug("Generating jobs for time " + time)
- val jobs = this.synchronized {
- outputStreams.flatMap { outputStream =>
- val jobOption = outputStream.generateJob(time)
- jobOption.foreach(_.setCallSite(outputStream.creationSite))
- jobOption
- }
- }
- logDebug("Generated " + jobs.length + " jobs for time " + time)
- jobs
- }
每個輸出流都會生成一個job,輸出流就類似于foreachrdd,print這些。其實內(nèi)部都是ForEachDStream。所以生成的是一個job集合。
然后就會將job集合提交到線程池里去執(zhí)行,這些都是在driver端完成的哦。
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
- 具體h函數(shù)內(nèi)容
- def submitJobSet(jobSet: JobSet) {
- if (jobSet.jobs.isEmpty) {
- logInfo("No jobs added for time " + jobSet.time)
- } else {
- listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
- jobSets.put(jobSet.time, jobSet)
- jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- logInfo("Added jobs for time " + jobSet.time)
- }
- }
其實就是遍歷生成的job集合,然后提交到線程池jobExecutor內(nèi)部執(zhí)行。這個也是在driver端的哦。
jobExecutor就是一個固定線程數(shù)的線程池,默認(rèn)是1個線程。
- private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
- private val jobExecutor =
- ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
需要的話可以配置spark.streaming.concurrentJobs來同時提交執(zhí)行多個job。
那么這種情況下,job就可以并行執(zhí)行了嗎?
顯然不是的!
還要修改一下調(diào)度模式為Fair,詳細(xì)的配置可以參考:
http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application
簡單的均分的話只需要
- conf.set("spark.scheduler.mode", "FAIR")
然后,同時運行的job就會均分所有executor提供的資源。
這就是整個job生成的整個過程了哦。
因為Spark Streaming的任務(wù)存在Fair模式下并發(fā)的情況,所以需要在使用單例模式生成broadcast的時候要注意聲明同步。