NameNode是用了什么神秘技術(shù)來支撐元數(shù)據(jù)百萬并發(fā)讀寫的
本文轉(zhuǎn)載自微信公眾號(hào)「KK架構(gòu)師」,作者wangkai。轉(zhuǎn)載本文請(qǐng)聯(lián)系KK架構(gòu)師公眾號(hào)。
本文大綱
一、HDFS 是大數(shù)據(jù)的基石
我們都知道,HDFS 是大數(shù)據(jù)存儲(chǔ)的基石,所有的離線數(shù)據(jù)都存儲(chǔ)在 HDFS 上,而 NameNode 是存儲(chǔ)所有元數(shù)據(jù)的地方(所謂元數(shù)據(jù)就是描述數(shù)據(jù)的數(shù)據(jù),比如文件的大小,文件都存儲(chǔ)在哪些 DataNode 上,文件在目錄樹的位置等),所以 NameNode 便成為了 HDFS 最關(guān)鍵的部分。
在離線數(shù)倉中,會(huì)存在很多離線任務(wù),這些離線任務(wù)都要往 HDFS 寫數(shù)據(jù),每次寫數(shù)據(jù)都會(huì)經(jīng)過 NameNode 來保存元數(shù)據(jù)信息,那么 NameNode 勢必會(huì)承擔(dān)非常多的請(qǐng)求。NameNode 作為 HDFS 的核心,肯定自身要保證高可用,數(shù)據(jù)不能一直在內(nèi)存中,要寫到磁盤里。
所以一個(gè)關(guān)鍵的問題來了,NameNode 是用了什么神秘的技術(shù),在保證自身高可用的同時(shí),還能承擔(dān)巨額的讀寫請(qǐng)求?
二、NameNode 高可用是如何實(shí)現(xiàn)的
下面直接來一個(gè) NameNode 高可用的架構(gòu)圖:
然后解釋下如何保證高可用的:
(1)NameNode 只有一個(gè)時(shí)的單點(diǎn)故障問題
如果我們只部署了一個(gè) NameNode,那么這個(gè) NameNode 是有單點(diǎn)故障的問題的。如何解決,再加一個(gè) NameNode 即可;
(2)當(dāng)有兩個(gè) NameNode ,切換時(shí),數(shù)據(jù)如何保持同步
兩個(gè) NameNode 一起工作,某一個(gè) NameNode 掛掉了,另一個(gè) NameNode 接替工作,這件事成立的必要前提是,兩個(gè) NameNode 的數(shù)據(jù)得時(shí)時(shí)刻刻保持一致。
那么如何保持?jǐn)?shù)據(jù)一致,是不是可以在兩個(gè) NameNode 之間搞個(gè)共享的文件系統(tǒng)?仔細(xì)想想也不行,這樣的話,單點(diǎn)故障問題就轉(zhuǎn)移到這個(gè)文件系統(tǒng)上了。
(3)使用多節(jié)點(diǎn)的 JournalNode 作為主備 NameNode 的數(shù)據(jù)同步介質(zhì)
這里引入了 JournalNode 集群,JournalNode 的每個(gè)節(jié)點(diǎn)的數(shù)據(jù)都是一樣的,并且時(shí)刻保持一致。并且只要超過半數(shù)的節(jié)點(diǎn)存活,整個(gè) JournalNode 集群都可以正常提供服務(wù)。
所以,一般會(huì)使用奇數(shù)個(gè)節(jié)點(diǎn)來搭建。(為什么一般不用偶數(shù)個(gè)呢?因?yàn)?3 個(gè)節(jié)點(diǎn)構(gòu)成的集群,可以容忍掛掉一臺(tái)機(jī)器;而 4 個(gè)節(jié)點(diǎn)構(gòu)成的集群,也只能容忍掛掉一臺(tái)機(jī)器。同樣是只能掛掉一臺(tái),為何不選 3 個(gè)節(jié)點(diǎn)的呢,還能節(jié)省資源)。
使用 JournalNode 集群,一個(gè) NameNode 實(shí)時(shí)的往集群寫,另一個(gè) NameNode 也實(shí)時(shí)的讀集群數(shù)據(jù),這樣兩個(gè) NameNode 數(shù)據(jù)就可以保持一致了。
(4)一個(gè) NameNode 掛掉,另一個(gè) NameNode 如何立馬感知并接替工作
首先不能人工參與切換。那如何實(shí)時(shí)監(jiān)聽呢?
首先要再引入一個(gè)關(guān)鍵組件:Zookeeper。當(dāng)兩個(gè) NameNode 同時(shí)啟動(dòng)后,他們都會(huì)去 Zookeeper 上注冊(cè),誰注冊(cè)成功了,誰就能獲得鎖,成為 Active 狀態(tài)的 NameNode。
另外還需要一個(gè)組件:ZKFC,它會(huì)實(shí)時(shí)監(jiān)控 Zookeeper 進(jìn)程狀態(tài),并且會(huì)以心跳的方式實(shí)時(shí)的告訴 Zookeeper NameNode 的狀態(tài)。如果一個(gè) NameNode 進(jìn)程掛了,就會(huì)把 Zookeeper 上的鎖給另外一個(gè) NameNode,讓它成為 Active 的來工作。
三、NameNode 如何既高可用,還能高并發(fā)
1、雙緩沖技術(shù)
NameNode 為了實(shí)現(xiàn)高可用,首先自己內(nèi)存里的數(shù)據(jù)需要寫到磁盤,然后還需要往 JournalNode 里寫數(shù)據(jù)。
所以既然要寫磁盤,還是往兩個(gè)地方寫磁盤,那必然性能會(huì)跟不上的。
所以這里 NameNode 引入了一個(gè)技術(shù),也是本篇文章的重點(diǎn):雙緩沖技術(shù)。
雙緩沖的設(shè)計(jì)理念如下圖:
客戶端不是直接寫磁盤,而是往一個(gè)內(nèi)存結(jié)構(gòu)(Buffer1)里面寫數(shù)據(jù)。當(dāng) Buffer1 達(dá)到一定閾值后,Buffer 1 和 Buffer 2 交換內(nèi)存數(shù)據(jù)。此時(shí) Buffer1 數(shù)據(jù)為空,Buffer2 開始在后臺(tái)默默寫磁盤。
這樣的好處很明顯的,前端只需要進(jìn)行內(nèi)存寫 Buffer1 就行,性能特別高;而 Buffer2 在后臺(tái)默默的同步日志到磁盤即可。
這樣磁盤寫,就轉(zhuǎn)化成為了內(nèi)存寫,速度大大提高了。
2、如何實(shí)現(xiàn)雙緩沖
然而,在真實(shí)環(huán)境不只一個(gè)客戶端是這樣子的:
大數(shù)據(jù)情況下是 N 個(gè)客戶端同時(shí)并發(fā)寫的,在高并發(fā)的情況下,我們必然要去協(xié)調(diào)多個(gè)線程動(dòng)作的一致性,比如往 Buffer1 的寫動(dòng)作,Buffer1 與 Buffer2 數(shù)據(jù)交換的動(dòng)作,Buffer2 寫磁盤的動(dòng)作。
那么我們?cè)撊绾螌?shí)現(xiàn)這樣一個(gè)巧妙的雙緩沖呢?下面的代碼是我從 Hadoop 源碼里抽離出來的關(guān)鍵實(shí)現(xiàn):
- package org.apache.hadoop;
- import java.util.LinkedList;
- public class FSEditLog2 {
- private long txid=0L;
- private DoubleBuffer editLogBuffer=new DoubleBuffer();
- //是否正在刷寫磁盤
- private volatile Boolean isSyncRunning = false;
- private volatile Boolean isWaitSync = false;
- private volatile Long syncMaxTxid = 0L;
- //每個(gè)線程都對(duì)應(yīng)自己的一個(gè)副本
- private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();
- public void logEdit(String content){//mkdir /a
- synchronized (this){//加鎖的目的就是為了事務(wù)ID的唯一,而且是遞增
- txid++;
- localTxid.set(txid);
- EditLog log = new EditLog(txid, content);
- editLogBuffer.write(log);
- }
- logSync();
- }
- private void logSync(){
- synchronized (this){
- if(isSyncRunning){ //是否有人正在把數(shù)據(jù)同步到磁盤上面
- long txid = localTxid.get();
- if(txid <= syncMaxTxid){
- //直接return,不接著干了?
- return;
- }
- if(isWaitSync){
- return;
- }
- isWaitSync = true;
- while(isSyncRunning){
- try {
- wait(2000);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- isWaitSync = false;
- }
- editLogBuffer.setReadyToSync();
- if(editLogBuffer.syncBuffer.size() > 0) {
- syncMaxTxid = editLogBuffer.getSyncMaxTxid();
- }
- isSyncRunning = true;
- } //釋放鎖
- editLogBuffer.flush();
- synchronized (this) {
- isSyncRunning = false;
- notify();
- } //釋放鎖
- }
- /**
- * 把日志抽象成類
- */
- class EditLog{
- //順序遞增
- long txid;
- //操作內(nèi)容 mkdir /a
- String content;
- //構(gòu)造函數(shù)
- public EditLog(long txid,String content){
- this.txid = txid;
- this.content = content;
- }
- //為了測試方便
- @Override
- public String toString() {
- return "EditLog{" +
- "txid=" + txid +
- ", content='" + content + '\'' +
- '}';
- }
- }
- /**
- * 雙緩存方案
- */
- class DoubleBuffer{
- //內(nèi)存1
- LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
- //內(nèi)存2
- LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>();
- /**
- * 把數(shù)據(jù)寫到當(dāng)前內(nèi)存1
- * @param log
- */
- public void write(EditLog log){
- currentBuffer.add(log);
- }
- /**
- * 交換內(nèi)存
- */
- public void setReadyToSync(){
- LinkedList<EditLog> tmp= currentBuffer;
- currentBuffer = syncBuffer;
- syncBuffer = tmp;
- }
- /**
- * 獲取內(nèi)存2里面的日志的最大的事務(wù)編號(hào)
- * @return
- */
- public Long getSyncMaxTxid(){
- return syncBuffer.getLast().txid;
- }
- /**
- * 刷寫磁盤
- */
- public void flush(){
- for(EditLog log:syncBuffer){
- //把數(shù)據(jù)寫到磁盤上
- System.out.println("存入磁盤日志信息:"+log);
- }
- //把內(nèi)存2里面的數(shù)據(jù)要清空
- syncBuffer.clear();
- }
- }
- }
主要的業(yè)務(wù)邏輯就是 40 行,但是真的很巧妙。
1、EditLog
我們先看這個(gè) EditLog 內(nèi)部類,這是對(duì) EditLog 日志的一個(gè)封裝,就兩個(gè)屬性 txid 和 content,分別是日志的事務(wù)id(保證唯一性)和 內(nèi)容。
2、DoubleBuffer
再看這個(gè) DoubleBuffer 雙緩沖類,很簡單,就是在內(nèi)存里面維護(hù)了兩個(gè)有序的 LinkedList,分別是當(dāng)前寫編輯日志的緩沖和同步到磁盤的緩沖,其中的元素就是 EditLog 類。
write 方法就是把一條編輯日志寫到當(dāng)前緩沖里。
setReadyToSync 方法,就是交換兩個(gè)緩沖,也是最簡單的剛學(xué)習(xí) Java 就學(xué)習(xí)過的兩個(gè)變量交換值的方法。
getSyncMaxTxid 方法,獲得正在同步的那個(gè)緩沖去里的最大的事務(wù)id。
flush 方法,遍歷同步的緩沖的每一條編輯日志,寫到磁盤,并最終清空緩沖區(qū)內(nèi)容。
3、主類的一些屬性說明
(1)全局的事務(wù)id
private long txid=0L;
(2)雙緩沖結(jié)構(gòu)
private DoubleBuffer editLogBuffer=new DoubleBuffer();
(3)控制變量
private volatile Boolean isSyncRunning = false; // 是否正在同步數(shù)據(jù)到磁盤
private volatile Boolean isWaitSync = false; // 是否有線程在等待同步數(shù)據(jù)到磁盤完成
private volatile Long syncMaxTxid = 0L; // 當(dāng)前同步的最大日志事務(wù)id
private ThreadLocallocalTxid=new ThreadLocal(); // 每個(gè)線程的線程副本,用來放本線程當(dāng)前寫入的日志事務(wù)id
(4)主邏輯 logEdit 方法
這個(gè)方法是對(duì)外暴露的方法,客戶端往雙緩沖寫數(shù)據(jù)就是用的這個(gè)方法。
假設(shè)當(dāng)前有一個(gè)線程1 進(jìn)到了 logEdit 方法,首先直接把當(dāng)前類實(shí)例加鎖,避免別的線程進(jìn)來,以此來保證編輯日志事務(wù)id的唯一自增性。
- // 全局事務(wù)遞增
- txid++;
- // 往線程本身的變量里設(shè)置事務(wù)id值
- localTxid.set(txid);
- // 構(gòu)造 EditLog 變量
- EditLog log = new EditLog(txid, content);
- // 寫入當(dāng)前的 Buffer
- editLogBuffer.write(log);
當(dāng)它執(zhí)行完了這些之后,釋放鎖,開始執(zhí)行 logSync() 方法。此時(shí)由于釋放了鎖,于是很多線程開始拿到鎖,進(jìn)入了這個(gè)方法中。
假設(shè)有五個(gè)線程進(jìn)來了分別寫了一條日志,于是現(xiàn)在雙緩沖是這樣子的:
好,然后線程1 開始進(jìn)入 logSync 方法,第一步就是使用當(dāng)前類的實(shí)例加了鎖,保證只有一個(gè)線程進(jìn)來。
檢查 isSyncRunning 變量是否為 true,目前是 false,跳過這個(gè)方法。
開始執(zhí)行這個(gè) editLogBuffer.setReadyToSync(); 方法,于是雙緩沖的數(shù)據(jù)直接被交換了。
然后獲得了全局最大的id,當(dāng)前是 5,賦值給了 syncMaxTxid 變量
- if(editLogBuffer.syncBuffer.size() > 0) {
- syncMaxTxid = editLogBuffer.getSyncMaxTxid();
- }
然后 isSyncRunning = true; 把這個(gè)變量置為 true,表示正在同步數(shù)據(jù)到磁盤。此時(shí)釋放鎖。
然后 線程 1 開始執(zhí)行數(shù)據(jù)同步到磁盤的動(dòng)作:editLogBuffer.flush() ,這個(gè)動(dòng)作肯定耗費(fèi)的時(shí)間比較久,基本是在 ms 級(jí)別。
此時(shí)我們假設(shè) 線程2 爭搶到了鎖,進(jìn)入到了 logSync 方法。
- // 線程2 判斷 是否有人正在把數(shù)據(jù)同步到磁盤上面,這個(gè)值被線程 1 改為 true 了
- // 進(jìn)入到 if 方法
- if(isSyncRunning){
- // 獲得到自己線程的事務(wù)id,為 2
- long txid = localTxid.get();
- // 2 是否小于 5 ?小于,直接返回,因?yàn)榇藭r(shí) 5 已經(jīng)正在被同步到磁盤了
- if(txid <= syncMaxTxid){
- return;
- }
- if(isWaitSync){
- return;
- }
- isWaitSync = true;
- while(isSyncRunning){
- try {
- wait(2000);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- isWaitSync = false;
- }
線程2 由于自身的編輯日志的事務(wù)id 小于當(dāng)前正在同步的最大的事務(wù)id,所以直接返回了,然后線程3 ,線程4,線程5 進(jìn)來都是這樣,直接 return 返回。
假設(shè)線程6 此時(shí)進(jìn)來,當(dāng)前雙緩沖狀態(tài)是這樣的
下面線程 6 干的活,參考下面代碼里的注釋:
- // 線程6 判斷是否有人正在把數(shù)據(jù)同步到磁盤上面,這個(gè)值被線程 1 改為 true 了
- // 進(jìn)入到 if 方法
- if(isSyncRunning){
- // 獲得到自己線程的事務(wù)id,為 6
- long txid = localTxid.get();
- // 6 是否小于 5 ,不小于繼續(xù)執(zhí)行
- if(txid <= syncMaxTxid){
- return;
- }
- // 這個(gè)值為 false,繼續(xù)執(zhí)行
- if(isWaitSync){
- return;
- }
- // 把 isWaitSync 設(shè)置為 true
- isWaitSync = true;
- // 這個(gè)值被線程1置為了 true,所以這里在死循環(huán)
- while(isSyncRunning){
- try {
- // 等待 2s,wait 會(huì)釋放鎖,同時(shí)線程 6 進(jìn)入睡眠中
- wait(2000);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- isWaitSync = false;
- }
可以看到 線程 6 在 while 循環(huán)里無限等待數(shù)據(jù)同步到磁盤完畢。然后由于線程 6 把 isWaitSync 值改為了 true,線程 6 在等待期間釋放鎖,被其他線程搶到之后,其他線程由于 isWaitSync 為true,直接返回了。
當(dāng)過了一會(huì)兒,線程1 把第二個(gè) Buffer 同步到磁盤完畢后,線程1 會(huì)執(zhí)行這些代碼
- synchronized (this) {
- isSyncRunning = false;
- notify();
- } //釋放鎖
把 isSyncRunning 變量置為 false,同時(shí)調(diào)用 notify(),通知線程 6 ,你可以繼續(xù)參與鎖的競爭了。
然后線程6 ,從 wait 中醒來,重新參與鎖競爭,繼續(xù)執(zhí)行接下來的代碼。此時(shí) isSyncRunning 已經(jīng)為 false,所以它跳出了 while 循環(huán),把 isWaitSync 置為了 false。
然后它開始執(zhí)行:交換緩沖區(qū),給最大的事務(wù)id(此時(shí)為6 )賦值,把 isSyncRunning 賦值為 true。
- editLogBuffer.setReadyToSync();
- if(editLogBuffer.syncBuffer.size() > 0) {
- syncMaxTxid = editLogBuffer.getSyncMaxTxid();
- }
- isSyncRunning = true;
執(zhí)行完了之后,釋放鎖,開始執(zhí)行Buffer2 的同步。然后所有的線程就按照上面的方式有序的工作。
這段幾十行的代碼很精煉,值得反復(fù)推敲,總結(jié)下來如下:
(1)寫緩沖到內(nèi)存 和 同步數(shù)據(jù)到磁盤分開,互不影響和干擾;
(2)使用 synchronize ,wait 和 notify 來保證多線程有序進(jìn)行工作;
(3)當(dāng)在同步數(shù)據(jù)到磁盤中的時(shí)候,其他爭搶到鎖進(jìn)來準(zhǔn)備同步數(shù)據(jù)的線程只能等待;
(4)線程使用 ThreadLocal 變量,來記錄自身當(dāng)前的事務(wù)id,如果小于當(dāng)前正在同步的最大事務(wù)id,則不同步;
(5)有線程在等待同步數(shù)據(jù)的時(shí)候,其他線程寫完 editLog 到內(nèi)存后直接返回;
四、最后的總結(jié)
本文詳細(xì)探討了 HDFS 在大數(shù)據(jù)中基石的地位,以及如何保障 NameNode 高可用的運(yùn)行。
NameNode 在高可用運(yùn)行時(shí),同時(shí)是如何保證高并發(fā)讀寫操作的。雙緩沖在其中起到了核心的作用,把寫數(shù)據(jù)和同步數(shù)據(jù)到磁盤分離開,互不影響。
同時(shí)我們還剝離了一段核心雙緩沖的實(shí)現(xiàn)代碼,仔細(xì)分析了實(shí)現(xiàn)原理。這短短的幾十行代碼,可謂綜合利用了多線程高并發(fā)的知識(shí),耐人尋味。