Spark:一個高效的分布式計算系統(tǒng)
概述
什么是Spark
◆ Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的并行計算框架,Spark基于map reduce算法實現(xiàn)的分布式計算,擁有Hadoop MapReduce所具有的優(yōu)點;但不同于MapReduce的是Job中間輸出和結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫HDFS,因此Spark能更好地適用于數(shù)據(jù)挖掘與機器學(xué)習(xí)等需要迭代的map reduce的算法。其架構(gòu)如下圖所示:
Spark與Hadoop的對比
◆ Spark的中間數(shù)據(jù)放到內(nèi)存中,對于迭代運算效率更高。
- Spark更適合于迭代運算比較多的ML和DM運算。因為在Spark里面,有RDD的抽象概念。
◆ Spark比Hadoop更通用。
- Spark提供的數(shù)據(jù)集操作類型有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多種操作類型,Spark把這些操作稱為Transformations。同時還提供Count, collect, reduce, lookup, save等多種actions操作。
- 這些多種多樣的數(shù)據(jù)集操作類型,給給開發(fā)上層應(yīng)用的用戶提供了方便。各個處理節(jié)點之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結(jié)果的存儲、分區(qū)等??梢哉f編程模型比Hadoop更靈活。
- 不過由于RDD的特性,Spark不適用那種異步細(xì)粒度更新狀態(tài)的應(yīng)用,例如web服務(wù)的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應(yīng)用模型不適合。
◆ 容錯性。
- 在分布式數(shù)據(jù)集計算時通過checkpoint來實現(xiàn)容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現(xiàn)容錯。
◆ 可用性。
- Spark通過提供豐富的Scala, Java,Python API及交互式Shell來提高可用性。
Spark與Hadoop的結(jié)合
◆ Spark可以直接對HDFS進行數(shù)據(jù)的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運行于同集群中,共享存儲資源與計算,數(shù)據(jù)倉庫Shark實現(xiàn)上借用Hive,幾乎與Hive完全兼容。
Spark的適用場景
◆ Spark是基于內(nèi)存的迭代計算框架,適用于需要多次操作特定數(shù)據(jù)集的應(yīng)用場合。需要反復(fù)操作的次數(shù)越多,所需讀取的數(shù)據(jù)量越大,受益越大,數(shù)據(jù)量小但是計算密集度較大的場合,受益就相對較小
◆ 由于RDD的特性,Spark不適用那種異步細(xì)粒度更新狀態(tài)的應(yīng)用,例如web服務(wù)的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應(yīng)用模型不適合。
◆ 總的來說Spark的適用面比較廣泛且比較通用。
運行模式
◆ 本地模式
◆ Standalone模式
◆ Mesoes模式
◆ yarn模式
Spark生態(tài)系統(tǒng)
◆ Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎(chǔ)上提供和Hive一樣的H iveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現(xiàn)query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過配置Shark參數(shù),Shark可以自動在內(nèi)存中緩存特定的RDD,實現(xiàn)數(shù)據(jù)重用,進而加快特定數(shù)據(jù)集的檢索。同時,Shark通過UDF用戶自定義函數(shù)實現(xiàn)特定的數(shù)據(jù)分析學(xué)習(xí)算法,使得SQL數(shù)據(jù)查詢和運算分析能結(jié)合在一起,最大化RDD的重復(fù)使用。
◆ Spark streaming: 構(gòu)建在Spark上處理Stream數(shù)據(jù)的框架,基本的原理是將Stream數(shù)據(jù)分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分?jǐn)?shù)據(jù)。Spark Streaming構(gòu)建在Spark上,一方面是因為Spark的低延遲執(zhí)行引擎(100ms+)可以用于實時計算,另一方面相比基于Record的其它處理框架(如Storm),RDD數(shù)據(jù)集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數(shù)據(jù)處理的邏輯和算法。方便了一些需要歷史數(shù)據(jù)和實時數(shù)據(jù)聯(lián)合分析的特定應(yīng)用場合。
◆ Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現(xiàn)了Google的PageRank算法。
#p#
在業(yè)界的使用
◆ Spark項目在2009年啟動,2010年開源, 現(xiàn)在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。
Spark核心概念
Resilient Distributed Dataset (RDD)彈性分布數(shù)據(jù)集
◆ RDD是Spark的最基本抽象,是對分布式內(nèi)存的抽象使用,實現(xiàn)了以操作本地集合的方式來操作分布式數(shù)據(jù)集的抽象實現(xiàn)。RDD是Spark最核心的東西,它表示已被分區(qū),不可變的并能夠被并行操作的數(shù)據(jù)集合,不同的數(shù)據(jù)集格式對應(yīng)不同的RDD實現(xiàn)。RDD必須是可序列化的。RDD可以cache到內(nèi)存中,每次對RDD數(shù)據(jù)集的操作之后的結(jié)果,都可以存放到內(nèi)存中,下一個操作可以直接從內(nèi)存中輸入,省去了MapReduce大量的磁盤IO操作。這對于迭代運算比較常見的機器學(xué)習(xí)算法, 交互式數(shù)據(jù)挖掘來說,效率提升比較大。
◆ RDD的特點:
- 它是在集群節(jié)點上的不可變的、已分區(qū)的集合對象。
- 通過并行轉(zhuǎn)換的方式來創(chuàng)建如(map, filter, join, etc)。
- 失敗自動重建。
- 可以控制存儲級別(內(nèi)存、磁盤等)來進行重用。
- 必須是可序列化的。
- 是靜態(tài)類型的。
◆ RDD的好處
- RDD只能從持久存儲或通過Transformations操作產(chǎn)生,相比于分布式共享內(nèi)存(DSM)可以更高效實現(xiàn)容錯,對于丟失部分?jǐn)?shù)據(jù)分區(qū)只需根據(jù)它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
- RDD的不變性,可以實現(xiàn)類Hadoop MapReduce的推測式執(zhí)行。
- RDD的數(shù)據(jù)分區(qū)特性,可以通過數(shù)據(jù)的本地性來提高性能,這與Hadoop MapReduce是一樣的。
- RDD都是可序列化的,在內(nèi)存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現(xiàn)在的MapReduce。
◆ RDD的存儲與分區(qū)
- 用戶可以選擇不同的存儲級別存儲RDD以便重用。
- 當(dāng)前RDD默認(rèn)是存儲于內(nèi)存,但當(dāng)內(nèi)存不足時,RDD會spill到disk。
- RDD在需要進行分區(qū)把數(shù)據(jù)分布于集群中時會根據(jù)每條記錄Key進行分區(qū)(如Hash 分區(qū)),以此保證兩個數(shù)據(jù)集在Join時能高效。
◆ RDD的內(nèi)部表示
在RDD的內(nèi)部實現(xiàn)中每個RDD都可以使用5個方面的特性來表示:
- 分區(qū)列表(數(shù)據(jù)塊列表)
- 計算每個分片的函數(shù)(根據(jù)父RDD計算出此RDD)
- 對父RDD的依賴列表
- 對key-value RDD的Partitioner【可選】
- 每個數(shù)據(jù)分片的預(yù)定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)【可選】
◆ RDD的存儲級別
RDD根據(jù)useDisk、useMemory、deserialized、replication四個參數(shù)的組合提供了11種存儲級別:
- val NONE = new StorageLevel(false, false, false)
- val DISK_ONLY = new StorageLevel(true, false, false)
- val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
- val MEMORY_ONLY = new StorageLevel(false, true, true)
- val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
- val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
- val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
- val MEMORY_AND_DISK = new StorageLevel(true, true, true)
- val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
- val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
- val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
◆ RDD定義了各種操作,不同類型的數(shù)據(jù)由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現(xiàn)。
RDD的生成
◆ RDD有兩種創(chuàng)建方式:
1、從Hadoop文件系統(tǒng)(或與Hadoop兼容的其它存儲系統(tǒng))輸入(例如HDFS)創(chuàng)建。
2、從父RDD轉(zhuǎn)換得到新RDD。
◆ 下面來看一從Hadoop文件系統(tǒng)生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼如下:
- // SparkContext根據(jù)文件/目錄及可選的分片數(shù)創(chuàng)建RDD, 這里我們可以看到Spark與Hadoop MapReduce很像
- // 需要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。
- def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
- classOf[Text], minSplits) .map(pair => pair._2.toString) }
- // 根據(jù)Hadoop配置,及InputFormat等創(chuàng)建HadoopRDD
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
◆ 對RDD進行計算時,RDD從HDFS讀取數(shù)據(jù)時與Hadoop MapReduce幾乎一樣的:
RDD的轉(zhuǎn)換與操作
◆ 對于RDD可以有兩種計算方式:轉(zhuǎn)換(返回值還是一個RDD)與操作(返回值不是一個RDD)。
◆ 轉(zhuǎn)換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉(zhuǎn)換生成另一個RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執(zhí)行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
◆ 操作(Actions) (如:count, collect, save等),Actions操作會返回結(jié)果或把RDD數(shù)據(jù)寫到存儲系統(tǒng)中。Actions是觸發(fā)Spark啟動計算的動因。
◆ 下面使用一個例子來示例說明Transformations與Actions在Spark的使用。
- val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"),
- Seq(System.getenv("SPARK_TEST_JAR")))
- val rdd_A = sc.textFile(hdfs://.....)
- val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))
- val rdd_C = sc.textFile(hdfs://.....)
- val rdd_D = rdd_C.map(line => (line.substring(10), 1))
- val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
- val rdd_F = rdd_B.jion(rdd_E)
- rdd_F.saveAsSequenceFile(hdfs://....)
#p#
Lineage(血統(tǒng))
◆ 利用內(nèi)存加快數(shù)據(jù)加載,在眾多的其它的In-Memory類數(shù)據(jù)庫或Cache類系統(tǒng)中也有實現(xiàn),Spark的主要區(qū)別在于它處理分布式運算環(huán)境下的數(shù)據(jù)容錯性(節(jié)點實效/數(shù)據(jù)丟失)問題時采用的方案。為了保證RDD中數(shù)據(jù)的魯棒性,RDD數(shù)據(jù)集通過所謂的血統(tǒng)關(guān)系(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統(tǒng)的細(xì)顆粒度的內(nèi)存數(shù)據(jù)更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數(shù)據(jù)轉(zhuǎn)換(Transformation)操作(filter, map, join etc.)行為。當(dāng)這個RDD的部分分區(qū)數(shù)據(jù)丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運用場合,但同時相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來了性能的提升。
◆ RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決數(shù)據(jù)容錯的高效性。Narrow Dependencies是指父RDD的每一個分區(qū)最多被一個子RDD的分區(qū)所用,表現(xiàn)為一個父RDD的分區(qū)對應(yīng)于一個子RDD的分區(qū)或多個父RDD的分區(qū)對應(yīng)于一個子RDD的分區(qū),也就是說一個父RDD的一個分區(qū)不可能對應(yīng)一個子RDD的多個分區(qū)。Wide Dependencies是指子RDD的分區(qū)依賴于父RDD的多個分區(qū)或所有分區(qū),也就是說存在一個父RDD的一個分區(qū)對應(yīng)一個子RDD的多個分區(qū)。對與Wide Dependencies,這種計算的輸入和輸出在不同的節(jié)點上,lineage方法對與輸入節(jié)點完好,而輸出節(jié)點宕機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統(tǒng)的意思),Narrow Dependencies對于數(shù)據(jù)的重算開銷要遠(yuǎn)小于Wide Dependencies的數(shù)據(jù)重算開銷。
容錯
◆ 在RDD計算,通過checkpint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現(xiàn)容錯,默認(rèn)是logging the updates方式,通過記錄跟蹤所有生成RDD的轉(zhuǎn)換(transformations)也就是記錄每個RDD的lineage(血統(tǒng))來重新計算生成丟失的分區(qū)數(shù)據(jù)。
資源管理與作業(yè)調(diào)度
◆ Spark對于資源管理與作業(yè)調(diào)度可以使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現(xiàn)。 Spark on Yarn在Spark0.6時引用,但真正可用是在現(xiàn)在的branch-0.8版本。Spark on Yarn遵循YARN的官方規(guī)范實現(xiàn),得益于Spark天生支持多種Scheduler和Executor的良好設(shè)計,對YARN的支持也就非常容易,Spark on Yarn的大致框架圖。
◆ 讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。
編程接口
◆ Spark通過與編程語言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個數(shù)據(jù)集都表示為RDD對象,對數(shù)據(jù)集的操作就表示成對RDD對象的操作。Spark主要的編程語言是Scala,選擇Scala是因為它的簡潔性(Scala可以很方便在交互式下使用)和性能(JVM上的靜態(tài)強類型語言)。
◆ Spark和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節(jié)點)組成。用戶編寫的Spark程序被稱為Driver程序,Dirver程序會連接master并定義了對各RDD的轉(zhuǎn)換與操作,而對RDD的轉(zhuǎn)換與操作通過Scala閉包(字面量函數(shù))來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操作發(fā)送到各Workers節(jié)點。 Workers存儲著數(shù)據(jù)分塊和享有集群內(nèi)存,是運行在工作節(jié)點上的守護進程,當(dāng)它收到對RDD的操作時,根據(jù)數(shù)據(jù)分片信息進行本地化數(shù)據(jù)操作,生成新的數(shù)據(jù)分片、返回結(jié)果或把RDD寫入存儲系統(tǒng)。
Scala
◆
Spark使用Scala開發(fā),默認(rèn)使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創(chuàng)建或使用(SparkContext)實例,使用SparkContext創(chuàng)建RDD,然后就是對RDD進行操作。如:
- val sc = new SparkContext(master, appName, [sparkHome], [jars])
- val textFile = sc.textFile("hdfs://.....")
- textFile.map(....).filter(.....).....
Java
◆ Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,Scala與Java可以互操作,Java編程接口其實就是對Scala的封裝。如:
- JavaSparkContext sc = new JavaSparkContext(...);
- JavaRDD lines = ctx.textFile("hdfs://...");
- JavaRDD words = lines.flatMap(
- new FlatMapFunction<String, String>() {
- public Iterable call(String s) {
- return Arrays.asList(s.split(" "));
- }
- }
- );
Python
◆ 現(xiàn)在Spark也提供了Python編程接口,Spark使用py4j來實現(xiàn)python與java的互操作,從而實現(xiàn)使用python編寫Spark程序。Spark也同樣提供了pyspark,一個Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。 如:
- from pyspark import SparkContext
- sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
- words = sc.textFile("/usr/share/dict/words")
- words.filter(lambda w: w.startswith("spar")).take(5)
#p#
使用示例
Standalone模式
◆ 為方便Spark的推廣使用,Spark提供了Standalone模式,Spark一開始就設(shè)計運行于Apache Mesos資源管理框架上,這是非常好的設(shè)計,但是卻帶了部署測試的復(fù)雜性。為了讓Spark能更方便的部署和嘗試,Spark因此提供了Standalone運行模式,它由一個Spark Master和多個Spark worker組成,與Hadoop MapReduce1很相似,就連集群啟動方式都幾乎是一樣。
◆ 以Standalone模式運行Spark集群
- 下載Scala2.9.3,并配置SCALA_HOME
- 下載Spark代碼(可以使用源碼編譯也可以下載編譯好的版本)這里下載 編譯好的版本(
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
) - 解壓spark-0.7.3-prebuilt-cdh4.tgz安裝包
- 修改配置(conf/*) slaves: 配置工作節(jié)點的主機名 spark-env.sh:配置環(huán)境變量。
- SCALA_HOME=/home/spark/scala-2.9.3
- JAVA_HOME=/home/spark/jdk1.6.0_45
- SPARK_MASTER_IP=spark1
- SPARK_MASTER_PORT=30111
- SPARK_MASTER_WEBUI_PORT=30118
- SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g
- SPARK_WORKER_PORT=30333
- SPARK_WORKER_WEBUI_PORT=30119
- SPARK_WORKER_INSTANCES=1
◆ 把Hadoop配置copy到conf目錄下
◆ 在master主機上對其它機器做ssh無密碼登錄
◆ 把配置好的Spark程序使用scp copy到其它機器
◆ 在master啟動集群
- $SPARK_HOME/start-all.sh
yarn模式
◆ Spark-shell現(xiàn)在還不支持Yarn模式,使用Yarn模式運行,需要把Spark程序全部打包成一個jar包提交到Y(jié)arn上運行。目錄只有branch-0.8版本才真正支持Yarn。
◆ 以Yarn模式運行Spark
下載Spark代碼.
- git clone git://github.com/mesos/spark
◆ 切換到branch-0.8
- cd spark
- git checkout -b yarn --track origin/yarn
◆ 使用sbt編譯Spark并
- $SPARK_HOME/sbt/sbt
- > package
- > assembly
◆ 把Hadoop yarn配置copy到conf目錄下
◆ 運行測試
- SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \
- ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \
- --class spark.examples.SparkPi --args yarn-standalone
使用Spark-shell
◆ Spark-shell使用很簡單,當(dāng)Spark以Standalon模式運行后,使用$SPARK_HOME/spark-shell
進入shell即可,在Spark-shell中SparkContext已經(jīng)創(chuàng)建好了,實例名為sc可以直接使用,還有一個需要注意的是,在Standalone模式下,Spark默認(rèn)使用的調(diào)度器的FIFO調(diào)度器而不是公平調(diào)度,而Spark-shell作為一個Spark程序一直運行在Spark上,其它的Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。
◆ 在Spark-shell上寫程序非常簡單,就像在Scala Shell上寫程序一樣。
- scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data")
- textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
- scala> textFile.count() // Number of items in this RDD
- res0: Long = 21374
- scala> textFile.first() // First item in this RDD
- res1: String = # Spark
編寫Driver程序
◆ 在Spark中Spark程序稱為Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是一樣的,不同的地方就是SparkContext需要自己創(chuàng)建。如WorkCount程序如下:
- import spark.SparkContext
- import SparkContext._
- object WordCount {
- def main(args: Array[String]) {
- if (args.length ==0 ){
- println("usage is org.test.WordCount ")
- }
- println("the args: ")
- args.foreach(println)
- val hdfsPath = "hdfs://hadoop1:8020"
- // create the SparkContext, args(0)由yarn傳入appMaster地址
- val sc = new SparkContext(args(0), "WrodCount",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
- val textFile = sc.textFile(hdfsPath + args(1))
- val result = textFile.flatMap(line => line.split("\\s+"))
- .map(word => (word, 1)).reduceByKey(_ + _)
- result.saveAsTextFile(hdfsPath + args(2))
- }
- }