數(shù)據(jù)分析工具篇——Spark計算原理
Hadoop的MR結(jié)構(gòu)和YARN結(jié)構(gòu)是大數(shù)據(jù)時代的第一代產(chǎn)品,滿足了大家在離線計算上的需求,但是針對實時運算卻存在不足,為滿足這一需求,后來的大佬研發(fā)了spark計算方法,大大的提高了運算效率。
Spark的計算原理
spark的結(jié)構(gòu)為:
節(jié)點介紹:
- Cluster Manager:在standalone模式中即為Master主節(jié)點,控制整個集群,監(jiān)控worker。在YARN模式中為資源管理器負責(zé)分配資源,有點像YARN中ResourceManager那個角色,大管家握有所有的干活的資源,屬于乙方的總包。
- WorkerNode:可以干活的節(jié)點,聽大管家ClusterManager差遣,是真正有資源干活的主。從節(jié)點,負責(zé)控制計算節(jié)點,啟動Executor或者Driver。
- Executor:在WorkerNode上起的一個進程,相當(dāng)于一個包工頭,負責(zé)準備Task環(huán)境和執(zhí)行。
- Task:負責(zé)內(nèi)存和磁盤的使用。Task是施工項目里的每一個具體的任務(wù)。
- Driver:統(tǒng)管Task的產(chǎn)生與發(fā)送給Executor的,運行Application 的main()函數(shù),是甲方的司令員。
- SparkContext:與ClusterManager打交道的,負責(zé)給錢申請資源的,是甲方的接口人。
整個互動流程是這樣的:
- 甲方來了個項目,創(chuàng)建了SparkContext,SparkContext去找ClusterManager申請資源同時給出報價,需要多少CPU和內(nèi)存等資源。ClusterManager去找WorkerNode并啟動Excutor,并介紹Excutor給Driver認識;
- Driver根據(jù)施工圖拆分一批批的Task,將Task送給Executor去執(zhí)行;
- Executor接收到Task后準備Task運行時依賴并執(zhí)行,并將執(zhí)行結(jié)果返回給Driver;
- Driver會根據(jù)返回回來的Task狀態(tài)不斷的指揮下一步工作,直到所有Task執(zhí)行結(jié)束;
運行流程及特點為:
- Sparkcontext的作用:一是分發(fā)task,申請資源等功能外,更重要的一個功能是將RDD拆分成task,即繪制DAG圖。
借用上圖我們再來了解一下spark的運算過程:
- 構(gòu)建Spark Application的運行環(huán)境,啟動SparkContext;
- SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請運行Executor資源,并啟動StandaloneExecutorbackend;
- Executor向SparkContext申請Task;
- SparkContext將應(yīng)用程序分發(fā)給Executor;
- SparkContext構(gòu)建成DAG圖,將DAG圖分解成Stage、將Taskset發(fā)送給Task Scheduler,最后由Task Scheduler將Task發(fā)送給Executor運行;
- Task在Executor上運行,運行完釋放所有資源;
RDD計算案例
我們用一個案例來分析RDD的計算過程:
- 在客戶端通過RDD構(gòu)建一個RDD的圖形,如圖第一部分rdd1.join(rdd2).groupby(…).filter(…)。
- sparkcontext中的DAGScheduler會將上步的RDD圖形構(gòu)建成DAG圖形,如圖第二部分;
- TaskScheduler會將DAG圖形拆分成多個Task;
- Clustermanager通過Yarn調(diào)度器將Task分配到各個node的Executer中,結(jié)合相關(guān)資源進行運算。
DAGScheduler對于RDD圖形的劃分是有一定規(guī)律的:
- stage的劃分是觸發(fā)action的時候從后往前劃分的,所以本圖要從RDD_G開始劃分。
- RDD_G依賴于RDD_B和RDD_F,隨機決定先判斷哪一個依賴,但是對于結(jié)果無影響。
- RDD_B與RDD_G屬于窄依賴,所以他們屬于同一個stage,RDD_B與老爹RDD_A之間是寬依賴的關(guān)系,所以他們不能劃分在一起,所以RDD_A自己是一個stage1;
- RDD_F與RDD_G是屬于寬依賴,他們不能劃分在一起,所以最后一個stage的范圍也就限定了,RDD_B和RDD_G組成了Stage3;
- RDD_F與兩個爹RDD_D、RDD_E之間是窄依賴關(guān)系,RDD_D與爹RDD_C之間也是窄依賴關(guān)系,所以他們都屬于同一個stage2;
- 執(zhí)行過程中stage1和stage2相互之間沒有前后關(guān)系所以可以并行執(zhí)行,相應(yīng)的每個stage內(nèi)部各個partition對應(yīng)的task也并行執(zhí)行;
- stage3依賴stage1和stage2執(zhí)行結(jié)果的partition,只有等前兩個stage執(zhí)行結(jié)束后才可以啟動stage3;
- 我們前面有介紹過Spark的Task有兩種:ShuffleMapTask和ResultTask,其中后者在DAG最后一個階段推送給Executor,其余所有階段推送的都是ShuffleMapTask。在這個案例中stage1和stage2中產(chǎn)生的都是ShuffleMapTask,在stage3中產(chǎn)生的ResultTask;
- 雖然stage的劃分是從后往前計算劃分的,但是依賴邏輯判斷等結(jié)束后真正創(chuàng)建stage是從前往后的。也就是說如果從stage的ID作為標識的話,先需要執(zhí)行的stage的ID要小于后需要執(zhí)行的ID。就本案例來說,stage1和stage2的ID要小于stage3,至于stage1和stage2的ID誰大誰小是隨機的,是由前面第2步?jīng)Q定的。
Executor是最終運行task的苦力,他將Task的執(zhí)行結(jié)果反饋給Driver,會根據(jù)大小采用不同的策略:
- 如果大于MaxResultSize,默認1G,直接丟棄;
- 如果“較大”,大于配置的frameSize(默認10M),以taksId為key存入BlockManager
- else,全部吐給Driver。