一個SparkSQL作業(yè)的一生可能只是一集瑯琊榜
Spark是時下很火的計算框架,由UC Berkeley AMP Lab研發(fā),并由原班人馬創(chuàng)建的Databricks負責(zé)商業(yè)化相關(guān)事務(wù)。而SparkSQL則是Spark之上搭建的SQL解決方案,主打交互查詢場景。
人人都說Spark/SparkSQL快,各種Benchmark滿天飛,但是到底Spark/SparkSQL快么,或者快在哪里,似乎很少有人說得清。因為Spark是基于內(nèi)存的計算框架?因為SparkSQL有強大的優(yōu)化器?本文將帶你看一看一個SparkSQL作業(yè)到底是如何執(zhí)行的,順便探討一下SparkSQL和Hive On MapReduce比起來到底有何區(qū)別。
SQL On Hadoop的解決方案已經(jīng)玲瑯滿目了,不管是元祖級的Hive,Cloudera的Impala,MapR的 Drill,Presto,SparkSQL甚至Apache Tajo,IBM BigSQL等等,各家公司都試圖解決SQL交互場景的性能問題,因為原本的Hive On MapReduce實在太慢了。
那么Hive On MapReduce和SparkSQL或者其他交互引擎相比,慢在何處呢?讓我們先看看一個SQL On Hadoop引擎到底如何工作的。
現(xiàn)在的SQL On Hadoop作業(yè),前半段的工作原理都差不多,類似一個Compiler,分來分去都是這基層。
小紅是數(shù)據(jù)分析,她某天寫了個SQL來統(tǒng)計一個分院系的加權(quán)均值分數(shù)匯總。
- SELECT dept, avg(math_score * 1.2) + avg(eng_score * 0.8) FROM students
- GROUP BY dept;
其中STUDENTS表是學(xué)生分數(shù)表(請不要在意這個表似乎不符合范式,很多Hadoop上的數(shù)據(jù)都不符合范式,因為Join成本高,而且我寫表介紹也會很麻煩)。
她提交了這個查詢到某個SQL On Hadoop平臺執(zhí)行,然后她放下工作,切到視頻網(wǎng)頁看一會《瑯琊榜》。
在她看視頻的時候,我們的SQL平臺可是有很努力的工作滴。
首先是查詢解析。
這里和很多Compiler類似,你需要一個Parser(就是著名的程序員約架專用項目),Parser(確切說是Lexer加Parser)的作用是把一個字符串流變成一個一個Token,再根據(jù)語法定義生成一棵抽象語法樹AST。這里不詳細展開,童鞋們可以參考編譯原理。比較多的項目會選 ANTLR(Hive啦,Presto啦等等),你可以用類似BNF的范式來寫Parser規(guī)則,當然也有手寫的比如SparkSQL。AST會進一步包裝成一個簡單的基本查詢信息對象,這個對象包含了一個查詢基本的信息,比如基本語句的類型是SELECT還是INSERT,WHERE是什么,GROUP BY是什么,如果有子查詢,還需要遞歸進去,這個東西大致來說就是所謂的邏輯計劃。
- TableScan(students)
- -> Project(dept, avg(math_score * 1.2) + avg(eng_score * 0.8))
- ->TableSink
上面是無責(zé)任示意,具體到某個SQL引擎會略有不同,但是基本上都會這么干。如果你想找一個代碼干凈易懂的SQL引擎,可以參考Presto(可以算我讀過的開源代碼寫的最漂亮的了)。
到上面為止,你已經(jīng)把字符串轉(zhuǎn)換成一個所謂的LogicalPlan,這個Plan距離可以求值來說還比較殘疾。最基本來說,我還不知道dept是個啥吧,math_score是神馬類型,AVG是個什么函數(shù),這些都不明了。這樣的LogicalPlan可以稱為Unresolved(殘疾的)Logical Plan。
缺少的是所謂的元數(shù)據(jù)信息,這里主要包含兩部分:表的Schema和函數(shù)信息。表的Schema信息主要包含表的列定義(名字,類型),表的物理位置,格式,如何讀取;函數(shù)信息是函數(shù)簽名,類的位置等。
有了這些,SQL引擎需要再一次遍歷剛才的殘廢計劃,進行一次深入的解析。最重要的處理是列引用綁定和函數(shù)綁定。列引用綁定決定了一個表達式的類型。而有了類型你可以做函數(shù)綁定。函數(shù)綁定幾乎是這里最關(guān)鍵的步驟,因為普通函數(shù)比如CAST,和聚合函數(shù)比如這里的AVG,分析函數(shù)比如Rank以及 Table Function比如explode都會用完全不同的方式求值,他們會被改寫成獨立的計劃節(jié)點,而不再是普通的Expression節(jié)點。除此之外,還需要進行深入的語義檢測。比如GROUP BY是否囊括了所有的非聚合列,聚合函數(shù)是否內(nèi)嵌了聚合函數(shù),以及最基本的類型兼容檢查,對于強類型的系統(tǒng),類型不一致比如date = ‘2015-01-01’需要報錯,對于弱類型的系統(tǒng),你可以添加CAST來做Type(類型) Coerce(茍合)。
然后我們得到了一個尚未優(yōu)化的邏輯計劃:
- TableScan(students=>dept:String, eng_score:double, math_score:double)
- ->Project(dept, math_score * 1.2:expr1, eng_score * 0.8:expr2)
- ->Aggregate(avg(expr1):expr3, avg(expr2):expr4, GROUP:dept)
- ->Project(dept, expr3+expr4:avg_result)
- ->TableSink(dept, avg_result->Client)
所以我們可以開始上肉戲了?還早呢。
剛才的計劃,還差得很遠,作為一個SQL引擎,沒有優(yōu)化怎么好見人?不管是SparkSQL還是Hive,都有一套優(yōu)化器。大多數(shù)SQL on Hadoop引擎都有基于規(guī)則的優(yōu)化,少數(shù)復(fù)雜的引擎比如Hive,擁有基于代價的優(yōu)化。規(guī)則優(yōu)化很容易實現(xiàn),比如經(jīng)典的謂詞下推,可以把Join查詢的過濾條件推送到子查詢預(yù)先計算,這樣JOIN時需要計算的數(shù)據(jù)就會減少(JOIN是最重的幾個操作之一,能用越少的數(shù)據(jù)做JOIN就會越快),又比如一些求值優(yōu)化,像去掉求值結(jié)果為常量的表達式等等?;诖鷥r的優(yōu)化就復(fù)雜多了,比如根據(jù)JOIN代價來調(diào)整JOIN順序(最經(jīng)典的場景),對SparkSQL 來說,代價優(yōu)化是最簡單的根據(jù)表大小來選擇JOIN策略(小表可以用廣播分發(fā)),而沒有JOIN順序交換這些,而JOIN策略選擇則是在隨后要解釋的物理執(zhí)行計劃生成階段。
到這里,如果還沒報錯,那你就幸運滴得到了一個Resolved(不殘廢的)Logical Plan了。這個Plan,再配上表達式求值器,你也可以折騰折騰在單機對表查詢求值了。但是,我們不是做分布式系統(tǒng)的么?數(shù)據(jù)分析妹子已經(jīng)看完《瑯琊榜》的片頭了,你還在悠閑什么呢?
為了讓妹子在看完電視劇之前算完幾百G的數(shù)據(jù),我們必須借助分布式的威力,畢竟單節(jié)點算的話夠妹子看完整個瑯琊榜劇集了。剛才生成的邏輯計劃,之所以稱為邏輯計劃,是因為它只是邏輯上看起來似乎能執(zhí)行了(誤),實際上我們并不知道具體這個東西怎么對應(yīng)Spark或者MapReduce任務(wù)。
邏輯執(zhí)行計劃接下來需要轉(zhuǎn)換成具體可以在分布式情況下執(zhí)行的物理計劃,你還缺少:怎么和引擎對接,怎么做表達式求值兩個部分。
表達式求值有兩種基本策略,一個是解釋執(zhí)行,直接把之前帶來的表達式進行解釋執(zhí)行,這個是Hive現(xiàn)在的模式;另一個是代碼生成,包括 SparkSQL,Impala,Drill等等號稱新一代的引擎都是代碼生成模式的(并且配合高速編譯器)。不管是什么模式,你最終把表達式求值部分封裝成了類。代碼可能長得類似如下:
- // math_score * 1.2
- val leftOp = row.get(1/* math_score column index */);
- val result = if (leftOp == null) then null else leftOp * 1.2;
每個獨立的SELECT項目都會生成這樣一段表達式求值代碼或者封裝過的求值器。但是AVG怎么辦?當初寫wordcount的時候,我記得聚合計算需要分派在Map和Reduce兩個階段呀?這里就涉及到物理執(zhí)行轉(zhuǎn)換,涉及到分布式引擎的對接。
AVG這樣的聚合計算,加上GROUP BY的指示,告訴了底層的分布式引擎你需要怎么做聚合。本質(zhì)上來說AVG聚合需要拆分成Map階段來計算累加,還有條目個數(shù),以及Reduce階段二次累加最后每個組做除法。
因此我們要算的AVG其實會進一步拆分成兩個計劃節(jié)點:Aggregates(Partial)和Aggregates(Final)。 Partial部分是我們計算局部累加的部分,每個Mapper節(jié)點都將執(zhí)行,然后底層引擎會做一個Shuffle,將相同Key(在這里是Dept)的行分發(fā)到相同的Reduce節(jié)點。這樣經(jīng)過最終聚合你才能拿到最后結(jié)果。
拆完聚合函數(shù),如果只是上面案例給的一步SQL,那事情比較簡單,如果還有多個子查詢,那么你可能面臨多次Shuffle,對于MapReduce 來說,每次Shuffle你需要一個MapReduce Job來支撐,因為MapReduce模型中,只有通過Reduce階段才能做Shuffle操作,而對于Spark來說,Shuffle可以隨意擺放,不過你要根據(jù)Shuffle來拆分Stage。這樣拆過之后,你得到一個多個MR Job串起來的DAG或者一個Spark多個Stage的DAG(有向無環(huán)圖)。
還記得剛才的執(zhí)行計劃么?它最后變成了這樣的物理執(zhí)行計劃:
- TableScan->Project(dept, math_score * 1.2: expr1, eng_score * 0.8: expr2)
- -> AggretatePartial(avg(expr1):avg1, avg(expr2):avg2, GROUP: dept)
- -> ShuffleExchange(Row, KEY:dept)
- -> AggregateFinal(avg1, avg2, GROUP:dept)
- -> Project(dept, avg1 + avg2)
- -> TableSink
這東西到底怎么在MR或者Spark中執(zhí)行啊?對應(yīng)Shuffle之前和之后,物理上它們將在不同批次的計算節(jié)點上執(zhí)行。不管對應(yīng) MapReduce引擎還是Spark,它們分別是Mapper和Reducer,中間隔了Shuffle。上面的計劃,會由 ShuffleExchange中間斷開,分別發(fā)送到Mapper和Reducer中執(zhí)行,當然除了上面的部分還有之前提到的求值類,也都會一起序列化發(fā)送。
實際在MapReduce模型中,你最終執(zhí)行的是一個特殊的Mapper和特殊的Reducer,它們分別在初始化階段載入被序列化的Plan和求值器信息,然后在map和reduce函數(shù)中依次對每個輸入求值;而在Spark中,你生成的是一個一個RDD變換操作。
比如一個Project操作,對于MapReduce來說,偽代碼大概是這樣的:
- void configuration() {
- context = loadContext()
- }
- void map(inputRow) {
- outputRow = context.projectEvaluator (inputRow);
- write(outputRow);
- }
對于Spark,大概就是這樣:
- currentPlan.mapPartitions { iter =>
- projection = loadContext()
- iter.map { row => projection(row) } }
至此為止,引擎幫你愉快滴提交了Job,你的集群開始不緊不慢地計算了。
到這里為止,似乎看起來SparkSQL和Hive On MapReduce沒有什么區(qū)別?其實SparkSQL快,并不快在引擎。
SparkSQL的引擎優(yōu)化,并沒有Hive復(fù)雜,畢竟人Hive多年積累,十多年下來也不是吃素的。但是Spark本身快呀。
Spark標榜自己比MapReduce快幾倍幾十倍,很多人以為這是因為Spark是“基于內(nèi)存的計算引擎”,其實這不是真的。Spark還是要落磁盤的,Shuffle的過程需要也會將中間數(shù)據(jù)吐到本地磁盤上。所以說Spark是基于內(nèi)存計算的說法,不考慮手動Cache的情景,是不正確的。
SparkSQL的快,根本不是剛才說的那一坨東西哪兒比Hive On MR快了,而是Spark引擎本身快了。
事實上,不管是SparkSQL,Impala還是Presto等等,這些標榜第二代的SQL On Hadoop引擎,都至少做了三個改進,消除了冗余的HDFS讀寫,冗余的MapReduce階段,節(jié)省了JVM啟動時間。
在MapReduce模型下,需要Shuffle的操作,就必須接入一個完整的MapReduce操作,而接入一個MR操作,就必須將前階段的MR結(jié)果寫入HDFS,并且在Map階段重新讀出來,這才是萬惡之源。
事實上,如果只是上面的SQL查詢,不管用MapReduce還是Spark,都不一定會有顯著的差異,因為它只經(jīng)過了一個shuffle階段。
真正體現(xiàn)差異的,是這樣的查詢:
- SELECT g1.name, g1.avg, g2.cnt
- FROM (SELECT name, avg(id) AS avg FROM students GROUP BY name) g1
- JOIN (SELECT name, count(id) AS cnt FROM students GROUP BY name) g2
- ON (g1.name = g2.name)
- ORDER BY avg;
而他們所對應(yīng)的MR任務(wù)和Spark任務(wù)分別是這樣的:
一次HDFS中間數(shù)據(jù)寫入,其實會因為Replication的常數(shù)擴張為三倍寫入,而磁盤讀寫是非常耗時的。這才是Spark速度的主要來源。
另一個加速,來自于JVM重用??紤]一個上萬Task的Hive任務(wù),如果用MapReduce執(zhí)行,每個Task都會啟動一次JVM,而每次JVM啟動時間可能就是幾秒到十幾秒,而一個短Task的計算本身可能也就是幾秒到十幾秒,當MR的Hive任務(wù)啟動完成,Spark的任務(wù)已經(jīng)計算結(jié)束了。對于短 Task多的情形下,這是很大的節(jié)省。
說到這里,小紅已經(jīng)看完《瑯琊榜》回來了,接下去我們討論一下劇情吧。。。