Apache Spark源碼走讀之3:Task運(yùn)行期之函數(shù)調(diào)用
準(zhǔn)備
-
spark已經(jīng)安裝完畢
-
spark運(yùn)行在local mode或local-cluster mode
local-cluster mode
local-cluster模式也稱為偽分布式,可以使用如下指令運(yùn)行
- MASTER=local[1,2,1024] bin/spark-shell
[1,2,1024] 分別表示,executor number, core number和內(nèi)存大小,其中內(nèi)存大小不應(yīng)小于默認(rèn)的512M
Driver Programme的初始化過程分析
初始化過程的涉及的主要源文件
-
SparkContext.scala 整個(gè)初始化過程的入口
-
SparkEnv.scala 創(chuàng)建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
-
DAGScheduler.scala 任務(wù)提交的入口,即將Job劃分成各個(gè)stage的關(guān)鍵
-
TaskSchedulerImpl.scala 決定每個(gè)stage可以運(yùn)行幾個(gè)task,每個(gè)task分別在哪個(gè)executor上運(yùn)行
-
SchedulerBackend
-
最簡單的單機(jī)運(yùn)行模式的話,看LocalBackend.scala
-
如果是集群模式,看源文件SparkDeploySchedulerBackend
-
初始化過程步驟詳解
步驟1: 根據(jù)初始化入?yún)⑸蒘parkConf,再根據(jù)SparkConf來創(chuàng)建SparkEnv, SparkEnv中主要包含以下關(guān)鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager
- private[spark] val env = SparkEnv.create(
- conf,
- "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true,
- isLocalisLocal = isLocal)
- SparkEnv.set(env)
步驟2:創(chuàng)建TaskScheduler,根據(jù)Spark的運(yùn)行模式來選擇相應(yīng)的SchedulerBackend,同時(shí)啟動(dòng)taskscheduler,這一步至為關(guān)鍵
- private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
- taskScheduler.start()
TaskScheduler.start目的是啟動(dòng)相應(yīng)的SchedulerBackend,并啟動(dòng)定時(shí)器進(jìn)行檢測
- override def start() {
- backend.start()
- if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher
- sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
- SPECULATION_INTERVAL milliseconds) {
- checkSpeculatableTasks()
- }
- }
- }
步驟3:以上一步中創(chuàng)建的TaskScheduler實(shí)例為入?yún)?chuàng)建DAGScheduler并啟動(dòng)運(yùn)行
- @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
- dagScheduler.start()
步驟4:啟動(dòng)WEB UI
- ui.start()
RDD的轉(zhuǎn)換過程
還是以最簡單的wordcount為例說明rdd的轉(zhuǎn)換過程
- sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
上述一行簡短的代碼其實(shí)發(fā)生了很復(fù)雜的RDD轉(zhuǎn)換,下面仔細(xì)解釋每一步的轉(zhuǎn)換過程和轉(zhuǎn)換結(jié)果
步驟1:val rawFile = sc.textFile("README.md")
textFile先是生成hadoopRDD,然后再通過map操作生成MappedRDD,如果在spark-shell中執(zhí)行上述語句,得到的結(jié)果可以證明所做的分析
- scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set;
- assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore:
- Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager:
- Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0:
- org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13
步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))
flatMap將原來的MappedRDD轉(zhuǎn)換成為FlatMappedRDD
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
- new FlatMappedRDD(this, sc.clean(f))
步驟3:val wordCount = splittedText.map(word => (word, 1))
利用word生成相應(yīng)的鍵值對(duì),上一步的FlatMappedRDD被轉(zhuǎn)換成為MappedRDD
步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最復(fù)雜
步驟2,3中使用到的operation全部定義在RDD.scala中,而這里使用到的reduceByKey卻在RDD.scala中見不到蹤跡。reduceByKey的定義出現(xiàn)在源文件PairRDDFunctions.scala
細(xì)心的你一定會(huì)問reduceByKey不是MappedRDD的屬性和方法啊,怎么能被MappedRDD調(diào)用呢?其實(shí)這背后發(fā)生了一個(gè)隱式的轉(zhuǎn)換,該轉(zhuǎn)換將MappedRDD轉(zhuǎn)換成為PairRDDFunctions
- implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
- new PairRDDFunctions(rdd)
這種隱式的轉(zhuǎn)換是scala的一個(gè)語法特征,如果想知道的更多,請(qǐng)用關(guān)鍵字"scala implicit method"進(jìn)行查詢,會(huì)有不少的文章對(duì)此進(jìn)行詳盡的介紹。
接下來再看一看reduceByKey的定義
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(defaultPartitioner(self), func)
- }
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, partitioner)
- }
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- partitioner: Partitioner,
- mapSideCombine: Boolean = true,
- serializerClass: String = null): RDD[(K, C)] = {
- if (getKeyClass().isArray) {
- if (mapSideCombine) {
- throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner])
- { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) =>
- { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) }
- else if (mapSideCombine)
- { val combined = self.mapPartitionsWithContext((context, iter) =>
- { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) =>
- { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) }
- else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) =>
- { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true)
- }
- }
reduceByKey最終會(huì)調(diào)用combineByKey, 在這個(gè)函數(shù)中PairedRDDFunctions會(huì)被轉(zhuǎn)換成為ShuffleRDD,當(dāng)調(diào)用mapPartitionsWithContext之后,shuffleRDD被轉(zhuǎn)換成為MapPartitionsRDD
Log輸出能證明我們的分析
- res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13
RDD轉(zhuǎn)換小結(jié)
小結(jié)一下整個(gè)RDD轉(zhuǎn)換過程
HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
整個(gè)轉(zhuǎn)換過程好長啊,這一切的轉(zhuǎn)換都發(fā)生在任務(wù)提交之前。
運(yùn)行過程分析
數(shù)據(jù)集操作分類
在對(duì)任務(wù)運(yùn)行過程中的函數(shù)調(diào)用關(guān)系進(jìn)行分析之前,我們也來探討一個(gè)偏理論的東西,作用于RDD之上的Transformantion為什么會(huì)是這個(gè)樣子?
對(duì)這個(gè)問題的解答和數(shù)學(xué)搭上關(guān)系了,從理論抽象的角度來說,任務(wù)處理都可歸結(jié)為“input->processing->output"。input和output對(duì)應(yīng)于數(shù)據(jù)集dataset.
在此基礎(chǔ)上作一下簡單的分類
-
one-one 一個(gè)dataset在轉(zhuǎn)換之后還是一個(gè)dataset,而且dataset的size不變,如map
-
one-one 一個(gè)dataset在轉(zhuǎn)換之后還是一個(gè)dataset,但size發(fā)生更改,這種更改有兩種可能:擴(kuò)大或縮小,如flatMap是size增大的操作,而subtract是size變小的操作
-
many-one 多個(gè)dataset合并為一個(gè)dataset,如combine, join
-
one-many 一個(gè)dataset分裂為多個(gè)dataset, 如groupBy
Task運(yùn)行期的函數(shù)調(diào)用
task的提交過程參考本系列中的第二篇文章。本節(jié)主要講解當(dāng)task在運(yùn)行期間是如何一步步調(diào)用到作用于RDD上的各個(gè)operation
TaskRunner.run
Task.run
Task.runTask (Task是一個(gè)基類,有兩個(gè)子類,分別為ShuffleMapTask和ResultTask)
RDD.iterator
RDD.computeOrReadCheckpoint
RDD.compute
或許當(dāng)看到RDD.compute函數(shù)定義時(shí),還是覺著f沒有被調(diào)用,以MappedRDD的compute定義為例
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).map(f)
注意,這里最容易產(chǎn)生錯(cuò)覺的地方就是map函數(shù),這里的map不是RDD中的map,而是scala中定義的iterator的成員函數(shù)map, 請(qǐng)自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator
堆棧輸出
- 80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
- 81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154)
- 82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
- 83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
- 84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
- 85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
- 86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
- 87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
- 88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
- 89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
- 90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
- 91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
- 92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
- 93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
- 94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
- 95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
- 96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
- 97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
- 98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
- 99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
- 100 at org.apache.spark.scheduler.Task.run(Task.scala:53)
- 101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
ResultTask
compute的計(jì)算過程對(duì)于ShuffleMapTask比較復(fù)雜,繞的圈圈比較多,對(duì)于ResultTask就直接許多。
- override def runTask(context: TaskContext): U = {
- metrics = Some(context.taskMetrics)
- try {
- func(context, rdd.iterator(split, context))
- } finally {
- context.executeOnCompleteCallbacks()
- }
- }
計(jì)算結(jié)果的傳遞
上面的分析知道,wordcount這個(gè)job在最終提交之后,被DAGScheduler分為兩個(gè)stage,***個(gè)Stage是shuffleMapTask,第二個(gè)Stage是ResultTask.
那么ShuffleMapTask的計(jì)算結(jié)果是如何被ResultTask取得的呢?這個(gè)過程簡述如下
-
ShffuleMapTask將計(jì)算的狀態(tài)(注意不是具體的數(shù)據(jù))包裝為MapStatus返回給DAGScheduler
-
DAGScheduler將MapStatus保存到MapOutputTrackerMaster中
-
ResultTask在執(zhí)行到ShuffleRDD時(shí)會(huì)調(diào)用BlockStoreShuffleFetcher的fetch方法去獲取數(shù)據(jù)
-
***件事就是咨詢MapOutputTrackerMaster所要取的數(shù)據(jù)的location
-
根據(jù)返回的結(jié)果調(diào)用BlockManager.getMultiple獲取真正的數(shù)據(jù)
-
BlockStoreShuffleFetcher的fetch函數(shù)偽碼
- val blockManager = SparkEnv.get.blockManager
- val startTime = System.currentTimeMillis
- val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
- logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime))
- val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)
注意上述代碼中的getServerStatuses及getMultiple,一個(gè)是詢問數(shù)據(jù)的位置,一個(gè)是去獲取真正的數(shù)據(jù)。
有關(guān)Shuffle的詳細(xì)解釋,請(qǐng)參考”詳細(xì)探究Spark的shuffle實(shí)現(xiàn)一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/
原文鏈接:http://www.cnblogs.com/hseagle/p/3673132.html