Spark程序運(yùn)行常見(jiàn)錯(cuò)誤解決方法以及優(yōu)化
一.org.apache.spark.shuffle.FetchFailedException
1.問(wèn)題描述
這種問(wèn)題一般發(fā)生在有大量shuffle操作的時(shí)候,task不斷的failed,然后又重執(zhí)行,一直循環(huán)下去,非常的耗時(shí)。
2.報(bào)錯(cuò)提示
(1) missing output location
- org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
(2) shuffle fetch faild
- org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
當(dāng)前的配置為每個(gè)executor使用1cpu,5GRAM,啟動(dòng)了20個(gè)executor
3.解決方案
一般遇到這種問(wèn)題提高executor內(nèi)存即可,同時(shí)增加每個(gè)executor的cpu,這樣不會(huì)減少task并行度。
- spark.executor.memory 15G
- spark.executor.cores 3
- spark.cores.max 21
啟動(dòng)的execuote數(shù)量為:7個(gè)
- execuoteNum = spark.cores.max/spark.executor.cores
每個(gè)executor的配置:
- 3core,15G RAM
消耗的內(nèi)存資源為:105G RAM
- 15G*7=105G
可以發(fā)現(xiàn)使用的資源并沒(méi)有提升,但是同樣的任務(wù)原來(lái)的配置跑幾個(gè)小時(shí)還在卡著,改了配置后幾分鐘就結(jié)束了。
二.Executor&Task Lost
1.問(wèn)題描述
因?yàn)榫W(wǎng)絡(luò)或者gc的原因,worker或executor沒(méi)有接收到executor或task的心跳反饋
2.報(bào)錯(cuò)提示
(1) executor lost
- WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
(2) task lost
- WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
(3) 各種timeout
- java.util.concurrent.TimeoutException: Futures timed out after [120 second
- ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong
3.解決方案
提高 spark.network.timeout 的值,根據(jù)情況改成300(5min)或更高。
默認(rèn)為 120(120s),配置所有網(wǎng)絡(luò)傳輸?shù)难訒r(shí),如果沒(méi)有主動(dòng)設(shè)置以下參數(shù),默認(rèn)覆蓋其屬性
- spark.core.connection.ack.wait.timeout
- spark.akka.timeout
- spark.storage.blockManagerSlaveTimeoutMs
- spark.shuffle.io.connectionTimeout
- spark.rpc.askTimeout or spark.rpc.lookupTimeout
三.傾斜
1.問(wèn)題描述
大多數(shù)任務(wù)都完成了,還有那么一兩個(gè)任務(wù)怎么都跑不完或者跑的很慢。
分為數(shù)據(jù)傾斜和task傾斜兩種。
2.錯(cuò)誤提示
(1) 數(shù)據(jù)傾斜
(2) 任務(wù)傾斜
差距不大的幾個(gè)task,有的運(yùn)行速度特別慢。
3.解決方案
(1) 數(shù)據(jù)傾斜
數(shù)據(jù)傾斜大多數(shù)情況是由于大量null值或者""引起,在計(jì)算前過(guò)濾掉這些數(shù)據(jù)既可。
例如:
- sqlContext.sql("...where col is not null and col != ''")
(2) 任務(wù)傾斜
task傾斜原因比較多,網(wǎng)絡(luò)io,cpu,mem都有可能造成這個(gè)節(jié)點(diǎn)上的任務(wù)執(zhí)行緩慢,可以去看該節(jié)點(diǎn)的性能監(jiān)控來(lái)分析原因。以前遇到過(guò)同事在spark的一臺(tái)worker上跑R的任務(wù)導(dǎo)致該節(jié)點(diǎn)spark task運(yùn)行緩慢。
或者可以開(kāi)啟spark的推測(cè)機(jī)制,開(kāi)啟推測(cè)機(jī)制后如果某一臺(tái)機(jī)器的幾個(gè)task特別慢,推測(cè)機(jī)制會(huì)將任務(wù)分配到其他機(jī)器執(zhí)行,***Spark會(huì)選取最快的作為最終結(jié)果。
spark.speculation true
spark.speculation.interval 100 - 檢測(cè)周期,單位毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比時(shí)啟動(dòng)推測(cè)
spark.speculation.multiplier 1.5 - 比其他的慢多少倍時(shí)啟動(dòng)推測(cè)。
四.OOM(內(nèi)存溢出)
1.問(wèn)題描述
內(nèi)存不夠,數(shù)據(jù)太多就會(huì)拋出OOM的Exeception
因?yàn)閳?bào)錯(cuò)提示很明顯,這里就不給報(bào)錯(cuò)提示了。。。
2.解決方案
主要有driver OOM和executor OOM兩種
(1) driver OOM
一般是使用了collect操作將所有executor的數(shù)據(jù)聚合到driver導(dǎo)致。盡量不要使用collect操作即可。
(2) executor OOM
1.可以按下面的內(nèi)存優(yōu)化的方法增加code使用內(nèi)存空間
2.增加executor內(nèi)存總量,也就是說(shuō)增加spark.executor.memory的值
3.增加任務(wù)并行度(大任務(wù)就被分成小任務(wù)了),參考下面優(yōu)化并行度的方法
優(yōu)化
1.內(nèi)存
當(dāng)然如果你的任務(wù)shuffle量特別大,同時(shí)rdd緩存比較少可以更改下面的參數(shù)進(jìn)一步提高任務(wù)運(yùn)行速度。
spark.storage.memoryFraction - 分配給rdd緩存的比例,默認(rèn)為0.6(60%),如果緩存的數(shù)據(jù)較少可以降低該值。
spark.shuffle.memoryFraction - 分配給shuffle數(shù)據(jù)的內(nèi)存比例,默認(rèn)為0.2(20%)
剩下的20%內(nèi)存空間則是分配給代碼生成對(duì)象等。
如果任務(wù)運(yùn)行緩慢,jvm進(jìn)行頻繁gc或者內(nèi)存空間不足,或者可以降低上述的兩個(gè)值。
"spark.rdd.compress","true" - 默認(rèn)為false,壓縮序列化的RDD分區(qū),消耗一些cpu減少空間的使用
如果數(shù)據(jù)只使用一次,不要采用cache操作,因?yàn)椴⒉粫?huì)提高運(yùn)行速度,還會(huì)造成內(nèi)存浪費(fèi)。
2.并行度
- spark.default.parallelism
發(fā)生shuffle時(shí)的并行度,在standalone模式下的數(shù)量默認(rèn)為core的個(gè)數(shù),也可手動(dòng)調(diào)整,數(shù)量設(shè)置太大會(huì)造成很多小任務(wù),增加啟動(dòng)任務(wù)的開(kāi)銷(xiāo),太小,運(yùn)行大數(shù)據(jù)量的任務(wù)時(shí)速度緩慢。
- spark.sql.shuffle.partitions
sql聚合操作(發(fā)生shuffle)時(shí)的并行度,默認(rèn)為200,如果任務(wù)運(yùn)行緩慢增加這個(gè)值。
相同的兩個(gè)任務(wù):
- spark.sql.shuffle.partitions=300:
- spark.sql.shuffle.partitions=500:
速度變快主要是大量的減少了gc的時(shí)間。
修改map階段并行度主要是在代碼中使用rdd.repartition(partitionNum)來(lái)操作。