Spark中的核心概念RDD,我們了解多少?
RDD全稱叫做彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets),它是一種分布式的內(nèi)存抽象,表示一個只讀的記錄分區(qū)的集合,它只能通過其他RDD轉(zhuǎn)換而創(chuàng)建,為此,RDD支持豐富的轉(zhuǎn)換操作 ( 如: map, join, filter, groupBy 等),通過這種轉(zhuǎn)換操作,新的RDD則包含了如何從其他RDDs衍生所必需的信息,所以說RDDs之間是有依賴關(guān)系的。
基于RDDs之間的依賴,RDDs會形成一個有向無環(huán)圖DAG,該DAG描述了整個流式計算的流程,實際執(zhí)行的時候,RDD是通過血緣關(guān)系(Lineage)一氣呵成的,即使出現(xiàn)數(shù)據(jù)分區(qū)丟失,也可以通過血緣關(guān)系重建分區(qū)。
總結(jié)起來,基于RDD的流式計算任務可描述為:從穩(wěn)定的物理存儲(如分布式文件系統(tǒng))中加載記錄,記錄被傳入由一組確定性操作構(gòu)成的DAG,然后寫回穩(wěn)定存儲。另外RDD還可以將數(shù)據(jù)集緩存到內(nèi)存中,使得在多個操作之間可以重用數(shù)據(jù)集,基于這個特點可以很方便地構(gòu)建迭代型應用(圖計算、機器學習等)或者交互式數(shù)據(jù)分析應用。
可以說Spark最初也就是實現(xiàn)RDD的一個分布式系統(tǒng),后面通過不斷發(fā)展壯大成為現(xiàn)在較為完善的大數(shù)據(jù)生態(tài)系統(tǒng),簡單來講,Spark-RDD的關(guān)系類似于Hadoop-MapReduce關(guān)系。
RDD特點
RDD表示只讀的分區(qū)的數(shù)據(jù)集,對RDD進行改動,只能通過RDD的轉(zhuǎn)換操作,由一個RDD得到一個新的RDD,新的RDD包含了從其他RDD衍生所必需的信息。
RDDs之間存在依賴,RDD的執(zhí)行是按照血緣關(guān)系延時計算的。如果血緣關(guān)系較長,可以通過持久化RDD來切斷血緣關(guān)系。
分區(qū)
如下圖所示,RDD邏輯上是分區(qū)的,每個分區(qū)的數(shù)據(jù)是抽象存在的,計算的時候會通過一個compute函數(shù)得到每個分區(qū)的數(shù)據(jù)。
如果RDD是通過已有的文件系統(tǒng)構(gòu)建,則compute函數(shù)是讀取指定文件系統(tǒng)中的數(shù)據(jù),如果RDD是通過其他RDD轉(zhuǎn)換而來,則compute函數(shù)是執(zhí)行轉(zhuǎn)換邏輯將其他RDD的數(shù)據(jù)進行轉(zhuǎn)換。
只讀
如下圖所示,RDD是只讀的,要想改變RDD中的數(shù)據(jù),只能在現(xiàn)有的RDD基礎上創(chuàng)建新的RDD。
由一個RDD轉(zhuǎn)換到另一個RDD,可以通過豐富的操作算子實現(xiàn),不再像MapReduce那樣只能寫map和reduce了,如下圖所示。
RDD的操作算子包括兩類,一類叫做transformations,它是用來將RDD進行轉(zhuǎn)化,構(gòu)建RDD的血緣關(guān)系;另一類叫做actions,它是用來觸發(fā)RDD的計算,得到RDD的相關(guān)計算結(jié)果或者將RDD保存的文件系統(tǒng)中。下圖是RDD所支持的操作算子列表。
依賴
RDDs通過操作算子進行轉(zhuǎn)換,轉(zhuǎn)換得到的新RDD包含了從其他RDDs衍生所必需的信息,RDDs之間維護著這種血緣關(guān)系,也稱之為依賴。如下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區(qū)是一一對應的,另一種是寬依賴,下游RDD的每個分區(qū)與上游RDD(也稱之為父RDD)的每個分區(qū)都有關(guān),是多對多的關(guān)系。
通過RDDs之間的這種依賴關(guān)系,一個任務流可以描述為DAG(有向無環(huán)圖),如下圖所示,在實際執(zhí)行過程中寬依賴對應于Shuffle(圖中的reduceByKey和join),窄依賴中的所有轉(zhuǎn)換操作可以通過類似于管道的方式一氣呵成執(zhí)行(圖中map和union可以一起執(zhí)行)。
緩存
如果在應用程序中多次使用同一個RDD,可以將該RDD緩存起來,該RDD只有在***次計算的時候會根據(jù)血緣關(guān)系得到分區(qū)的數(shù)據(jù),在后續(xù)其他地方用到該RDD的時候,會直接從緩存處取而不用再根據(jù)血緣關(guān)系計算,這樣就加速后期的重用。
如下圖所示,RDD-1經(jīng)過一系列的轉(zhuǎn)換后得到RDD-n并保存到hdfs,RDD-1在這一過程中會有個中間結(jié)果,如果將其緩存到內(nèi)存,那么在隨后的RDD-1轉(zhuǎn)換到RDD-m這一過程中,就不會計算其之前的RDD-0了。
Checkpoint
雖然RDD的血緣關(guān)系天然地可以實現(xiàn)容錯,當RDD的某個分區(qū)數(shù)據(jù)失敗或丟失,可以通過血緣關(guān)系重建。但是對于長時間迭代型應用來說,隨著迭代的進行,RDDs之間的血緣關(guān)系會越來越長,一旦在后續(xù)迭代過程中出錯,則需要通過非常長的血緣關(guān)系去重建,勢必影響性能。
為此,RDD支持checkpoint將數(shù)據(jù)保存到持久化的存儲中,這樣就可以切斷之前的血緣關(guān)系,因為checkpoint后的RDD不需要知道它的父RDDs了,它可以從checkpoint處拿到數(shù)據(jù)。
小結(jié)
總結(jié)起來,給定一個RDD我們至少可以知道如下幾點信息:1、分區(qū)數(shù)以及分區(qū)方式;2、由父RDDs衍生而來的相關(guān)依賴信息;3、計算每個分區(qū)的數(shù)據(jù),計算步驟為:1)如果被緩存,則從緩存中取的分區(qū)的數(shù)據(jù);2)如果被Checkpoint,則從Checkpoint處恢復數(shù)據(jù);3)根據(jù)血緣關(guān)系計算分區(qū)的數(shù)據(jù)。
編程模型
在Spark中,RDD被表示為對象,通過對象上的方法調(diào)用來對RDD進行轉(zhuǎn)換。經(jīng)過一系列的Transformations后,就可以調(diào)用Actions觸發(fā)RDD的計算,Action可以是向應用程序返回結(jié)果( count, collect 等),或者是向存儲系統(tǒng)保存數(shù)據(jù)( saveAsTextFile 等)。在Spark中,只有遇到Action,才會執(zhí)行RDD的計算(即懶執(zhí)行),這樣在運行時可以通過管道的方式傳輸多個轉(zhuǎn)換。
要使用Spark,開發(fā)者需要編寫一個Driver程序,它被提交到集群以調(diào)度運行Worker,如下圖所示。Driver中定義了一個或多個RDD,并調(diào)用RDD上的action,Worker則執(zhí)行RDD分區(qū)計算任務。
應用舉例
下面介紹一個簡單的Spark應用程序?qū)嵗齏ordCount,統(tǒng)計一個數(shù)據(jù)集中每個單詞出現(xiàn)的次數(shù),首先將從HDFS中加載數(shù)據(jù)得到原始RDD-0,其中每條記錄為數(shù)據(jù)中的一行句子,經(jīng)過一個flatMap操作,將一行句子切分為多個獨立的詞,得到RDD-1,再通過map操作將每個詞映射為key-value形式,其中key為詞本身,value為初始計數(shù)值1,得到RDD-2,將RDD-2中的所有記錄歸并,統(tǒng)計每個詞的計數(shù),得到RDD-3,***將其保存到HDFS。
- object WordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCount <inputfile> <outputfile>");
- System.exit(1);
- }
- val conf = new SparkConf().setAppName("WordCount")
- val sc = new SparkContext(conf)
- val result = sc.textFile(args(0))
- .flatMap(line => line.split(" "))
- .map(word => (word, 1))
- .reduceByKey(_ + _)
- result.saveAsTextFile(args(1))
- }
- }
結(jié)語
基于RDD實現(xiàn)的Spark相比于傳統(tǒng)的Hadoop MapReduce有什么優(yōu)勢呢?總結(jié)起來應該至少有三點:
1.RDD提供了豐富的操作算子,不再是只有map和reduce兩個操作了,對于描述應用程序來說更加方便;
2.通過RDDs之間的轉(zhuǎn)換構(gòu)建DAG,中間結(jié)果不用落地;
3.RDD支持緩存,可以在內(nèi)存中快速完成計算。