終于有人將Spark的技術(shù)框架講明白了
Spark是加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室開源的類似MapReduce的通用并行計(jì)算框架,擁有MapReduce所具備的分布式計(jì)算的優(yōu)點(diǎn)。但不同于MapReduce的是,Spark更多地采用內(nèi)存計(jì)算,減少了磁盤讀寫,比MapReduce性能更高。同時(shí),它提供了更加豐富的函數(shù)庫,能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等分析算法。
Spark在Hadoop生態(tài)圈中主要是替代MapReduce進(jìn)行分布式計(jì)算,如下圖所示。同時(shí),組件SparkSQL可以替換Hive對(duì)數(shù)據(jù)倉庫的處理,組件Spark Streaming可以替換Storm對(duì)流式計(jì)算的處理,組件Spark ML可以替換Mahout數(shù)據(jù)挖掘算法庫。
Spark在Hadoop生態(tài)圈中的位置
01Spark的運(yùn)行原理
如今,我們已經(jīng)不再需要去學(xué)習(xí)煩瑣的MapReduce設(shè)計(jì)開發(fā)了,而是直接上手學(xué)習(xí)Spark的開發(fā)。這一方面是因?yàn)镾park的運(yùn)行效率比MapReduce高,另一方面是因?yàn)镾park有豐富的函數(shù)庫,開發(fā)效率也比MapReduce高。
首先,從運(yùn)行效率來看,Spark的運(yùn)行速度是Hadoop的數(shù)百倍。為什么會(huì)有如此大的差異呢?關(guān)鍵在于它們的運(yùn)行原理,Hadoop總要讀取磁盤,而Spark更多地是在進(jìn)行內(nèi)存計(jì)算,如下圖所示。
Hadoop的運(yùn)行總是在讀寫磁盤
前面談到,MapReduce的主要運(yùn)算過程,實(shí)際上就是循環(huán)往復(fù)地執(zhí)行Map與Reduce的過程。但是,在執(zhí)行每一個(gè)Map或Reduce過程時(shí),都要先讀取磁盤中的數(shù)據(jù),然后執(zhí)行運(yùn)算,最后將執(zhí)行的結(jié)果數(shù)據(jù)寫入磁盤。因此,MapReduce的執(zhí)行過程,實(shí)際上就是讀數(shù)據(jù)、執(zhí)行Map、寫數(shù)據(jù)、再讀數(shù)據(jù)、執(zhí)行Reduce、再寫數(shù)據(jù)的往復(fù)過程。這樣的設(shè)計(jì)雖然可以在海量數(shù)據(jù)中減少對(duì)內(nèi)存的占用,但頻繁地讀寫磁盤將耗費(fèi)大量時(shí)間,影響運(yùn)行效率。
相反,Spark的執(zhí)行過程只有第一次需要從磁盤中讀數(shù)據(jù),然后就可以執(zhí)行一系列操作。這一系列操作也是類似Map或Reduce的操作,然而在每次執(zhí)行前都是從內(nèi)存中讀取數(shù)據(jù)、執(zhí)行運(yùn)算、將執(zhí)行的結(jié)果數(shù)據(jù)寫入內(nèi)存的往復(fù)過程,直到最后一個(gè)操作執(zhí)行完才寫入磁盤。這樣整個(gè)執(zhí)行的過程中都是對(duì)內(nèi)存的讀寫,雖然會(huì)大量占用內(nèi)存資源,然而運(yùn)行效率將大大提升。
Spark框架的運(yùn)行原理如下圖所示,Spark在集群部署時(shí),在NameNode節(jié)點(diǎn)上部署了一個(gè)Spark Driver,然后在每個(gè)DataNode節(jié)點(diǎn)上部署一個(gè)Executor。Spark Driver是接收并調(diào)度任務(wù)的組件,而Executor則是分布式執(zhí)行數(shù)據(jù)處理的組件。同時(shí),在每一次執(zhí)行數(shù)據(jù)處理任務(wù)之前,數(shù)據(jù)文件已經(jīng)通過HDFS分布式存儲(chǔ)在各個(gè)DataNode節(jié)點(diǎn)上了。因此,在每個(gè)節(jié)點(diǎn)上的Executor會(huì)首先通過Reader讀取本地磁盤的數(shù)據(jù),然后執(zhí)行一系列的Transformation操作。每個(gè)Transformation操作的輸入是數(shù)據(jù)集,在Spark中將其組織成彈性分布式數(shù)據(jù)集(RDD),從內(nèi)存中讀取,最后的輸出也是RDD,并將其寫入內(nèi)存中。這樣,整個(gè)一系列的Transformation操作都是在內(nèi)存中讀寫,直到最后一個(gè)操作Action,然后通過Writer將其寫入磁盤。這就是Spark的運(yùn)行原理。
Spark框架的運(yùn)行原理圖
同時(shí),Spark擁有一個(gè)非常豐富的函數(shù)庫,許多常用的操作都不需要開發(fā)人員自己編寫,直接調(diào)用函數(shù)庫就可以了。這樣大大提高了軟件開發(fā)的效率,只用寫更少的代碼就能執(zhí)行更加復(fù)雜的處理過程。在這些豐富的函數(shù)庫中,Spark將其分為兩種類型:轉(zhuǎn)換(Transfer)與動(dòng)作(Action)。
Transfer的輸入是RDD,輸出也是RDD,因此它實(shí)際上是對(duì)數(shù)據(jù)進(jìn)行的各種Trans-formation操作,是Spark要編寫的主要程序。同時(shí),RDD也分為兩種類型:普通RDD與名-值對(duì)RDD。
普通RDD,就是由一條一條的記錄組成的數(shù)據(jù)集,從原始文件中讀取出來的數(shù)據(jù)通常都是這種形式,操作普通RDD最主要的函數(shù)包括map、flatMap、filter、distinct、union、intersection、subtract、cartesian等。
名-值對(duì)RDD,就是k-v存儲(chǔ)的數(shù)據(jù)集,map操作就是將普通RDD的數(shù)據(jù)轉(zhuǎn)換為名-值對(duì)RDD。有了名-值對(duì)RDD,才能對(duì)其進(jìn)行各種reduceByKey、joinByKey等復(fù)雜的操作。操作名-值對(duì)RDD最主要的函數(shù)包括reduceByKey、groupByKey、combineByKey、mapValues、flatMapValues、keys、values、sortByKey、subtractByKey、join、leftOuterJoin、rightOuterJoin、cogroup等。
所有Transfer函數(shù)的另外一個(gè)重要特征就是,它們?cè)谔幚鞷DD數(shù)據(jù)時(shí)都不會(huì)立即執(zhí)行,而是延遲到下一個(gè)Action再執(zhí)行。這樣的執(zhí)行效果就是,當(dāng)所有一系列操作都定義好以后,一次性執(zhí)行完成,然后立即寫磁盤。這樣在執(zhí)行過程中就減少了等待時(shí)間,進(jìn)而減少了對(duì)內(nèi)存的占用時(shí)間。
Spark的另外一種類型的函數(shù)就是Action,它們輸入的是RDD,輸出的是一個(gè)數(shù)據(jù)結(jié)果,通常拿到這個(gè)數(shù)據(jù)結(jié)果就要寫磁盤了。根據(jù)RDD的不同,Action也分為兩種:針對(duì)普通RDD的操作,包括collect、count、countByValue、take、top、reduce、fold、aggregate、foreach等;針對(duì)名-值對(duì)RDD的操作,包括countByKey、collectAsMap、lookup等。
02Spark的設(shè)計(jì)開發(fā)
Spark的設(shè)計(jì)開發(fā)支持3種語言,Scala、Python與Java,其中Scala是它的原生語言。Spark是在Scala語言中實(shí)現(xiàn)的,它將Scala作為其應(yīng)用程序框架,能夠與Scala緊密集成。Scala語言是一種類似Java的函數(shù)式編程語言,它在運(yùn)行時(shí)也使用Java虛擬機(jī),可以與Java語言無縫結(jié)合、相互調(diào)用。同時(shí),由于Scala語言采用了當(dāng)前比較流行的函數(shù)式編程風(fēng)格,所以代碼更加精簡(jiǎn),編程效率更高。
前面講解的那段計(jì)算詞頻的代碼如下:
- 1val textFile = sc.textFile("hdfs://...")
- 2val counts = textFile.flatMap(line => line.split(""))
- 3 .map(word => (word, 1))
- 4 .reduceByKey(_ + _)
- 5counts.saveAsTextFile("hdfs://...")
為了實(shí)現(xiàn)這個(gè)功能,前面講解的MapReduce框架需要編寫一個(gè)Mapper類和一個(gè)Reducer類,還要通過一個(gè)驅(qū)動(dòng)程序把它們串聯(lián)起來才能夠執(zhí)行。然而,在Spark程序中通過Scala語言編寫,只需要這么5行代碼就可以實(shí)現(xiàn),編程效率大大提升。這段代碼如果使用Java語言編寫,那么需要編寫成這樣:
- 1JavaRDD<String> textFile = sc.textFile("hdfs://...");
- 2JavaRDD<String> words = textFile.flatMap(
- 3 new FlatMapFunction<String, String>() {
- 4 public Iterable<String> call(String s) {
- 5 return Arrays.asList(s.split(" ")); }
- 6});
- 7JavaPairRDD<String, Integer> pairs = words.mapToPair(
- 8 new PairFunction<String, String, Integer>() {
- 9 public Tuple2<String, Integer> call(String s) {
- 10 return new Tuple2<String, Integer>(s, 1); }
- 11});
- 12JavaPairRDD<String, Integer> counts= pairs.reduceByKey(
- 13 new Function2<Integer, Integer, Integer>() {
- 14 public Integer call(Integer a, Integer b) { return a + b; }
- 15});
- 16counts.saveAsTextFile("hdfs://...");
很顯然,采用Scala語言編寫的Spark程序比Java語言的更精簡(jiǎn),因而更易于維護(hù)與變更。所以,Scala語言將會(huì)成為更多大數(shù)據(jù)開發(fā)團(tuán)隊(duì)的選擇。
下圖是一段完整的Spark程序,它包括初始化操作,如SparkContext的初始化、對(duì)命令參數(shù)args的讀取等。接著,從磁盤載入數(shù)據(jù),通過Spark函數(shù)處理數(shù)據(jù),最后將結(jié)果數(shù)據(jù)存入磁盤。
完整的Spark程序
03Spark SQL設(shè)計(jì)開發(fā)
在未來的三五年時(shí)間里,整個(gè)IT產(chǎn)業(yè)的技術(shù)架構(gòu)將會(huì)發(fā)生翻天覆地的變化。數(shù)據(jù)量瘋漲,原有的數(shù)據(jù)庫架構(gòu)下的存儲(chǔ)成本將越來越高,查詢速度越來越慢,數(shù)據(jù)擴(kuò)展越來越困難,因此需要向著大數(shù)據(jù)技術(shù)轉(zhuǎn)型。
大數(shù)據(jù)轉(zhuǎn)型要求開發(fā)人員熟悉Spark/Scala的編程模式、分布式計(jì)算的設(shè)計(jì)原理、大量業(yè)務(wù)數(shù)據(jù)的分析與處理,還要求開發(fā)人員熟悉SQL語句。
因此,迫切需要一個(gè)技術(shù)框架,能夠支持開發(fā)人員用SQL語句進(jìn)行編程,然后將SQL語言轉(zhuǎn)化為Spark程序進(jìn)行運(yùn)算。這樣的話,大數(shù)據(jù)開發(fā)的技術(shù)門檻會(huì)大大降低,更多普通的Java開發(fā)人員也能夠參與大數(shù)據(jù)開發(fā)。這樣的框架就是Spark SQL+Hive。
Spark SQL+Hive的設(shè)計(jì)思路就是,將通過各種渠道采集的數(shù)據(jù)存儲(chǔ)于Hadoop大數(shù)據(jù)平臺(tái)的Hive數(shù)據(jù)庫中。Hive數(shù)據(jù)庫中的數(shù)據(jù)實(shí)際上存儲(chǔ)在分布式文件系統(tǒng)HDFS中,并將這些數(shù)據(jù)文件映射成一個(gè)個(gè)的表,通過SQL語句對(duì)數(shù)據(jù)進(jìn)行操作。在對(duì)Hive數(shù)據(jù)庫的數(shù)據(jù)進(jìn)行操作時(shí),通過Spark SQL將數(shù)據(jù)讀取出來,然后通過SQL語句進(jìn)行處理,最后將結(jié)果數(shù)據(jù)又存儲(chǔ)到Hive數(shù)據(jù)庫中。
- 1CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
- 2 [(col_name data_type [COMMENT col_comment], ...)]
- 3 [COMMENT table_comment]
- 4 [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
- 5 [CLUSTERED BY (col_name, col_name, ...)
- 6 [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
- 7 [ROW FORMAT row_format]
- 8 [STORED AS file_format]
- 9 [LOCATION hdfs_path]
首先,通過以上語句在Hive數(shù)據(jù)庫中建表,每個(gè)表都會(huì)在HDFS上映射成一個(gè)數(shù)據(jù)庫文件,并通過HDFS進(jìn)行分布式存儲(chǔ)。完成建表以后,Hive數(shù)據(jù)庫的表不支持一條一條數(shù)據(jù)的插入,也不支持對(duì)數(shù)據(jù)的更新與刪除操作。數(shù)據(jù)是通過一個(gè)數(shù)據(jù)文件一次性載入的,或者通過類似insert into T1 select * from T2的語句將查詢結(jié)果載入表中。
- 1# 從NameNode節(jié)點(diǎn)中加載數(shù)據(jù)文件
- 2LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
- 3# 從NameNode節(jié)點(diǎn)中加載數(shù)據(jù)文件到分區(qū)表
- 4LOAD DATA LOCAL INPATH './examples/files/kv2.txt'
- 5OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
- 6# 從HDFS中加載數(shù)據(jù)文件到分區(qū)表
- 7LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE
- 8INTO TABLE invites PARTITION (ds='2008-08-15');
加載數(shù)據(jù)以后,就可以通過SQL語句查詢和分析數(shù)據(jù)了:
- 1SELECT a1, a2, a3 FROM a_table
- 2LEFT JOIN | RIGHT JOIN | INNER JOIN | SEMI JOIN b_table
- 3ON a_table.b = b_table.b
- 4WHERE a_table.a4 = "xxx"
注意,這里的join操作除了有左連接、右連接、內(nèi)連接以外,還有半連接(SEMI JOIN),它的執(zhí)行效果類似于in語句或exists語句。
有了Hive數(shù)據(jù)庫,就可以通過Spark SQL去讀取數(shù)據(jù),然后用SQL語句對(duì)數(shù)據(jù)進(jìn)行分析了:
- 1import org.apache.spark.sql.{SparkSession, SaveMode}
- 2import java.text.SimpleDateFormat
- 3object UDFDemo {
- 4 def main(args: Array[String]): Unit = {
- 5 val spark = SparkSession
- 6 .builder()
- 7 .config("spark.sql.warehouse.dir","")
- 8 .enableHiveSupport()
- 9 .appName("UDF Demo")
- 10 .master("local")
- 11 .getOrCreate()
- 12
- 13 val dateFormat = new SimpleDateFormat("yyyy")
- 14 spark.udf.register("getYear", (date:Long) => dateFormat.format(date).toInt)
- 15 val df = spark.sql("select getYear(date_key) year, * from etl_fxdj")
- 16 df.write.mode(SaveMode.Overwrite).saveAsTable("dw_dm_fx_fxdj")
- 17 }
- 18}
在這段代碼中,首先進(jìn)行了Spark的初始化,然后定義了一個(gè)名為getYear的函數(shù),接著通過spark.sql()對(duì)Hive表中的數(shù)據(jù)進(jìn)行查詢與處理。最后,通過df.write.mode().saveAsTable()將結(jié)果數(shù)據(jù)寫入另一張Hive表中。其中,在執(zhí)行SQL語句時(shí),可以將getYear()作為函數(shù)在SQL語句中調(diào)用。
有了Spark SQL+Hive的方案,在大數(shù)據(jù)轉(zhuǎn)型的時(shí)候,實(shí)際上就是將過去存儲(chǔ)在數(shù)據(jù)庫中的表變?yōu)镠ive數(shù)據(jù)庫的表,將過去的存儲(chǔ)過程變?yōu)镾park SQL程序,將過去存儲(chǔ)過程中的函數(shù)變?yōu)镾park自定義函數(shù)。這樣就可以幫助企業(yè)更加輕松地由傳統(tǒng)數(shù)據(jù)庫架構(gòu)轉(zhuǎn)型為大數(shù)據(jù)架構(gòu)。
本書摘編自《架構(gòu)真意:企業(yè)級(jí)應(yīng)用架構(gòu)設(shè)計(jì)方法論與實(shí)踐》,經(jīng)出版方授權(quán)發(fā)布。