自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

終于有人將Spark的技術(shù)框架講明白了

開發(fā) 架構(gòu) Spark
Spark是加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室開源的類似MapReduce的通用并行計(jì)算框架,擁有MapReduce所具備的分布式計(jì)算的優(yōu)點(diǎn)。但不同于MapReduce的是,Spark更多地采用內(nèi)存計(jì)算,減少了磁盤讀寫,比MapReduce性能更高。

 [[423922]]

Spark是加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室開源的類似MapReduce的通用并行計(jì)算框架,擁有MapReduce所具備的分布式計(jì)算的優(yōu)點(diǎn)。但不同于MapReduce的是,Spark更多地采用內(nèi)存計(jì)算,減少了磁盤讀寫,比MapReduce性能更高。同時(shí),它提供了更加豐富的函數(shù)庫,能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等分析算法。

Spark在Hadoop生態(tài)圈中主要是替代MapReduce進(jìn)行分布式計(jì)算,如下圖所示。同時(shí),組件SparkSQL可以替換Hive對(duì)數(shù)據(jù)倉庫的處理,組件Spark Streaming可以替換Storm對(duì)流式計(jì)算的處理,組件Spark ML可以替換Mahout數(shù)據(jù)挖掘算法庫。

Spark在Hadoop生態(tài)圈中的位置

01Spark的運(yùn)行原理

如今,我們已經(jīng)不再需要去學(xué)習(xí)煩瑣的MapReduce設(shè)計(jì)開發(fā)了,而是直接上手學(xué)習(xí)Spark的開發(fā)。這一方面是因?yàn)镾park的運(yùn)行效率比MapReduce高,另一方面是因?yàn)镾park有豐富的函數(shù)庫,開發(fā)效率也比MapReduce高。

首先,從運(yùn)行效率來看,Spark的運(yùn)行速度是Hadoop的數(shù)百倍。為什么會(huì)有如此大的差異呢?關(guān)鍵在于它們的運(yùn)行原理,Hadoop總要讀取磁盤,而Spark更多地是在進(jìn)行內(nèi)存計(jì)算,如下圖所示。

Hadoop的運(yùn)行總是在讀寫磁盤

前面談到,MapReduce的主要運(yùn)算過程,實(shí)際上就是循環(huán)往復(fù)地執(zhí)行Map與Reduce的過程。但是,在執(zhí)行每一個(gè)Map或Reduce過程時(shí),都要先讀取磁盤中的數(shù)據(jù),然后執(zhí)行運(yùn)算,最后將執(zhí)行的結(jié)果數(shù)據(jù)寫入磁盤。因此,MapReduce的執(zhí)行過程,實(shí)際上就是讀數(shù)據(jù)、執(zhí)行Map、寫數(shù)據(jù)、再讀數(shù)據(jù)、執(zhí)行Reduce、再寫數(shù)據(jù)的往復(fù)過程。這樣的設(shè)計(jì)雖然可以在海量數(shù)據(jù)中減少對(duì)內(nèi)存的占用,但頻繁地讀寫磁盤將耗費(fèi)大量時(shí)間,影響運(yùn)行效率。

相反,Spark的執(zhí)行過程只有第一次需要從磁盤中讀數(shù)據(jù),然后就可以執(zhí)行一系列操作。這一系列操作也是類似Map或Reduce的操作,然而在每次執(zhí)行前都是從內(nèi)存中讀取數(shù)據(jù)、執(zhí)行運(yùn)算、將執(zhí)行的結(jié)果數(shù)據(jù)寫入內(nèi)存的往復(fù)過程,直到最后一個(gè)操作執(zhí)行完才寫入磁盤。這樣整個(gè)執(zhí)行的過程中都是對(duì)內(nèi)存的讀寫,雖然會(huì)大量占用內(nèi)存資源,然而運(yùn)行效率將大大提升。

Spark框架的運(yùn)行原理如下圖所示,Spark在集群部署時(shí),在NameNode節(jié)點(diǎn)上部署了一個(gè)Spark Driver,然后在每個(gè)DataNode節(jié)點(diǎn)上部署一個(gè)Executor。Spark Driver是接收并調(diào)度任務(wù)的組件,而Executor則是分布式執(zhí)行數(shù)據(jù)處理的組件。同時(shí),在每一次執(zhí)行數(shù)據(jù)處理任務(wù)之前,數(shù)據(jù)文件已經(jīng)通過HDFS分布式存儲(chǔ)在各個(gè)DataNode節(jié)點(diǎn)上了。因此,在每個(gè)節(jié)點(diǎn)上的Executor會(huì)首先通過Reader讀取本地磁盤的數(shù)據(jù),然后執(zhí)行一系列的Transformation操作。每個(gè)Transformation操作的輸入是數(shù)據(jù)集,在Spark中將其組織成彈性分布式數(shù)據(jù)集(RDD),從內(nèi)存中讀取,最后的輸出也是RDD,并將其寫入內(nèi)存中。這樣,整個(gè)一系列的Transformation操作都是在內(nèi)存中讀寫,直到最后一個(gè)操作Action,然后通過Writer將其寫入磁盤。這就是Spark的運(yùn)行原理。

Spark框架的運(yùn)行原理圖

同時(shí),Spark擁有一個(gè)非常豐富的函數(shù)庫,許多常用的操作都不需要開發(fā)人員自己編寫,直接調(diào)用函數(shù)庫就可以了。這樣大大提高了軟件開發(fā)的效率,只用寫更少的代碼就能執(zhí)行更加復(fù)雜的處理過程。在這些豐富的函數(shù)庫中,Spark將其分為兩種類型:轉(zhuǎn)換(Transfer)與動(dòng)作(Action)。

Transfer的輸入是RDD,輸出也是RDD,因此它實(shí)際上是對(duì)數(shù)據(jù)進(jìn)行的各種Trans-formation操作,是Spark要編寫的主要程序。同時(shí),RDD也分為兩種類型:普通RDD與名-值對(duì)RDD。

普通RDD,就是由一條一條的記錄組成的數(shù)據(jù)集,從原始文件中讀取出來的數(shù)據(jù)通常都是這種形式,操作普通RDD最主要的函數(shù)包括map、flatMap、filter、distinct、union、intersection、subtract、cartesian等。

名-值對(duì)RDD,就是k-v存儲(chǔ)的數(shù)據(jù)集,map操作就是將普通RDD的數(shù)據(jù)轉(zhuǎn)換為名-值對(duì)RDD。有了名-值對(duì)RDD,才能對(duì)其進(jìn)行各種reduceByKey、joinByKey等復(fù)雜的操作。操作名-值對(duì)RDD最主要的函數(shù)包括reduceByKey、groupByKey、combineByKey、mapValues、flatMapValues、keys、values、sortByKey、subtractByKey、join、leftOuterJoin、rightOuterJoin、cogroup等。

所有Transfer函數(shù)的另外一個(gè)重要特征就是,它們?cè)谔幚鞷DD數(shù)據(jù)時(shí)都不會(huì)立即執(zhí)行,而是延遲到下一個(gè)Action再執(zhí)行。這樣的執(zhí)行效果就是,當(dāng)所有一系列操作都定義好以后,一次性執(zhí)行完成,然后立即寫磁盤。這樣在執(zhí)行過程中就減少了等待時(shí)間,進(jìn)而減少了對(duì)內(nèi)存的占用時(shí)間。

Spark的另外一種類型的函數(shù)就是Action,它們輸入的是RDD,輸出的是一個(gè)數(shù)據(jù)結(jié)果,通常拿到這個(gè)數(shù)據(jù)結(jié)果就要寫磁盤了。根據(jù)RDD的不同,Action也分為兩種:針對(duì)普通RDD的操作,包括collect、count、countByValue、take、top、reduce、fold、aggregate、foreach等;針對(duì)名-值對(duì)RDD的操作,包括countByKey、collectAsMap、lookup等。

02Spark的設(shè)計(jì)開發(fā)

Spark的設(shè)計(jì)開發(fā)支持3種語言,Scala、Python與Java,其中Scala是它的原生語言。Spark是在Scala語言中實(shí)現(xiàn)的,它將Scala作為其應(yīng)用程序框架,能夠與Scala緊密集成。Scala語言是一種類似Java的函數(shù)式編程語言,它在運(yùn)行時(shí)也使用Java虛擬機(jī),可以與Java語言無縫結(jié)合、相互調(diào)用。同時(shí),由于Scala語言采用了當(dāng)前比較流行的函數(shù)式編程風(fēng)格,所以代碼更加精簡(jiǎn),編程效率更高。

前面講解的那段計(jì)算詞頻的代碼如下:

  1. 1val textFile = sc.textFile("hdfs://..."
  2. 2val counts = textFile.flatMap(line => line.split("")) 
  3. 3    .map(word => (word, 1)) 
  4. 4    .reduceByKey(_ + _) 
  5. 5counts.saveAsTextFile("hdfs://..."

為了實(shí)現(xiàn)這個(gè)功能,前面講解的MapReduce框架需要編寫一個(gè)Mapper類和一個(gè)Reducer類,還要通過一個(gè)驅(qū)動(dòng)程序把它們串聯(lián)起來才能夠執(zhí)行。然而,在Spark程序中通過Scala語言編寫,只需要這么5行代碼就可以實(shí)現(xiàn),編程效率大大提升。這段代碼如果使用Java語言編寫,那么需要編寫成這樣:

  1.  1JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  2JavaRDD<String> words = textFile.flatMap( 
  3.  3  new FlatMapFunction<String, String>() { 
  4.  4  public Iterable<String> call(String s) {  
  5.  5    return Arrays.asList(s.split(" ")); } 
  6.  6}); 
  7.  7JavaPairRDD<String, Integer> pairs = words.mapToPair( 
  8.  8  new PairFunction<String, String, Integer>() { 
  9.  9  public Tuple2<String, Integer> call(String s) {  
  10. 10    return new Tuple2<String, Integer>(s, 1); } 
  11. 11}); 
  12. 12JavaPairRDD<String, Integer> counts= pairs.reduceByKey( 
  13. 13  new Function2<IntegerIntegerInteger>() { 
  14. 14  public Integer call(Integer a, Integer b) { return a + b; } 
  15. 15}); 
  16. 16counts.saveAsTextFile("hdfs://..."); 

很顯然,采用Scala語言編寫的Spark程序比Java語言的更精簡(jiǎn),因而更易于維護(hù)與變更。所以,Scala語言將會(huì)成為更多大數(shù)據(jù)開發(fā)團(tuán)隊(duì)的選擇。

下圖是一段完整的Spark程序,它包括初始化操作,如SparkContext的初始化、對(duì)命令參數(shù)args的讀取等。接著,從磁盤載入數(shù)據(jù),通過Spark函數(shù)處理數(shù)據(jù),最后將結(jié)果數(shù)據(jù)存入磁盤。

完整的Spark程序

03Spark SQL設(shè)計(jì)開發(fā)

在未來的三五年時(shí)間里,整個(gè)IT產(chǎn)業(yè)的技術(shù)架構(gòu)將會(huì)發(fā)生翻天覆地的變化。數(shù)據(jù)量瘋漲,原有的數(shù)據(jù)庫架構(gòu)下的存儲(chǔ)成本將越來越高,查詢速度越來越慢,數(shù)據(jù)擴(kuò)展越來越困難,因此需要向著大數(shù)據(jù)技術(shù)轉(zhuǎn)型。

大數(shù)據(jù)轉(zhuǎn)型要求開發(fā)人員熟悉Spark/Scala的編程模式、分布式計(jì)算的設(shè)計(jì)原理、大量業(yè)務(wù)數(shù)據(jù)的分析與處理,還要求開發(fā)人員熟悉SQL語句。

因此,迫切需要一個(gè)技術(shù)框架,能夠支持開發(fā)人員用SQL語句進(jìn)行編程,然后將SQL語言轉(zhuǎn)化為Spark程序進(jìn)行運(yùn)算。這樣的話,大數(shù)據(jù)開發(fā)的技術(shù)門檻會(huì)大大降低,更多普通的Java開發(fā)人員也能夠參與大數(shù)據(jù)開發(fā)。這樣的框架就是Spark SQL+Hive。

Spark SQL+Hive的設(shè)計(jì)思路就是,將通過各種渠道采集的數(shù)據(jù)存儲(chǔ)于Hadoop大數(shù)據(jù)平臺(tái)的Hive數(shù)據(jù)庫中。Hive數(shù)據(jù)庫中的數(shù)據(jù)實(shí)際上存儲(chǔ)在分布式文件系統(tǒng)HDFS中,并將這些數(shù)據(jù)文件映射成一個(gè)個(gè)的表,通過SQL語句對(duì)數(shù)據(jù)進(jìn)行操作。在對(duì)Hive數(shù)據(jù)庫的數(shù)據(jù)進(jìn)行操作時(shí),通過Spark SQL將數(shù)據(jù)讀取出來,然后通過SQL語句進(jìn)行處理,最后將結(jié)果數(shù)據(jù)又存儲(chǔ)到Hive數(shù)據(jù)庫中。

  1. 1CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
  2. 2  [(col_name data_type [COMMENT col_comment], ...)] 
  3. 3  [COMMENT table_comment] 
  4. 4  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
  5. 5  [CLUSTERED BY (col_name, col_name, ...) 
  6. 6  [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
  7. 7  [ROW FORMAT row_format] 
  8. 8  [STORED AS file_format] 
  9. 9  [LOCATION hdfs_path] 

首先,通過以上語句在Hive數(shù)據(jù)庫中建表,每個(gè)表都會(huì)在HDFS上映射成一個(gè)數(shù)據(jù)庫文件,并通過HDFS進(jìn)行分布式存儲(chǔ)。完成建表以后,Hive數(shù)據(jù)庫的表不支持一條一條數(shù)據(jù)的插入,也不支持對(duì)數(shù)據(jù)的更新與刪除操作。數(shù)據(jù)是通過一個(gè)數(shù)據(jù)文件一次性載入的,或者通過類似insert into T1 select * from T2的語句將查詢結(jié)果載入表中。

  1. 1# 從NameNode節(jié)點(diǎn)中加載數(shù)據(jù)文件 
  2. 2LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes; 
  3. 3# 從NameNode節(jié)點(diǎn)中加載數(shù)據(jù)文件到分區(qū)表 
  4. 4LOAD DATA LOCAL INPATH './examples/files/kv2.txt'  
  5. 5OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15'); 
  6. 6# 從HDFS中加載數(shù)據(jù)文件到分區(qū)表 
  7. 7LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE  
  8. 8INTO TABLE invites PARTITION (ds='2008-08-15'); 

加載數(shù)據(jù)以后,就可以通過SQL語句查詢和分析數(shù)據(jù)了:

  1. 1SELECT a1, a2, a3 FROM a_table 
  2. 2LEFT JOIN | RIGHT JOIN | INNER JOIN | SEMI JOIN b_table 
  3. 3ON a_table.b = b_table.b 
  4. 4WHERE a_table.a4 = "xxx" 

注意,這里的join操作除了有左連接、右連接、內(nèi)連接以外,還有半連接(SEMI JOIN),它的執(zhí)行效果類似于in語句或exists語句。

有了Hive數(shù)據(jù)庫,就可以通過Spark SQL去讀取數(shù)據(jù),然后用SQL語句對(duì)數(shù)據(jù)進(jìn)行分析了:

  1.  1import org.apache.spark.sql.{SparkSession, SaveMode} 
  2.  2import java.text.SimpleDateFormat 
  3.  3object UDFDemo { 
  4.  4  def main(args: Array[String]): Unit = { 
  5.  5    val spark = SparkSession 
  6.  6    .builder() 
  7.  7    .config("spark.sql.warehouse.dir",""
  8.  8    .enableHiveSupport() 
  9.  9    .appName("UDF Demo"
  10. 10    .master("local"
  11. 11    .getOrCreate() 
  12. 12 
  13. 13    val dateFormat =  new SimpleDateFormat("yyyy"
  14. 14    spark.udf.register("getYear", (date:Long) => dateFormat.format(date).toInt) 
  15. 15    val df = spark.sql("select getYear(date_key) year, * from etl_fxdj"
  16. 16    df.write.mode(SaveMode.Overwrite).saveAsTable("dw_dm_fx_fxdj"
  17. 17  } 
  18. 18} 

在這段代碼中,首先進(jìn)行了Spark的初始化,然后定義了一個(gè)名為getYear的函數(shù),接著通過spark.sql()對(duì)Hive表中的數(shù)據(jù)進(jìn)行查詢與處理。最后,通過df.write.mode().saveAsTable()將結(jié)果數(shù)據(jù)寫入另一張Hive表中。其中,在執(zhí)行SQL語句時(shí),可以將getYear()作為函數(shù)在SQL語句中調(diào)用。

有了Spark SQL+Hive的方案,在大數(shù)據(jù)轉(zhuǎn)型的時(shí)候,實(shí)際上就是將過去存儲(chǔ)在數(shù)據(jù)庫中的表變?yōu)镠ive數(shù)據(jù)庫的表,將過去的存儲(chǔ)過程變?yōu)镾park SQL程序,將過去存儲(chǔ)過程中的函數(shù)變?yōu)镾park自定義函數(shù)。這樣就可以幫助企業(yè)更加輕松地由傳統(tǒng)數(shù)據(jù)庫架構(gòu)轉(zhuǎn)型為大數(shù)據(jù)架構(gòu)。

本書摘編自《架構(gòu)真意:企業(yè)級(jí)應(yīng)用架構(gòu)設(shè)計(jì)方法論與實(shí)踐》,經(jīng)出版方授權(quán)發(fā)布。

 

責(zé)任編輯:武曉燕 來源: 數(shù)倉寶貝庫
相關(guān)推薦

2021-08-31 19:14:38

技術(shù)埋點(diǎn)運(yùn)營

2021-09-03 18:38:13

數(shù)據(jù)湖數(shù)據(jù)倉庫

2021-09-26 15:58:05

MySQL SQL 語句數(shù)據(jù)庫

2021-04-12 07:36:15

Scrapy爬蟲框架

2022-11-01 18:21:14

數(shù)據(jù)埋點(diǎn)SDK

2021-10-09 00:02:04

DevOps敏捷開發(fā)

2021-06-13 12:03:46

SaaS軟件即服務(wù)

2022-03-27 20:32:28

Knative容器事件模型

2021-08-04 20:35:03

可視化SeabornMatplotlib

2021-03-25 11:24:25

爬蟲技術(shù)開發(fā)

2022-04-27 18:25:02

數(shù)據(jù)采集維度

2021-10-17 20:38:30

微服務(wù)內(nèi)存組件

2020-11-03 07:04:39

云計(jì)算公有云私有云

2021-10-12 18:31:40

流量運(yùn)營前端

2021-12-03 18:25:56

數(shù)據(jù)指標(biāo)本質(zhì)

2020-11-30 08:34:44

大數(shù)據(jù)數(shù)據(jù)分析技術(shù)

2021-02-14 00:21:37

區(qū)塊鏈數(shù)字貨幣金融

2021-03-03 21:31:24

量化投資利潤

2021-06-29 11:21:41

數(shù)據(jù)安全網(wǎng)絡(luò)安全黑客

2022-01-05 18:27:44

數(shù)據(jù)挖掘工具
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)