如何提高Spark姿勢水平
本文的依據(jù)是我學(xué)習(xí)整個Spark的學(xué)習(xí)歷程。在這里,我會從幾個方面來跟大家一起討論。Spark 是什么?Spark 跟 Hadoop 有什么淵源?Spark 有哪些方便的組件?什么場景下用 Spark ,如何使用?以及用什么樣的姿勢來學(xué)習(xí) Spark 會比較好?
Apache Spark? is a fast and general engine for large-scale data processing.
Spark就是一個能夠快速以及通用的處理大規(guī)模數(shù)據(jù)的引擎。怎么理解這句話呢?
Spark 就是一個處理引擎,它提供了類似 map , reduce , groupBy,persist 這些操作,來方便地對數(shù)據(jù)進行各種各樣的并行處理。
它以一個有向無環(huán)圖來定義一個應(yīng)用,方便對任務(wù)的容錯和重試處理。
它定義了一個叫 RDD 的彈性數(shù)據(jù)結(jié)構(gòu),將所有的數(shù)據(jù)和中間結(jié)果都盡可能緩存在內(nèi)存中,形成一個分布式內(nèi)存數(shù)據(jù)集。
然后為什么說它只是一個處理引擎呢?從數(shù)據(jù)源角度看, Spark 可以從 HBase、ElasticSeach、Hive 等渠道獲取。從運行資源角度看, Spark 可以跑在 Spark集群,Hadoop 集群 ,Mesos 集群上,所以它只是一個處理引擎。至此它擁有了快速的,通用的屬性,也就成為一個通用的大數(shù)據(jù)處理引擎。
Spark 和 Hadoop 有什么淵源?
容我細(xì)細(xì)道來。說起 Spark ,我們不可不提到它的老前輩 MapReduce 。
MapReduce 是一個編程模型 ,可以實現(xiàn)運行在規(guī)??梢造`活調(diào)整的由普通機器組成的集群上,一個典型的 MapReduce計算往往由幾千臺機器組成、處理以 TB 計算的數(shù)據(jù)。 這在 Google 發(fā)出《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文之前,幾乎是不可想象的。并行計算,容錯機制是那么的高效和可靠。開源的 Hadoop 就實現(xiàn)了 MapReduce ,以及它的基石分布式文件系統(tǒng) HDFS (Hadoop Distribute File System),也即是 《Google File System》 的開源版實現(xiàn)。
既然這么高效那為什么還會出現(xiàn) Spark 呢?一個巨大的原因是,Hadoop 把數(shù)據(jù)的中間結(jié)果放到了HDFS 上了,這就導(dǎo)致處理的過程雖然非常可靠,但是耗時也非常非常長。當(dāng)初寫 Spark 是因為需要進行進行大規(guī)模數(shù)據(jù)的機器學(xué)習(xí),總所周知機器學(xué)習(xí)需要不斷訪問數(shù)據(jù)不斷訪問數(shù)據(jù)不斷迭代,這對于 MapReduce 來說是致命的,效率很慢,所以實現(xiàn)了Spark。
那么Spark發(fā)展至今,有哪些方便的組件呢?如下圖。
- val datas:DataFrame = hc.sql("SELECT SEX,TALL FROM PERSONS");
- val model = LogisticRegressionWithSGD.train(datas, 50)
- KafkaSteaming.createStream(x => model.predict(x.SEX));
短短幾行代碼可能就涵蓋了,Spark SQL,MLLib,SparkSteaming 。這幾個組件分別是干啥的呢?Spark Core Engine 提供了最基礎(chǔ)的操作,如 map , reduce , reduceBy , groupBy 等基礎(chǔ)操作,提供了 RDD 和有向無環(huán)圖的管理結(jié)構(gòu),提供了容錯機制。
SparkSQL 提供了對于 Hive,HBase等數(shù)據(jù)庫的操作,以 SQL 作為統(tǒng)一的查詢規(guī)范進行數(shù)據(jù)訪問。不僅如此 Spark 還提供了 DataFrame 的操作方式將數(shù)據(jù)的操作抽象化為對對象的操作。
MLLib 提供了機器學(xué)習(xí)相關(guān)的流水線處理 Pipline ,以及實現(xiàn)了絕大部分機器學(xué)習(xí)的組件,如 LinearRegression 、GBDT、LogisticRegression、SVM等,可以非常方便地用于大規(guī)模數(shù)據(jù)的機器學(xué)習(xí)。
GraphX 提供了大規(guī)模的圖處理及圖計算算法,其中有傳統(tǒng)的 stronglyConnectedComponent 強直通性算法,也有實現(xiàn)了 PageRank 的新型的 Pregel 分布式圖計算框架,以及實現(xiàn)了 Label Propagation Algorithm 的機器學(xué)習(xí)標(biāo)簽傳播算法。
而 SparkSteaming 則提供了批量流計算,用于接收來自 Kafka 或者 Twritter 消息中間件的數(shù)據(jù),進行實時處理。
那么我們應(yīng)該在什么場景下使用 Spark ,以及如何使用呢?
1、有錢的時候
Spark 需要非常多非常多的內(nèi)存,比 MapReduce 多多了,MapReduce 只是需要少量的內(nèi)存和大量硬盤,所以跑 Spark 來說會比較貴。
2、迫切需要快速處理大數(shù)據(jù)量的時候。
如果不是很迫切,那么使用 Hadoop 和 Hive 可能更加合適,因為它們也可以完成絕大部分的數(shù)據(jù)處理,并不是一定要用 Spark。
3、需要處理大規(guī)模圖的時候
當(dāng)前做巨大圖計算的引擎來說,Spark 可以說是最合適的。
4、其他的計算框架需要 Spark 作為計算引擎的時候。
比如Hive on Spark,比如 Impala 。Spark 可以作為一個分布式計算引擎嵌入到其他計算系統(tǒng)中。
Spark 運行架構(gòu)是怎樣的?
Spark 任務(wù)由 Driver 提交 Application 給 Master ,然后由 Master 將 Application 切分成多個 Job ,然后調(diào)度 DAG Scheduler 將 Task 切分成多個 stage ,分配給多個 Worker,每個Worker 接收到 TaskSet 任務(wù)集后,將調(diào)度 Executor 們進行任務(wù)分配,每個 Executor 都有自己的 DataSet 用于計算。通訊是使用akka。
Driver會記錄所有stage的信息。要是stage切分過多,那占用Driver的內(nèi)存會非常多。若task運行的stage失敗,默認(rèn)會進行4次重試,若4次重試全部失敗,SparkContext會停止所有工作。
Driver也會記錄stage的運行時間,如果task運行的stage時間太久,Driver可能會認(rèn)為這個job可能失敗了,會重新分配一個task給另外一個Executor,兩個task都會同時跑,誰先跑完誰交差,另外一個只有被干掉的份。
從運行模式來看,Spark有這么幾種方式可以運行。
- local
- mesos
- standalone
- yarn-client
- yarn-cluster
下面一個一個來解剖它們。
local,顧名思義,是跑在本地的,指將Driver和Executor都運行在提交任務(wù)的機器上。 local[2] 代表啟動兩個線程來跑任務(wù), local[*]代表啟動任意數(shù)量需要的線程來跑Spark任務(wù)。
Mesos是Apache下的開源分布式資源管理框架,它被稱為是分布式系統(tǒng)的內(nèi)核。Mesos最初是由加州大學(xué)伯克利分校的AMPLab開發(fā)的,后在Twitter得到廣泛使用。
Spark on mesos,是指跑在mesos平臺上。目前有兩個模式可以選擇,粗粒度模式(CoarseMesosSchedulerBackend)和細(xì)粒度模式(MesosSchedulerBackend)。粗粒度模式下,Spark任務(wù)在指定資源的時候,所分配的資源將會被鎖定,其他應(yīng)用無法share。在細(xì)粒度模式下,Spark啟動時Secheduler只會分配給當(dāng)前需要的資源,類似云的想法,不會對資源進行鎖定。
Spark on standalone,是指跑在 Spark 集群上。Spark集群可以自成一個平臺,資源由Spark來管理,不借助任何外部資源,若在測試階段可以考慮使用這種模式,比較高效,但是在生產(chǎn)環(huán)境若有多個任務(wù),不太建議使用這種方式。
Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協(xié)調(diào)者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統(tǒng),可為上層應(yīng)用提供統(tǒng)一的資源管理和調(diào)度,它的引入為集群在利用率、資源統(tǒng)一管理和數(shù)據(jù)共享等方面帶來了巨大好處。
Spark on yarn,是指跑在Hadoop集群上。Hadoop提供的yarn是一個比較好的資源管理平臺,若項目中已經(jīng)有使用Hadoop相關(guān)的組件,建議優(yōu)先使用yarn來進行資源管理。
將Spark任務(wù)提交到y(tǒng)arn上同樣有兩個模式,一種是yarn-client,一種是yarn-cluster。
yarn-client將SparkContext運行在本地,Driver也運行在本地,這種模式一般不推薦,因為在分配Driver資源的時候,提交的機器往往并不能滿足。
yarn-cluster,將任務(wù)提交到Hadoop集群上,由yarn來決定Driver應(yīng)該跑在哪個機器,SparkContext也會運行在被分配的機器上,建議使用這種模式。
無論是yarn-client還是yarn-cluster,都是在yarn平臺的管理下完成,而Spark on yarn目前只支持粗粒度方式(Hadoop2.6.0),所以在任務(wù)多,資源需求大的情況下,可能需要擴大Hadoop集群避免資源搶占。
Spark 使用的時候有哪些坑呢,如何使用呢?
00000:Spark on yarn 啟動的時候一直在 waiting。
第一種可能,隊列資源不足,所有的資源都在被其他同學(xué)占用ing。
解決方案:把那個同學(xué)打暈,然后kill application。
第二種可能,設(shè)置的 Driver 或者 executor 的 cpu 或者 memory 資源太多。
解決方案:看看隊列資源有多少,拿小本本計算一下究竟能申請多少,然后給自己一巴掌。如果集群資源太爛,單臺機器只有16G,那你就別動不動就申請一個 driver 或者 executor 一下就來個32G了。
第三種可能,程序報錯了,一直在重試。
解決方案:滾回去debug去。
特別提醒:Spark 默認(rèn)是有10%的內(nèi)存的 overhead 的,所以會比你申請的多10%。
00001:Driver 拋 OutOfMemory Exception
很明顯嘛,就是driver的內(nèi)存不足了,嘗試看一下哪個地方占用內(nèi)存太多了,特別提醒一下,stage的切分,task的分配都是在Driver 分配的,數(shù)量太多的話會爆炸。以及collect(),count()等這些操作都是需要把所有信息搜集到driver端的。
解決方案:打自己一巴掌,然后看dump日志或者看看自己的代碼,是不是哪里搞錯了。如果一切都很合理,那就提高一下內(nèi)存吧。
00010:executor 拋 OutOfMemory Exception
內(nèi)存不足。哇,那這個可能性就多了。
是不是數(shù)據(jù)量太大 partition 數(shù)太少?太少了就多加點 partition 。
是不是產(chǎn)生數(shù)據(jù)傾斜了?解決它。
是不是某個操作,比如flatmap,導(dǎo)致單個executor產(chǎn)生大量數(shù)據(jù)了?
是不是請求的內(nèi)存實在太少了?
00011:executor 拋 is running beyond physical memory limit
哇,你的集群資源超分配了,物理資源被其他團隊用了,GG思密達,快拿起40米長木棍。把那個人抓出來。
00100:driver 或者 executor 拋 OutOfMemoryError: GC overhead limit exceeded
出現(xiàn)內(nèi)存泄漏或者真的集群資源不夠,一直在full GC超過次數(shù)限制了,仔細(xì)檢查一下哪些東西占用內(nèi)存太多,是不是RDD持久化占用太多資源了,還是數(shù)據(jù)有傾斜,還是真的partition太少導(dǎo)致每個partition數(shù)據(jù)太多。
00101:運行 GraphX 的時候 driver 拋 OutOfMemory Exception
運行 GraphX 的時候因為會迭代計算,所以會產(chǎn)生非常非常多 stage,這時候 driver 可能沒有足夠多的內(nèi)存可以放下這些 stage 和 task 的狀態(tài),很容易就出現(xiàn) OOM。這時候能做的事情就四個,第一增加 driver 內(nèi)存,第二降低 partition 的數(shù)量,第三減少 Pregel 的迭代次數(shù)減少stage的數(shù)量,第四優(yōu)化圖的切分策略。
00110:大對象網(wǎng)絡(luò)傳輸慢。
放棄默認(rèn)的 Java Serialization,改用 Kryo Serialization。
小對象用廣播的模式,避免全局 join。
GraphX 來說改善圖切分策略,減少網(wǎng)絡(luò)交互。
GraphX 盡量單臺機器配置高點,可以盡量讓更多的 partition 在同一臺機器。
00111:SparkStreaming 消息堆積。
調(diào)整窗口時間,著重分析消息消費過程的瓶頸并調(diào)整相應(yīng)的資源,盡量降低單筆計算時間。然后根據(jù)收集的信息再根據(jù)吞吐量來決定窗口時間。
01000:進行 Shuffle 的時候報 Spark Shuffle FetchFailedException。
數(shù)據(jù)在 Shuffle 的時候中間數(shù)據(jù)量過大或者數(shù)據(jù)產(chǎn)生了傾斜,導(dǎo)致部分目標(biāo)機器崩潰。通過分析崩潰的時候的任務(wù),改善數(shù)據(jù) Shuffle 時的數(shù)據(jù)分布情況。
那應(yīng)該以怎樣的姿勢來學(xué)習(xí) Spark 呢?
Step1:環(huán)境搭建
自己開虛擬機或者云主機搭好Hadoop,Spark,Hive,sqoop,原生的那種,可以直接實現(xiàn)為偽分布式??梢栽囋囄蚁旅嫱扑]的這種版本搭配,這是CDH5.8.x的個組件版本組合。
Apache Zookeeper 3.4.5 + Apache Hadoop 2.6.0 + Apache HBase 1.2.0 + Apache Hive 1.1.0 + Apache Spark 1.6 + Apache Pig 0.12.0 + Apache Flume 1.6.0 + Apache Kafka 0.9.0 + Apache Sqoop 1.4.6/1.99.5
注意事項:版本搭配要合理,不然會有很多坑。
Step2:數(shù)據(jù)準(zhǔn)備
使用Spark生成500萬數(shù)據(jù),包含[身份證,手機號,日期,性別,身高]五個字段。其中身份證格式為6位,手機號為6位,日期為yyyy-mm-dd格式,性別為F、M,身高為160-190隨機數(shù)。手機號其中有100萬必須為10086,都必須為合理的隨機數(shù)據(jù),不能是序列,結(jié)果保存到Hive表中。
Step3:MapReduce初探
使用 Step2 產(chǎn)生的數(shù)據(jù)進行關(guān)系生成,相同手機號的人認(rèn)為有關(guān)系,可以使用RDF 的組織方式進行保存。直接過濾空數(shù)據(jù)以及6位號碼相同的,若發(fā)現(xiàn)同一號碼導(dǎo)致的關(guān)系數(shù)超過3000,剔除,結(jié)果先保存到Hive中。
Step4:內(nèi)存調(diào)優(yōu)及算法實現(xiàn)
利用 Step3 生成的關(guān)系,利用GraphX和SLPA進行社區(qū)劃分,可以借鑒 Spark 的 Pregel 框架,閱讀 LPA 實現(xiàn)的源碼。當(dāng)然希望你能改造為SLPA,SLPA需要自己實現(xiàn),要注意思考GraphX的局限性。
Step5:去做更多的事,實現(xiàn)更多的功能。
投入到更多的數(shù)據(jù)處理工作中,繼續(xù)一些億級別的調(diào)優(yōu)以及機器學(xué)習(xí)的學(xué)習(xí)中,不斷學(xué)習(xí)不斷提高自己的水平。scala 是 Spark 的原生語言,但是現(xiàn)在也有很多的數(shù)據(jù)分析師在使用 R 在 Spark 上進行數(shù)據(jù)分析,也有數(shù)據(jù)開發(fā)工程師使用 Python 在 Spark 進行機器學(xué)習(xí),甚至還實現(xiàn)一些深度學(xué)習(xí)的算法,打通了Tensorflow,這些在未來都可能成為主流。
最后總結(jié)一下 Spark:
1、Spark 跟 MapReduce 如出一轍。
2、Spark 很快,是一個計算引擎,其他組件都是可拔插的,但需要耗費很多內(nèi)存很多錢。
3、不是非得用Spark,還有很多其他的解決方案。
4、Spark 需要循序漸進學(xué)習(xí),不斷實踐,純理論沒什么用。
完。你不點個贊么?
【本文為51CTO專欄作者“大蕉”的原創(chuàng)稿件,轉(zhuǎn)載請通過作者微信公眾號“一名叫大蕉的程序員”獲取授權(quán)】