Hadoop 生態(tài)之 MapReduce 及 Hive 簡介
1.計算框架
Hadoop 是一個計算框架,目前大型數(shù)據(jù)計算框架常用的大致有五種:
- 僅批處理框架:Apache hadoop.
- 僅流處理框架:Apache Storm、Apache Samza.
- 混合框架:Apache Spark、Apache Flink.
這其中名氣最大、使用最廣的當(dāng)屬 Hadoop 和 Spark。
雖然兩者都被稱為大數(shù)據(jù)框架,但實際層級不同。Hadoop 是一個分布式數(shù)據(jù)基礎(chǔ)設(shè)施,包括計算框架 MapReduce、分布式文件系統(tǒng) HDFS、YARN 等。而Spark 是專門用來對分布式存儲的大數(shù)據(jù)的處理工具,并不會進行數(shù)據(jù)存儲,更像是 MapReduce 的替代。
在使用場景上,Hadoop 主要用于離線數(shù)據(jù)計算,Spark更適用于需要精準實時的場景。本文主要介紹 Hadoop,對 Spark 不做討論。
本篇文章可承接知識庫 Hadoop之HDFS (https://gitlab.aihaisi.com/docs/docs/issues/516) ,介紹下 Hadoop 另一重要組件 MapReduce,以及 Hive。
2. MapReduce
2.1 MapReduce 是什么
一個基于 Java 的并行分布式計算框架。
前文有提到 HDFS 提供了基于主從結(jié)構(gòu)的分布式文件系統(tǒng),基于此存儲服務(wù)支持,MapReduce 可以實現(xiàn)任務(wù)的分發(fā)、跟蹤、執(zhí)行等工作,并收集結(jié)果。
2.2 MapReduce 組成
MapReduce 主要思想講的通俗一點就是將一個大的計算拆分成 Map(映射)和 Reduce(化簡)。說到這里,其實 JAVA8 在引入 Lambda 后,也有 map 和 reduce 方法。下面是一段 Java 中的用法:
- List<Integer> nums = Arrays.asList(1, 2, 3);
- List<Integer> doubleNums = nums.stream().map(number -> number * 2).collect(Collectors.toList());
- 結(jié)果:[2,4,6]
- Optional<Integer> sum = nums.stream().reduce(Integer::sum);
- 結(jié)果:[6]
代碼很簡單,map 負責(zé)歸類,reduce 負責(zé)計算。而 Hadoop 中的 MapReduce 也有異曲同工之處。
下面結(jié)合官方案例 WordCount 進行分析:
- public class WordCount {
- // Mapper泛型類,4個參數(shù)分別代表輸入鍵、值,輸出鍵、值類型
- public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- // 字符解析
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- // nextToken():返回從當(dāng)前位置到下一個分隔符的字符串
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
- // Reducer同樣也是四個參數(shù)
- public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values,Context context) throws
- IOException,InterruptedException {
- int sum = 0;
- // 循環(huán)values,并記錄“單詞”個數(shù)
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
在這段代碼中,不難看出程序核心是 map 函數(shù)和 reduce 函數(shù)。是否 MapReduce 就是由這兩者組成的?接著往下看。
2.3 Map 和 Reduce
2.3.1 Map
在 WordCount 案例中,明顯看到 map 函數(shù)的輸入主要是一個
Context 在這里暫時性忽略,其是 Mapper 類的內(nèi)部抽象類,一般計算中不會用到,可以先當(dāng)做“上下文”理解。
map 函數(shù)計算過程是: 將這行文本中的單詞提取出來,針對每個單詞輸出一個
2.3.2 Reduce
接著就來看看 reduce ,這里輸入?yún)?shù) Values 就是上面提到的由很多個 1 組成的集合,而 Key 就是具體“單詞” word。
它的計算過程是: 將集合里的1求和,再將單詞(word)與這個和(sum)組成一個
假設(shè)有兩個數(shù)據(jù)塊的文本數(shù)據(jù)需要進行詞頻統(tǒng)計,MapReduce 計算過程如下圖所示:

到這都很容易理解,畢竟只是個 HelloWorld 的例子~,但整個MapReduce過程中最關(guān)鍵的部分其實是在 map 到 reduce 之間。
還拿上面例子來說:統(tǒng)計相同單詞在所有輸入數(shù)據(jù)中出現(xiàn)的次數(shù),一個 Map 只能處理一部分數(shù)據(jù),而熱點單詞就很可能會出現(xiàn)在所有 Map 中了,意味著同一單詞必須要合并到一起統(tǒng)計才能得到正確結(jié)果。這種數(shù)據(jù)關(guān)聯(lián)幾乎在所有的大數(shù)據(jù)計算場景都需要處理,如果是例子這種的當(dāng)然只對 Key 合并就OK了,但類似數(shù)據(jù)庫 join 操作這種較復(fù)雜的,就需對兩種類型(或更多)的數(shù)據(jù)依據(jù) Key 關(guān)聯(lián)。
這個數(shù)據(jù)關(guān)聯(lián)操作在 MapReduce中的叫做:shuffle。
2.4 shuffle
shuffle 從字面意思來看,洗牌。下面是一個完整的MR過程,看一看如何洗牌。

先看左半邊
1. 從 HDFS 中讀取數(shù)據(jù),輸入數(shù)據(jù)塊到一個個的 map,其中 map 完成計算時,計算結(jié)果會存儲到本地文件系統(tǒng)。而當(dāng) map 快要進行完時,就會啟動 shuffle 過程。
2. 如圖,shuffle 也可分為兩種,在Map端的是 Map shuffle。大致過程為:Map 任務(wù)進程會調(diào)用一個 Partitioner 接口,對 Map 產(chǎn)生的每個
這里就實現(xiàn)了對 Map 結(jié)果的分區(qū)、排序、分割,以及將同一分區(qū)的輸出合并寫入磁盤,得到一個分區(qū)有序的文件。這樣不管 Map 在哪個服務(wù)器節(jié)點,相同的 Key 一定會被發(fā)送給相同 Reduce 進程。Reduce 進程對收到的
再看右半邊
1. Reduce shuffle,又可分為復(fù)制 Map 輸出、排序合并兩階段。
- Copy:Reduce 任務(wù)從各個 Map 任務(wù)拖取數(shù)據(jù)后,通知父 TaskTracker 狀態(tài)已更新,TaskTracker 通知 JobTracker。Reduce 會定期向JobTracker 獲取 Map 的輸出位置,一旦拿到位置,Reduce 任務(wù)會從此輸出對應(yīng)的 TaskTracker 上復(fù)制輸出到本地,不會等到所有的Map任務(wù)結(jié)束。
- Merge sort:
- Copy 的數(shù)據(jù)先放入內(nèi)存緩沖區(qū),若緩沖區(qū)放得下就把數(shù)據(jù)寫入內(nèi)存,即內(nèi)存到內(nèi)存 merge。
- Reduce 向每個 Map 去拖取數(shù)據(jù),內(nèi)存中每個 Map 對應(yīng)一塊數(shù)據(jù),當(dāng)內(nèi)存緩存區(qū)中存儲的數(shù)據(jù)達到一定程度,開啟內(nèi)存中 merge,把內(nèi)存中數(shù)據(jù)merge 輸出到磁盤文件中,即內(nèi)存到磁盤 merge。
- 當(dāng)屬于該 reduce 的 map 輸出全部拷貝完成,會在 reduce 上生成多個文件,執(zhí)行合并操作,即磁盤到磁盤 merge。此刻 Map 的輸出數(shù)據(jù)已經(jīng)是有序的,Merge 進行一次合并排序,所謂 Reduce 端的 sort 過程就是這個合并的過程。
2. 經(jīng)過上一步Reduce shuffle后,reduce進行最后的計算,將輸出寫入HDFS中。
以上便是 shuffle 大致四個步驟,關(guān)鍵是 map 輸出的 shuffle 到哪個 Reduce 進程,它由 Partitioner 來實現(xiàn),MapReduce 框架默認的 Partitioner 用 Key 哈希值對 Reduce 任務(wù)數(shù)量取模,相同 Key 會落在相同的 Reduce 任務(wù) ID 上。
- public int getPartition(K2 key, V2 value, int numReduceTasks) {
- return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
如果對 Shuffle 總結(jié)一句話: 分布式計算將不同服務(wù)器中的數(shù)據(jù)合并到一起進行后續(xù)計算的過程。
shuffle 是大數(shù)據(jù)計算過程中神奇的地方,不管是 MapReduce 還是 Spark,只要是大數(shù)據(jù)批處理計算,一定會有 shuffle 過程,只有讓數(shù)據(jù)關(guān)聯(lián)起來,它的內(nèi)在關(guān)系和價值才會呈現(xiàn)。
3. Hive
上一部分介紹了 MapReduce,接下來簡單談?wù)?Hive .
我覺得任何一項技術(shù)的出現(xiàn)都是為了解決某類問題, MapReduce 毫無疑問簡化了大數(shù)據(jù)開發(fā)的編程難度。但實際上進行數(shù)據(jù)計算更常用的手段可能是 SQL,那么有沒有辦法直接運行 SQL ?
3.1 Hive是什么
基于Hadoop的一個數(shù)據(jù)倉庫系統(tǒng),定義了一種類SQL查詢語言:Hive SQL。
這里有一個名詞 數(shù)據(jù)倉庫,數(shù)據(jù)倉庫是指:面向主題(Subject Oriented)、集成(Integrated)、相對穩(wěn)定(Non-Volatile)、反應(yīng)歷史變化(Time Variant)的數(shù)據(jù)集合,用于支持管理決策。
這么說可能有點抽象,分解一下:
- 主題:數(shù)據(jù)倉庫針對某個主題來進行組織,指使用數(shù)據(jù)倉庫決策時所關(guān)心的重點方面。比如訂閱分析就可以當(dāng)做一個主題。
- 集成:數(shù)據(jù)倉庫要將多個數(shù)據(jù)源數(shù)據(jù)存到一起,但數(shù)據(jù)以前的存儲方式不同,要經(jīng)過抽取、清洗、轉(zhuǎn)換。(也就是 ETL)
- 穩(wěn)定:保存的數(shù)據(jù)是一系列歷史快照,不允許修改,只能分析。
- 時變:會定期接收到新的數(shù)據(jù),反應(yīng)出新的數(shù)據(jù)變化。
現(xiàn)在再看下定義:數(shù)據(jù)倉庫是將多個數(shù)據(jù)源的數(shù)據(jù)按照一定的主題集成,進行抽取、清洗、轉(zhuǎn)換。且處理整合后的數(shù)據(jù)不允許隨意修改,只能分析,還需定期更新。
3.2 為什么是 Hive
了解了 Hive 的基礎(chǔ)定義,想一下:一個依賴于 HDFS 的數(shù)據(jù)倉庫在 Hadoop 環(huán)境中可以扮演什么角色?
前面說到,可不可以讓 SQL 直接運行在 Hadoop 平臺,這里的答案便是 Hive。它可以將 Hive SQL 轉(zhuǎn)換為 MapReduce 程序運行。
Hive 初期版本默認 Hive on Mapreduce
啟動 hive 前通常要先啟動 hdfs 和 yarn, 同時一般需要配置 MySQL,Hive 依賴于 HDFS 的數(shù)據(jù)存儲,但為了能操作 HDFS 上的數(shù)據(jù)集,要知道數(shù)據(jù)切分格式、存儲類型、地址等。這些信息通過一張表存儲,稱為元數(shù)據(jù),可以存儲到 MySQL 中。
- 現(xiàn)在來看下 Hive 的部分命令
- 新建數(shù)據(jù)庫:create database xxx;
- 刪除數(shù)據(jù)庫:drop database xxx;
- 建表:
- create table table_name(col_name data_type);
- Hive 的表有兩個概念:**內(nèi)部表和外部表**。默認內(nèi)部表,簡單來說,內(nèi)部表數(shù)據(jù)存儲在每個表相應(yīng)的HDFS目錄下。外部表的數(shù)據(jù)存在別處,要刪除這個外部表,該外部表所指向的數(shù)據(jù)是不會被刪除的,只會刪除外部表對應(yīng)的元數(shù)據(jù)。
- 查詢:
- select * from t_table **where** a<100 **and** b>1000;
- 連接查詢:
- select a.*,b.* from t_a a join t_b b on a.name=b.name;
看到這里,可能會覺得我在寫 SQL, 沒錯,對于熟悉 SQL 的人來說,Hive 是非常易于上手的。
3.3 HIVE SQL To MapReduce
前面說到 HQL 可以‘轉(zhuǎn)換’為 MapReduce, 下面就來看看:一個 HQL 是如何轉(zhuǎn)化為 MapReduce 的Hive的基礎(chǔ)架構(gòu):

通過 Client 向 Hive 提交 SQL 命令。如果是 DDL,Hive 就會通過執(zhí)行引擎 Driver 將數(shù)據(jù)表的信息記錄在 Metastore 元數(shù)據(jù)組件中,這個組件通常用一個關(guān)系數(shù)據(jù)庫實現(xiàn),記錄表名、字段名、字段類型、關(guān)聯(lián) HDFS 文件路徑等 Meta 信息(元信息)。
如果是DQL,Driver 就會將該語句提交給自己的編譯器 進行語法分析、解析、優(yōu)化等一系列操作,最后生成一個 MapReduce 執(zhí)行計劃。再根據(jù)執(zhí)行計劃生成一個 MapReduce 的作業(yè),提交給 Hadoop 的 MapReduce 計算框架處理。
比如輸入一條 select xxx from a ; 其執(zhí)行順序為:首先在 metastore 查詢--> sql 解析--> 查詢優(yōu)化---> 物理計劃--> 執(zhí)行 MapReduce。
小結(jié)
本文大致闡述了什么是 MapReduce 及其組成和基本原理,同時也介紹了Hive。
其實在實踐中,并不需要常編寫 MapReduce 程序,主要的數(shù)據(jù)處理還是 SQL 分析,因此 Hive 在大數(shù)據(jù)應(yīng)用中的擁有很大的作用。