Hadoop 是什么?它是如何工作的?
Hadoop是什么?它是如何工作的?為什么 Hadoop可以成為全球最流行的大數(shù)據(jù)處理框架之一?如何基于 Hadoop搭建一套簡(jiǎn)單的分布式文件系統(tǒng)?這篇我們一起來(lái)來(lái)深入討論。
一、Hadoop是什么?
Hadoop是一個(gè)開(kāi)源的分布式計(jì)算框架,用于處理和存儲(chǔ)大規(guī)模數(shù)據(jù)集,它是由 Apache Software Foundation維護(hù),能夠幫助用戶(hù)在商用硬件集群上以可靠、高效、容錯(cuò)的方式處理和分析海量數(shù)據(jù)。為了更好地理解 Hadoop是什么,我們列舉了Hadoop一些里程碑:
- 2002年: Nutch項(xiàng)目啟動(dòng),目標(biāo)是實(shí)現(xiàn)全面的網(wǎng)頁(yè)抓取、索引和查詢(xún)功能。
- 2003年: Google發(fā)布了三篇具有影響力的論文(Google File System(GFS)、MapReduce和Bigtable),為 Hadoop的文件存儲(chǔ)架構(gòu)奠定了基礎(chǔ)。
- 2004年: Cutting在 Nutch中實(shí)現(xiàn)了類(lèi)似 GFS的功能,形成了后來(lái)的 Hadoop分布式文件系統(tǒng)(HDFS)。
- 2005年: Nutch項(xiàng)目中實(shí)現(xiàn)了 MapReduce的初步版本,隨后 Hadoop從 Nutch中分離出來(lái),成為一個(gè)獨(dú)立的開(kāi)源項(xiàng)目。
- 2006年: Yahoo!雇傭 Doug Cutting,并為Hadoop的發(fā)展提供支持,同年,Apache Hadoop項(xiàng)目正式啟動(dòng)。
- 2008年: Hadoop成為 Apache頂級(jí)項(xiàng)目,并迎來(lái)了快速發(fā)展。同年,Cloudera公司成立,推動(dòng) Hadoop商業(yè)化進(jìn)程。
- 從此,Hadoop迅猛發(fā)展,成為全球最流行的大數(shù)據(jù)處理框架之一。
二、Hadoop的核心組件
Hadoop的核心組件包括以下 4個(gè):
- HDFS:HDFS是 Hadoop的數(shù)據(jù)存儲(chǔ)層,它負(fù)責(zé)將大量數(shù)據(jù)分塊存儲(chǔ)到集群中的不同節(jié)點(diǎn)上,從而實(shí)現(xiàn)分布式保存和冗余備份。數(shù)據(jù)被切分成小塊,并復(fù)制到多個(gè)節(jié)點(diǎn),以防硬件故障。
- MapReduce:MapReduce是 Hadoop的分布式計(jì)算模型,它將處理大規(guī)模數(shù)據(jù)集的任務(wù)分發(fā)到多個(gè)節(jié)點(diǎn),允許并行處理。MapReduce由兩個(gè)階段組成:Map階段負(fù)責(zé)將任務(wù)分解為多個(gè)小任務(wù);Reduce階段負(fù)責(zé)對(duì)小任務(wù)的結(jié)果進(jìn)行匯總。
- YARN:YARN 是 Hadoop的資源管理層,它負(fù)責(zé)管理和調(diào)度集群中的計(jì)算資源。YARN允許多個(gè)作業(yè)在同一 Hadoop集群上并行執(zhí)行,這大大提高了 Hadoop集群的利用率和擴(kuò)展能力。
- Hadoop Common:Hadoop Common是 Hadoop的核心庫(kù),提供必要的工具和實(shí)用程序,用于支持其他 Hadoop模塊。
它們的關(guān)系如下:
接下來(lái)我們將對(duì)各個(gè)組件進(jìn)行詳細(xì)的分析。
1. HDFS
HDFS,全稱(chēng) Hadoop Distributed File System(分布式文件系統(tǒng)),它是 Hadoop的核心組件之一,旨在解決海量數(shù)據(jù)的存儲(chǔ)問(wèn)題。
(1) HDFS 架構(gòu)概述
HDFS是主從結(jié)構(gòu)的分布式文件系統(tǒng),由兩類(lèi)節(jié)點(diǎn)組成:
- NameNode :NameNode(主節(jié)點(diǎn)是 HDFS 的中心控制節(jié)點(diǎn),負(fù)責(zé)管理文件系統(tǒng)的元數(shù)據(jù)(比如文件和目錄的樹(shù)狀結(jié)構(gòu),文件塊的位置、用戶(hù)權(quán)限等)。它不直接存儲(chǔ)數(shù)據(jù),而是記錄數(shù)據(jù)存儲(chǔ)在哪些 DataNode 上。
- DataNode :DataNode(數(shù)據(jù)節(jié)點(diǎn))是實(shí)際存儲(chǔ)數(shù)據(jù)的節(jié)點(diǎn),它們接收數(shù)據(jù)塊,并定期向 NameNode匯報(bào)自己存儲(chǔ)的塊信息和健康狀態(tài)。
此外,還有一個(gè)可選的組件Secondary NameNode,它用于輔助 NameNode 的元數(shù)據(jù)備份和日志合并,幫助維持文件系統(tǒng)的高可用性。
HDFS的架構(gòu)如下圖:
(2) 數(shù)據(jù)存儲(chǔ)機(jī)制
HDFS的文件是分塊存儲(chǔ)的,大文件被按照固定大小(默認(rèn)是 128MB,早期版本是 64MB)劃分為多個(gè)數(shù)據(jù)塊(Block),每個(gè)文件塊被存儲(chǔ)在集群中的不同 DataNode 上。
為了防止數(shù)據(jù)因節(jié)點(diǎn)故障而丟失,HDFS做了數(shù)據(jù)冗余與容錯(cuò)機(jī)制,它會(huì)對(duì)每個(gè)數(shù)據(jù)塊進(jìn)行復(fù)制,默認(rèn)情況下每個(gè)數(shù)據(jù)塊有 3 副本:
- 一個(gè)副本存儲(chǔ)在與客戶(hù)端最近的節(jié)點(diǎn)上。
- 第二個(gè)副本存儲(chǔ)在不同機(jī)架的節(jié)點(diǎn)上(防止機(jī)架故障)。
- 第三個(gè)副本存儲(chǔ)在第二個(gè)副本所在機(jī)架的其他節(jié)點(diǎn)上。
基于上述的副本機(jī)制,HDFS可以確保即使部分 DataNode 失效,數(shù)據(jù)依然可以通過(guò)其他存有副本的節(jié)點(diǎn)恢復(fù)。
(3) 讀寫(xiě)操作流程
HDFS 文件寫(xiě)入過(guò)程:
- 客戶(hù)端與 NameNode 交互:客戶(hù)端首先將寫(xiě)請(qǐng)求發(fā)給 NameNode,然后 NameNode 返回存儲(chǔ)該文件每個(gè)塊的若干 DataNode 節(jié)點(diǎn)位置。
- 數(shù)據(jù)塊寫(xiě)入 DataNode:客戶(hù)端將數(shù)據(jù)塊發(fā)送至其中一個(gè) DataNode,這個(gè) DataNode 會(huì)將數(shù)據(jù)塊傳遞給下一個(gè) DataNode,依次類(lèi)推,直到所有節(jié)點(diǎn)都保存該數(shù)據(jù)塊的副本。
- 狀態(tài)更新:上傳完成后,所有涉及的 DataNode 會(huì)將其存儲(chǔ)狀態(tài)通知 NameNode,并提交流程結(jié)束。
HDFS 文件讀取過(guò)程:
- 獲取元數(shù)據(jù):客戶(hù)端向 NameNode 請(qǐng)求文件位置信息,NameNode 返回相關(guān)文件塊及其所在 DataNode 的位置。
- 從 DataNode 讀取數(shù)據(jù)塊:客戶(hù)端根據(jù) NameNode 提供的位置從相關(guān)的 DataNode 直接讀取文件的不同數(shù)據(jù)塊并組裝回文件。
- 容錯(cuò)處理:如果某個(gè) DataNode 失效,客戶(hù)端無(wú)法從該NameNode獲取塊信息,它會(huì)嘗試從存儲(chǔ)副本的其他 DataNode 讀取。
2. MapReduce
MapReduce是 Hadoop的分布式計(jì)算框架,通過(guò)將復(fù)雜的任務(wù)分解成多個(gè)獨(dú)立的簡(jiǎn)單任務(wù)來(lái)實(shí)現(xiàn)并行計(jì)算,它的核心思想是“Map”和“Reduce”兩個(gè)階段:
- Map階段:將原始數(shù)據(jù)映射(map)為鍵值對(duì)(key-value pairs)。
- Reduce階段:將具有相同鍵的數(shù)值進(jìn)行聚合(reduce)。
(1) MapReduce 執(zhí)行流程
MapReduce 執(zhí)行流程包含以下5個(gè)步驟:
① Job劃分
一個(gè)完整的 MapReduce任務(wù)稱(chēng)為一個(gè)Job,Job是由多個(gè)Task構(gòu)成的,分為Map Task和Reduce Task。
② Input Splitting(輸入分片)
MapReduce處理輸入數(shù)據(jù)時(shí),首先將大文件切分成較小的Splits,Map Task的數(shù)量通常與輸入分片數(shù)量一致,每一個(gè)Map Task處理一個(gè)分片的數(shù)據(jù)。
③ Map階段
- 每個(gè)Map Task拿到一份Input Split的數(shù)據(jù),通過(guò)RecordReader將數(shù)據(jù)轉(zhuǎn)化為一對(duì)對(duì)的<key, value>形式,這里的鍵值對(duì)((K1, V1))根據(jù)業(yè)務(wù)的需求構(gòu)造。
- Map函數(shù)逐條處理這些鍵值對(duì),輸出<K2, V2>形式的新的鍵值對(duì)。
- 這些中間鍵值對(duì)<K2, V2>在寫(xiě)入本地磁盤(pán)之前會(huì)進(jìn)行Sort(排序) 和 Partition(分區(qū)) 操作,Partition的作用是將具有相同鍵(K2)的鍵值對(duì)分發(fā)到相同的 Reducer中執(zhí)行。
④ Shuffle and Sort(分發(fā)與排序)
Shuffle發(fā)生在 Map階段結(jié)束和 Reduce階段之間,具體過(guò)程如下:
- 排序:每個(gè)Map Task輸出的<K2, V2>對(duì)會(huì)按鍵(K2)進(jìn)行排序,確保同一鍵的所有值(V2)聚集在一起。
- 分區(qū):Map Task的輸出會(huì)根據(jù) Partition函數(shù)的哈希值發(fā)送到不同的Reduce任務(wù)中。
- 拉取數(shù)據(jù):Reducer從每個(gè) Map輸出中拉取需要的分區(qū)文件,經(jīng)過(guò)網(wǎng)絡(luò)傳輸將其聚合。
⑤ Reduce階段
- 每個(gè)Reduce Task接收到的內(nèi)容是經(jīng)過(guò) Shuffle過(guò)程后所有鍵值對(duì)(<K2, List<V2>>)的集合。
- Reduce函數(shù)(用戶(hù)自定義)會(huì)對(duì)每個(gè)K2執(zhí)行聚合計(jì)算,輸出為新的鍵值對(duì)<K3, V3>。
- 最終輸出結(jié)果會(huì)通過(guò) RecordWriter寫(xiě)入 HDFS或者其他存儲(chǔ)系統(tǒng)。
整個(gè)流程可以用下圖解釋?zhuān)?/p>
2. YARN
YARN(Yet Another Resource Negotiator,另一種資源調(diào)度器)是Hadoop 2.x版本中引入的一個(gè)集群資源管理框架,它的設(shè)計(jì)初衷是解決 Hadoop 1.x中 MapReduce計(jì)算框架的資源調(diào)度和管理局限性,可以支持各種應(yīng)用程序調(diào)度的需求。
(1) 核心組件
YARN包含以下 5個(gè)核心組件:
① ResourceManager
ResourceManager(RM,資源管理器)負(fù)責(zé)全局集群資源的管理和調(diào)度,它是YARN的中央控制器,協(xié)調(diào)集群中的所有應(yīng)用和計(jì)算資源。ResourceManager有兩個(gè)重要的子組件:
- Scheduler(調(diào)度器):負(fù)責(zé)為應(yīng)用程序按需分配資源,但不負(fù)責(zé)任務(wù)的執(zhí)行和重新啟動(dòng)。調(diào)度器的排期是決定如何把資源多租戶(hù)化、多應(yīng)用程序化的關(guān)鍵,它可以實(shí)現(xiàn)不同的調(diào)度策略(如公平調(diào)度器,容量調(diào)度器等)。
- Applications Manager(應(yīng)用程序管理器):負(fù)責(zé)各種應(yīng)用程序的生命周期管理,包括應(yīng)用程序的啟動(dòng)、檢查、資源監(jiān)控和故障恢復(fù)。
② NodeManager
NodeManager(NM,節(jié)點(diǎn)管理器)是YARN架構(gòu)中的分布式代理,負(fù)責(zé)管理每個(gè)計(jì)算節(jié)點(diǎn)上的資源,具體負(fù)責(zé):
- 資源報(bào)告:將本節(jié)點(diǎn)的CPU、內(nèi)存等資源使用情況匯報(bào)給ResourceManager。
- 容器管理:協(xié)調(diào)和管理每一個(gè)容器(Container)的生命周期,包括啟動(dòng)、監(jiān)控和停止容器。
- 任務(wù)監(jiān)控和報(bào)告:監(jiān)控執(zhí)行的任務(wù),并向ResourceManager報(bào)告其狀態(tài)和進(jìn)度。
③ ApplicationMaster
ApplicationMaster(AM,應(yīng)用程序管理器)是為每個(gè)具體應(yīng)用程序(如MapReduce Job、Spark Job)啟動(dòng)的專(zhuān)用進(jìn)程,它負(fù)責(zé)協(xié)調(diào)整個(gè)應(yīng)用程序生命周期的調(diào)度和執(zhí)行,協(xié)調(diào) ResourceManager與 NodeManager,動(dòng)態(tài)申請(qǐng)和釋放資源。每一個(gè)應(yīng)用程序在提交時(shí)都會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的ApplicationMaster實(shí)例。 AM的職責(zé)包括:
- 申請(qǐng)資源:向ResourceManager請(qǐng)求所需的資源,定義CPU和內(nèi)存需求。
- 任意調(diào)度:根據(jù)資源信息及負(fù)載情況,決定將任務(wù)分配到哪個(gè)節(jié)點(diǎn)/容器執(zhí)行。
- 容器監(jiān)控:監(jiān)控啟動(dòng)的任務(wù)并處理故障。
④ Container
Container(容器)是YARN中的資源分配單位,它將邏輯運(yùn)行環(huán)境(如CPU、內(nèi)存等涉及硬件維度的資源)與應(yīng)用程序任務(wù)綁定在一起。ApplicationMaster可以向ResourceManager申請(qǐng)多個(gè)容器,并在這些容器中分配任務(wù)進(jìn)行具體的計(jì)算。
⑤ Client
客戶(hù)端負(fù)責(zé)與YARN進(jìn)行交互,提交應(yīng)用程序請(qǐng)求,并向YARN查詢(xún)?nèi)蝿?wù)的執(zhí)行進(jìn)度和結(jié)果??蛻?hù)端將資源需求信息傳遞給ResourceManager,RM會(huì)為該任務(wù)分配資源,然后將其控制權(quán)交給對(duì)應(yīng)的ApplicationMaster。
核心組件模型如下圖:
(2) 工作流程
YARN工作流程包含以下4個(gè)步驟:
① 應(yīng)用程序啟動(dòng)流程
- 啟動(dòng)應(yīng)用程序:客戶(hù)端通過(guò)API或命令行向YARN集群提交應(yīng)用程序。此時(shí),客戶(hù)端給RM發(fā)送請(qǐng)求,描述任務(wù)的資源需求及執(zhí)行規(guī)范。
- 生成ApplicationMaster:ResourceManager根據(jù)集群的整體資源利用情況,為應(yīng)用程序分配第一個(gè)容器(Container),并啟動(dòng)相應(yīng)的ApplicationMaster。
- ApplicationMaster初始化:ApplicationMaster會(huì)在啟動(dòng)后向ResourceManager注冊(cè)自己,并根據(jù)初始任務(wù)和資源需求向RM申請(qǐng)更多的資源。
- 分配資源:ResourceManager根據(jù)集群的實(shí)時(shí)負(fù)載情況和調(diào)度策略,將余下的容器分配給ApplicationMaster,AC根據(jù)任務(wù)需求啟動(dòng)容器,并將計(jì)算任務(wù)分配給這些容器去執(zhí)行。
② 資源調(diào)度流程
- 請(qǐng)求資源:ApplicationMaster向ResourceManager提交資源申請(qǐng)。請(qǐng)求中指定了計(jì)算任務(wù)所需的資源(如CPU、內(nèi)存)以及在何處優(yōu)先執(zhí)行(一定節(jié)點(diǎn)上或任意節(jié)點(diǎn))。
- 資源心跳與分配:NodeManager通過(guò)定期心跳將節(jié)點(diǎn)的可用資源(包括剩余內(nèi)存、CPU等情況)匯報(bào)給ResourceManager。ResourceManager根據(jù)集群整體資源情況,通過(guò)調(diào)度器(Scheduler)為機(jī)器或容器分配任務(wù)。
- 任務(wù)分配與啟動(dòng):ApplicationMaster得到資源分配信息后,再與NodeManager通信,為任務(wù)啟動(dòng)容器并分配計(jì)算任務(wù)。
③ 任務(wù)運(yùn)行與監(jiān)控
一旦任務(wù)開(kāi)始執(zhí)行,NodeManager會(huì)為Container提供隔離的運(yùn)行環(huán)境(如JVM),ApplicationMaster監(jiān)視任務(wù)的運(yùn)行狀態(tài),并通過(guò)心跳與NodeManager通信,確保任務(wù)成功完成或在出現(xiàn)故障時(shí)重新調(diào)度任務(wù)。
④ 應(yīng)用完成與資源回收
當(dāng)ApplicationMaster檢測(cè)到所有任務(wù)均已成功完成,它會(huì)向ResourceManager發(fā)送一個(gè)"完成"信號(hào),表示應(yīng)用程序已經(jīng)完成。隨后,ResourceManager會(huì)通知NodeManager釋放任務(wù)所占用的資源容器,集群整體資源狀態(tài)更新。
三、代碼實(shí)戰(zhàn)
在代碼實(shí)戰(zhàn)環(huán)節(jié),我們將通過(guò)一個(gè)完整的示例來(lái)展示如何在 Java中實(shí)現(xiàn)一個(gè) MapReduce任務(wù),并將處理結(jié)果存儲(chǔ)回 HDFS。
任務(wù)描述:計(jì)算給定文件中每個(gè)單詞出現(xiàn)的次數(shù)文件格式:CVS或者JSON項(xiàng)目結(jié)構(gòu):項(xiàng)目結(jié)構(gòu)如下:
wordcount/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── WordsCounterMapper.java
│ │ │ ├── WordsCounterReducer.java
│ │ │ └── WordsCounterDriver.java
│ └── resources/
├── input/
│ └── input.cvs
│ input.cvs
└── output/
1.安裝Hadoop
我自己Mac電腦安裝的是 Hadoop-3.4.1,查看版本指令:hadoop version,關(guān)于安裝 Hadoop,可以參考這篇文章
2.處理文件
(1) 處理 CSV文件
假設(shè)我們有一個(gè)超大的 CSV文件:input.csv,如下內(nèi)容只是展示前幾行數(shù)據(jù):
id,name,address
1,yuanjava,hangzhou
2,juejin,beijin
3,didi,beijing
...
我們可以使用開(kāi)源的 Apache Commons CSV工具類(lèi)來(lái)處理該文件,對(duì)應(yīng)的依賴(lài)如下:
// maven依賴(lài)
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
// gradle 依賴(lài)
implementation 'org.apache.commons:commons-csv:1.12.0'
(2) 處理 Json文件
假設(shè)我們有一個(gè)超大的 Json文件:input.json,如下內(nèi)容只是展示前幾行數(shù)據(jù):
[
{"id": 1, "name": "yuanjava", "address": "hangzhou"},
{"id": 2, "name": "juejin", "address": "beijing"},
{"id": 3, "name": "didi", "address": "beijing"},
...
]
我們可以使用開(kāi)源的 Jackson庫(kù)來(lái)處理文件,對(duì)應(yīng)的依賴(lài)如下:
// maven依賴(lài)
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.1</version>
</dependency>
// gradle 依賴(lài)
implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.1'
3.增加 Hadoop依賴(lài)
// maven依賴(lài)
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.4.1</version>
</dependency>
// gradle依賴(lài)
implementation 'org.apache.hadoop:hadoop-common:3.4.1'
implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:3.4.1'
4.編寫(xiě) Mapper類(lèi)
Mapper類(lèi)的作用是處理輸入數(shù)據(jù),并為每個(gè)輸入記錄生成鍵值對(duì),在詞頻統(tǒng)計(jì)任務(wù)中,Mapper 的任務(wù)是將每個(gè)單詞映射為一個(gè)中間鍵值對(duì) (word, 1)。
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.io.StringReader;
public class WordsCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
if (line.isEmpty()) {
return; // Skip empty lines
}
// json以'{'開(kāi)頭, CVS以'['開(kāi)頭
if (line.startsWith("{") || line.startsWith("[")) {
processJson(line, context);
} else {
processCsv(line, context);
}
}
private void processJson(String line, Context context) throws IOException, InterruptedException{
JsonNode rootNode = objectMapper.readTree(line);
if (rootNode.isArray()) {
for (JsonNode node : rootNode) {
String name = node.get("name").asText();
word.set(name);
context.write(word, one);
}
} else if (rootNode.isObject()) {
String name = rootNode.get("name").asText();
word.set(name);
context.write(word, one);
}
}
private void processCsv(String line, Context context) throws IOException, InterruptedException{
StringReader reader = new StringReader(line);
Iterable<CSVRecord> records = CSVFormat.DEFAULT.parse(reader);
for (CSVRecord record : records) {
// Assuming the CSV has a header row and "name" is one of the columns
String name = record.get("name");
word.set(name);
context.write(word, one);
}
}
}
5.編寫(xiě) Reducer類(lèi)
Reducer類(lèi)的作用是對(duì)來(lái)自 Mapper的中間鍵值對(duì)進(jìn)行匯總,在詞頻統(tǒng)計(jì)任務(wù)中,Reduce 的任務(wù)是對(duì)相同單詞的計(jì)數(shù)進(jìn)行累加。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordsCounterDriver {
public static void main(String[] args){
if (args.length != 2) {
System.err.println("Please enter input path and output path.");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordCounter");
job.setJarByClass(WordsCounterDriver.class);
job.setMapperClass(WordsCounterMapper.class);
job.setReducerClass(WordsCounterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6.編寫(xiě) Driver 類(lèi)
Driver 類(lèi)用于配置 MapReduce 作業(yè)并啟動(dòng)作業(yè)。它指定了 Mapper 和 Reducer 的實(shí)現(xiàn)類(lèi),以及輸入和輸出路徑等。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordsCounterDriver {
public static void main(String[] args){
if (args.length != 2) {
System.err.println("Please enter input path and output path.");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordCounter");
job.setJarByClass(WordsCounterDriver.class);
job.setMapperClass(WordsCounterMapper.class);
job.setReducerClass(WordsCounterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
7.運(yùn)行和查看結(jié)果
(1) 運(yùn)行步驟:
- 編譯和打包:將上述代碼編譯并打包成一個(gè) JAR 文件。
- 上傳數(shù)據(jù)到 HDFS:將待處理的 CSV 和 JSON 數(shù)據(jù)上傳到 HDFS 的一個(gè)目錄中。
- 執(zhí)行 MapReduce作業(yè):在 Hadoop集群上運(yùn)行該 JAR文件,并指定輸入和輸出路徑,指令如下:
hadoop jar wordcounter.jar WordCounterDriver input/input.cvs output/
hadoop jar wordcounter.jar WordCounterDriver input/input.json output/
- /input/input.cvs(json) 是 HDFS上包含 CSV和 JSON文件的目錄。
- /output 是用于存儲(chǔ)結(jié)果的 HDFS目錄。注意:輸出目錄不能預(yù)先存在,否則作業(yè)將失敗。
(2) 查看輸出結(jié)果
任務(wù)完成后,輸出結(jié)果將會(huì)保存在指定的輸出目錄中,我們可以使用以下命令查看結(jié)果:
hadoop fs -cat output/part-r-00000
輸出結(jié)果可能如下:
yuanjava 100
juejin 3000
didi 100
...
8.代碼解釋與優(yōu)化
(1) Mapper詳解
繼承與泛型:Mapper<LongWritable, Text, Text, IntWritable> 表示輸入鍵值對(duì)的類(lèi)型和輸出鍵值對(duì)的類(lèi)型。輸入鍵是行偏移量,值是行文本,輸出鍵是單詞,值是整數(shù) 1。
map 方法:對(duì)每一行文本進(jìn)行分割,然后對(duì)每個(gè)單詞輸出一個(gè)鍵值對(duì)。
(2) Reducer詳解
繼承與泛型:Reducer<Text, IntWritable, Text, IntWritable> 表示輸入和輸出鍵值對(duì)類(lèi)型。輸入鍵是單詞,值是整數(shù)列表,輸出鍵是單詞,值是單詞的累加計(jì)數(shù)。
reduce 方法:對(duì)每個(gè)單詞的所有計(jì)數(shù)進(jìn)行累加輸出。
(3) Driver詳解
Job 配置:設(shè)置 Mapper和 Reducer類(lèi),指定輸入輸出格式。
路徑設(shè)置:通過(guò)命令行參數(shù)指定輸入輸出路徑。
(4) 優(yōu)化建議
- Combiner使用:在 Map端進(jìn)行部分匯總,減少傳輸?shù)?Reduce端的數(shù)據(jù)量。
- 數(shù)據(jù)壓縮:?jiǎn)⒂弥虚g數(shù)據(jù)壓縮減少網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)。
- 分區(qū)與排序:根據(jù)數(shù)據(jù)特性自定義分區(qū)器和排序規(guī)則。
通過(guò)這個(gè)簡(jiǎn)單的例子,我們展示了如何在 Java中實(shí)現(xiàn)一個(gè)基本的 MapReduce程序,通過(guò)定義 Mapper和 Reducer 再結(jié)合 Driver,能夠?qū)崿F(xiàn)對(duì)大規(guī)模數(shù)據(jù)集的分布式處理。如果要處理更復(fù)雜的任務(wù),可以通過(guò)自定義分區(qū)器、排序規(guī)則、Combiner 等方式進(jìn)行優(yōu)化。
通過(guò)此示例,我們可以更好地理解 Hadoop MapReduce 的工作原理和編程模型以及它對(duì)于大數(shù)據(jù)處理的重要性。
四、總結(jié)
本文,我們分析了 Hadoop的核心組件及其工作原理,讓我們對(duì) Hadoop有了一定的認(rèn)識(shí)。本人有幾年 Hadoop的使用經(jīng)驗(yàn),從整體上看,Hadoop的使用屬于中等難度,Hadoop的生態(tài)比完善,學(xué)習(xí)難度比較大,但是,不得不說(shuō) Hadoop的設(shè)計(jì)思維很優(yōu)秀,值得我們花時(shí)間去學(xué)習(xí)。
2003年,Google發(fā)布 Google File System(GFS)、MapReduce和 Bigtable 三篇論文后,Doug Cutting和 Michael J. Cafarella抓住了機(jī)會(huì),共同創(chuàng)造了 Hadoop。Google的這三篇經(jīng)典論文是大數(shù)據(jù)領(lǐng)域的經(jīng)典之作,但它的影響力遠(yuǎn)不止大數(shù)據(jù)領(lǐng)域,因此,如果想成為一名優(yōu)秀的工程師,閱讀原滋原味的優(yōu)秀論文絕對(duì)是受益無(wú)窮的一種方式。
Hadoop展示了大數(shù)據(jù)領(lǐng)域一個(gè)優(yōu)秀的架構(gòu)模式:集中管理,分布式存儲(chǔ)與計(jì)算。這種優(yōu)秀的架構(gòu)模式同樣還運(yùn)用在 Spark、Kafka、Flink、HBase、Elasticsearch、Cassandra等這些優(yōu)秀的框架上,它在大數(shù)據(jù)領(lǐng)域展示了顯著的優(yōu)勢(shì)。
最近一年,我從事的項(xiàng)目有幸和 MIT,Standford這樣頂尖學(xué)府出來(lái)的工程師合作,他們強(qiáng)悍的數(shù)學(xué)建模能力以及對(duì)同一個(gè)問(wèn)題思考的深度確實(shí)讓我望塵莫及,在互聯(lián)網(wǎng)大廠(chǎng)卷了這么多年,每天都有寫(xiě)完的需求開(kāi)不完的會(huì),絕大多數(shù)程序員都被業(yè)務(wù)裹挾著,導(dǎo)致很多優(yōu)秀的人無(wú)法從業(yè)務(wù)中抽離出來(lái)去研究更深層領(lǐng)域的東西,陷入無(wú)盡的內(nèi)卷。
如何在這個(gè)內(nèi)卷的環(huán)境中讓自己立于不敗之地?基本功絕對(duì)是重中之重。
最后,因?yàn)?Hadoop的內(nèi)容太多,很難僅憑本文把 Hadoop講透,希望在分享我個(gè)人對(duì) Hadoop理解的同時(shí)也能拋磚引玉,激發(fā)同行寫(xiě)出更多優(yōu)秀的文章,對(duì)于技術(shù),對(duì)于行業(yè)產(chǎn)生共多思考的共鳴。