自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一個SparkSQL作業(yè)的一生可能只是一集瑯琊榜

大數(shù)據(jù) Spark
小紅是數(shù)據(jù)分析,她某天寫了個SQL來統(tǒng)計一個分院系的加權(quán)均值分數(shù)匯總。她提交了這個查詢到某個SQL On Hadoop平臺執(zhí)行,然后她放下工作,切到視頻網(wǎng)頁看一會《瑯琊榜》……

Spark是時下很火的計算框架,由UC Berkeley AMP Lab研發(fā),并由原班人馬創(chuàng)建的Databricks負責(zé)商業(yè)化相關(guān)事務(wù)。而SparkSQL則是Spark之上搭建的SQL解決方案,主打交互查詢場景。

人人都說Spark/SparkSQL快,各種Benchmark滿天飛,但是到底Spark/SparkSQL快么,或者快在哪里,似乎很少有人說得清。因為Spark是基于內(nèi)存的計算框架?因為SparkSQL有強大的優(yōu)化器?本文將帶你看一看一個SparkSQL作業(yè)到底是如何執(zhí)行的,順便探討一下SparkSQL和Hive On MapReduce比起來到底有何區(qū)別。

SQL On Hadoop的解決方案已經(jīng)玲瑯滿目了,不管是元祖級的Hive,Cloudera的Impala,MapR的 Drill,Presto,SparkSQL甚至Apache Tajo,IBM BigSQL等等,各家公司都試圖解決SQL交互場景的性能問題,因為原本的Hive On MapReduce實在太慢了。

那么Hive On MapReduce和SparkSQL或者其他交互引擎相比,慢在何處呢?讓我們先看看一個SQL On Hadoop引擎到底如何工作的。

現(xiàn)在的SQL On Hadoop作業(yè),前半段的工作原理都差不多,類似一個Compiler,分來分去都是這基層。

小紅是數(shù)據(jù)分析,她某天寫了個SQL來統(tǒng)計一個分院系的加權(quán)均值分數(shù)匯總。

  1. SELECT dept, avg(math_score * 1.2) + avg(eng_score * 0.8) FROM students 
  2. GROUP BY dept; 

其中STUDENTS表是學(xué)生分數(shù)表(請不要在意這個表似乎不符合范式,很多Hadoop上的數(shù)據(jù)都不符合范式,因為Join成本高,而且我寫表介紹也會很麻煩)。

她提交了這個查詢到某個SQL On Hadoop平臺執(zhí)行,然后她放下工作,切到視頻網(wǎng)頁看一會《瑯琊榜》。

在她看視頻的時候,我們的SQL平臺可是有很努力的工作滴。

首先是查詢解析。

這里和很多Compiler類似,你需要一個Parser(就是著名的程序員約架專用項目),Parser(確切說是Lexer加Parser)的作用是把一個字符串流變成一個一個Token,再根據(jù)語法定義生成一棵抽象語法樹AST。這里不詳細展開,童鞋們可以參考編譯原理。比較多的項目會選 ANTLR(Hive啦,Presto啦等等),你可以用類似BNF的范式來寫Parser規(guī)則,當然也有手寫的比如SparkSQL。AST會進一步包裝成一個簡單的基本查詢信息對象,這個對象包含了一個查詢基本的信息,比如基本語句的類型是SELECT還是INSERT,WHERE是什么,GROUP BY是什么,如果有子查詢,還需要遞歸進去,這個東西大致來說就是所謂的邏輯計劃。

  1. TableScan(students) 
  2. -> Project(dept, avg(math_score * 1.2) + avg(eng_score * 0.8)) 
  3. ->TableSink 

上面是無責(zé)任示意,具體到某個SQL引擎會略有不同,但是基本上都會這么干。如果你想找一個代碼干凈易懂的SQL引擎,可以參考Presto(可以算我讀過的開源代碼寫的最漂亮的了)。

到上面為止,你已經(jīng)把字符串轉(zhuǎn)換成一個所謂的LogicalPlan,這個Plan距離可以求值來說還比較殘疾。最基本來說,我還不知道dept是個啥吧,math_score是神馬類型,AVG是個什么函數(shù),這些都不明了。這樣的LogicalPlan可以稱為Unresolved(殘疾的)Logical Plan。

缺少的是所謂的元數(shù)據(jù)信息,這里主要包含兩部分:表的Schema和函數(shù)信息。表的Schema信息主要包含表的列定義(名字,類型),表的物理位置,格式,如何讀取;函數(shù)信息是函數(shù)簽名,類的位置等。

有了這些,SQL引擎需要再一次遍歷剛才的殘廢計劃,進行一次深入的解析。最重要的處理是列引用綁定和函數(shù)綁定。列引用綁定決定了一個表達式的類型。而有了類型你可以做函數(shù)綁定。函數(shù)綁定幾乎是這里最關(guān)鍵的步驟,因為普通函數(shù)比如CAST,和聚合函數(shù)比如這里的AVG,分析函數(shù)比如Rank以及 Table Function比如explode都會用完全不同的方式求值,他們會被改寫成獨立的計劃節(jié)點,而不再是普通的Expression節(jié)點。除此之外,還需要進行深入的語義檢測。比如GROUP BY是否囊括了所有的非聚合列,聚合函數(shù)是否內(nèi)嵌了聚合函數(shù),以及最基本的類型兼容檢查,對于強類型的系統(tǒng),類型不一致比如date = ‘2015-01-01’需要報錯,對于弱類型的系統(tǒng),你可以添加CAST來做Type(類型) Coerce(茍合)。

然后我們得到了一個尚未優(yōu)化的邏輯計劃:

  1. TableScan(students=>dept:String, eng_score:double, math_score:double
  2. ->Project(dept, math_score * 1.2:expr1, eng_score * 0.8:expr2) 
  3. ->Aggregate(avg(expr1):expr3, avg(expr2):expr4, GROUP:dept) 
  4. ->Project(dept, expr3+expr4:avg_result) 
  5. ->TableSink(dept, avg_result->Client) 

所以我們可以開始上肉戲了?還早呢。

剛才的計劃,還差得很遠,作為一個SQL引擎,沒有優(yōu)化怎么好見人?不管是SparkSQL還是Hive,都有一套優(yōu)化器。大多數(shù)SQL on Hadoop引擎都有基于規(guī)則的優(yōu)化,少數(shù)復(fù)雜的引擎比如Hive,擁有基于代價的優(yōu)化。規(guī)則優(yōu)化很容易實現(xiàn),比如經(jīng)典的謂詞下推,可以把Join查詢的過濾條件推送到子查詢預(yù)先計算,這樣JOIN時需要計算的數(shù)據(jù)就會減少(JOIN是最重的幾個操作之一,能用越少的數(shù)據(jù)做JOIN就會越快),又比如一些求值優(yōu)化,像去掉求值結(jié)果為常量的表達式等等?;诖鷥r的優(yōu)化就復(fù)雜多了,比如根據(jù)JOIN代價來調(diào)整JOIN順序(最經(jīng)典的場景),對SparkSQL 來說,代價優(yōu)化是最簡單的根據(jù)表大小來選擇JOIN策略(小表可以用廣播分發(fā)),而沒有JOIN順序交換這些,而JOIN策略選擇則是在隨后要解釋的物理執(zhí)行計劃生成階段。

到這里,如果還沒報錯,那你就幸運滴得到了一個Resolved(不殘廢的)Logical Plan了。這個Plan,再配上表達式求值器,你也可以折騰折騰在單機對表查詢求值了。但是,我們不是做分布式系統(tǒng)的么?數(shù)據(jù)分析妹子已經(jīng)看完《瑯琊榜》的片頭了,你還在悠閑什么呢?

為了讓妹子在看完電視劇之前算完幾百G的數(shù)據(jù),我們必須借助分布式的威力,畢竟單節(jié)點算的話夠妹子看完整個瑯琊榜劇集了。剛才生成的邏輯計劃,之所以稱為邏輯計劃,是因為它只是邏輯上看起來似乎能執(zhí)行了(誤),實際上我們并不知道具體這個東西怎么對應(yīng)Spark或者MapReduce任務(wù)。

邏輯執(zhí)行計劃接下來需要轉(zhuǎn)換成具體可以在分布式情況下執(zhí)行的物理計劃,你還缺少:怎么和引擎對接,怎么做表達式求值兩個部分。

表達式求值有兩種基本策略,一個是解釋執(zhí)行,直接把之前帶來的表達式進行解釋執(zhí)行,這個是Hive現(xiàn)在的模式;另一個是代碼生成,包括 SparkSQL,Impala,Drill等等號稱新一代的引擎都是代碼生成模式的(并且配合高速編譯器)。不管是什么模式,你最終把表達式求值部分封裝成了類。代碼可能長得類似如下:

  1. // math_score * 1.2 
  2. val leftOp = row.get(1/* math_score column index */); 
  3. val result = if (leftOp == nullthen null else leftOp * 1.2; 

每個獨立的SELECT項目都會生成這樣一段表達式求值代碼或者封裝過的求值器。但是AVG怎么辦?當初寫wordcount的時候,我記得聚合計算需要分派在Map和Reduce兩個階段呀?這里就涉及到物理執(zhí)行轉(zhuǎn)換,涉及到分布式引擎的對接。

AVG這樣的聚合計算,加上GROUP BY的指示,告訴了底層的分布式引擎你需要怎么做聚合。本質(zhì)上來說AVG聚合需要拆分成Map階段來計算累加,還有條目個數(shù),以及Reduce階段二次累加最后每個組做除法。

因此我們要算的AVG其實會進一步拆分成兩個計劃節(jié)點:Aggregates(Partial)和Aggregates(Final)。 Partial部分是我們計算局部累加的部分,每個Mapper節(jié)點都將執(zhí)行,然后底層引擎會做一個Shuffle,將相同Key(在這里是Dept)的行分發(fā)到相同的Reduce節(jié)點。這樣經(jīng)過最終聚合你才能拿到最后結(jié)果。

拆完聚合函數(shù),如果只是上面案例給的一步SQL,那事情比較簡單,如果還有多個子查詢,那么你可能面臨多次Shuffle,對于MapReduce 來說,每次Shuffle你需要一個MapReduce Job來支撐,因為MapReduce模型中,只有通過Reduce階段才能做Shuffle操作,而對于Spark來說,Shuffle可以隨意擺放,不過你要根據(jù)Shuffle來拆分Stage。這樣拆過之后,你得到一個多個MR Job串起來的DAG或者一個Spark多個Stage的DAG(有向無環(huán)圖)。

還記得剛才的執(zhí)行計劃么?它最后變成了這樣的物理執(zhí)行計劃:

  1. TableScan->Project(dept, math_score * 1.2: expr1, eng_score * 0.8: expr2) 
  2. -> AggretatePartial(avg(expr1):avg1, avg(expr2):avg2, GROUP: dept) 
  3. -> ShuffleExchange(Row, KEY:dept) 
  4. -> AggregateFinal(avg1, avg2, GROUP:dept) 
  5. -> Project(dept, avg1 + avg2) 
  6. -> TableSink 

這東西到底怎么在MR或者Spark中執(zhí)行啊?對應(yīng)Shuffle之前和之后,物理上它們將在不同批次的計算節(jié)點上執(zhí)行。不管對應(yīng) MapReduce引擎還是Spark,它們分別是Mapper和Reducer,中間隔了Shuffle。上面的計劃,會由 ShuffleExchange中間斷開,分別發(fā)送到Mapper和Reducer中執(zhí)行,當然除了上面的部分還有之前提到的求值類,也都會一起序列化發(fā)送。

實際在MapReduce模型中,你最終執(zhí)行的是一個特殊的Mapper和特殊的Reducer,它們分別在初始化階段載入被序列化的Plan和求值器信息,然后在map和reduce函數(shù)中依次對每個輸入求值;而在Spark中,你生成的是一個一個RDD變換操作。

比如一個Project操作,對于MapReduce來說,偽代碼大概是這樣的:

  1. void configuration() { 
  2. context = loadContext() 
  3. void map(inputRow) { 
  4. outputRow = context.projectEvaluator (inputRow); 
  5. write(outputRow); 

對于Spark,大概就是這樣:

  1. currentPlan.mapPartitions { iter => 
  2. projection = loadContext() 
  3. iter.map { row => projection(row) } } 

至此為止,引擎幫你愉快滴提交了Job,你的集群開始不緊不慢地計算了。

到這里為止,似乎看起來SparkSQL和Hive On MapReduce沒有什么區(qū)別?其實SparkSQL快,并不快在引擎。

SparkSQL的引擎優(yōu)化,并沒有Hive復(fù)雜,畢竟人Hive多年積累,十多年下來也不是吃素的。但是Spark本身快呀。

Spark標榜自己比MapReduce快幾倍幾十倍,很多人以為這是因為Spark是“基于內(nèi)存的計算引擎”,其實這不是真的。Spark還是要落磁盤的,Shuffle的過程需要也會將中間數(shù)據(jù)吐到本地磁盤上。所以說Spark是基于內(nèi)存計算的說法,不考慮手動Cache的情景,是不正確的。

SparkSQL的快,根本不是剛才說的那一坨東西哪兒比Hive On MR快了,而是Spark引擎本身快了。

事實上,不管是SparkSQL,Impala還是Presto等等,這些標榜第二代的SQL On Hadoop引擎,都至少做了三個改進,消除了冗余的HDFS讀寫,冗余的MapReduce階段,節(jié)省了JVM啟動時間。

在MapReduce模型下,需要Shuffle的操作,就必須接入一個完整的MapReduce操作,而接入一個MR操作,就必須將前階段的MR結(jié)果寫入HDFS,并且在Map階段重新讀出來,這才是萬惡之源。

事實上,如果只是上面的SQL查詢,不管用MapReduce還是Spark,都不一定會有顯著的差異,因為它只經(jīng)過了一個shuffle階段。

真正體現(xiàn)差異的,是這樣的查詢:

  1. SELECT g1.name, g1.avg, g2.cnt 
  2. FROM (SELECT nameavg(id) AS avg FROM students GROUP BY name) g1 
  3. JOIN (SELECT namecount(id) AS cnt FROM students GROUP BY name) g2 
  4. ON (g1.name = g2.name
  5. ORDER BY avg

而他們所對應(yīng)的MR任務(wù)和Spark任務(wù)分別是這樣的:

 

一次HDFS中間數(shù)據(jù)寫入,其實會因為Replication的常數(shù)擴張為三倍寫入,而磁盤讀寫是非常耗時的。這才是Spark速度的主要來源。

另一個加速,來自于JVM重用??紤]一個上萬Task的Hive任務(wù),如果用MapReduce執(zhí)行,每個Task都會啟動一次JVM,而每次JVM啟動時間可能就是幾秒到十幾秒,而一個短Task的計算本身可能也就是幾秒到十幾秒,當MR的Hive任務(wù)啟動完成,Spark的任務(wù)已經(jīng)計算結(jié)束了。對于短 Task多的情形下,這是很大的節(jié)省。

說到這里,小紅已經(jīng)看完《瑯琊榜》回來了,接下去我們討論一下劇情吧。。。

責(zé)任編輯:Ophira 來源: 比特科技
相關(guān)推薦

2015-04-23 08:51:53

2011-11-30 12:32:38

企業(yè)防毒防毒方案拯救三

2016-08-24 11:13:30

2015-08-03 09:33:21

PH程序員一生

2023-01-10 08:20:55

RocketMQ消息源碼

2023-07-11 13:34:19

Rust開發(fā)軟件

2020-05-03 14:00:29

程序員技能開發(fā)者

2023-05-09 22:57:26

AI網(wǎng)絡(luò)

2017-06-26 10:05:49

開發(fā)編程程序員

2021-08-06 22:43:54

中斷架構(gòu)傳遞

2023-12-26 18:47:32

2020-07-09 17:37:47

Linux網(wǎng)絡(luò)包中斷

2018-01-18 09:05:05

存儲數(shù)據(jù)包分層

2024-06-04 00:01:00

2012-12-04 10:08:16

2021-06-08 07:48:26

iOS 15 Linux 操作系統(tǒng)

2018-01-05 12:42:01

Lisa電腦蘋果Mac

2020-11-29 17:08:50

程序員IT

2010-03-15 13:36:11

2015-12-11 17:00:08

數(shù)據(jù)中心瑯琊榜
點贊
收藏

51CTO技術(shù)棧公眾號