Apache Spark源碼走讀之2:Job的提交與運行
實驗環(huán)境搭建
在進行后續(xù)操作前,確保下列條件已滿足。
-
下載spark binary 0.9.1
-
安裝scala
-
安裝sbt
-
安裝java
啟動spark-shell
單機模式運行,即local模式
local模式運行非常簡單,只要運行以下命令即可,假設當前目錄是$SPARK_HOME
- MASTER=local bin/spark-shell
"MASTER=local"就是表明當前運行在單機模式
local cluster方式運行
local cluster模式是一種偽cluster模式,在單機環(huán)境下模擬standalone的集群,啟動順序分別如下
-
啟動master
-
啟動worker
-
啟動spark-shell
master
- $SPARK_HOME/sbin/start-master.sh
注意運行時的輸出,日志默認保存在$SPARK_HOME/logs目錄。
master主要是運行類 org.apache.spark.deploy.master.Master,在8080端口啟動監(jiān)聽,日志如下圖所示
修改配置
-
進入$SPARK_HOME/conf目錄
-
將spark-env.sh.template重命名為spark-env.sh
-
修改spark-env.sh,添加如下內(nèi)容
- export SPARK_MASTER_IP=localhost
- export SPARK_LOCAL_IP=localhost
運行worker
- bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M
worker啟動完成,連接到master。打開maser的web ui可以看到連接上來的worker. Master WEb UI的監(jiān)聽地址是http://localhost:8080
啟動spark-shell
MASTER=spark://localhost:7077 bin/spark-shell
如果一切順利,將看到下面的提示信息。
Created spark context..
Spark context available as sc.
可以用瀏覽器打開localhost:4040來查看如下內(nèi)容
-
stages
-
storage
-
environment
-
executors
wordcount
上述環(huán)境準備妥當之后,我們在sparkshell中運行一下最簡單的例子,在spark-shell中輸入如下代碼
scala>sc.textFile("README.md").filter(_.contains("Spark")).count
上述代碼統(tǒng)計在README.md中含有Spark的行數(shù)有多少
部署過程詳解
Spark布置環(huán)境中組件構成如下圖所示。
-
Driver Program 簡要來說在spark-shell中輸入的wordcount語句對應于上圖的Driver Program.
-
Cluster Manager 就是對應于上面提到的master,主要起到deploy management的作用
-
Worker Node 與Master相比,這是slave node。上面運行各個executor,executor可以對應于線程。executor處理兩種基本的業(yè)務邏輯,一種就是driver programme,另一種就是job在提交之后拆分成各個stage,每個stage可以運行一到多個task
Notes: 在集群(cluster)方式下, Cluster Manager運行在一個jvm進程之中,而worker運行在另一個jvm進程中。在local cluster中,這些jvm進程都在同一臺機器中,如果是真正的standalone或Mesos及Yarn集群,worker與master或分布于不同的主機之上。
JOB的生成和運行
job生成的簡單流程如下
-
首先應用程序創(chuàng)建SparkContext的實例,如實例為sc
-
利用SparkContext的實例來創(chuàng)建生成RDD
-
經(jīng)過一連串的transformation操作,原始的RDD轉(zhuǎn)換成為其它類型的RDD
-
當action作用于轉(zhuǎn)換之后RDD時,會調(diào)用SparkContext的runJob方法
-
sc.runJob的調(diào)用是后面一連串反應的起點,關鍵性的躍變就發(fā)生在此處
調(diào)用路徑大致如下
-
sc.runJob->dagScheduler.runJob->submitJob
-
DAGScheduler::submitJob會創(chuàng)建JobSummitted的event發(fā)送給內(nèi)嵌類eventProcessActor
-
eventProcessActor在接收到JobSubmmitted之后調(diào)用processEvent處理函數(shù)
-
job到stage的轉(zhuǎn)換,生成finalStage并提交運行,關鍵是調(diào)用submitStage
-
在submitStage中會計算stage之間的依賴關系,依賴關系分為寬依賴和窄依賴兩種
-
如果計算中發(fā)現(xiàn)當前的stage沒有任何依賴或者所有的依賴都已經(jīng)準備完畢,則提交task
-
提交task是調(diào)用函數(shù)submitMissingTasks來完成
-
task真正運行在哪個worker上面是由TaskScheduler來管理,也就是上面的submitMissingTasks會調(diào)用TaskScheduler::submitTasks
-
TaskSchedulerImpl中會根據(jù)Spark的當前運行模式來創(chuàng)建相應的backend,如果是在單機運行則創(chuàng)建LocalBackend
-
LocalBackend收到TaskSchedulerImpl傳遞進來的ReceiveOffers事件
-
receiveOffers->executor.launchTask->TaskRunner.run
代碼片段executor.lauchTask
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
說了這么一大通,也就是講最終的邏輯處理切切實實是發(fā)生在TaskRunner這么一個executor之內(nèi)。
運算結(jié)果是包裝成為MapStatus然后通過一系列的內(nèi)部消息傳遞,反饋到DAGScheduler,這一個消息傳遞路徑不是過于復雜,有興趣可以自行勾勒。