Apache Spark源碼走讀:如何進(jìn)行代碼跟讀
概要
今天不談Spark中什么復(fù)雜的技術(shù)實現(xiàn),只稍為聊聊如何進(jìn)行代碼跟讀。眾所周知,Spark使用scala進(jìn)行開發(fā),由于scala有眾多的語法糖,很多時候代碼跟著跟著就覺著線索跟丟掉了,另外Spark基于Akka來進(jìn)行消息交互,那如何知道誰是接收方呢?
new Throwable().printStackTrace
代碼跟讀的時候,經(jīng)常會借助于日志,針對日志中輸出的每一句,我們都很想知道它們的調(diào)用者是誰。但有時苦于對spark系統(tǒng)的了解程度不深,或者對scala認(rèn)識不夠,一時半會之內(nèi)無法找到答案,那么有沒有什么簡便的辦法呢?
我的辦法就是在日志出現(xiàn)的地方加入下面一句話
- new Throwable().printStackTrace()
現(xiàn)在舉一個實際的例子來說明問題。
比如我們在啟動spark-shell之后,輸入一句非常簡單的sc.textFile("README.md"),會輸出下述的log
- 14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
- 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
- 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took 78 ms
- 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took 79 ms
- res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
那我很想知道是第二句日志所在的tryToPut函數(shù)是被誰調(diào)用的該怎么辦?
辦法就是打開MemoryStore.scala,找到下述語句
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
在這句話之上,添加如下語句
- new Throwable().printStackTrace()
然后,重新進(jìn)行源碼編譯
- sbt/sbt assembly
再次打開spark-shell,執(zhí)行sc.textFile("README.md"),就可以得到如下輸出,從中可以清楚知道tryToPut的調(diào)用者是誰
- 14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
- 14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code
- java.lang.Throwable
- at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)
- at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)
- at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
- at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)
- at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)
- at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)
- at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)
- at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
- at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
- at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
- at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)
- at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)
- at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)
- at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13)
- at $line5.$read$$iwC$$iwC$$iwC.(:18)
- at $line5.$read$$iwC$$iwC.(:20)
- at $line5.$read$$iwC.(:22)
- at $line5.$read.(:24)
- at $line5.$read$.(:28)
- at $line5.$read$.()
- at $line5.$eval$.(:7)
- at $line5.$eval$.()
- at $line5.$eval.$print()
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:483)
- at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
- at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
- at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
- at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
- at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
- at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
- at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
- at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
- at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
- at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
- at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
- at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
- at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
- at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
- at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
- at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
- at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
- at org.apache.spark.repl.Main$.main(Main.scala:31)
- at org.apache.spark.repl.Main.main(Main.scala)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:483)
- at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
- at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
- at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
- 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
- 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took 78 ms
- 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took 79 ms
- res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
git同步
對代碼作了修改之后,如果并不想提交代碼,那該如何將***的內(nèi)容同步到本地呢?
- git reset --hard
- git pull origin master
Akka消息跟蹤
追蹤消息的接收者是誰,相對來說比較容易,只要使用好grep就可以了,當(dāng)然前提是要對actor model有一點點了解。
還是舉個實例吧,我們知道CoarseGrainedSchedulerBackend會發(fā)送LaunchTask消息出來,那么誰是接收方呢?只需要執(zhí)行以下腳本即可。
- grep LaunchTask -r core/src/main
從如下的輸出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到該函數(shù)之后的業(yè)務(wù)處理,只需要去看看接收方的receive函數(shù)即可。
- core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: case LaunchTask(data) =>
- core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: logError("Received LaunchTask command but executor was null")
- core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala: case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
- core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala: executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
小結(jié)
今天的內(nèi)容相對簡單,沒有技術(shù)含量,自己做個記述,免得時間久了,不記得。