從HDFS和MapReduce兩方面了解Hadoop
簡介
Hadoop 是一個能夠?qū)Υ罅繑?shù)據(jù)進行分布式處理的軟件框架,框架最核心的設(shè)計就是:HDFS 和 MapReduce。HDFS 為海量的數(shù)據(jù)提供了存儲,而 MapReduce 則為海量的數(shù)據(jù)提供了計算。這篇文章就主要從 HDFS 和 MapReuce 兩個大的方面展開對 Hadoop 講解,當然為了直觀的測試 HDFS 提供的豐富的 API 以及我們編寫的 MapReduce 程序,在閱讀下面的內(nèi)容之前,你需要準備一臺安裝了 Hadoop 的機器(也可以是虛擬機),如果你還沒有安裝的話,可以參考《在 Ubuntu 上安裝 Hadoop》。
HDFS
HDFS 概念
在說 HDFS 之前我們先來解釋一下什么是 DFS,DFS 的全稱是 Distributed File System,翻譯過來就是分布式文件系統(tǒng),而 HDFS 就是 Hadoop 自帶的分布式文件系統(tǒng)。
相關(guān)名詞
為了后面大家更容易理解文章,這里使用一定的篇幅來簡單的介紹一下與 HDFS 相關(guān)的一些組件或者名詞的概念。
- NameNode,管理節(jié)點,管理系統(tǒng)的命名空間,維護著整個文件系統(tǒng)的結(jié)構(gòu)和目錄信息,通常情況下一個 Hadoop 集群只會有一個工作的 NameNode。
- DataNode,工作節(jié)點,文件系統(tǒng)的工作節(jié)點,主要是根據(jù)需要進行存儲或者檢索數(shù)據(jù)塊,并且定期向 NameNode 報告它們所存儲的數(shù)據(jù)塊列表。
- 數(shù)據(jù)塊,同我們常使用的磁盤上的文件系統(tǒng)一樣,HDFS 也有數(shù)據(jù)塊的概念,默認的大小為 128M。
- 塊緩存,一般情況下,我們通過 HDFS 從 DataNode 中檢索數(shù)據(jù)時,DataNode 都是從磁盤中讀取,但是對于訪問很頻繁的文件,它所對于的數(shù)據(jù)塊可能會被緩存到 DataNode 的內(nèi)存中,以加快讀取速度,這就是所謂的塊緩存。
- 聯(lián)邦 HDFS,其實這個就是為了解決 Hadoop 不適合存儲數(shù)量龐大的文件的問題,同時由多個 NameNode 來維護整個文件系統(tǒng)的系統(tǒng)樹以及文件和目錄,每個 NameNode 負責管理文件系統(tǒng)命名空間中的一部分。
特性
下面我們就一起來看下 HDFS 有哪些特性:
- 存儲超大文件,由于 HDFS 是分布式的文件系統(tǒng),所以不受單臺機器的存儲大小的限制,可以存儲超大文件,目前已經(jīng)達到了 PB 級了。
- 流式訪問數(shù)據(jù)。
- HDFS 對硬件的要求并不是很高,可以運行在廉價的商用硬件上。
- 不適合低延遲的數(shù)據(jù)訪問,由于 Hadoop 的流式數(shù)據(jù)訪問,訪問數(shù)據(jù)會有寫延遲,所以不太適合低時間延遲的數(shù)據(jù)訪問,一般情況下這種需求我們會使用關(guān)系型數(shù)據(jù)庫來實現(xiàn)。
- 不適合大量的小文件存儲,原因是 NameNode 將文件系統(tǒng)的元數(shù)據(jù)存儲在內(nèi)存中,每存儲一個文件都需要在 NameNode 中存儲該文件的目錄、存儲在哪個 DataNode 中等等的數(shù)據(jù)。所以如果文件的數(shù)量達到數(shù)十億的話 NameNode 的內(nèi)存很可能不夠用了。
- 不支持多用戶寫入,以及任意的修改文件,只可以在文件末尾添加內(nèi)容。
HDFS 的命令行操作
命令行接口是 HDFS 所有類型的接口中最簡單的,也是每個開發(fā)者都必須要掌握的。下面我們就列舉幾個簡單的命令行操作,但是在操作前你必須按照***章的內(nèi)容安裝好了 Hadoop,并且啟動了 HDFS。
創(chuàng)建目錄。
- 清單 1. 創(chuàng)建目錄命令
- hadoop fs -mkdir /test
查看目錄。
- 清單 2. 創(chuàng)建目錄命令
- hadoop fs -ls /
上傳文件,緊跟-put 后面的 test.txt 是要推送到 HDFS 中的文件,/test 是指定要推送到 HDFS 上哪個目錄下面。
- 清單 3. 上傳文件
- hadoop fs -put test.txt /test
刪除文件。
- 清單 4. 上傳文件
- hadoop fs -rm /test/test.txt
其實通過上面例舉的幾個命令我們可以看出 HDFS 的文件操作命令幾乎和 Linux 上的命令一致,這樣我們使用起來會很容易上手。
HDFS 的 JavaAPI
在 Java 項目中使用 HDFS 提供的 API 我們需要依賴 hadoop-common 和 hadoop-hdfs 兩個包,為了方便測試,我們這里還引入了 junit,篇幅原因這里就不對項目本身做太多的講解,這里附上項目源碼地址供大家參考。
讀取 HDFS 中文件的內(nèi)容。
- 清單 5. JavaApi 讀取文件內(nèi)容
- @Test
- public void read() throws IOException {
- // 文件地址。
- URI uri = URI.create("/test/test.txt");
- // 用于接收讀取的數(shù)據(jù)流。
- FSDataInputStream in = null;
- try {
- in = fs.open(new Path(uri));
- // ***的一個 boolean 類型的參數(shù)是指是否在調(diào)用結(jié)束后關(guān)閉流,我們這里選擇在 finally 里面手動關(guān)閉。
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
不出意外的話,你可以在控制臺看到你指定文件的內(nèi)容。在這一步我遇到一個問題,就是無法直接在 windows 下操作 HDFS,具體的解決方法可以參照這篇文章。FSDataInputStream.seek()方法還可以實現(xiàn)從文件輸入流的任意一個絕對位置讀取文件內(nèi)容,比如我們可以在上面代碼中添加如下的內(nèi)容來實現(xiàn)在控制臺重復打印文件內(nèi)容。
- 清單 6. JavaApi 任意位置讀取文件內(nèi)容
- in.seek(0);
- tils.copyBytes(in, System.out, 4096, false);
創(chuàng)建目錄。
- 清單 7. JavaApi 創(chuàng)建目錄
- @Test
- public void mkdir() throws IOException {
- fs.mkdirs(new Path("/test/api"));
- }
查詢文件目錄。
- 清單 8. JavaApi 查詢文件目錄
- @Test
- public void ls() throws IOException {
- FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
- if (null == fileStatuses || fileStatuses.length == 0) {
- return;
- }
- for (FileStatus fileStatus : fileStatuses) {
- System.out.println(fileStatus.getPath() + " " + fileStatus.getPermission());
- }
- }
這里引入一個類 FileStatus,這個類封裝了 HDFS 中文件和目錄的元數(shù)據(jù),包括文件長度、塊大小、復本、修改時間、所有者以及權(quán)限信息。FileSystem 里面提供的 listStatus 方法可以獲取一個目錄下的所有目錄或者文件的 FileStatus,但是它不會遞歸獲取下級目錄的內(nèi)容,這里可以開發(fā)你的想象自己實現(xiàn)一下(Tips:fileStatus.isDirectory()可以判斷這個 fileStatus 是否是一個文件夾)。
刪除文件或目錄。
- 清單 9. JavaApi 刪除文件或目錄
- @Test
- public void delete() throws IOException {
- fs.delete(new Path("/test/api"), false);
- }
- @Test
- public void deleteNonEmptyDir() throws IOException {
- fs.delete(new Path("/test"), true);
- }
我們可以看到 fs.delete()這個方法有兩個參數(shù),***個參數(shù)很好理解,就是我們要刪除的目錄或者文件的地址。那么第二個 Boolean 類型的參數(shù)呢,如果刪除的是文件或者空目錄這個參數(shù)實際上是會被忽略的,如果刪除的是非空目錄,只有在這個參數(shù)值為 true 的時候才會成功刪除。
創(chuàng)建文件和文件寫入。
我們通過 FileSystem.create()方法來創(chuàng)建一個文件,這個方法會順帶著創(chuàng)建不存在的父級目錄,如果不需要這個的話,***是在創(chuàng)建之前調(diào)用 exists()方法來判斷一下,如果父級目錄不存在直接報錯即可。
- 清單 10. JavaApi 創(chuàng)建文件和文件寫入
- @Test
- public void create() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.create(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- IOUtils.closeStream(out);
- }
- }
文件創(chuàng)建好后,可以通過 append()方法在文件末尾添加內(nèi)容。
- 清單 11. JavaApi 追加文件內(nèi)容
- @Test
- public void append() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.append(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- out.close();
- }
- }
從本地上傳文件到 HDFS。
- 清單 12. JavaApi 上傳文件至 HDFS
- @Test
- public void copyFromLocal() throws IOException {
- fs.copyFromLocalFile(new Path("d:/local.txt"), new Path("/test/api"));
- }
從 HDFS 上下載文件。
- 清單 13. JavaApi 從 HDFS 下載文件
- @Test
- public void copyToLocal() throws IOException {
- fs.copyToLocalFile(new Path("/test/api/local.txt"), new Path("E:/"));
- }
MapReduce 實戰(zhàn)
什么是 MapReduce
MapReduce 是一種編程模型,"Map(映射)"和"Reduce(歸約)",是它們的主要思想,我們通過 Map 函數(shù)來分布式處理輸入數(shù)據(jù),然后通過 Reduce 匯總結(jié)果并輸出。我們編寫一個 MapReduce 程序的一般步驟是:
- 編寫 map 程序。
- 編寫 reduce 程序。
- 編寫程序驅(qū)動。
本章節(jié)的目標
本節(jié)中我們將使用 MapReduce 框架來編寫一個簡單的例子,這個例子是用來統(tǒng)計 HDFS 指定目錄下的文件中每個字符出現(xiàn)的次數(shù)并將統(tǒng)計結(jié)果輸出到 HDFS 的指定目錄中。點擊此處獲取本章節(jié)源代碼。
Map 程序
我們繼承 Mapper 類并重寫了其 map 方法。Map 階段輸入的數(shù)據(jù)是從 hdfs 中拿到的原數(shù)據(jù),輸入的 key 為某一行起始位置相對于文件起始位置的偏移量,value 為該行的文本。輸出的內(nèi)容同樣也為鍵-值對,這個時候輸出數(shù)據(jù)的鍵值對的類型可以自己指定,在本例中 key 是 Text 類型的,value 是 LongWritable 類型的。輸出的結(jié)果將會被發(fā)送到 reduce 函數(shù)進一步處理。
- 清單 14. Map 程序
- public class CharCountMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // 將這一行文本轉(zhuǎn)為字符數(shù)組
- char[] chars = value.toString().toCharArray();
- for (char c : chars) {
- // 某個字符出現(xiàn)一次,便輸出其出現(xiàn) 1 次。
- context.write(new Text(c + ""), new LongWritable(1));
- }
- }
- }
Reduce 程序
我們繼承 Reducer 類并重寫了其 reduce 方法。在本例中 Reduce 階段的輸入是 Map 階段的輸出,輸出的結(jié)果可以作為最終的輸出結(jié)果。相信你也注意到了,reduce 方法的第二個參數(shù)是一個 Iterable,MapReduce 會將 map 階段中相同字符的輸出匯總到一起作為 reduce 的輸入。
- 清單 15. Reduce 程序
- public class CharCountReducer extends Reducer< Text, LongWritable, Text, LongWritable> {
- @Override
- protected void reduce(Text key, Iterable< LongWritable> values, Context context)
- throws IOException, InterruptedException {
- long count = 0;
- for (LongWritable value : values) {
- count += value.get();
- }
- context.write(key, new LongWritable(count));
- }
- }
驅(qū)動程序
到目前為止,我們已經(jīng)有了 map 程序和 reduce 程序,我們還需要一個驅(qū)動程序來運行整個作業(yè)。可以看到我們在這里初始化了一個 Job 對象。Job 對象指定整個 MapReduce 作業(yè)的執(zhí)行規(guī)范。我們用它來控制整個作業(yè)的運作,在這里我們指定了 jar 包位置還有我們的 Map 程序、Reduce 程序、Map 程序的輸出類型、整個作業(yè)的輸出類型還有輸入輸出文件的地址。
- 清單 16. 驅(qū)動程序
- public class CharCountDriver {
- public static void main(String[] args) throws Exception {
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- // Hadoop 會自動根據(jù)驅(qū)動程序的類路徑來掃描該作業(yè)的 Jar 包。
- job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
- // 指定 mapper
- job.setMapperClass(CharCountMapper.class);
- // 指定 reducer
- job.setReducerClass(CharCountReducer.class);
- // map 程序的輸出鍵-值對類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 輸出鍵-值對類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 輸入文件的路徑
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- // 輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean res = job.waitForCompletion(true);
- System.exit(res?0:1);
- }
- }
執(zhí)行 MapReduce 作業(yè)
打包作業(yè),我們需要將我們的 MapReduce 程序打成 jar 包。
- 清單 17. 打包作業(yè)
- mvn package -Dmaven.test.skip=true
將 jar 包復制到 hadoop 機器上。
在 HDFS 上準備好要統(tǒng)計的文件,我準備的文件在 HDFS 上的/mr/input/目錄下,內(nèi)容為"hello hadoop hdfs.I am coming."。
執(zhí)行 jar。
- 清單 18. 執(zhí)行作業(yè)
- hadoop jar mr-test-1.0-SNAPSHOT.jar cn.itweknow.mr.CharCountDriver /mr/input/ /mr/output/out.txt
查看結(jié)果。
我們先看看輸出目錄,結(jié)果如下,最終輸出的結(jié)果就存放在/mr/output/part-r-00000 文件中。
圖 1. MapReduce 作業(yè)輸出目錄
然后我們再看看輸出文件中的具體內(nèi)容,如下所示:
圖 2. MapReduce 作業(yè)輸出結(jié)果
MapReduce 運行原理
我們可以將一個 MapReduce 作業(yè)的運行過程簡單的拆分成 6 個過程,分別是作業(yè)的提交、作業(yè)初始化、任務(wù)分配、任務(wù)執(zhí)行、進度和狀態(tài)的更新、作業(yè)完成。下面我就一起來具體了解下這么幾個步驟。
作業(yè)的提交
當我們調(diào)用 job.submit()或者 job.waitForCompletion()方法(其內(nèi)部也會調(diào)用 submit()方法)的時候,會創(chuàng)建一個 JobSubmitter 對象,在 JobSubmitter 內(nèi)部所實現(xiàn)的作業(yè)提交過程如下:
- 向資源管理器請求新的應(yīng)用 ID 作為 MapReduce 作業(yè)的作業(yè) ID。
- 檢查作業(yè)的輸出目錄,如果沒有指定輸出目錄或者輸出目錄已經(jīng)存在就會拋出錯誤,這也就是為啥我們在執(zhí)行 MapReduce 作業(yè)時為啥需要保證指定的輸出目錄不存在。
- 將作業(yè)運行所需要的資源文件(作業(yè) JAR 包,配置文件,輸入分片)復制到一起(一個以作業(yè) ID 命名的目錄下)。
- 調(diào)用 submitApplication()方法提交作業(yè)。
作業(yè)的初始化
- 首先資源管理器會將作業(yè)請求傳遞給 YARN 調(diào)度器。
- 調(diào)度器會為作業(yè)分配一個容器。
- 資源管理器在節(jié)點管理器的管理下在容器中啟動 application master。
- application master 的主類 MRAppMaster 會創(chuàng)建多個簿記對象來跟蹤作業(yè)的進度。
- 接收輸入分片。
- application master 為每個分片創(chuàng)建 map 任務(wù)以及確定 reduce 任務(wù),并且分配任務(wù) ID。
任務(wù)的分配
application master 會為創(chuàng)建的任務(wù)向資源管理器請求容器,先是為 map 任務(wù)請求資源,后為 reduce 任務(wù)請求資源。為 map 任務(wù)分配資源的時候需要考慮到數(shù)據(jù)本地化的局限,會盡量保證運行的 map 任務(wù)所需要的數(shù)據(jù)塊存儲在當前機器或者當前機架中,這樣會極大的節(jié)省帶寬資源。而 reduce 任務(wù)則不存在這個限制。
任務(wù)的執(zhí)行
- 資源管理器為任務(wù)分配好容器后,application master 就通過與節(jié)點管理器通信啟動容器。
- 在運行任務(wù)之前,會將任務(wù)所需要的資源本地化。
- 運行任務(wù)。
進度和狀態(tài)的更新
任務(wù)在運行的過程中,會對其精度保持追蹤,對與 map 任務(wù),其任務(wù)進度就是已經(jīng)處理的輸入所占總輸入的比例。對與 reduce 任務(wù)來講就比較復雜了,因為這個部分包含資源復制階段、排序階段和 reduce 階段三個階段,每個階段都占整個完成比例的 1/3,也就是說當我們完成 reduce 的一半的時候進度應(yīng)該為 5/6。對與狀態(tài)的更新,客戶端會每秒輪詢一次 application master 以接收***的任務(wù)狀態(tài)。
作業(yè)的完成
當 application master 收到作業(yè)***一個任務(wù)已經(jīng)完成的通知后,便把作業(yè)的狀態(tài)設(shè)置為"成功"。
為了方便大家理解,我這里將整個過程總結(jié)為一張圖,貼在這里僅供大家參考。
圖 3. MapReduce 程序運行圖解
Shuffle
簡介,什么是 Shuffle
MapReduce 程序會確保每個 reduce 函數(shù)的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序以及將 map 函數(shù)的輸出傳給 reduce 函數(shù)的過程稱之為 shuffle。整個 Shuffle 分為 Map 端和 Reduce 端,下圖是 MapReduce 的 Shuffle 的一個整體概覽圖,大家先看一下整個圖,我們后面再做進一步的解釋說明。
圖 4. Shuffle 概覽圖
Map 端 Shuffle
其實 Map 函數(shù)產(chǎn)生的輸出會寫到磁盤上而不是 HDFS。但是它也不是簡簡單單的直接寫到磁盤,這中間有一個復雜的過程,下面我們就來拆解一下。
從上面的圖可以看到每個 Map 任務(wù)都會有一個緩沖區(qū),這個緩沖區(qū)會臨時存儲 map 函數(shù)輸出的內(nèi)容,緩沖區(qū)的個大小默認是 100M,我們可以通過 mapreduce.task.io.sort.mb 這個配置項配置,當緩沖區(qū)中的內(nèi)容達到其設(shè)定的閾值(閾值的設(shè)置值是占整個緩沖區(qū)的大小,默認為 0.8,我們可以通過 mapreduce.map.sort.spill.percent 來配置)時就會產(chǎn)生溢出,這個時候會有一個后臺線程將緩沖區(qū)中的內(nèi)容分區(qū)(根據(jù)最終要傳給的 Reduce 任務(wù)分成不同的區(qū),分區(qū)的目的是將輸出劃分到不同的 Reducer 上去,后面的 Reducer 就會根據(jù)分區(qū)來讀取自己對應(yīng)的數(shù)據(jù))
然后區(qū)內(nèi)按照 key 排序,如果我們設(shè)置了 Combiner(Combiner 的本質(zhì)也是一個 Reducer,其目的是對將要寫入到磁盤上的文件先進行一次處理,這樣,寫入到磁盤的數(shù)據(jù)量就會減少。)
的話,這個時候會運行 Combiner 函數(shù),***再寫入磁盤。而在這個過程中 Map 任務(wù)還會繼續(xù)往緩沖區(qū)中輸出內(nèi)容,如果出現(xiàn)緩沖區(qū)空間被占滿的情況,Map 任務(wù)就會阻塞直到緩沖區(qū)中的內(nèi)容被全部寫到磁盤中為止。
每次緩沖區(qū)溢出時都會新建一個新的溢出文件,這樣***其實是會出現(xiàn)多個溢出文件的,在 Map 任務(wù)結(jié)束前這些溢出文件會被合并到一個整的輸出文件。
Reduce 端 Shuffle
Reduce 端的 Shuffle 分為三個階段,復制階段、合并階段和 Reduce。
首先是復制階段,Reduce 任務(wù)需要集群上若干個 map 輸出作為其輸入內(nèi)容,在每個 Map 任務(wù)完成的時候 Reduce 任務(wù)就開復制其輸出,上面也提到過 Map 任務(wù)在寫入磁盤前會將輸出進行根據(jù) Reduce 任務(wù)進行分區(qū),所以這里 Reduce 任務(wù)在復制的時候只會復制自己的那個分區(qū)里的內(nèi)容。如果 Map 的輸出非常小,那么 Reduce 會直接將其復制到內(nèi)存中,否則會被復制到磁盤。
合并階段,因為有很多的 Map 任務(wù),所以 Reduce 復制過來的 map 輸出會有很多個,在這個階段主要就是將這些 Map 輸出合并成為一個文件。
Reduce 階段,這個階段主要就是執(zhí)行我們的 Reduce 函數(shù)的代碼了,并產(chǎn)生最終的結(jié)果,然后寫入到 HDFS 中。
在文章的***,提供我在撰寫本文的過程中所編寫的一些源代碼,供大家參考。也希望大家能夠從本文中收獲一些幫助。
結(jié)束語
本文主要從 HDFS 和 MapReduce 兩個大的方面講解了 Hadoop 的相關(guān)知識,并且編寫了一個簡單的 MapReduce 程序,***還深入了解了一下 MapReduce 程序的運行原理以及 Shuffle 相關(guān)的內(nèi)容。