面試 | 十分鐘聊透Spark
本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)技術(shù)與數(shù)倉」,作者西貝。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)技術(shù)與數(shù)倉公眾號。
Spark是一個(gè)快速的大數(shù)據(jù)處理引擎,在實(shí)際的生產(chǎn)環(huán)境中,應(yīng)用十分廣泛。目前,Spark仍然是大數(shù)據(jù)開發(fā)非常重要的一個(gè)工具,所以在面試的過程中,Spark也會是被重點(diǎn)考察的對象。對于初學(xué)者而言,面對繁多的Spark相關(guān)概念,一時(shí)會難以厘清頭緒,對于使用Spark開發(fā)的同學(xué)而言,有時(shí)候也會對這些概念感到模糊。本文主要梳理了幾個(gè)關(guān)于Spark的比較重要的幾個(gè)概念,在面試的過程中如果被問到Spark相關(guān)的問題,具體可以從以下幾個(gè)方面展開即可,希望對你有所幫助。本文主要包括以下內(nèi)容:
- 運(yùn)行架構(gòu)
- 運(yùn)行流程
- 執(zhí)行模式
- 驅(qū)動程序
- 共享變量
- 寬依賴窄依賴
- 持久化
- 分區(qū)
- 綜合實(shí)踐案例
組成
Spark棧包括SQL和DataFrames,MLlib機(jī)器學(xué)習(xí), GraphX和SparkStreaming。用戶可以在同一個(gè)應(yīng)用程序中無縫組合使用這些庫。
架構(gòu)
Spark運(yùn)行架構(gòu)包括集群資源管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)。其中,集群資源管理器可以是Spark自帶的資源管理器,也可以是YARN或Mesos等資源管理框架。
運(yùn)行流程
- 當(dāng)一個(gè)Spark應(yīng)用被提交時(shí),首先需要為這個(gè)應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由任務(wù)控制節(jié)點(diǎn)(Driver)創(chuàng)建一個(gè)SparkContext,由SparkContext負(fù)責(zé)和資源管理器(Cluster Manager)的通信以及進(jìn)行資源的申請、任務(wù)的分配和監(jiān)控等。SparkContext會向資源管理器注冊并申請運(yùn)行Executor的資源;
- 資源管理器為Executor分配資源,并啟動Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上;
- SparkContext根據(jù)RDD的依賴關(guān)系構(gòu)建DAG圖,DAG圖提交給DAG調(diào)度器(DAGScheduler)進(jìn)行解析,將DAG圖分解成多個(gè)“階段”(每個(gè)階段都是一個(gè)任務(wù)集),并且計(jì)算出各個(gè)階段之間的依賴關(guān)系,然后把一個(gè)個(gè)“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進(jìn)行處理;Executor向SparkContext申請任務(wù),任務(wù)調(diào)度器將任務(wù)分發(fā)給Executor運(yùn)行,同時(shí),SparkContext將應(yīng)用程序代碼發(fā)放給Executor;
- 任務(wù)在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器,然后反饋給DAG調(diào)度器,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源。
MapReduce VS Spark
與Spark相比,MapReduce具有以下缺點(diǎn):
- 表達(dá)能力有限
- 磁盤IO開銷大
- 延遲高
- 任務(wù)之間的銜接涉及IO開銷
- 在前一個(gè)任務(wù)執(zhí)行完成之前,其他任務(wù)就無法開始,難以勝任復(fù)雜、多階段的計(jì)算任務(wù)
與MapReduce相比,Spark具有以下優(yōu)點(diǎn):具體包括兩個(gè)方面
- 一是利用多線程來執(zhí)行具體的任務(wù)(Hadoop MapReduce采用的是進(jìn)程模型),減少任務(wù)的啟動開銷;
- 二是Executor中有一個(gè)BlockManager存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,當(dāng)需要多輪迭代計(jì)算時(shí),可以將中間結(jié)果存儲到這個(gè)存儲模塊里,下次需要時(shí),就可以直接讀該存儲模塊里的數(shù)據(jù),而不需要讀寫到HDFS等文件系統(tǒng)里,因而有效減少了IO開銷;或者在交互式查詢場景下,預(yù)先將表緩存到該存儲系統(tǒng)上,從而可以提高讀寫IO性能。
驅(qū)動程序(Driver)和Executor
運(yùn)行main函數(shù)的驅(qū)動程序進(jìn)程位于集群中的一個(gè)節(jié)點(diǎn)上,負(fù)責(zé)三件事:
- 維護(hù)有關(guān) Spark 應(yīng)用程序的信息。
- 響應(yīng)用戶的程序或輸入。
- 跨Executor分析、分配和調(diào)度作業(yè)。
驅(qū)動程序進(jìn)程是絕對必要的——它是 Spark 應(yīng)用程序的核心,并在應(yīng)用程序的生命周期內(nèi)維護(hù)所有相關(guān)信息。
Executor負(fù)責(zé)實(shí)際執(zhí)行驅(qū)動程序分配給他們的任務(wù)。這意味著每個(gè)Executor只負(fù)責(zé)兩件事:
- 執(zhí)行驅(qū)動程序分配給它的代碼。
- 向Driver節(jié)點(diǎn)匯報(bào)該Executor的計(jì)算狀態(tài)
分區(qū)
為了讓每個(gè) executor 并行執(zhí)行工作,Spark 將數(shù)據(jù)分解成稱為partitions 的塊。分區(qū)是位于集群中一臺物理機(jī)器上的行的集合。Dataframe 的分區(qū)表示數(shù)據(jù)在執(zhí)行期間如何在機(jī)器集群中物理分布。
如果你有一個(gè)分區(qū),即使你有數(shù)千個(gè)Executor,Spark 的并行度也只有一個(gè)。如果你有很多分區(qū)但只有一個(gè)執(zhí)行器,Spark 仍然只有一個(gè)并行度,因?yàn)橹挥幸粋€(gè)計(jì)算資源。
執(zhí)行模式:Client VS Cluster VS Local
執(zhí)行模式能夠在運(yùn)行應(yīng)用程序時(shí)確定Driver和Executor的物理位置。
有三種模式可供選擇:
- 集群模式(Cluster)。
- 客戶端模式(Client)。
- 本地模式(Local)。
集群模式 可能是運(yùn)行 Spark 應(yīng)用程序最常見的方式。在集群模式下,用戶將預(yù)編譯的代碼提交給集群管理器。除了啟動Executor之外,集群管理器會在集群內(nèi)的工作節(jié)點(diǎn)(work)上啟動驅(qū)動程序(Driver)進(jìn)程。這意味著集群管理器負(fù)責(zé)管理與 Spark 應(yīng)用程序相關(guān)的所有進(jìn)程。
客戶端模式 與集群模式幾乎相同,只是 Spark 驅(qū)動程序保留在提交應(yīng)用程序的客戶端節(jié)點(diǎn)上。這意味著客戶端機(jī)器負(fù)責(zé)維護(hù) Spark driver 進(jìn)程,集群管理器維護(hù) executor 進(jìn)程。通常將這個(gè)節(jié)點(diǎn)稱之為網(wǎng)關(guān)節(jié)點(diǎn)。
本地模式可以被認(rèn)為是在你的計(jì)算機(jī)上運(yùn)行一個(gè)程序,spark 會在同一個(gè) JVM 中運(yùn)行驅(qū)動程序和執(zhí)行程序。
RDD VS DataFrame VS DataSet
RDD
一個(gè)RDD是一個(gè)分布式對象集合,其本質(zhì)是一個(gè)只讀的、分區(qū)的記錄集合。每個(gè)RDD可以分成多個(gè)分區(qū),不同的分區(qū)保存在不同的集群節(jié)點(diǎn)上(具體如下圖所示)。RDD是一種高度受限的共享內(nèi)存模型,即RDD是只讀的分區(qū)記錄集合,所以也就不能對其進(jìn)行修改。只能通過兩種方式創(chuàng)建RDD,一種是基于物理存儲的數(shù)據(jù)創(chuàng)建RDD,另一種是通過在其他RDD上作用轉(zhuǎn)換操作(transformation,比如map、filter、join等)得到新的RDD。
- 基于內(nèi)存
RDD是位于內(nèi)存中的對象集合。RDD可以存儲在內(nèi)存、磁盤或者內(nèi)存加磁盤中,但是,Spark之所以速度快,是基于這樣一個(gè)事實(shí):數(shù)據(jù)存儲在內(nèi)存中,并且每個(gè)算子不會從磁盤上提取數(shù)據(jù)。
- 分區(qū)
分區(qū)是對邏輯數(shù)據(jù)集劃分成不同的獨(dú)立部分,分區(qū)是分布式系統(tǒng)性能優(yōu)化的一種技術(shù)手段,可以減少網(wǎng)絡(luò)流量傳輸,將相同的key的元素分布在相同的分區(qū)中可以減少shuffle帶來的影響。RDD被分成了多個(gè)分區(qū),這些分區(qū)分布在集群中的不同節(jié)點(diǎn)。
- 強(qiáng)類型
RDD中的數(shù)據(jù)是強(qiáng)類型的,當(dāng)創(chuàng)建RDD的時(shí)候,所有的元素都是相同的類型,該類型依賴于數(shù)據(jù)集的數(shù)據(jù)類型。
- 懶加載
Spark的轉(zhuǎn)換操作是懶加載模式,這就意味著只有在執(zhí)行了action(比如count、collect等)操作之后,才會去執(zhí)行一些列的算子操作。
- 不可修改
RDD一旦被創(chuàng)建,就不能被修改。只能從一個(gè)RDD轉(zhuǎn)換成另外一個(gè)RDD。
- 并行化
RDD是可以被并行操作的,由于RDD是分區(qū)的,每個(gè)分區(qū)分布在不同的機(jī)器上,所以每個(gè)分區(qū)可以被并行操作。
- 持久化
由于RDD是懶加載的,只有action操作才會導(dǎo)致RDD的轉(zhuǎn)換操作被執(zhí)行,進(jìn)而創(chuàng)建出相對應(yīng)的RDD。對于一些被重復(fù)使用的RDD,可以對其進(jìn)行持久化操作(比如將其保存在內(nèi)存或磁盤中,Spark支持多種持久化策略),從而提高計(jì)算效率。
DataFrame
DataFrame代表一個(gè)不可變的分布式數(shù)據(jù)集合,其核心目的是讓開發(fā)者面對數(shù)據(jù)處理時(shí),只關(guān)心要做什么,而不用關(guān)心怎么去做,將一些優(yōu)化的工作交由Spark框架本身去處理。DataFrame是具有Schema信息的,也就是說可以被看做具有字段名稱和類型的數(shù)據(jù),類似于關(guān)系型數(shù)據(jù)庫中的表,但是底層做了很多的優(yōu)化。創(chuàng)建了DataFrame之后,就可以使用SQL進(jìn)行數(shù)據(jù)處理。
用戶可以從多種數(shù)據(jù)源中構(gòu)造DataFrame,例如:結(jié)構(gòu)化數(shù)據(jù)文件,Hive中的表,外部數(shù)據(jù)庫或現(xiàn)有RDD。DataFrame API支持Scala,Java,Python和R,在Scala和Java中,row類型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。
DataSet
DataSet是Spark 1.6中添加的新接口,是DataFrame的擴(kuò)展,它具有RDD的優(yōu)點(diǎn)(強(qiáng)類型輸入,支持強(qiáng)大的lambda函數(shù))以及Spark SQL的優(yōu)化執(zhí)行引擎的優(yōu)點(diǎn)。可以通過JVM對象構(gòu)建DataSet,然后使用函數(shù)轉(zhuǎn)換(map,flatMap,filter)。值得注意的是,Dataset API在Scala和 Java中可用,Python不支持Dataset API。
另外,DataSet API可以減少內(nèi)存的使用,由于Spark框架知道DataSet的數(shù)據(jù)結(jié)構(gòu),因此在持久化DataSet時(shí)可以節(jié)省很多的內(nèi)存空間。
共享變量
Spark提供了兩種類型的共享變量:廣播變量和累加器。廣播變量(Broadcast variables)是一個(gè)只讀的變量,并且在每個(gè)節(jié)點(diǎn)都保存一份副本,而不需要在集群中發(fā)送數(shù)據(jù)。累加器(Accumulators)可以將所有任務(wù)的數(shù)據(jù)累加到一個(gè)共享結(jié)果中。
廣播變量
廣播變量允許用戶在集群中共享一個(gè)不可變的值,該共享的、不可變的值被持計(jì)劃到集群的每臺節(jié)點(diǎn)上。通常在需要將一份小數(shù)據(jù)集(比如維表)復(fù)制到集群中的每臺節(jié)點(diǎn)時(shí)使用,比如日志分析的應(yīng)用,web日志通常只包含pageId,而每個(gè)page的標(biāo)題保存在一張表中,如果要分析日志(比如哪些page被訪問的最多),則需要將兩者join在一起,這時(shí)就可以使用廣播變量,將該表廣播到集群的每個(gè)節(jié)點(diǎn)。具體如下圖所示:
如上圖,首先Driver將序列化對象分割成小的數(shù)據(jù)庫,然后將這些數(shù)據(jù)塊存儲在Driver節(jié)點(diǎn)的BlockManager上。當(dāng)ececutor中執(zhí)行具體的task時(shí),每個(gè)executor首先嘗試從自己所在節(jié)點(diǎn)的BlockManager提取數(shù)據(jù),如果之前已經(jīng)提取的該廣播變量的值,就直接使用它。如果沒有找到,則會向遠(yuǎn)程的Driver或者其他的Executor中提取廣播變量的值,一旦獲取該值,就將其存儲在自己節(jié)點(diǎn)的BlockManager中。這種機(jī)制可以避免Driver端向多個(gè)executor發(fā)送數(shù)據(jù)而造成的性能瓶頸。
累加器
累加器(Accumulator)是Spark提供的另外一個(gè)共享變量,與廣播變量不同,累加器是可以被修改的,是可變的。每個(gè)transformation會將修改的累加器值傳輸?shù)紻river節(jié)點(diǎn),累加器可以實(shí)現(xiàn)一個(gè)累加的功能,類似于一個(gè)計(jì)數(shù)器。Spark本身支持?jǐn)?shù)字類型的累加器,用戶也可以自定義累加器的類型。
寬依賴和窄依賴
RDD中不同的操作會使得不同RDD中的分區(qū)產(chǎn)不同的依賴,主要有兩種依賴:寬依賴和窄依賴。寬依賴是指一個(gè)父RDD的一個(gè)分區(qū)對應(yīng)一個(gè)子RDD的多個(gè)分區(qū),窄依賴是指一個(gè)父RDD的分區(qū)對應(yīng)與一個(gè)子RDD的分區(qū),或者多個(gè)父RDD的分區(qū)對應(yīng)一個(gè)子RDD分區(qū)。
窄依賴會被劃分到同一個(gè)stage中,這樣可以以管道的形式迭代執(zhí)行。寬依賴所依賴的分區(qū)一般有多個(gè),所以需要跨節(jié)點(diǎn)傳輸數(shù)據(jù)。從容災(zāi)方面看,兩種依賴的計(jì)算結(jié)果恢復(fù)的方式是不同的,窄依賴只需要恢復(fù)父RDD丟失的分區(qū)即可,而寬依賴則需要考慮恢復(fù)所有父RDD丟失的分區(qū)。
DAGScheduler會將Job的RDD劃分到不同的stage中,并構(gòu)建一個(gè)stage的依賴關(guān)系,即DAG。這樣劃分的目的是既可以保障沒有依賴關(guān)系的stage可以并行執(zhí)行,又可以保證存在依賴關(guān)系的stage順序執(zhí)行。stage主要分為兩種類型,一種是ShuffleMapStage,另一種是ResultStage。其中ShuffleMapStage是屬于上游的stage,而ResulStage屬于最下游的stage,這意味著上游的stage先執(zhí)行,最后執(zhí)行ResultStage。
持久化
方式
在Spark中,RDD采用惰性求值的機(jī)制,每次遇到action操作,都會從頭開始執(zhí)行計(jì)算。每次調(diào)用action操作,都會觸發(fā)一次從頭開始的計(jì)算。對于需要被重復(fù)使用的RDD,spark支持對其進(jìn)行持久化,通過調(diào)用persist()或者cache()方法即可實(shí)現(xiàn)RDD的持計(jì)劃。通過持久化機(jī)制可以避免重復(fù)計(jì)算帶來的開銷。值得注意的是,當(dāng)調(diào)用持久化的方法時(shí),只是對該RDD標(biāo)記為了持久化,需要等到第一次執(zhí)行action操作之后,才會把計(jì)算結(jié)果進(jìn)行持久化。持久化后的RDD將會被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中被后面的行動操作重復(fù)使用。
Spark提供的兩個(gè)持久化方法的主要區(qū)別是:cache()方法默認(rèn)使用的是內(nèi)存級別,其底層調(diào)用的是persist()方法。
持久化級別的選擇
Spark提供的持久化存儲級別是在內(nèi)存使用與CPU效率之間做權(quán)衡,通常推薦下面的選擇方式:
- 如果內(nèi)存可以容納RDD,可以使用默認(rèn)的持久化級別,即MEMORY_ONLY。這是CPU最有效率的選擇,可以使作用在RDD上的算子盡可能第快速執(zhí)行。
- 如果內(nèi)存不夠用,可以嘗試使用MEMORY_ONLY_SER,使用一個(gè)快速的序列化庫可以節(jié)省很多空間,比如 Kryo 。
tips:在一些shuffle算子中,比如reduceByKey,即便沒有顯性調(diào)用persist方法,Spark也會自動將中間結(jié)果進(jìn)行持久化,這樣做的目的是避免在shuffle期間發(fā)生故障而造成重新計(jì)算整個(gè)輸入。即便如此,還是推薦對需要被重復(fù)使用的RDD進(jìn)行持久化處理。
coalesce VS repartition
repartition算法對數(shù)據(jù)進(jìn)行了shuffle操作,并創(chuàng)建了大小相等的數(shù)據(jù)分區(qū)。coalesce操作合并現(xiàn)有分區(qū)以避免shuffle,除此之外coalesce操作僅能用于減少分區(qū),不能用于增加分區(qū)。
值得注意的是:使用coalesce在減少分區(qū)時(shí),并沒有對所有數(shù)據(jù)進(jìn)行了移動,僅僅是在原來分區(qū)的基礎(chǔ)之上進(jìn)行了合并而已,所以效率較高,但是可能會引起數(shù)據(jù)傾斜。
綜合案例
一種數(shù)倉技術(shù)架構(gòu)
SparkStreaming實(shí)時(shí)同步
- 訂閱消費(fèi):
SparkStreaming消費(fèi)kafka埋點(diǎn)數(shù)據(jù)
- 數(shù)據(jù)寫入:
將解析的數(shù)據(jù)同時(shí)寫入HDFS上的某個(gè)臨時(shí)目錄下及Hive表對應(yīng)的分區(qū)目錄下
- 小文件合并:
由于是實(shí)時(shí)數(shù)據(jù)抽取,所以會生成大量的小文件,小文件的生成取決于SparkStreaming的Batch Interval,比如一分鐘一個(gè)batch,那么一分鐘就會生成一個(gè)小文件
基于SparkSQL的批處理
- ODS層到DWD層數(shù)據(jù)去重
SparkStreaming數(shù)據(jù)輸出是At Least Once,可能會存在數(shù)據(jù)重復(fù)。在ODS層到DWD層進(jìn)行明細(xì)數(shù)據(jù)處理時(shí),需要對數(shù)據(jù)使用row_number去重。
- JDBC寫入MySQL
數(shù)據(jù)量大時(shí),需要對數(shù)據(jù)進(jìn)行重分區(qū),并且為DataSet分區(qū)級別建立連接,采用批量提交的方式。
- 使用DISTRIBUTE BY子句避免生成大量小文件
spark.sql.shuffle.partitions的默認(rèn)值為200,會導(dǎo)致以下問題
- 對于較小的數(shù)據(jù),200是一個(gè)過大的選擇,由于調(diào)度開銷,通常會導(dǎo)致處理速度變慢,同時(shí)會造成小文件的產(chǎn)生。
- 對于大數(shù)據(jù)集,200很小,無法有效利用集群中的資源
使用 DISTRIBUTE BY cast( rand * N as int) 這里的N是指具體最后落地生成多少個(gè)文件數(shù)。
手動維護(hù)offset至HBase
當(dāng)作業(yè)發(fā)生故障或重啟時(shí),要保障從當(dāng)前的消費(fèi)位點(diǎn)去處理數(shù)據(jù),單純的依靠SparkStreaming本身的機(jī)制是不太理想,生產(chǎn)環(huán)境中通常借助手動管理來維護(hù)kafka的offset。
流應(yīng)用監(jiān)控告警
- 實(shí)現(xiàn)StreamingListener 接口,重寫onBatchStarted與onBatchCompleted方法
- 獲取batch執(zhí)行完成之后的時(shí)間,寫入Redis,數(shù)據(jù)類型的key為自定義的具體字符串,value是batch處理完的結(jié)束時(shí)間
- 加入流作業(yè)監(jiān)控
- 啟動定時(shí)任務(wù)監(jiān)控上述步驟寫入redis的kv數(shù)據(jù),一旦超出給定的閾值,則報(bào)錯,并發(fā)出告警通知
- 使用Azkaban定時(shí)調(diào)度該任務(wù)