HDFS 底層交互原理,看這篇就夠了
前言
大家好,我是林哥!
HDFS全稱是 Hadoop Distribute File System,是 Hadoop最重要的組件之一,也被稱為分步式存儲(chǔ)之王。本文主要從 HDFS 高可用架構(gòu)組成、HDFS 讀寫(xiě)流程、如何保證可用性以及高頻面試題出發(fā),提高大家對(duì) HDFS 的認(rèn)識(shí),掌握一些高頻的 HDFS 面試題。本篇文章概覽如下圖:
本篇文章概覽
1.HA 架構(gòu)組成
1.1HA架構(gòu)模型
在 HDFS 1.X 時(shí),NameNode 是 HDFS 集群中可能發(fā)生單點(diǎn)故障的節(jié)點(diǎn),集群中只有一個(gè) NameNode,一旦 NameNode 宕機(jī),整個(gè)集群將處于不可用的狀態(tài)。
在 HDFS 2.X 時(shí),HDFS 提出了高可用(High Availability, HA)的方案,解決了 HDFS 1.X 時(shí)的單點(diǎn)問(wèn)題。在一個(gè) HA 集群中,會(huì)配置兩個(gè) NameNode ,一個(gè)是 Active NameNode(主),一個(gè)是 Stadby NameNode(備)。主節(jié)點(diǎn)負(fù)責(zé)執(zhí)行所有修改命名空間的操作,備節(jié)點(diǎn)則執(zhí)行同步操作,以保證與主節(jié)點(diǎn)命名空間的一致性。HA 架構(gòu)模型如下圖所示:
HA 架構(gòu)組成2
HA 集群中所包含的進(jìn)程的職責(zé)各不相同。為了使得主節(jié)點(diǎn)和備用節(jié)點(diǎn)的狀態(tài)一致,采用了 Quorum Journal Manger (QJM)方案解決了主備節(jié)點(diǎn)共享存儲(chǔ)問(wèn)題,如圖 JournalNode 進(jìn)程,下面依次介紹各個(gè)進(jìn)程在架構(gòu)中所起的作用:
- Active NameNode:它負(fù)責(zé)執(zhí)行整個(gè)文件系統(tǒng)中命名空間的所有操作;維護(hù)著數(shù)據(jù)的元數(shù)據(jù),包括文件名、副本數(shù)、文件的 BlockId 以及 Block 塊所對(duì)應(yīng)的節(jié)點(diǎn)信息;另外還接受 Client 端讀寫(xiě)請(qǐng)求和 DataNode 匯報(bào) Block 信息。
- Standby NameNode:它是 Active NameNode 的備用節(jié)點(diǎn),一旦主節(jié)點(diǎn)宕機(jī),備用節(jié)點(diǎn)會(huì)切換成主節(jié)點(diǎn)對(duì)外提供服務(wù)。它主要是監(jiān)聽(tīng) JournalNode Cluster 上 editlog 變化,以保證當(dāng)前命名空間盡可能的與主節(jié)點(diǎn)同步。任意時(shí)刻,HA 集群只有一臺(tái) Active NameNode,另一個(gè)節(jié)點(diǎn)為 Standby NameNode。
- JournalNode Cluster: 用于主備節(jié)點(diǎn)間共享 editlog 日志文件的共享存儲(chǔ)系統(tǒng)。負(fù)責(zé)存儲(chǔ) editlog 日志文件, 當(dāng) Active NameNode 執(zhí)行了修改命名空間的操作時(shí),它會(huì)定期將執(zhí)行的操作記錄在 editlog 中,并寫(xiě)入 JournalNode Cluster 中。Standby NameNode 會(huì)一直監(jiān)聽(tīng) JournalNode Cluster 上 editlog 的變化,如果發(fā)現(xiàn) editlog 有改動(dòng),備用節(jié)點(diǎn)會(huì)讀取 JournalNode 上的 editlog 并與自己當(dāng)前的命名空間合并,從而實(shí)現(xiàn)了主備節(jié)點(diǎn)的數(shù)據(jù)一致性。
注意:QJM 方案是基于 Paxos 算法實(shí)現(xiàn)的,集群由 2N + 1 JouranlNode 進(jìn)程組成,最多可以容忍 N 臺(tái) JournalNode 宕機(jī),宕機(jī)數(shù)大于 N 臺(tái),這個(gè)算法就失效了!
- ZKFailoverController: ZKFC 以獨(dú)立進(jìn)程運(yùn)行,每個(gè) ZKFC 都監(jiān)控自己負(fù)責(zé)的 NameNode,它可以實(shí)現(xiàn) NameNode 自動(dòng)故障切換:即當(dāng)主節(jié)點(diǎn)異常,監(jiān)控主節(jié)點(diǎn)的 ZKFC 則會(huì)斷開(kāi)與 ZooKeeper 的連接,釋放分步式鎖,監(jiān)控備用節(jié)點(diǎn)的 ZKFC 進(jìn)程會(huì)去獲取鎖,同時(shí)把備用 NameNode 切換成 主 NameNode。
- ZooKeeper: 為 ZKFC 進(jìn)程實(shí)現(xiàn)自動(dòng)故障轉(zhuǎn)移提供統(tǒng)一協(xié)調(diào)服務(wù)。通過(guò) ZooKeeper 中 Watcher 監(jiān)聽(tīng)機(jī)制,通知 ZKFC 異常NameNode 下線;保證同一時(shí)刻只有一個(gè)主節(jié)點(diǎn)。
- DataNode: DataNode 是實(shí)際存儲(chǔ)文件 Block 塊的地方,一個(gè) Block 塊包含兩個(gè)文件:一個(gè)是數(shù)據(jù)本身,一個(gè)是元數(shù)據(jù)(數(shù)據(jù)塊長(zhǎng)度、塊數(shù)據(jù)的校驗(yàn)和、以及時(shí)間戳),DataNode 啟動(dòng)后會(huì)向 NameNode 注冊(cè),每 6 小時(shí)同時(shí)向主備兩個(gè) NameNode 上報(bào)所有的塊信息,每 3 秒同時(shí)向主備兩個(gè) NameNode 發(fā)送一次心跳。
DataNode 向 NameNode 匯報(bào)當(dāng)前塊信息的時(shí)間間隔,默認(rèn) 6 小時(shí),其配置參數(shù)名如下:
- <property>
- <name>dfs.blockreport.intervalMsec</name>
- <value>21600000</value>
- <description>Determines block reporting interval in
- milliseconds.</description>
- </property>
1.2HA主備故障切換流程
HA 集群剛啟動(dòng)時(shí),兩個(gè) NameNode 節(jié)點(diǎn)狀態(tài)均為 Standby,之后兩個(gè) NameNode 節(jié)點(diǎn)啟動(dòng) ZKFC 進(jìn)程后會(huì)去 ZooKeeper 集群搶占分步式鎖,成功獲取分步式鎖,ZooKeeper 會(huì)創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn),成功搶占分步式鎖的 NameNode 會(huì)成為 Active NameNode,ZKFC 便會(huì)實(shí)時(shí)監(jiān)控自己的 NameNode。
HDFS 提供了兩種 HA 狀態(tài)切換方式:一種是管理員手動(dòng)通過(guò)DFSHAAdmin -faieover執(zhí)行狀態(tài)切換;另一種則是自動(dòng)切換。下面分別從兩種情況分析故障的切換流程:
1.主 NameNdoe 宕機(jī)后,備用 NameNode 如何升級(jí)為主節(jié)點(diǎn)?
當(dāng)主 NameNode 宕機(jī)后,對(duì)應(yīng)的 ZKFC 進(jìn)程檢測(cè)到 NameNode 狀態(tài),便向 ZooKeeper 發(fā)生刪除鎖的命令,鎖刪除后,則觸發(fā)一個(gè)事件回調(diào)備用 NameNode 上的 ZKFC
ZKFC 得到消息后先去 ZooKeeper 爭(zhēng)奪創(chuàng)建鎖,鎖創(chuàng)建完成后會(huì)檢測(cè)原先的主 NameNode 是否真的掛掉(有可能由于網(wǎng)絡(luò)延遲,心跳延遲),掛掉則升級(jí)備用 NameNode 為主節(jié)點(diǎn),沒(méi)掛掉則將原先的主節(jié)點(diǎn)降級(jí)為備用節(jié)點(diǎn),將自己對(duì)應(yīng)的 NameNode 升級(jí)為主節(jié)點(diǎn)。
2.主 NameNode 上的 ZKFC 進(jìn)程掛掉,主 NameNode 沒(méi)掛,如何切換?
ZKFC 掛掉后,ZKFC 和 ZooKeeper 之間 TCP 鏈接會(huì)隨之?dāng)嚅_(kāi),session 也會(huì)隨之消失,鎖被刪除,觸發(fā)一個(gè)事件回調(diào)備用 NameNode ZKFC,ZKFC 得到消息后會(huì)先去 ZooKeeper 爭(zhēng)奪創(chuàng)建鎖,鎖創(chuàng)建完成后也會(huì)檢測(cè)原先的主 NameNode 是否真的掛掉,掛掉則升級(jí) 備用 NameNode 為主節(jié)點(diǎn),沒(méi)掛掉則將主節(jié)點(diǎn)降級(jí)為備用節(jié)點(diǎn),將自己對(duì)應(yīng)的 NameNode 升級(jí)為主節(jié)點(diǎn)。
1.3Block、packet及chunk 概念
在 HDFS 中,文件存儲(chǔ)是按照數(shù)據(jù)塊(Block)為單位進(jìn)行存儲(chǔ)的,在讀寫(xiě)數(shù)據(jù)時(shí),DFSOutputStream使用 Packet 類來(lái)封裝一個(gè)數(shù)據(jù)包。每個(gè) Packet 包含了若干個(gè) chunk 和對(duì)應(yīng)的 checksum。
- Block: HDFS 上的文件都是分塊存儲(chǔ)的,即把一個(gè)文件物理劃分為一個(gè) Block 塊存儲(chǔ)。Hadoop 2.X/3.X 默認(rèn)塊大小為 128 M,1.X 為 64M.
- Packet: 是 Client 端向 DataNode 或 DataNode 的 Pipline 之間傳輸數(shù)據(jù)的基本單位,默認(rèn) 64 KB
- Chunk: Chunk 是最小的單位,它是 Client 向 DataNode 或 DataNode PipLine 之間進(jìn)行數(shù)據(jù)校驗(yàn)的基本單位,默認(rèn) 512 Byte ,因?yàn)橛米餍r?yàn),所以每個(gè) Chunk 需要帶有 4 Byte 的校驗(yàn)位,實(shí)際上每個(gè) Chunk 寫(xiě)入 Packtet 的大小為 516 Byte。
2.源碼級(jí)讀寫(xiě)流程
2.1HDFS 讀流程
HDFS讀流程
我們以從 HDFS 讀取一個(gè) information.txt 文件為例,其讀取流程如上圖所示,分為以下幾個(gè)步驟:
1.打開(kāi) information.txt 文件:首先客戶端調(diào)用 DistributedFileSystem.open() 方法打開(kāi)文件,這個(gè)方法在底層會(huì)調(diào)用DFSclient.open() 方法,該方法會(huì)返回一個(gè) HdfsDataInputStream 對(duì)象用于讀取數(shù)據(jù)塊。但實(shí)際上真正讀取數(shù)據(jù)的是 DFSInputStream ,而 HdfsDataInputStream 是 DFSInputStream 的裝飾類(new HdfsDataInputStream(DFSInputStream))。
2.從 NameNode 獲取存儲(chǔ) information.txt 文件數(shù)據(jù)塊的 DataNode 地址:即獲取組成 information.txt block 塊信息。在構(gòu)造輸出流 DFSInputStream 時(shí),會(huì)通過(guò)調(diào)用 getBlockLocations() 方法向 NameNode 節(jié)點(diǎn)獲取組成 information.txt 的 block 的位置信息,并且 block 的位置信息是按照與客戶端的距離遠(yuǎn)近排好序。
3.連接 DataNode 讀取數(shù)據(jù)塊: 客戶端通過(guò)調(diào)用 DFSInputStream.read() 方法,連接到離客戶端最近的一個(gè) DataNode 讀取 Block 塊,數(shù)據(jù)會(huì)以數(shù)據(jù)包(packet)為單位從 DataNode 通過(guò)流式接口傳到客戶端,直到一個(gè)數(shù)據(jù)塊讀取完成;DFSInputStream會(huì)再次調(diào)用 getBlockLocations() 方法,獲取下一個(gè)最優(yōu)節(jié)點(diǎn)上的數(shù)據(jù)塊位置。
4.直到所有文件讀取完成,調(diào)用 close() 方法,關(guān)閉輸入流,釋放資源。
從上述流程可知,整個(gè)過(guò)程最主要涉及到 open()、read()兩個(gè)方法(其它方法都是在這兩個(gè)方法的調(diào)用鏈中調(diào)用,如getBlockLocations()),下面依次介紹這2個(gè)方法的實(shí)現(xiàn)。
注:本文是以 hadoop-3.1.3 源碼為基礎(chǔ)!
- open()方法
事實(shí)上,在調(diào)用 DistributedFileSystem.open()方法時(shí),底層調(diào)用的是 DFSClient.open()方法打開(kāi)文件,并構(gòu)造 DFSInputStream 輸入流對(duì)象。
- public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
- throws IOException {
- //檢查DFSClicent 的運(yùn)行狀況
- checkOpen();
- // 從 namenode 獲取 block 位置信息,并存到 LocatedBlocks 對(duì)象中,最終傳給 DFSInputStream 的構(gòu)造方法
- try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
- LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
- //調(diào)用 openInternal 方法,獲取輸入流
- return openInternal(locatedBlocks, src, verifyChecksum);
- }
- }
整個(gè) open()方法分為兩部分:
第一部分是,調(diào)用 checkOpen()方法檢查 DFSClient 的運(yùn)行狀況,調(diào)用getLocateBlocks()方法,獲取 block 的位置消息
第二部分是,調(diào)用openInternal()方法,獲取輸入流。
- openInternal( )方法
- private DFSInputStream openInternal(LocatedBlocks locatedBlocks, String src,
- boolean verifyChecksum) throws IOException {
- if (locatedBlocks != null) {
- //獲取糾刪碼策略,糾刪碼是 Hadoop 3.x 的新特性,默認(rèn)不啟用糾刪碼策略
- ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
- if (ecPolicy != null) {
- //如果用戶指定了糾刪碼策略,將返回一個(gè) DFSStripedInputStream 對(duì)象
- //DFSStripedInputStream 會(huì)將數(shù)據(jù)邏輯字節(jié)范圍的請(qǐng)求轉(zhuǎn)換為存儲(chǔ)在 DataNode 上的內(nèi)部塊
- return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy,
- locatedBlocks);
- }
- //如果未指定糾刪碼策略,調(diào)用 DFSInputStream 的構(gòu)造方法,并且返回該 DFSInputStream 的對(duì)象
- return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
- } else {
- throw new IOException("Cannot open filename " + src);
- }
- }
- DFSInputStream 構(gòu)造方法
- DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
- LocatedBlocks locatedBlocks) throws IOException {
- this.dfsClient = dfsClient;
- this.verifyChecksum = verifyChecksum;
- this.src = src;
- synchronized (infoLock) {
- this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
- }
- this.locatedBlocks = locatedBlocks;
- //調(diào)用 openInfo 方法,參數(shù):refreshLocatedBlocks,是否要更新 locateBlocks 屬性。
- openInfo(false);
- }
構(gòu)造方法做了2件事:
第一部分是初始化 DFSInputStream 屬性,其中 verifyChecksum 含義是:讀取數(shù)據(jù)時(shí)是否進(jìn)行校驗(yàn),cachingStrategy,指的是緩存策略。
第二部分,調(diào)用 openInfo()方法。
思考:為甚么要更新最后一個(gè)數(shù)據(jù)塊長(zhǎng)度?
因?yàn)榭赡軙?huì)有這種情況出現(xiàn),當(dāng)客戶端在讀取文件時(shí),最后一個(gè)文件塊可能還在構(gòu)建的狀態(tài)(正在被寫(xiě)入),Datanode 還未上報(bào)最后一個(gè)文件塊,那么 namenode 所保存的數(shù)據(jù)塊長(zhǎng)度有可能小于 Datanode實(shí)際存儲(chǔ)的數(shù)據(jù)塊長(zhǎng)度,所以需要與 Datanode 通信以確認(rèn)最后一個(gè)數(shù)據(jù)塊的真實(shí)長(zhǎng)度。
獲取到 DFSInputStream 流對(duì)象后,并且得到了文件的所有 Block 塊的位置信息,接下來(lái)調(diào)用read()方法,從 DataNode 讀取數(shù)據(jù)塊。
注:在openInfo() 方法
在openInfp()中,會(huì)從 namenode 獲取當(dāng)前正在讀取文件的最后一個(gè)數(shù)據(jù)塊的長(zhǎng)度 lastBlockBeingWrittenLength,如果返回的最后一個(gè)數(shù)據(jù)塊的長(zhǎng)度為 -1 ,這是一種特殊情況:即集群剛重啟,DataNode 可能還沒(méi)有向 NN 進(jìn)行完整的數(shù)據(jù)塊匯報(bào),這時(shí)部分?jǐn)?shù)據(jù)塊位置信息還獲取不到,也獲取不到這些塊的長(zhǎng)度,則默認(rèn)會(huì)重試 3 次,默認(rèn)每次等待 4 秒,重新去獲取文件對(duì)應(yīng)的數(shù)據(jù)塊的位置信息以及最后數(shù)據(jù)塊長(zhǎng)度;如果最后一個(gè)數(shù)據(jù)塊的長(zhǎng)度不為 -1,則表明,最后一個(gè)數(shù)據(jù)塊已經(jīng)是完整狀態(tài)。
- read()方法
- public synchronized int read(@Nonnull final byte buf[], int off, int len)
- throws IOException {
- //驗(yàn)證輸入的參數(shù)是否可用
- validatePositionedReadArgs(pos, buf, off, len);
- if (len == 0) {
- return 0;
- }
- //構(gòu)造字節(jié)數(shù)組作為容器
- ReaderStrategy byteArrayReader =
- new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
- //調(diào)用 readWithStrategy()方法讀取數(shù)據(jù)
- return readWithStrategy(byteArrayReader);
- }
當(dāng)用戶代碼調(diào)用read()方法時(shí),其底層調(diào)用的是 DFSInputStream.read()方法。該方法從輸入流的 off 位置開(kāi)始讀取,讀取 len 個(gè)字節(jié),然后存入 buf 字節(jié)數(shù)組中。源碼中構(gòu)造了一個(gè) ByteArrayStrategy 對(duì)象,該對(duì)象封裝了 5 個(gè)屬性,分別是:字節(jié)數(shù)組 buf,讀取到的字節(jié)存入該字節(jié)數(shù)組;off,讀取的偏移量;len,將要讀取的目標(biāo)長(zhǎng)度;readStatistics,統(tǒng)計(jì)計(jì)數(shù)器,客戶端。最后通過(guò)調(diào)用 readWithStrategy()方法去讀取文件數(shù)據(jù)塊的數(shù)據(jù)。
總結(jié):HDFS 讀取一個(gè)文件,調(diào)用流程如下:(中間涉及到的部分方法未列出)
usercode 調(diào)用 open() ---> DistributedFileSystem.open() ---> DFSClient.open() ---> 返回一個(gè) DFSInputStream 對(duì)象給 DistributedFileSystem ---> new hdfsDataInputStream(DFSInputStream) 并返回給用戶;
usercode 調(diào)用 read() ---> 底層DFSIputStream.read() ---> readWithStrategy(bytArrayReader)
2.2HDFS 寫(xiě)流程
介紹完 HDFS 讀的流程,接下來(lái)看看一個(gè)文件的寫(xiě)操作的實(shí)現(xiàn)。從下圖中可以看出,HDFS 寫(xiě)流程涉及的方法比較多,過(guò)程也比較復(fù)雜。
1.在 namenode 創(chuàng)建文件: 當(dāng) client 寫(xiě)一個(gè)新文件時(shí),首先會(huì)調(diào)用 DistributeedFileSytem.creat() 方法,DistributeFileSystem 是客戶端創(chuàng)建的一個(gè)對(duì)象,在收到 creat 命令之后,DistributeFileSystem 通過(guò) RPC 與 NameNode 通信,讓它在文件系統(tǒng)的 namespace 創(chuàng)建一個(gè)獨(dú)立的新文件;namenode 會(huì)先確認(rèn)文件是否存在以及客戶端是否有權(quán)限,確認(rèn)成功后,會(huì)返回一個(gè) HdfsDataOutputStream 對(duì)象,與讀流程類似,這個(gè)對(duì)象底層包裝了一個(gè) DFSOutputStream 對(duì)象,它才是寫(xiě)數(shù)據(jù)的真正執(zhí)行者。
2.建立數(shù)據(jù)流 pipeline 管道: 客戶端得到一個(gè)輸出流對(duì)象,還需要通過(guò)調(diào)用 ClientProtocol.addBlock()向 namenode 申請(qǐng)新的空數(shù)據(jù)塊,addBlock( ) 會(huì)返回一個(gè) LocateBlock 對(duì)象,該對(duì)象保存了可寫(xiě)入的 DataNode 的信息,并構(gòu)成一個(gè) pipeline,默認(rèn)是有三個(gè) DataNode 組成。
3.通過(guò)數(shù)據(jù)流管道寫(xiě)數(shù)據(jù): 當(dāng) DFSOutputStream調(diào)用 write()方法把數(shù)據(jù)寫(xiě)入時(shí),數(shù)據(jù)會(huì)先被緩存在一個(gè)緩沖區(qū)中,寫(xiě)入的數(shù)據(jù)會(huì)被切分成多個(gè)數(shù)據(jù)包,每當(dāng)達(dá)到一個(gè)數(shù)據(jù)包長(zhǎng)度(默認(rèn)65536字節(jié))時(shí),
DFSOutputStream會(huì)構(gòu)造一個(gè) Packet 對(duì)象保存這個(gè)要發(fā)送的數(shù)據(jù)包;新構(gòu)造的 Packet 對(duì)象會(huì)被放到 DFSOutputStream維護(hù)的 dataQueue 隊(duì)列中,DataStreamer 線程會(huì)從 dataQueue 隊(duì)列中取出 Packet 對(duì)象,通過(guò)底層 IO 流發(fā)送到 pipeline 中的第一個(gè) DataNode,然后繼續(xù)將所有的包轉(zhuǎn)到第二個(gè) DataNode 中,以此類推。發(fā)送完畢后,
這個(gè) Packet 會(huì)被移出 dataQueue,放入 DFSOutputStream 維護(hù)的確認(rèn)隊(duì)列 ackQueue 中,該隊(duì)列等待下游 DataNode 的寫(xiě)入確認(rèn)。當(dāng)一個(gè)包已經(jīng)被 pipeline 中所有的 DataNode 確認(rèn)了寫(xiě)入磁盤(pán)成功,這個(gè)數(shù)據(jù)包才會(huì)從確認(rèn)隊(duì)列中移除。
4.關(guān)閉輸入流并提交文件: 當(dāng)客戶端完成了整個(gè)文件中所有的數(shù)據(jù)塊的寫(xiě)操作之后,會(huì)調(diào)用 close() 方法關(guān)閉輸出流,客戶端還會(huì)調(diào)用 ClientProtoclo.complete( ) 方法通知 NameNode 提交這個(gè)文件中的所有數(shù)據(jù)塊,
NameNode 還會(huì)確認(rèn)該文件的備份數(shù)是否滿足要求。對(duì)于 DataNode 而言,它會(huì)調(diào)用 blockReceivedAndDelete() 方法向 NameNode 匯報(bào),NameNode 會(huì)更新內(nèi)存中的數(shù)據(jù)塊與數(shù)據(jù)節(jié)點(diǎn)的對(duì)應(yīng)關(guān)系。
從上述流程來(lái)看,整個(gè)寫(xiě)流程主要涉及到了 creat()、write()這些方法,下面著重介紹下這兩個(gè)方法的實(shí)現(xiàn)。當(dāng)調(diào)用 DistributeedFileSytem.creat() 方法時(shí),其底層調(diào)用的其實(shí)是 DFSClient.create()方法,其源碼如下:
- create( )方法
- public DFSOutputStream create(String src, FsPermission permission,
- EnumSet<CreateFlag> flag, boolean createParent,
- short replication,long blockSize,
- Progressable progress, int buffersize,
- ChecksumOpt checksumOpt,
- InetSocketAddress[] favoredNodes,
- String ecPolicyName) throws IOException {
- //檢查客戶端是否已經(jīng)打開(kāi)
- checkOpen();
- final FsPermission masked = applyUMask(permission);
- LOG.debug("{}: masked={}", src, masked);
- //調(diào)用 DFSOutputStream.newStreamForCreate()創(chuàng)建輸出流對(duì)象
- final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
- src, masked, flag, createParent, replication, blockSize, progress,
- dfsClientConf.createChecksum(checksumOpt),
- getFavoredNodesStr(favoredNodes), ecPolicyName);
- //獲取 HDFS 文件的租約
- beginFileLease(result.getFileId(), result);
- return result;
- }
DistributeFileSystem.create()在底層會(huì)調(diào)用 DFSClient.create()方法。該方法主要完成三件事:
租約:指的是租約持有者在規(guī)定時(shí)間內(nèi)獲得該文件權(quán)限(寫(xiě)文件權(quán)限)的合同
第一,檢查客戶端是否已經(jīng)打開(kāi)
第二,調(diào)用靜態(tài)的 newStreamForCreate() 方法,通過(guò) RPC 與 NameNode 通信創(chuàng)建新文件,并構(gòu)建出 DFSOutputStream流
第三,執(zhí)行 beginFileLease() 方法,獲取新J建文件的租約
- newStreamForCreate() 方法
- static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
- FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
- short replication, long blockSize, Progressable progress,
- DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
- throws IOException {
- try (TraceScope ignored =
- dfsClient.newPathTraceScope("newStreamForCreate", src)) {
- HdfsFileStatus stat = null;
- // 如果發(fā)生異常,并且異常為 RetryStartFileException ,便重新調(diào)用create()方法,重試次數(shù)為 10
- boolean shouldRetry = true;
- //重試次數(shù)為 10
- int retryCount = CREATE_RETRY_COUNT;
- while (shouldRetry) {
- shouldRetry = false;
- try {
- //調(diào)用 ClientProtocol.create() 方法,在命名空間中創(chuàng)建 HDFS 文件
- stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
- new EnumSetWritable<>(flag), createParent, replication,
- blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
- break;
- } catch (RemoteException re) {
- IOException e = re.unwrapRemoteException(AccessControlException.class,
- //....此處省略了部分異常類型
- UnknownCryptoProtocolVersionException.class);
- if (e instanceof RetryStartFileException) {//如果發(fā)生異常,判斷異常是否為 RetryStartFileException
- if (retryCount > 0) {
- shouldRetry = true;
- retryCount--;
- } else {
- throw new IOException("Too many retries because of encryption" +
- " zone operations", e);
- }
- } else {
- throw e;
- }
- }
- }
- Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- final DFSOutputStream out;
- if(stat.getErasureCodingPolicy() != null) {
- //如果用戶指定了糾刪碼策略,將創(chuàng)建一個(gè) DFSStripedOutputStream 對(duì)象
- out = new DFSStripedOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
- } else {
- //如果沒(méi)指定糾刪碼策略,調(diào)用構(gòu)造方法創(chuàng)建一個(gè) DFSOutputStream 對(duì)象
- out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes, true);
- }
- //啟動(dòng)輸出流對(duì)象的 Datastreamer 線程
- out.start();
- return out;
- }
- }
newStreamForCreate()方法總共涉及三個(gè)部分:
當(dāng)構(gòu)建完 DFSOutputStream 輸出流時(shí),客戶端調(diào)用 write() 方法把數(shù)據(jù)包寫(xiě)入 dataQueue 隊(duì)列,在將數(shù)據(jù)包發(fā)送到 DataNode 之前,DataStreamer會(huì)向 NameNode 申請(qǐng)分配一個(gè)新的數(shù)據(jù)塊
然后建立寫(xiě)這個(gè)數(shù)據(jù)塊的數(shù)據(jù)流管道(pipeline),之后DataStreamer 會(huì)從 dataQueue 隊(duì)列取出數(shù)據(jù)包,通過(guò) pipeline 依次發(fā)送給各個(gè) DataNode。每個(gè)數(shù)據(jù)包(packet)都有對(duì)應(yīng)的序列號(hào),當(dāng)一個(gè)數(shù)據(jù)塊中所有的數(shù)據(jù)包都發(fā)送完畢,
并且都得到了 ack 消息確認(rèn)后,Datastreamer會(huì)將當(dāng)前數(shù)據(jù)塊的 pipeline 關(guān)閉。通過(guò)不斷循環(huán)上述過(guò)程,直到該文件(一個(gè)文件會(huì)被切分為多個(gè) Block)的所有數(shù)據(jù)塊都寫(xiě)完成。
調(diào)用 ClientProtocol.create()方法,創(chuàng)建文件,如果發(fā)生異常為 RetryStartFileException ,則默認(rèn)重試10次
調(diào)用 DFSStripedOutputStream 或 DFSOutputStream 構(gòu)造方法,構(gòu)造輸出流對(duì)象
啟動(dòng) Datastreamer 線程,Datastreamer 是 DFSOutputStream 中的一個(gè)內(nèi)部類,負(fù)責(zé)構(gòu)建 pipeline 管道,并將數(shù)據(jù)包發(fā)送到 pipeline 中的第一個(gè) DataNode
- writeChunk()方法
- protected synchronized void writeChunk(ByteBuffer buffer, int len,
- byte[] checksum, int ckoff, int cklen) throws IOException {
- writeChunkPrepare(len, ckoff, cklen);
- //將當(dāng)前校驗(yàn)數(shù)據(jù)、校驗(yàn)塊寫(xiě)入數(shù)據(jù)包中
- currentPacket.writeChecksum(checksum, ckoff, cklen);
- currentPacket.writeData(buffer, len);
- currentPacket.incNumChunks();
- getStreamer().incBytesCurBlock(len);
- // 如果當(dāng)前數(shù)據(jù)包已經(jīng)滿了,或者寫(xiě)滿了一個(gè)數(shù)據(jù)塊,則將當(dāng)前數(shù)據(jù)包放入發(fā)送隊(duì)列中
- if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
- getStreamer().getBytesCurBlock() == blockSize) {
- enqueueCurrentPacketFull();
- }
- }
最終寫(xiě)數(shù)據(jù)調(diào)用都的是 writeChunk ()方法,其會(huì)首先調(diào)用 checkChunkPrepare()構(gòu)造一個(gè) Packet 對(duì)象保存數(shù)據(jù)包,
然后調(diào)用writeCheckSum()和writeData()方法,將校驗(yàn)塊數(shù)據(jù)和校驗(yàn)和寫(xiě)入 Packet 對(duì)象中。
當(dāng) Packet 對(duì)象寫(xiě)滿時(shí)(每個(gè)數(shù)據(jù)包都可以寫(xiě)入 maxChunks 個(gè)校驗(yàn)塊),則調(diào)用 enqueueCurrentPacketFull()方法,將當(dāng)前的 Packet 對(duì)象放入 dataQueue 隊(duì)列中,等待 DataStreamer 線程的處理。
如果當(dāng)前數(shù)據(jù)塊中的所有數(shù)據(jù)都已經(jīng)發(fā)送完畢,則發(fā)送一個(gè)空數(shù)據(jù)包標(biāo)識(shí)所有數(shù)據(jù)已經(jīng)發(fā)送完畢。
3.HDFS 如何保證可用性?
在 1.1 節(jié)中已經(jīng)闡述了 HDFS 的高可用的架構(gòu),分別涉及到 NameNode,DataNode,Journalnode,ZKFC等組件。所以,在談及 HDFS 如何保證可用性,要從多個(gè)方面去回答。
- 在 Hadoop 2.X 時(shí),主備 NameNode 節(jié)點(diǎn)通過(guò) JournalNode 的數(shù)據(jù)同步,來(lái)保證數(shù)據(jù)一致性,2個(gè) ZKFC 進(jìn)程負(fù)責(zé)各自的 NameNode 健康監(jiān)控,從而實(shí)現(xiàn)了 NameNode 的高可用。Hadoop 3.X 時(shí),NameNode 數(shù)量可以大于等于 2。
- 對(duì)于 JournalNode 來(lái)講,也是分布式的,保證了可用性。因?yàn)橛羞x舉機(jī)制,所以 JournalNode 個(gè)數(shù) 一般都為 2N+1 個(gè)。在 主NameNode向 JournalNode寫(xiě)入 editlog 文件時(shí),當(dāng)有一半以上的(≥N+1) JournalNode返回寫(xiě)操作成功時(shí)即認(rèn)為該次寫(xiě)成功。所以 JournalNode集群能容忍最多 N 臺(tái)節(jié)點(diǎn)宕掉,如果多于 N 臺(tái)機(jī)器掛掉,服務(wù)才不可用。
- ZKFC 主要輔助 ZooKeeper 做 Namenode 的健康監(jiān)控,能夠保證故障自動(dòng)轉(zhuǎn)移,它是部署在兩臺(tái) NameNode 節(jié)點(diǎn)上的獨(dú)立的進(jìn)程。此外,ZooKeeper 集群也是一個(gè)獨(dú)立的分布式系統(tǒng),它通過(guò) Zab 協(xié)議來(lái)保證數(shù)據(jù)一致,和主備節(jié)點(diǎn)的選舉切換等機(jī)制來(lái)保證可用性。
- DataNode 節(jié)點(diǎn)主要負(fù)責(zé)存儲(chǔ)數(shù)據(jù),通過(guò) 3 副本策略來(lái)保證數(shù)據(jù)的完整性,所以其最大可容忍 2 臺(tái) DataNode 掛掉,同時(shí) NameNode 會(huì)保證副本的數(shù)量。
- 最后,關(guān)于數(shù)據(jù)的可用性保證,HDFS 提供了數(shù)據(jù)完整性校驗(yàn)的機(jī)制。當(dāng)客戶端創(chuàng)建文件時(shí),它會(huì)計(jì)算每個(gè)文件的數(shù)據(jù)塊的checknums,也就是校驗(yàn)和,并存儲(chǔ)在 NameNode 中。當(dāng)客戶端去讀取文件時(shí),會(huì)驗(yàn)證從 DataNode 接收的數(shù)據(jù)塊的校驗(yàn)和,如果校驗(yàn)和不一致,說(shuō)明該數(shù)據(jù)塊已經(jīng)損壞,此時(shí)客戶端會(huì)選擇從其它 DataNode 獲取該數(shù)據(jù)塊的可用副本。
4.HDFS 高頻面試題
- HDFS 客戶端是如何與 DataNode 、NameNode 交互的?
- ZKFC 是如何實(shí)現(xiàn)主 NameNode 故障自動(dòng)轉(zhuǎn)移的?
- NameNode 存儲(chǔ)了哪些數(shù)據(jù)?
- Zookeeper 在故障轉(zhuǎn)移過(guò)程中是如何起作用的?
- HDFS 的讀寫(xiě)流程?
- 在 Hadoop 2.X 時(shí),HDFS block 塊為什么設(shè)置為 128 M?
本文轉(zhuǎn)載自微信公眾號(hào)「小林玩大數(shù)據(jù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系小林玩大數(shù)據(jù)公眾號(hào)。