解析Spark在騰訊、雅虎、優(yōu)酷的成功應(yīng)用
為了滿足挖掘分析與交互式實(shí)時(shí)查詢的計(jì)算需求,騰訊大數(shù)據(jù)使用了Spark平臺(tái)來(lái)支持挖掘分析類計(jì)算、交互式實(shí)時(shí)查詢計(jì)算以及允許誤差范圍的快速查詢計(jì)算,目前騰訊大數(shù)據(jù)擁有超過(guò)200臺(tái)的Spark集群,并獨(dú)立維護(hù)Spark和Shark分支。Spark集群已穩(wěn)定運(yùn)行2年,我們積累了大量的案例和運(yùn)營(yíng)經(jīng)驗(yàn)?zāi)芰?,另外多個(gè)業(yè)務(wù)的大數(shù)據(jù)查詢與分析應(yīng)用,已在陸續(xù)上線并穩(wěn)定運(yùn)行。在SQL查詢性能方面普遍比MapReduce高出2倍以上,利用內(nèi)存計(jì)算和內(nèi)存表的特性,性能至少在10倍以上。在迭代計(jì)算與挖掘分析方面,精準(zhǔn)推薦將小時(shí)和天級(jí)別的模型訓(xùn)練轉(zhuǎn)變?yōu)镾park的分鐘級(jí)別的訓(xùn)練,同時(shí)簡(jiǎn)潔的編程接口使得算法實(shí)現(xiàn)比MR在時(shí)間成本和代碼量上高出許多。
Spark VS MapReduce
盡管MapReduce適用大多數(shù)批處理工作,并且在大數(shù)據(jù)時(shí)代成為企業(yè)大數(shù)據(jù)處理的***技術(shù),但由于以下幾個(gè)限制,它對(duì)一些場(chǎng)景并不是***選擇:
缺少對(duì)迭代計(jì)算以及DAG運(yùn)算的支持
Shuffle過(guò)程多次排序和落地,MR之間的數(shù)據(jù)需要落Hdfs文件系統(tǒng)
Spark在很多方面都彌補(bǔ)了MapReduce的不足,比MapReduce的通用性更好,迭代運(yùn)算效率更高,作業(yè)延遲更低,它的主要優(yōu)勢(shì)包括:
提供了一套支持DAG圖的分布式并行計(jì)算的編程框架,減少多次計(jì)算之間中間結(jié)果寫(xiě)到Hdfs的開(kāi)銷
提供Cache機(jī)制來(lái)支持需要反復(fù)迭代計(jì)算或者多次數(shù)據(jù)共享,減少數(shù)據(jù)讀取的IO開(kāi)銷
使用多線程池模型來(lái)減少task啟動(dòng)開(kāi)稍,shuffle過(guò)程中避免不必要的sort操作以及減少磁盤(pán)IO操作
廣泛的數(shù)據(jù)集操作類型
MapReduce由于其設(shè)計(jì)上的約束只適合處理離線計(jì)算,在實(shí)時(shí)查詢和迭代計(jì)算上仍有較大的不足,而隨著業(yè)務(wù)的發(fā)展,業(yè)界對(duì)實(shí)時(shí)查詢和迭代分析有更多的需求,單純依靠MapReduce框架已經(jīng)不能滿足業(yè)務(wù)的需求了。Spark由于其可伸縮、基于內(nèi)存計(jì)算等特點(diǎn),且可以直接讀寫(xiě)Hadoop上任何格式的數(shù)據(jù),成為滿足業(yè)務(wù)需求的***候選者。
應(yīng)用Spark的成功案例
目前大數(shù)據(jù)在互聯(lián)網(wǎng)公司主要應(yīng)用在廣告、報(bào)表、推薦系統(tǒng)等業(yè)務(wù)上。在廣告業(yè)務(wù)方面需要大數(shù)據(jù)做應(yīng)用分析、效果分析、定向優(yōu)化等,在推薦系統(tǒng)方面則需要大數(shù)據(jù)優(yōu)化相關(guān)排名、個(gè)性化推薦以及熱點(diǎn)點(diǎn)擊分析等。
這些應(yīng)用場(chǎng)景的普遍特點(diǎn)是計(jì)算量大、效率要求高。Spark恰恰滿足了這些要求,該項(xiàng)目一經(jīng)推出便受到開(kāi)源社區(qū)的廣泛關(guān)注和好評(píng)。并在近兩年內(nèi)發(fā)展成為大數(shù)據(jù)處理領(lǐng)域最炙手可熱的開(kāi)源項(xiàng)目。
本章將列舉國(guó)內(nèi)外應(yīng)用Spark的成功案例。
1. 騰訊
廣點(diǎn)通是最早使用Spark的應(yīng)用之一。騰訊大數(shù)據(jù)精準(zhǔn)推薦借助Spark快速迭代的優(yōu)勢(shì),圍繞“數(shù)據(jù)+算法+系統(tǒng)”這套技術(shù)方案,實(shí)現(xiàn)了在“數(shù)據(jù)實(shí)時(shí)采集、算法實(shí)時(shí)訓(xùn)練、系統(tǒng)實(shí)時(shí)預(yù)測(cè)”的全流程實(shí)時(shí)并行高維算法,最終成功應(yīng)用于廣點(diǎn)通pCTR投放系統(tǒng)上,支持每天上百億的請(qǐng)求量。
基于日志數(shù)據(jù)的快速查詢系統(tǒng)業(yè)務(wù)構(gòu)建于Spark之上的Shark,利用其快速查詢以及內(nèi)存表等優(yōu)勢(shì),承擔(dān)了日志數(shù)據(jù)的即席查詢工作。在性能方面,普遍比Hive高2-10倍,如果使用內(nèi)存表的功能,性能將會(huì)比Hive快百倍。
2. Yahoo
Yahoo將Spark用在Audience Expansion中的應(yīng)用。Audience Expansion是廣告中尋找目標(biāo)用戶的一種方法:首先廣告者提供一些觀看了廣告并且購(gòu)買(mǎi)產(chǎn)品的樣本客戶,據(jù)此進(jìn)行學(xué)習(xí),尋找更多可能轉(zhuǎn)化的用戶,對(duì)他們定向廣告。Yahoo采用的算法是logistic regression。同時(shí)由于有些SQL負(fù)載需要更高的服務(wù)質(zhì)量,又加入了專門(mén)跑Shark的大內(nèi)存集群,用于取代商業(yè)BI/OLAP工具,承擔(dān)報(bào)表/儀表盤(pán)和交互式/即席查詢,同時(shí)與桌面BI工具對(duì)接。目前在Yahoo部署的Spark集群有112臺(tái)節(jié)點(diǎn),9.2TB內(nèi)存。
3. 淘寶
阿里搜索和廣告業(yè)務(wù),最初使用Mahout或者自己寫(xiě)的MR來(lái)解決復(fù)雜的機(jī)器學(xué)習(xí),導(dǎo)致效率低而且代碼不易維護(hù)。淘寶技術(shù)團(tuán)隊(duì)使用了Spark來(lái)解決多次迭代的機(jī)器學(xué)習(xí)算法、高計(jì)算復(fù)雜度的算法等。將Spark運(yùn)用于淘寶的推薦相關(guān)算法上,同時(shí)還利用Graphx解決了許多生產(chǎn)問(wèn)題,包括以下計(jì)算場(chǎng)景:基于度分布的中樞節(jié)點(diǎn)發(fā)現(xiàn)、基于***連通圖的社區(qū)發(fā)現(xiàn)、基于三角形計(jì)數(shù)的關(guān)系衡量、基于隨機(jī)游走的用戶屬性傳播等。
4. 優(yōu)酷土豆
優(yōu)酷土豆在使用Hadoop集群的突出問(wèn)題主要包括:***是商業(yè)智能BI方面,分析師提交任務(wù)之后需要等待很久才得到結(jié)果;第二就是大數(shù)據(jù)量計(jì)算,比如進(jìn)行一些模擬廣告投放之時(shí),計(jì)算量非常大的同時(shí)對(duì)效率要求也比較高,***就是機(jī)器學(xué)習(xí)和圖計(jì)算的迭代運(yùn)算也是需要耗費(fèi)大量資源且速度很慢。
最終發(fā)現(xiàn)這些應(yīng)用場(chǎng)景并不適合在MapReduce里面去處理。通過(guò)對(duì)比,發(fā)現(xiàn)Spark性能比MapReduce提升很多。首先,交互查詢響應(yīng)快,性能比Hadoop提高若干倍;模擬廣告投放計(jì)算效率高、延遲小(同hadoop比延遲至少降低一個(gè)數(shù)量級(jí));機(jī)器學(xué)習(xí)、圖計(jì)算等迭代計(jì)算,大大減少了網(wǎng)絡(luò)傳輸、數(shù)據(jù)落地等,極大的提高的計(jì)算性能。目前Spark已經(jīng)廣泛使用在優(yōu)酷土豆的視頻推薦(圖計(jì)算)、廣告業(yè)務(wù)等。
Spark與Shark的原理
1.Spark生態(tài)圈
如下圖所示為Spark的整個(gè)生態(tài)圈,***層為資源管理器,采用Mesos、Yarn等資源管理集群或者Spark自帶的Standalone模式,底層存儲(chǔ)為文件系統(tǒng)或者其他格式的存儲(chǔ)系統(tǒng)如HBase。Spark作為計(jì)算框架,為上層多種應(yīng)用提供服務(wù)。Graphx和MLBase提供數(shù)據(jù)挖掘服務(wù),如圖計(jì)算和挖掘迭代計(jì)算等。Shark提供SQL查詢服務(wù),兼容Hive語(yǔ)法,性能比Hive快3-50倍,BlinkDB是一個(gè)通過(guò)權(quán)衡數(shù)據(jù)精確度來(lái)提升查詢晌應(yīng)時(shí)間的交互SQL查詢引擎,二者都可作為交互式查詢使用。Spark Streaming將流式計(jì)算分解成一系列短小的批處理計(jì)算,并且提供高可靠和吞吐量服務(wù)。
2.Spark基本原理
Spark運(yùn)行框架如下圖所示,首先有集群資源管理服務(wù)(Cluster Manager)和運(yùn)行作業(yè)任務(wù)的結(jié)點(diǎn)(Worker Node),然后就是每個(gè)應(yīng)用的任務(wù)控制結(jié)點(diǎn)Driver和每個(gè)機(jī)器節(jié)點(diǎn)上有具體任務(wù)的執(zhí)行進(jìn)程(Executor)。
與MR計(jì)算框架相比,Executor有二個(gè)優(yōu)點(diǎn):一個(gè)是多線程來(lái)執(zhí)行具體的任務(wù),而不是像MR那樣采用進(jìn)程模型,減少了任務(wù)的啟動(dòng)開(kāi)稍。二個(gè)是Executor上會(huì)有一個(gè)BlockManager存儲(chǔ)模塊,類似于KV系統(tǒng)(內(nèi)存和磁盤(pán)共同作為存儲(chǔ)設(shè)備),當(dāng)需要迭代多輪時(shí),可以將中間過(guò)程的數(shù)據(jù)先放到這個(gè)存儲(chǔ)系統(tǒng)上,下次需要時(shí)直接讀該存儲(chǔ)上數(shù)據(jù),而不需要讀寫(xiě)到hdfs等相關(guān)的文件系統(tǒng)里,或者在交互式查詢場(chǎng)景下,事先將表Cache到該存儲(chǔ)系統(tǒng)上,提高讀寫(xiě)IO性能。另外Spark在做Shuffle時(shí),在Groupby,Join等場(chǎng)景下去掉了不必要的Sort操作,相比于MapReduce只有Map和Reduce二種模式,Spark還提供了更加豐富全面的運(yùn)算操作如filter,groupby,join等。
Spark采用了Scala來(lái)編寫(xiě),在函數(shù)表達(dá)上Scala有天然的優(yōu)勢(shì),因此在表達(dá)復(fù)雜的機(jī)器學(xué)習(xí)算法能力比其他語(yǔ)言更強(qiáng)且簡(jiǎn)單易懂。提供各種操作函數(shù)來(lái)建立起RDD的DAG計(jì)算模型。把每一個(gè)操作都看成構(gòu)建一個(gè)RDD來(lái)對(duì)待,而RDD則表示的是分布在多臺(tái)機(jī)器上的數(shù)據(jù)集合,并且可以帶上各種操作函數(shù)。如下圖所示:
首先從hdfs文件里讀取文本內(nèi)容構(gòu)建成一個(gè)RDD,然后使用filter()操作來(lái)對(duì)上次的RDD進(jìn)行過(guò)濾,再使用map()操作取得記錄的***個(gè)字段,***將其cache在內(nèi)存上,后面就可以對(duì)之前cache過(guò)的數(shù)據(jù)做其他的操作。整個(gè)過(guò)程都將形成一個(gè)DAG計(jì)算圖,每個(gè)操作步驟都有容錯(cuò)機(jī)制,同時(shí)還可以將需要多次使用的數(shù)據(jù)cache起來(lái),供后續(xù)迭代使用。
3.Shark的工作原理
Shark是基于Spark計(jì)算框架之上且兼容Hive語(yǔ)法的SQL執(zhí)行引擎,由于底層的計(jì)算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,如果是純內(nèi)存計(jì)算的SQL,要快5倍以上,當(dāng)數(shù)據(jù)全部load在內(nèi)存的話,將快10倍以上,因此Shark可以作為交互式查詢應(yīng)用服務(wù)來(lái)使用。
上圖就是整個(gè)Shark的框架圖,與其他的SQL引擎相比,除了基于Spark的特性外,Shark是完全兼容Hive的語(yǔ)法,表結(jié)構(gòu)以及UDF函數(shù)等,已有的HiveSql可以直接進(jìn)行遷移至Shark上。
與Hive相比,Shark的特性如下:
1.以在線服務(wù)的方式執(zhí)行任務(wù),避免任務(wù)進(jìn)程的啟動(dòng)和銷毀開(kāi)稍,通常MapReduce里的每個(gè)任務(wù)都是啟動(dòng)和關(guān)閉進(jìn)程的方式來(lái)運(yùn)行的,而在Shark中,Server運(yùn)行后,所有的工作節(jié)點(diǎn)也隨之啟動(dòng),隨后以常駐服務(wù)的形式不斷的接受Server發(fā)來(lái)的任務(wù)。
2.Groupby和Join操作不需要Sort工作,當(dāng)數(shù)據(jù)量?jī)?nèi)存能裝下時(shí),一邊接收數(shù)據(jù)一邊執(zhí)行計(jì)算操作。在Hive中,不管任何操作在Map到Reduce的過(guò)程都需要對(duì)Key進(jìn)行Sort操作。
3.對(duì)于性能要求更高的表,提供分布式Cache系統(tǒng)將表數(shù)據(jù)事先Cache至內(nèi)存中,后續(xù)的查詢將直接訪問(wèn)內(nèi)存數(shù)據(jù),不再需要磁盤(pán)開(kāi)稍。
4.還有很多Spark的特性,如可以采用Torrent來(lái)廣播變量和小數(shù)據(jù),將執(zhí)行計(jì)劃直接傳送給Task,DAG過(guò)程中的中間數(shù)據(jù)不需要落地到Hdfs文件系統(tǒng)。
騰訊大數(shù)據(jù)Spark的概況
騰訊大數(shù)據(jù)綜合了多個(gè)業(yè)務(wù)線的各種需求和特性,目前正在進(jìn)行以下工作:
1.經(jīng)過(guò)改造和優(yōu)化的Shark和Spark吸收了TDW平臺(tái)的功能,如Hive的特有功能:元數(shù)據(jù)重構(gòu),分區(qū)優(yōu)化等,同時(shí)可以通過(guò)IDE或者洛子調(diào)度來(lái)直接執(zhí)行HiveSql查詢和定時(shí)調(diào)度Spark的任務(wù);
2.與Gaia和TDW的底層存儲(chǔ)直接兼容,可以直接安全且高效地使用TDW集群上的數(shù)據(jù);
3.對(duì)Spark底層的使用門(mén)檻,資源管理與調(diào)度,任務(wù)監(jiān)控以及容災(zāi)等多個(gè)功能進(jìn)行完善,并支持快速的遷移和擴(kuò)容。