自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從源碼看Spark讀取Hive表數(shù)據(jù)小文件和分塊的問題

大數(shù)據(jù) Spark
本文涉及源碼基于Spark2.0.0和Hadoop2.6.0,不同版本代碼可能不一致,需自己對(duì)應(yīng)。此外針對(duì)TextInputFormat格式的Hive表,其他格式的比如Parquet有Spark自己的高效實(shí)現(xiàn),不在討論范圍之內(nèi)

前言

有同事問到,Spark讀取一張Hive表的數(shù)據(jù)Task有一萬多個(gè),看了Hive表分區(qū)下都是3MB~4MB的小文件,每個(gè)Task只處理這么小的文件,實(shí)在浪費(fèi)資源浪費(fèi)時(shí)間。而我們都知道Spark的Task數(shù)由partitions決定,所以他想通過repartition(num)的方式來改變分區(qū)數(shù),結(jié)果發(fā)現(xiàn)讀取文件的時(shí)候Task數(shù)并沒有改變。遂問我有什么參數(shù)可以設(shè)置,從而改變讀取Hive表時(shí)的Task數(shù),將小文件合并大文件讀上來

本文涉及源碼基于Spark2.0.0和Hadoop2.6.0,不同版本代碼可能不一致,需自己對(duì)應(yīng)。此外針對(duì)TextInputFormat格式的Hive表,其他格式的比如Parquet有Spark自己的高效實(shí)現(xiàn),不在討論范圍之內(nèi)

分析

Spark讀取Hive表是通過HadoopRDD掃描上來的,具體可見 org.apache.spark.sql.hive.TableReader類,構(gòu)建HadoopRDD的代碼如下 

  1. val rdd = new HadoopRDD( 
  2. sparkSession.sparkContext, 
  3. _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], 
  4. Some(initializeJobConfFunc), 
  5. inputFormatClass, 
  6. classOf[Writable], 
  7. classOf[Writable], 
  8. _minSplitsPerRDD) 

這里inputFormatClass是Hive創(chuàng)建時(shí)指定的,默認(rèn)不指定為 org.apache.hadoop.mapred.TextInputFormat,由它就涉及到了HDFS文件的FileSplit數(shù),從而決定了上層Spark的partition數(shù)。在進(jìn)入HadoopRDD類查看之前,還有一個(gè)參數(shù)需要我們注意,就是 _minSplitsPerRDD,它在后面SplitSize的計(jì)算中是起了作用的。

我們看一下它的定義 

  1. private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 
  2. 0 // will splitted based on block by default
  3. else { 
  4. math.max(hadoopConf.getInt("mapred.map.tasks", 1), 
  5. sparkSession.sparkContext.defaultMinPartitions) 

在我們指定以--master local模式跑的時(shí)候,它為0,而在其他模式下,則是求的一個(gè)***值。這里重點(diǎn)看 defaultMinPartitions,如下 

  1. def defaultMinPartitions: Int = math.min(defaultParallelism, 2) 
  2.  
  3. // defaultParallelism 在yarn和standalone模式下的計(jì)算 
  4. def defaultParallelism(): Int = { 
  5. conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) 

從這里可以看到,defaultMinPartitions的值一般為2,而 mapred.map.tasks 或者 mapreduce.job.maps( 新版參數(shù))是Hadoop的內(nèi)建參數(shù),其默認(rèn)值也為2,一般很少去改變它。所以這里_minSplitsPerRDD的值基本就是2了。

下面我們跟到HadoopRDD類里,去看看它的partitions是如何來的 

  1. def getPartitions: Array[Partition] = { 
  2. val jobConf = getJobConf() 
  3. // add the credentials here as this can be called before SparkContext initialized 
  4. SparkHadoopUtil.get.addCredentials(jobConf) 
  5. // inputFormat就是上面參數(shù)inputFormatClass所配置的類的實(shí)例 
  6. val inputFormat = getInputFormat(jobConf) 
  7. // 此處獲取FileSplit數(shù),minPartitions就是上面的_minSplitsPerRDD 
  8. val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 
  9. val array = new Array[Partition](inputSplits.size
  10. // 從這里可以看出FileSplit數(shù)決定了Spark掃描Hive表的partition數(shù) 
  11. for (i <- 0 until inputSplits.size) { 
  12. array(i) = new HadoopPartition(id, i, inputSplits(i)) 
  13. array 

在 getPartitions 方法里我們可以看到 FileSplit數(shù)***決定了Spark讀取Hive表的Task數(shù),下面我們?cè)賮砜纯? mapred.TextInputFormat 類里 getSplits 的實(shí)現(xiàn)

分兩步來看,首先是掃描文件,計(jì)算文件大小的部分 

  1. FileStatus[] files = listStatus(job); 
  2.  
  3. ..... 
  4.  
  5. long totalSize = 0; // compute total size 
  6. for (FileStatus file: files) { // check we have valid files 
  7. if (file.isDirectory()) { 
  8. throw new IOException("Not a file: "+ file.getPath()); 
  9. totalSize += file.getLen(); 
  10.  
  11. // numSplits就是上面?zhèn)魅氲膍inPartitions,為2 
  12. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 
  13. long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1), minSplitSize); 
  14.  
  15. // minSplitSize 默認(rèn)為1,唯一可通過 setMinSplitSize 方法設(shè)置 
  16. private long minSplitSize = 1; 

針對(duì)Hive表的分區(qū),Spark對(duì)每個(gè)分區(qū)都構(gòu)建了一個(gè)HadoopRDD,每個(gè)分區(qū)目錄下就是實(shí)際的數(shù)據(jù)文件,例如我們集群的某一張表按天分區(qū),每天下面有200個(gè)數(shù)據(jù)文件,每個(gè)文件大概3MB~4MB之間,這些實(shí)際上是reduce設(shè)置不合理導(dǎo)致的小文件產(chǎn)生,如下圖 

HiveFile

此處 listStatus 方法就是掃描的分區(qū)目錄,它返回的就是圖中顯示的具體 part-*****文件的FileStatus對(duì)象,一共200個(gè)。從 totalSize 的計(jì)算可以看出,它是這200個(gè)文件的總大小,為838MB,因此 goalSize 就為419MB。

參數(shù) mapreduce.input.fileinputformat.split.minsize 在Spark程序沒有配的情況下,獲取的值為0,而 minSplitSize在Spark獲取FileSplits的時(shí)候并沒有被設(shè)置,所以為默認(rèn)值1,那么 minSize 就為1

其次,我們?cè)賮砜磸奈募澐諷plit,部分代碼如下(部分解釋見注釋) 

  1. ArrayList splits = new ArrayList(numSplits); 
  2. NetworkTopology clusterMap = new NetworkTopology(); 
  3.  
  4. // files是上面掃描的分區(qū)目錄下的part-*****文件 
  5. for (FileStatus file: files) { 
  6. Path path = file.getPath(); 
  7. long length = file.getLen(); 
  8. if (length != 0) { 
  9. FileSystem fs = path.getFileSystem(job); 
  10. BlockLocation[] blkLocations; 
  11. if (file instanceof LocatedFileStatus) { 
  12. blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
  13. else { 
  14. blkLocations = fs.getFileBlockLocations(file, 0, length); 
  15. // 判斷文件是否可切割 
  16. if (isSplitable(fs, path)) { 
  17. // 這里獲取的不是文件本身的大小,它的大小從上面的length就可以知道,這里獲取的是HDFS文件塊(跟文件本身沒有關(guān)系)的大小 
  18. // HDFS文件塊的大小由兩個(gè)參數(shù)決定,分別是 dfs.block.size 和 fs.local.block.size 
  19. // 在HDFS集群模式下,由 dfs.block.size 決定,對(duì)于Hadoop2.0來說,默認(rèn)值是128MB 
  20. // 在HDFS的local模式下,由 fs.local.block.size 決定,默認(rèn)值是32MB 
  21. long blockSize = file.getBlockSize(); // 128MB 
  22.  
  23. // 這里計(jì)算splitSize,根據(jù)前面計(jì)算的goalSize=419MB,minSize為1 
  24. long splitSize = computeSplitSize(goalSize, minSize, blockSize); 
  25.  
  26. long bytesRemaining = length; 
  27. // 如果文件大小大于splitSize,就按照splitSize對(duì)它進(jìn)行分塊 
  28. // 由此可以看出,這里是為了并行化更好,所以按照splitSize會(huì)對(duì)文件分的更細(xì),因而split會(huì)更多 
  29. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
  30. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 
  31. length-bytesRemaining, splitSize, clusterMap); 
  32. splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
  33. splitHosts[0], splitHosts[1])); 
  34. bytesRemaining -= splitSize; 
  35.  
  36. if (bytesRemaining != 0) { 
  37. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 
  38. - bytesRemaining, bytesRemaining, clusterMap); 
  39. splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 
  40. splitHosts[0], splitHosts[1])); 
  41. else { 
  42. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 
  43. splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 
  44. else { 
  45. //Create empty hosts array for zero length files 
  46. splits.add(makeSplit(path, 0, length, new String[0])); 

從上面可以看到,splitSize是從 computeSplitSize(goalSize, minSize, blockSize) 計(jì)算來的,這三個(gè)參數(shù)我們都知道大小,那么計(jì)算規(guī)則是怎么樣的呢

規(guī)則:Math.max(minSize, Math.min(goalSize, blockSize)),從而我們可以知道 splitSize = 128MB,對(duì)于3MB~4MB的小文件來說,就 決定了一個(gè)小文件就是一個(gè)split了,從而對(duì)應(yīng)了一個(gè)Spark的partition,所以我們一個(gè)分區(qū)下就有200個(gè)partition,當(dāng)取兩個(gè)月的數(shù)據(jù)時(shí),就是 200 * 30 * 2 = 12000,從而是12000個(gè)Task,跟同事所說的吻合!

> 而從TextInputFormat里分Split的邏輯來看,它只會(huì)把一個(gè)文件分得越來越小,而不會(huì)對(duì)小文件采取合并,所以無論調(diào)整哪個(gè)參數(shù),都沒法改變這種情況!而通過repartition強(qiáng)行分區(qū),也是在拿到HDFS文件之后對(duì)這12000個(gè)partition進(jìn)行重分區(qū),改變不了小文件的問題,也無法改變讀取Hive表Task數(shù)多的情況

總結(jié)

1、Block是物理概念,而Split是邏輯概念,***數(shù)據(jù)的分片是根據(jù)Split來的。一個(gè)文件可能大于BlockSize也可能小于BlockSize,大于它就會(huì)被分成多個(gè)Block存儲(chǔ)到不同的機(jī)器上,SplitSize可能大于BlockSize也可能小于BlockSize,SplitSize如果大于BlockSize,那么一個(gè)Split就可能要跨多個(gè)Block。對(duì)于數(shù)據(jù)分隔符而言,不用擔(dān)心一個(gè)完整的句子分在兩個(gè)Block里,因?yàn)樵赟plit構(gòu)建RecordReader時(shí),它會(huì)被補(bǔ)充完整

2、對(duì)于采用 org.apache.hadoop.mapred.TextInputFormat 作為InputFormat的Hive表,如果存在小文件,Spark在讀取的時(shí)候單憑調(diào)參數(shù)和repartition是改變不了分區(qū)數(shù)的!對(duì)于小文件的合并,目前除了Hadoop提供的Archive方式之外,也只能通過寫MR來手動(dòng)合了,***的方式還是寫數(shù)據(jù)的時(shí)候自己控制reduce的個(gè)數(shù),把握文件數(shù)

3、對(duì)于Spark直接通過SparkContext的 textFile(inputPath, numPartitions) 方法讀取HDFS文件的,它底層也是通過HadoopRDD構(gòu)建的,它的參數(shù)numPartitions就是上面計(jì)算goalSize的numSplits參數(shù),這篇 文章 對(duì)原理描述的非常詳細(xì),非常值得一讀

4、對(duì)于小文件合并的InputFormat有 org.apache.hadoop.mapred.lib.CombineFileInputFormat,跟它相關(guān)的參數(shù)是 mapreduce.input.fileinputformat.split.maxsize,它用于設(shè)置一個(gè)Split的***值

5、跟mapred.TextInputFormat 里的Split劃分相關(guān)的參數(shù)

  • mapreduce.input.fileinputformat.split.minsize : 決定了計(jì)算Split劃分時(shí)的minSize
  • mapreduce.job.maps 或 mapred.map.tasks : 決定了getSplits(JobConf job, int numSplits)方法里的numSplits,從而可以影響goalSize的大小
  • dfs.block.size 或 fs.local.block.size : 決定了HDFS的BlockSize

6、MapReduce新版API里的 org.apache.hadoop.mapreduce.lib.input.TextInputFormat,它的SplitSize與上面說到的計(jì)算方式不一樣,getSplits方法的簽名為 getSplits(JobContext job),不再有numSplilts這個(gè)參數(shù),splitSize的計(jì)算規(guī)則改為 Math.max(minSize, Math.min(maxSize, blockSize)),minSize和blockSize跟之前一樣,新的maxSize為conf.getLong("mapreduce.input.fileinputformat.split.maxsize", Long.MAX_VALUE)

7、在Spark2.0.0里,設(shè)置Hadoop相關(guān)的參數(shù)(比如mapreduce開頭的)要通過 spark.sparkContext.hadoopConfiguration 來設(shè)置

責(zé)任編輯:未麗燕 來源: 網(wǎng)絡(luò)大數(shù)據(jù)
相關(guān)推薦

2023-05-11 00:17:44

分區(qū)HiveReduce

2017-09-25 16:21:30

Spark on yacluster模式

2020-08-13 14:58:06

Spark小文件存儲(chǔ)

2021-07-14 09:48:15

Linux源碼Epoll

2021-07-15 14:27:47

LinuxSocketClose

2019-04-17 14:44:42

Spark內(nèi)存源碼

2019-10-10 16:20:23

spark內(nèi)存管理

2021-03-10 08:20:54

設(shè)計(jì)模式OkHttp

2017-06-26 15:00:17

2017-04-05 20:00:32

ChromeObjectJS代碼

2021-06-10 09:52:33

LinuxTCPAccept

2020-10-10 07:00:16

LinuxSocketTCP

2023-01-31 10:22:00

HiveMapReduce文件合并

2018-02-02 15:48:47

ChromeDNS解析

2023-03-30 09:06:20

HiveSpark大數(shù)據(jù)

2010-12-10 08:51:13

Web 2.0Cache集群

2021-04-12 06:08:16

HiveSpark大數(shù)據(jù)

2020-09-23 12:32:18

網(wǎng)絡(luò)IOMySQL

2017-02-09 15:15:54

Chrome瀏覽器

2021-06-08 09:18:54

SQLPandas數(shù)據(jù)透視表
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)