自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇

大數(shù)據(jù) Spark
在大數(shù)據(jù)計算領(lǐng)域,Spark已經(jīng)成為了越來越流行、越來越受歡迎的計算平臺之一。本文作為Spark性能優(yōu)化指南的基礎(chǔ)篇,主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)。

1、前言

在大數(shù)據(jù)計算領(lǐng)域,Spark已經(jīng)成為了越來越流行、越來越受歡迎的計算平臺之一。Spark的功能涵蓋了大數(shù)據(jù)領(lǐng)域的離線批處理、SQL類處理、流式/實時計算、機器學(xué)習(xí)、圖計算等各種不同類型的計算操作,應(yīng)用范圍與前景非常廣泛。在美團•大眾點評,已經(jīng)有很多同學(xué)在各種項目中嘗試使用Spark。大多數(shù)同學(xué)(包括筆者在內(nèi)),最初開始嘗試使用Spark的原因很簡單,主要就是為了讓大數(shù)據(jù)計算作業(yè)的執(zhí)行速度更快、性能更高。

[[236996]]

然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計算作業(yè),并不是那么簡單的。如果沒有對Spark作業(yè)進行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會很慢,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計算引擎的優(yōu)勢來。因此,想要用好Spark,就必須對其進行合理的性能優(yōu)化。

Spark的性能調(diào)優(yōu)實際上是由很多部分組成的,不是調(diào)節(jié)幾個參數(shù)就可以立竿見影提升作業(yè)性能的。我們需要根據(jù)不同的業(yè)務(wù)場景以及數(shù)據(jù)情況,對Spark作業(yè)進行綜合性的分析,然后進行多個方面的調(diào)節(jié)和優(yōu)化,才能獲得***性能。

筆者根據(jù)之前的Spark作業(yè)開發(fā)經(jīng)驗以及實踐積累,總結(jié)出了一套Spark作業(yè)的性能優(yōu)化方案。整套方案主要分為開發(fā)調(diào)優(yōu)、資源調(diào)優(yōu)、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)幾個部分。開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則,是高性能Spark作業(yè)的基礎(chǔ);數(shù)據(jù)傾斜調(diào)優(yōu),主要講解了一套完整的用來解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案;shuffle調(diào)優(yōu),面向的是對Spark的原理有較深層次掌握和研究的同學(xué),主要講解了如何對Spark作業(yè)的shuffle運行過程以及細(xì)節(jié)進行調(diào)優(yōu)。

本文作為Spark性能優(yōu)化指南的基礎(chǔ)篇,主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)。

2、開發(fā)調(diào)優(yōu)

Spark性能優(yōu)化的***步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD lineage設(shè)計、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時時刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實際的應(yīng)用場景,靈活地運用到自己的Spark作業(yè)中。Spark性能優(yōu)化的***步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD lineage設(shè)計、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時時刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實際的應(yīng)用場景,靈活地運用到自己的Spark作業(yè)中。

原則一:避免創(chuàng)建重復(fù)的RDD

通常來說,我們在開發(fā)一個Spark作業(yè)時,首先是基于某個數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個初始的RDD;接著對這個RDD執(zhí)行某個算子操作,然后得到下一個RDD;以此類推,循環(huán)往復(fù),直到計算出最終我們需要的結(jié)果。在這個過程中,多個RDD會通過不同的算子操作(比如map、reduce等)串起來,這個“RDD串”,就是RDD lineage,也就是“RDD的血緣關(guān)系鏈”。

我們在開發(fā)過程中要注意:對于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個RDD,不能創(chuàng)建多個RDD來代表同一份數(shù)據(jù)。

一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時,或者是有經(jīng)驗的工程師在開發(fā)RDD lineage極其冗長的Spark作業(yè)時,可能會忘了自己之前對于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個RDD了,從而導(dǎo)致對于同一份數(shù)據(jù),創(chuàng)建了多個RDD。這就意味著,我們的Spark作業(yè)會進行多次重復(fù)計算來創(chuàng)建多個代表相同數(shù)據(jù)的RDD,進而增加了作業(yè)的性能開銷。

一個簡單的例子 

  1. //也就是說,需要對一份數(shù)據(jù)執(zhí)行兩次算子操作。 
  2. //錯誤的做法:對于同一份數(shù)據(jù)執(zhí)行多次算子操作時,創(chuàng)建多個RDD。 
  3. //這里執(zhí)行了兩次textFile方法,針對同一個HDFS文件,創(chuàng)建了兩個RDD出來, 
  4. //然后分別對每個RDD都執(zhí)行了一個算子操作。 
  5. //這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個單獨的RDD; 
  6. //第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費掉的。 
  7. val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt"
  8. rdd1.map(...) 
  9. val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt"
  10. rdd2.reduce(...) 
  11. //正確的用法:對于一份數(shù)據(jù)執(zhí)行多次算子操作時,只使用一個RDD。 
  12. //這種寫法很明顯比上一種寫法要好多了,因為我們對于同一份數(shù)據(jù)只創(chuàng)建了一個RDD, 
  13. //然后對這一個RDD執(zhí)行了多次算子操作。 
  14. //但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作,第二次執(zhí)行reduce操作的時候, 
  15. //還會再次從源頭處重新計算一次rdd1的數(shù)據(jù),因此還是會有重復(fù)計算的性能開銷。 
  16. //要徹底解決這個問題,必須結(jié)合“原則三:對多次使用的RDD進行持久化”, 
  17. //才能保證一個RDD被多次使用時只被計算一次。 
  18. val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt"
  19. rdd1.map(...) 

原則二:盡可能復(fù)用同一個RDD

除了要避免在開發(fā)過程中對一份完全相同的數(shù)據(jù)創(chuàng)建多個RDD之外,在對不同的數(shù)據(jù)執(zhí)行算子操作時還要盡可能地復(fù)用一個RDD。比如說,有一個RDD的數(shù)據(jù)格式是key-value類型的,另一個是單value類型的,這兩個RDD的value數(shù)據(jù)是完全一樣的。那么此時我們可以只使用key-value類型的那個RDD,因為其中已經(jīng)包含了另一個的數(shù)據(jù)。對于類似這種多個RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個RDD,這樣可以盡可能地減少RDD的數(shù)量,從而盡可能減少算子執(zhí)行的次數(shù)。

一個簡單的例子 

  1. // 錯誤的做法。 
  2.   
  3. // 有一個<Long, String>格式的RDD,即rdd1。 
  4. // 接著由于業(yè)務(wù)需要,對rdd1執(zhí)行了一個map操作,創(chuàng)建了一個rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。 
  5. JavaPairRDD<Long, String> rdd1 = ... 
  6. JavaRDD<String> rdd2 = rdd1.map(...) 
  7.   
  8. // 分別對rdd1和rdd2執(zhí)行了不同的算子操作。 
  9. rdd1.reduceByKey(...) 
  10. rdd2.map(...) 
  11.   
  12. // 正確的做法。 
  13.   
  14. // 上面這個case中,其實rdd1和rdd2的區(qū)別無非就是數(shù)據(jù)格式不同而已,rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個rdd,并對兩個rdd都執(zhí)行了一次算子操作。 
  15. // 此時會因為對rdd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進而增加性能開銷。 
  16. // 其實在這種情況下完全可以復(fù)用同一個RDD。 
  17. // 我們可以使用rdd1,既做reduceByKey操作,也做map操作。 
  18. // 在進行第二個map操作時,只使用每個數(shù)據(jù)的tuple._2,也就是rdd1中的value值,即可。 
  19. JavaPairRDD<Long, String> rdd1 = ... 
  20. rdd1.reduceByKey(...) 
  21. rdd1.map(tuple._2...) 
  22.   
  23. // 第二種方式相較于***種方式而言,很明顯減少了一次rdd2的計算開銷。 
  24. // 但是到這里為止,優(yōu)化還沒有結(jié)束,對rdd1我們還是執(zhí)行了兩次算子操作,rdd1實際上還是會被計算兩次。 
  25. // 因此還需要配合“原則三:對多次使用的RDD進行持久化”進行使用,才能保證一個RDD被多次使用時只被計算一次。 

原則三:對多次使用的RDD進行持久化

當(dāng)你在Spark代碼中多次對一個RDD做了算子操作后,恭喜,你已經(jīng)實現(xiàn)Spark作業(yè)***步的優(yōu)化了,也就是盡可能復(fù)用RDD。此時就該在這個基礎(chǔ)之上,進行第二步優(yōu)化了,也就是要保證對一個RDD執(zhí)行多次算子操作時,這個RDD本身僅僅被計算一次。

Spark中對于一個RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對一個RDD執(zhí)行一個算子操作時,都會重新從源頭處計算一遍,計算出那個RDD來,然后再對這個RDD執(zhí)行你的算子操作。這種方式的性能是很差的。

因此對于這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對這個RDD進行算子操作時,都會直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子,而不會從源頭處重新計算一遍這個RDD,再執(zhí)行算子操作。

對多次使用的RDD進行持久化的代碼示例 

  1. // 如果要對一個RDD進行持久化,只要對這個RDD調(diào)用cache()和persist()即可。 
  2.   
  3. // 正確的做法。 
  4. // cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中。 
  5. // 此時再對rdd1執(zhí)行兩次算子操作時,只有在***次執(zhí)行map算子時,才會將這個rdd1從源頭處計算一次。 
  6. // 第二次執(zhí)行reduce算子時,就會直接從內(nèi)存中提取數(shù)據(jù)進行計算,不會重復(fù)計算一個rdd。 
  7. val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache() 
  8. rdd1.map(...) 
  9. rdd1.reduce(...) 
  10.   
  11. // persist()方法表示:手動選擇持久化級別,并使用指定的方式進行持久化。 
  12. // 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,內(nèi)存充足時優(yōu)先持久化到內(nèi)存中,內(nèi)存不充足時持久化到磁盤文件中。 
  13. // 而且其中的_SER后綴表示,使用序列化的方式來保存RDD數(shù)據(jù),此時RDD中的每個partition都會序列化成一個大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中。 
  14. // 序列化的方式可以減少持久化的數(shù)據(jù)對內(nèi)存/磁盤的占用量,進而避免內(nèi)存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。 
  15. val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER) 
  16. rdd1.map(...) 
  17. rdd1.reduce(...) 

對于persist()方法而言,我們可以根據(jù)不同的業(yè)務(wù)場景選擇不同的持久化級別。

Spark的持久化級別

Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇

如何選擇一種最合適的持久化策略

  • 默認(rèn)情況下,性能***的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數(shù)據(jù)。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳送到其他節(jié)點上。但是這里必須要注意的是,在實際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數(shù)據(jù)比較多時(比如幾十億),直接用這種持久化級別,會導(dǎo)致JVM的OOM內(nèi)存溢出異常。
  • 如果使用MEMORY_ONLY級別時發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時每個partition僅僅是一個字節(jié)數(shù)組而已,大大減少了對象數(shù)量,并降低了內(nèi)存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話,還是可能會導(dǎo)致OOM內(nèi)存溢出的異常。
  • 如果純內(nèi)存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數(shù)據(jù)量很大,內(nèi)存無法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時該策略會優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會寫入磁盤。
  • 通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數(shù)據(jù)的讀寫,會導(dǎo)致性能急劇降低,有時還不如重新計算一次所有RDD。后綴為_2的級別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。

原則四:盡量避免使用shuffle類算子

如果有可能的話,要盡量避免使用shuffle類算子。因為Spark作業(yè)運行過程中,最消耗性能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分布在集群中多個節(jié)點上的同一個key,拉取到同一個節(jié)點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發(fā)shuffle操作。

shuffle過程中,各個節(jié)點上的相同key都會先寫入本地磁盤文件中,然后其他節(jié)點需要通過網(wǎng)絡(luò)傳輸拉取各個節(jié)點上的磁盤文件中的相同key。而且相同key都拉取到同一個節(jié)點進行聚合操作時,還有可能會因為一個節(jié)點上處理的key過多,導(dǎo)致內(nèi)存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。

因此在我們的開發(fā)過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷。

Broadcast與map進行join代碼示例 

  1. // 傳統(tǒng)的join操作會導(dǎo)致shuffle操作。 
  2. // 因為兩個RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個節(jié)點上,由一個task進行join操作。 
  3. val rdd3 = rdd1.join(rdd2) 
  4.   
  5. // Broadcast+map的join操作,不會導(dǎo)致shuffle操作。 
  6. // 使用Broadcast將一個數(shù)據(jù)量較小的RDD作為廣播變量。 
  7. val rdd2Data = rdd2.collect() 
  8. val rdd2DataBroadcast = sc.broadcast(rdd2Data) 
  9.   
  10. // 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。 
  11. // 然后進行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進行join。 
  12. // 此時就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。 
  13. val rdd3 = rdd1.map(rdd2DataBroadcast...) 
  14.   
  15. // 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M,或者一兩G)的情況下使用。 
  16. // 因為每個Executor的內(nèi)存中,都會駐留一份rdd2的全量數(shù)據(jù)。 

原則五:使用map-side預(yù)聚合的shuffle操作

如果因為業(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子。

所謂的map-side預(yù)聚合,說的是在每個節(jié)點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個節(jié)點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節(jié)點在拉取所有節(jié)點上的相同key時,就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數(shù)對每個節(jié)點本地的相同key進行預(yù)聚合。而groupByKey算子是不會進行預(yù)聚合的,全量的數(shù)據(jù)會在集群的各個節(jié)點之間分發(fā)和傳輸,性能相對來說比較差。

比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數(shù)。其中***張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有數(shù)據(jù)都會在集群節(jié)點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節(jié)點本地的相同key數(shù)據(jù),都進行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點上進行全局聚合。

Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇

 

 

Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇

原則六:使用高性能的算子

除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則。

使用reduceByKey/aggregateByKey替代groupByKey

詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions類的算子,一次函數(shù)調(diào)用會處理一個partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對來說會高一些。但是有的時候,使用mapPartitions會出現(xiàn)OOM(內(nèi)存溢出)的問題。因為單次函數(shù)調(diào)用就要處理掉一個partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現(xiàn)OOM異常。所以使用這類操作時要慎重!

使用foreachPartitions替代foreach

原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。在實踐中發(fā)現(xiàn),foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會創(chuàng)建一個數(shù)據(jù)庫連接,此時就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數(shù)據(jù),那么對于每個partition,只要創(chuàng)建一個數(shù)據(jù)庫連接即可,然后執(zhí)行批量插入操作,此時性能是比較高的。實踐中發(fā)現(xiàn),對于1萬條左右的數(shù)據(jù)量寫MySQL,性能可以提升30%以上。

使用filter之后進行coalesce操作*

通常對一個RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因為filter之后,RDD的每個partition中都會有很多數(shù)據(jù)被過濾掉,此時如果照常進行后續(xù)的計算,其實每個task處理的partition中的數(shù)據(jù)量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。

使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個算子,官方建議,如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區(qū)的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。

原則七:廣播大變量

有時在開發(fā)過程中,會遇到需要在算子函數(shù)中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能。

在算子函數(shù)中使用到外部變量時,默認(rèn)情況下,Spark會將該變量復(fù)制多個副本,通過網(wǎng)絡(luò)傳輸?shù)絫ask中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個節(jié)點的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC,都會極大地影響性能。

因此對于上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播后的變量,會保證每個Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的task執(zhí)行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對Executor內(nèi)存的占用開銷,降低GC的頻率。

廣播大變量的代碼示例 

  1. // 以下代碼在算子函數(shù)中,使用了外部的變量。 
  2. // 此時沒有做任何特殊操作,每個task都會有一份list1的副本。 
  3. val list1 = ... 
  4. rdd1.map(list1...) 
  5.   
  6. // 以下代碼將list1封裝成了Broadcast類型的廣播變量。 
  7. // 在算子函數(shù)中,使用廣播變量時,首先會判斷當(dāng)前task所在Executor內(nèi)存中,是否有變量副本。 
  8. // 如果有則直接使用;如果沒有則從Driver或者其他Executor節(jié)點上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中。 
  9. // 每個Executor內(nèi)存中,就只會駐留一份廣播變量副本。 
  10. val list1 = ... 
  11. val list1Broadcast = sc.broadcast(list1) 
  12. rdd1.map(list1Broadcast...) 

原則八:使用Kryo優(yōu)化序列化性能

在Spark中,主要有三個地方涉及到了序列化:

  • 在算子函數(shù)中使用到外部變量時,該變量會被序列化后進行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)。
  • 將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現(xiàn)Serializable接口。
  • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節(jié)數(shù)組。

對于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫,來優(yōu)化序列化和反序列化的性能。Spark默認(rèn)使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫,是因為Kryo要求***要注冊所有需要進行序列化的自定義類型,因此對于開發(fā)者來說,這種方式比較麻煩。

以下是使用Kryo的代碼示例,我們只要設(shè)置序列化類,再注冊要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為RDD泛型類型的自定義類型等): 

  1. // 創(chuàng)建SparkConf對象。 
  2. val conf = new SparkConf().setMaster(...).setAppName(...) 
  3. // 設(shè)置序列化器為KryoSerializer。 
  4. conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer"
  5. // 注冊要序列化的自定義類型。 
  6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) 

原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)

Java中,有三種類型比較耗費內(nèi)存:

  • 對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內(nèi)存空間。
  • 字符串,每個字符串內(nèi)部都有一個字符數(shù)組以及長度等額外信息。
  • 集合類型,比如HashMap、LinkedList等,因為集合類型內(nèi)部通常會使用一些內(nèi)部類來封裝集合元素,比如Map.Entry。

因此Spark官方建議,在Spark編碼實現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率,提升性能。

但是在筆者的編碼實踐中發(fā)現(xiàn),要做到該原則其實并不容易。因為我們同時要考慮到代碼的可維護性,如果一個代碼中,完全沒有任何對象抽象,全部是字符串拼接的方式,那么對于后續(xù)的代碼維護和修改,無疑是一場巨大的災(zāi)難。同理,如果所有操作都基于數(shù)組實現(xiàn),而不使用HashMap、LinkedList等集合類型,那么對于我們的編碼難度以及代碼可維護性,也是一個極大的挑戰(zhàn)。因此筆者建議,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu),但是前提是要保證代碼的可維護性。

責(zé)任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關(guān)推薦

2021-03-04 08:39:21

SparkRDD調(diào)優(yōu)

2017-07-07 11:01:04

Spark性能調(diào)優(yōu)

2017-10-20 13:41:11

Spark集群代碼

2018-08-24 07:17:41

Spark大數(shù)據(jù)調(diào)優(yōu)

2019-08-13 09:04:22

Linux性能調(diào)優(yōu)

2021-12-26 00:03:25

Spark性能調(diào)優(yōu)

2016-03-25 09:59:38

性能調(diào)優(yōu)LinuxMySQL

2017-07-21 08:55:13

TomcatJVM容器

2012-06-20 11:05:47

性能調(diào)優(yōu)攻略

2023-04-03 10:25:00

數(shù)據(jù)庫性能調(diào)優(yōu)

2023-11-23 09:26:50

Java調(diào)優(yōu)

2011-03-10 14:40:54

LAMPMysql

2018-05-09 08:35:59

2011-03-10 14:40:50

2017-11-27 14:58:01

MySQL高并發(fā)優(yōu)化性能調(diào)優(yōu)

2011-05-20 15:02:01

Oracle性能調(diào)優(yōu)

2011-11-14 10:28:23

2020-11-30 11:40:35

NginxLinux性能調(diào)優(yōu)

2021-07-28 13:28:43

高并發(fā)RPC服務(wù)端

2022-03-30 10:51:40

JavaScript性能調(diào)優(yōu)
點贊
收藏

51CTO技術(shù)棧公眾號