4個(gè)簡(jiǎn)單技巧,可提高您的Apache Spark工作性能
使您的Apache Spark應(yīng)用程序運(yùn)行速度更快,而對(duì)代碼的更改最少!
介紹
在開(kāi)發(fā)Spark應(yīng)用程序時(shí),最耗時(shí)的部分之一是優(yōu)化。 在此博客文章中,我將提供一些性能提示,以及(至少對(duì)我而言)啟動(dòng)時(shí)可能會(huì)使用的未知配置參數(shù)。
因此,我將介紹以下主題:
- 多個(gè)小文件作為源
- 隨機(jī)分區(qū)參數(shù)
- 強(qiáng)制廣播Join
- 分區(qū)vs合并vs隨機(jī)分區(qū)參數(shù)設(shè)置
我們可以改善什么?
1. 使用多個(gè)小文件?
OpenCostInBytes(來(lái)自文檔)—可以同時(shí)掃描打開(kāi)文件的估計(jì)成本(以字節(jié)數(shù)衡量)。 將多個(gè)文件放入分區(qū)時(shí)使用。 最好高估一下,然后,具有較小文件的分區(qū)將比具有較大文件的分區(qū)(首先安排)更快。 默認(rèn)值為4MB。
spark.conf.set("spark.files.openCostInBytes", SOME_COST_IN_BYTES)
我對(duì)包含12,000個(gè)文件的1GB文件夾,包含800個(gè)文件的7.8GB文件夾和包含1.6k個(gè)文件的18GB文件夾進(jìn)行了測(cè)試。 我的目的是弄清楚輸入文件是否較小,最好使用低于默認(rèn)值的文件。
因此,當(dāng)測(cè)試1GB和7.8GB文件夾時(shí)-肯定是較低的值,但是測(cè)試大約11MB的文件時(shí),較大的參數(shù)值會(huì)更好。
使用接近您的小文件大小的openCostInBytes大小。 這樣會(huì)更有效率!
2. 隨機(jī)分區(qū)
開(kāi)始使用Spark時(shí),我莫名其妙地想到了在創(chuàng)建Spark會(huì)話(huà)時(shí)設(shè)置的配置是不可變的。 天哪,我怎么錯(cuò)。
因此,通常,在進(jìn)行聚集或聯(lián)接時(shí),spark分區(qū)在spark中是一個(gè)靜態(tài)數(shù)字(默認(rèn)為200)。 根據(jù)您的數(shù)據(jù)大小,這會(huì)導(dǎo)致兩個(gè)問(wèn)題:
- 數(shù)據(jù)集很小-200太多,數(shù)據(jù)分散且效率不高
- 數(shù)據(jù)集巨大-200太少了。 數(shù)據(jù)被浪費(fèi)了,我們沒(méi)有充分利用我們想要的所有資源。
因此,在遇到此類(lèi)問(wèn)題時(shí)遇到了一些麻煩,我在Google上花費(fèi)了很多時(shí)間,發(fā)現(xiàn)了這個(gè)美麗的東西
- spark.conf.set("spark.sql.shuffle.partitions", X)
可以在運(yùn)行時(shí)中途隨時(shí)隨地更改此整潔的配置,它會(huì)影響設(shè)置后觸發(fā)的步驟。 您也可以在創(chuàng)建Spark會(huì)話(huà)時(shí)使用這個(gè)壞男孩。 在對(duì)聯(lián)接或聚合進(jìn)行數(shù)據(jù)混排時(shí),將使用此分區(qū)數(shù)量。 還獲得數(shù)據(jù)幀分區(qū)計(jì)數(shù):
- df.rdd.getNumPartitions()
您可以估計(jì)最合適的混搭分區(qū)數(shù),以進(jìn)行進(jìn)一步的聯(lián)接和聚合。
也就是說(shuō),您有一個(gè)巨大的數(shù)據(jù)框,并且想要保留一些信息。 這樣就得到了大數(shù)據(jù)幀的分區(qū)數(shù)。 將shuffle分區(qū)參數(shù)設(shè)置為此值。 這樣一來(lái),加入后就不會(huì)成為默認(rèn)值200! 更多并行性-我們來(lái)了!
3. 廣播Join
非常簡(jiǎn)單的情況:我們有一個(gè)龐大的表,其中包含所有用戶(hù),而我們的表中包含內(nèi)部用戶(hù),質(zhì)量檢查人員和其他不應(yīng)包含在內(nèi)的用戶(hù)。 目標(biāo)只是離開(kāi)非內(nèi)部人員。
- 讀兩個(gè)表
- Huge_table 左防聯(lián)接小表
它看起來(lái)像是一個(gè)簡(jiǎn)單且性能明智的好解決方案。 如果您的小型表小于10MB,則您的小型數(shù)據(jù)集將在沒(méi)有任何提示的情況下進(jìn)行廣播。 如果在代碼中添加提示,則可能會(huì)使它在更大的數(shù)據(jù)集上運(yùn)行,但這取決于優(yōu)化程序的行為。
但是,假設(shè)它是100-200MB,并且提示您不要強(qiáng)制廣播它。 因此,如果您確信它不會(huì)影響代碼的性能(或引發(fā)一些OOM錯(cuò)誤),則可以使用它并覆蓋默認(rèn)值:
- spark.conf.set("spark.sql.autoBroadcastJoinThreshold", SIZE_OF_SMALLER_DATASET)
在這種情況下,它將廣播給所有執(zhí)行者,并且加入應(yīng)該工作得更快。
當(dāng)心OOM錯(cuò)誤!
4. 分區(qū)vs合并vs隨機(jī)分區(qū)配置設(shè)置
如果您使用的是Spark,則可能知道重新分區(qū)方法。 對(duì)我來(lái)說(shuō),來(lái)自SQL后臺(tái)方法合并的方式有不同的含義! 顯然,在分區(qū)上進(jìn)行火花合并時(shí),其行為方式有所不同-它移動(dòng)并將多個(gè)分區(qū)組合在一起。 基本上,我們將數(shù)據(jù)改組和移動(dòng)減到最少。
如果我們只需要減少分區(qū)數(shù),則應(yīng)該使用合并而不是重新分區(qū),因?yàn)檫@樣可以最大程度地減少數(shù)據(jù)移動(dòng)并且不會(huì)觸發(fā)交換。 如果我們想更均勻地在分區(qū)之間劃分?jǐn)?shù)據(jù),請(qǐng)重新分區(qū)。
但是,假設(shè)我們有一個(gè)重復(fù)出現(xiàn)的模式,我們執(zhí)行聯(lián)接/轉(zhuǎn)換并得到200個(gè)分區(qū),但是我們不需要200個(gè)分區(qū),即100個(gè)甚至1個(gè)。
讓我們嘗試進(jìn)行比較。 我們將讀取11MB的文件夾,并像以前一樣進(jìn)行匯總。
通過(guò)將數(shù)據(jù)幀持久存儲(chǔ)在僅存儲(chǔ)選件磁盤(pán)上,我們可以估計(jì)數(shù)據(jù)幀大小。 所以small_df只有10 MB,但是分區(qū)數(shù)是200。等等? 平均每個(gè)分區(qū)可提供50KB的數(shù)據(jù),這效率不高。 因此,我們將讀取大數(shù)據(jù)幀,并將聚合后的分區(qū)計(jì)數(shù)設(shè)置為1,并強(qiáng)制Spark執(zhí)行,最后我們將其算作一項(xiàng)操作。
這是我們?nèi)N情況的執(zhí)行計(jì)劃:
> Setting shuffle partition parameter
> Coalesce action
> Repartitioning
因此,在所有可見(jiàn)的設(shè)置中,我們不會(huì)調(diào)用Coalesce / Exchange的其他步驟(重新分區(qū)操作)。 因此,我們可以通過(guò)跳過(guò)它來(lái)節(jié)省一些執(zhí)行時(shí)間。 如果我們看一下執(zhí)行時(shí)間:Shuffle Partition設(shè)置在7.1分鐘,Coalesce 8.1,Repartition 8.3中完成。
這只是一個(gè)簡(jiǎn)單的示例,它仍然顯示了通過(guò)設(shè)置一個(gè)配置參數(shù)可以節(jié)省多少時(shí)間!
摘要
關(guān)于如何使您的Apache Spark應(yīng)用程序更快,更高效地運(yùn)行,有許多小而簡(jiǎn)單的技巧和竅門(mén)。 不幸的是,使用Spark時(shí),大多數(shù)情況下解決方案都是單獨(dú)的。 為了使其正常工作,大多數(shù)時(shí)候您必須了解Spark內(nèi)部組件的內(nèi)幕,并從頭到尾閱讀文檔多次。
在本文中,我提到了如何更快地讀取多個(gè)小文件,如何強(qiáng)制建議廣播連接,選擇何時(shí)使用shuffle分區(qū)參數(shù),合并和重新分區(qū)。
我希望它很有用,并會(huì)在您開(kāi)發(fā)Apache Spark應(yīng)用程序的過(guò)程中為您提供幫助!