自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

NameNode是用了什么神秘技術(shù)來支撐元數(shù)據(jù)百萬并發(fā)讀寫的

大數(shù)據(jù)
我們都知道,HDFS 是大數(shù)據(jù)存儲(chǔ)的基石,所有的離線數(shù)據(jù)都存儲(chǔ)在 HDFS 上,而 NameNode 是存儲(chǔ)所有元數(shù)據(jù)的地方(所謂元數(shù)據(jù)就是描述數(shù)據(jù)的數(shù)據(jù),比如文件的大小,文件都存儲(chǔ)在哪些 DataNode 上,文件在目錄樹的位置等),所以 NameNode 便成為了 HDFS 最關(guān)鍵的部分。

 [[390083]]

本文轉(zhuǎn)載自微信公眾號(hào)「KK架構(gòu)師」,作者wangkai。轉(zhuǎn)載本文請(qǐng)聯(lián)系KK架構(gòu)師公眾號(hào)。  

本文大綱 

一、HDFS 是大數(shù)據(jù)的基石

我們都知道,HDFS 是大數(shù)據(jù)存儲(chǔ)的基石,所有的離線數(shù)據(jù)都存儲(chǔ)在 HDFS 上,而 NameNode 是存儲(chǔ)所有元數(shù)據(jù)的地方(所謂元數(shù)據(jù)就是描述數(shù)據(jù)的數(shù)據(jù),比如文件的大小,文件都存儲(chǔ)在哪些 DataNode 上,文件在目錄樹的位置等),所以 NameNode 便成為了 HDFS 最關(guān)鍵的部分。

在離線數(shù)倉中,會(huì)存在很多離線任務(wù),這些離線任務(wù)都要往 HDFS 寫數(shù)據(jù),每次寫數(shù)據(jù)都會(huì)經(jīng)過 NameNode 來保存元數(shù)據(jù)信息,那么 NameNode 勢必會(huì)承擔(dān)非常多的請(qǐng)求。NameNode 作為 HDFS 的核心,肯定自身要保證高可用,數(shù)據(jù)不能一直在內(nèi)存中,要寫到磁盤里。

所以一個(gè)關(guān)鍵的問題來了,NameNode 是用了什么神秘的技術(shù),在保證自身高可用的同時(shí),還能承擔(dān)巨額的讀寫請(qǐng)求?

二、NameNode 高可用是如何實(shí)現(xiàn)的

下面直接來一個(gè) NameNode 高可用的架構(gòu)圖:

然后解釋下如何保證高可用的:

(1)NameNode 只有一個(gè)時(shí)的單點(diǎn)故障問題

如果我們只部署了一個(gè) NameNode,那么這個(gè) NameNode 是有單點(diǎn)故障的問題的。如何解決,再加一個(gè) NameNode 即可;

(2)當(dāng)有兩個(gè) NameNode ,切換時(shí),數(shù)據(jù)如何保持同步

兩個(gè) NameNode 一起工作,某一個(gè) NameNode 掛掉了,另一個(gè) NameNode 接替工作,這件事成立的必要前提是,兩個(gè) NameNode 的數(shù)據(jù)得時(shí)時(shí)刻刻保持一致。

那么如何保持?jǐn)?shù)據(jù)一致,是不是可以在兩個(gè) NameNode 之間搞個(gè)共享的文件系統(tǒng)?仔細(xì)想想也不行,這樣的話,單點(diǎn)故障問題就轉(zhuǎn)移到這個(gè)文件系統(tǒng)上了。

(3)使用多節(jié)點(diǎn)的 JournalNode 作為主備 NameNode 的數(shù)據(jù)同步介質(zhì)

這里引入了 JournalNode 集群,JournalNode 的每個(gè)節(jié)點(diǎn)的數(shù)據(jù)都是一樣的,并且時(shí)刻保持一致。并且只要超過半數(shù)的節(jié)點(diǎn)存活,整個(gè) JournalNode 集群都可以正常提供服務(wù)。

所以,一般會(huì)使用奇數(shù)個(gè)節(jié)點(diǎn)來搭建。(為什么一般不用偶數(shù)個(gè)呢?因?yàn)?3 個(gè)節(jié)點(diǎn)構(gòu)成的集群,可以容忍掛掉一臺(tái)機(jī)器;而 4 個(gè)節(jié)點(diǎn)構(gòu)成的集群,也只能容忍掛掉一臺(tái)機(jī)器。同樣是只能掛掉一臺(tái),為何不選 3 個(gè)節(jié)點(diǎn)的呢,還能節(jié)省資源)。

使用 JournalNode 集群,一個(gè) NameNode 實(shí)時(shí)的往集群寫,另一個(gè) NameNode 也實(shí)時(shí)的讀集群數(shù)據(jù),這樣兩個(gè) NameNode 數(shù)據(jù)就可以保持一致了。

(4)一個(gè) NameNode 掛掉,另一個(gè) NameNode 如何立馬感知并接替工作

首先不能人工參與切換。那如何實(shí)時(shí)監(jiān)聽呢?

首先要再引入一個(gè)關(guān)鍵組件:Zookeeper。當(dāng)兩個(gè) NameNode 同時(shí)啟動(dòng)后,他們都會(huì)去 Zookeeper 上注冊(cè),誰注冊(cè)成功了,誰就能獲得鎖,成為 Active 狀態(tài)的 NameNode。

另外還需要一個(gè)組件:ZKFC,它會(huì)實(shí)時(shí)監(jiān)控 Zookeeper 進(jìn)程狀態(tài),并且會(huì)以心跳的方式實(shí)時(shí)的告訴 Zookeeper NameNode 的狀態(tài)。如果一個(gè) NameNode 進(jìn)程掛了,就會(huì)把 Zookeeper 上的鎖給另外一個(gè) NameNode,讓它成為 Active 的來工作。

三、NameNode 如何既高可用,還能高并發(fā)

1、雙緩沖技術(shù)

NameNode 為了實(shí)現(xiàn)高可用,首先自己內(nèi)存里的數(shù)據(jù)需要寫到磁盤,然后還需要往 JournalNode 里寫數(shù)據(jù)。

所以既然要寫磁盤,還是往兩個(gè)地方寫磁盤,那必然性能會(huì)跟不上的。

所以這里 NameNode 引入了一個(gè)技術(shù),也是本篇文章的重點(diǎn):雙緩沖技術(shù)。

雙緩沖的設(shè)計(jì)理念如下圖:

客戶端不是直接寫磁盤,而是往一個(gè)內(nèi)存結(jié)構(gòu)(Buffer1)里面寫數(shù)據(jù)。當(dāng) Buffer1 達(dá)到一定閾值后,Buffer 1 和 Buffer 2 交換內(nèi)存數(shù)據(jù)。此時(shí) Buffer1 數(shù)據(jù)為空,Buffer2 開始在后臺(tái)默默寫磁盤。

這樣的好處很明顯的,前端只需要進(jìn)行內(nèi)存寫 Buffer1 就行,性能特別高;而 Buffer2 在后臺(tái)默默的同步日志到磁盤即可。

這樣磁盤寫,就轉(zhuǎn)化成為了內(nèi)存寫,速度大大提高了。

2、如何實(shí)現(xiàn)雙緩沖

然而,在真實(shí)環(huán)境不只一個(gè)客戶端是這樣子的:

大數(shù)據(jù)情況下是 N 個(gè)客戶端同時(shí)并發(fā)寫的,在高并發(fā)的情況下,我們必然要去協(xié)調(diào)多個(gè)線程動(dòng)作的一致性,比如往 Buffer1 的寫動(dòng)作,Buffer1 與 Buffer2 數(shù)據(jù)交換的動(dòng)作,Buffer2 寫磁盤的動(dòng)作。

那么我們?cè)撊绾螌?shí)現(xiàn)這樣一個(gè)巧妙的雙緩沖呢?下面的代碼是我從 Hadoop 源碼里抽離出來的關(guān)鍵實(shí)現(xiàn):

  1. package org.apache.hadoop; 
  2.  
  3. import java.util.LinkedList; 
  4.  
  5. public class FSEditLog2 { 
  6.     private long txid=0L; 
  7.     private DoubleBuffer editLogBuffer=new DoubleBuffer(); 
  8.     //是否正在刷寫磁盤 
  9.     private volatile Boolean isSyncRunning = false
  10.     private volatile Boolean isWaitSync = false
  11.  
  12.     private volatile Long syncMaxTxid = 0L; 
  13.  
  14.     //每個(gè)線程都對(duì)應(yīng)自己的一個(gè)副本 
  15.     private ThreadLocal<Long> localTxid=new ThreadLocal<Long>(); 
  16.  
  17.     public void logEdit(String content){//mkdir /a 
  18.         synchronized (this){//加鎖的目的就是為了事務(wù)ID的唯一,而且是遞增 
  19.             txid++; 
  20.             localTxid.set(txid); 
  21.             EditLog log = new EditLog(txid, content); 
  22.             editLogBuffer.write(log); 
  23.         } 
  24.         logSync(); 
  25.     } 
  26.  
  27.     private  void logSync(){ 
  28.         synchronized (this){ 
  29.             if(isSyncRunning){ //是否有人正在把數(shù)據(jù)同步到磁盤上面 
  30.                 long txid = localTxid.get(); 
  31.                 if(txid <= syncMaxTxid){ 
  32.                     //直接return,不接著干了? 
  33.                     return
  34.                 } 
  35.                 if(isWaitSync){ 
  36.                     return
  37.                 } 
  38.                 isWaitSync = true
  39.  
  40.                 while(isSyncRunning){ 
  41.                     try { 
  42.                         wait(2000); 
  43.                     }catch (Exception e){ 
  44.                         e.printStackTrace(); 
  45.                     } 
  46.                 } 
  47.                 isWaitSync = false
  48.             } 
  49.  
  50.             editLogBuffer.setReadyToSync(); 
  51.             if(editLogBuffer.syncBuffer.size() > 0) { 
  52.                 syncMaxTxid = editLogBuffer.getSyncMaxTxid(); 
  53.             } 
  54.  
  55.             isSyncRunning = true
  56.  
  57.         } //釋放鎖 
  58.  
  59.         editLogBuffer.flush(); 
  60.         synchronized (this) { 
  61.             isSyncRunning = false
  62.             notify(); 
  63.         } //釋放鎖 
  64.     } 
  65.  
  66.  
  67.     /** 
  68.      * 把日志抽象成類 
  69.      */ 
  70.     class EditLog{ 
  71.         //順序遞增 
  72.         long txid; 
  73.         //操作內(nèi)容  mkdir /a 
  74.         String content; 
  75.  
  76.         //構(gòu)造函數(shù) 
  77.         public EditLog(long txid,String content){ 
  78.             this.txid = txid; 
  79.             this.content = content; 
  80.         } 
  81.  
  82.         //為了測試方便 
  83.         @Override 
  84.         public String toString() { 
  85.             return "EditLog{" + 
  86.                     "txid=" + txid + 
  87.                     ", content='" + content + '\'' + 
  88.                     '}'
  89.         } 
  90.     } 
  91.  
  92.  
  93.     /** 
  94.      * 雙緩存方案 
  95.      */ 
  96.     class DoubleBuffer{ 
  97.         //內(nèi)存1 
  98.         LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>(); 
  99.         //內(nèi)存2 
  100.         LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>(); 
  101.  
  102.         /** 
  103.          * 把數(shù)據(jù)寫到當(dāng)前內(nèi)存1 
  104.          * @param log 
  105.          */ 
  106.         public void write(EditLog log){ 
  107.             currentBuffer.add(log); 
  108.         } 
  109.  
  110.         /** 
  111.          * 交換內(nèi)存 
  112.          */ 
  113.         public void setReadyToSync(){ 
  114.             LinkedList<EditLog> tmp= currentBuffer; 
  115.             currentBuffer = syncBuffer; 
  116.             syncBuffer = tmp; 
  117.         } 
  118.  
  119.         /** 
  120.          * 獲取內(nèi)存2里面的日志的最大的事務(wù)編號(hào) 
  121.          * @return 
  122.          */ 
  123.         public Long getSyncMaxTxid(){ 
  124.             return syncBuffer.getLast().txid; 
  125.         } 
  126.  
  127.  
  128.         /** 
  129.          * 刷寫磁盤 
  130.           */ 
  131.         public void flush(){ 
  132.             for(EditLog log:syncBuffer){ 
  133.                 //把數(shù)據(jù)寫到磁盤上 
  134.                 System.out.println("存入磁盤日志信息:"+log); 
  135.             } 
  136.  
  137.             //把內(nèi)存2里面的數(shù)據(jù)要清空 
  138.             syncBuffer.clear(); 
  139.         } 
  140.     } 

主要的業(yè)務(wù)邏輯就是 40 行,但是真的很巧妙。

1、EditLog

我們先看這個(gè) EditLog 內(nèi)部類,這是對(duì) EditLog 日志的一個(gè)封裝,就兩個(gè)屬性 txid 和 content,分別是日志的事務(wù)id(保證唯一性)和 內(nèi)容。

2、DoubleBuffer

再看這個(gè) DoubleBuffer 雙緩沖類,很簡單,就是在內(nèi)存里面維護(hù)了兩個(gè)有序的 LinkedList,分別是當(dāng)前寫編輯日志的緩沖和同步到磁盤的緩沖,其中的元素就是 EditLog 類。

write 方法就是把一條編輯日志寫到當(dāng)前緩沖里。

setReadyToSync 方法,就是交換兩個(gè)緩沖,也是最簡單的剛學(xué)習(xí) Java 就學(xué)習(xí)過的兩個(gè)變量交換值的方法。

getSyncMaxTxid 方法,獲得正在同步的那個(gè)緩沖去里的最大的事務(wù)id。

flush 方法,遍歷同步的緩沖的每一條編輯日志,寫到磁盤,并最終清空緩沖區(qū)內(nèi)容。

3、主類的一些屬性說明

(1)全局的事務(wù)id

private long txid=0L;

(2)雙緩沖結(jié)構(gòu)

private DoubleBuffer editLogBuffer=new DoubleBuffer();

(3)控制變量

private volatile Boolean isSyncRunning = false; // 是否正在同步數(shù)據(jù)到磁盤

private volatile Boolean isWaitSync = false; // 是否有線程在等待同步數(shù)據(jù)到磁盤完成

private volatile Long syncMaxTxid = 0L; // 當(dāng)前同步的最大日志事務(wù)id

private ThreadLocallocalTxid=new ThreadLocal(); // 每個(gè)線程的線程副本,用來放本線程當(dāng)前寫入的日志事務(wù)id

(4)主邏輯 logEdit 方法

這個(gè)方法是對(duì)外暴露的方法,客戶端往雙緩沖寫數(shù)據(jù)就是用的這個(gè)方法。

假設(shè)當(dāng)前有一個(gè)線程1 進(jìn)到了 logEdit 方法,首先直接把當(dāng)前類實(shí)例加鎖,避免別的線程進(jìn)來,以此來保證編輯日志事務(wù)id的唯一自增性。

  1. // 全局事務(wù)遞增 
  2. txid++; 
  3. // 往線程本身的變量里設(shè)置事務(wù)id值 
  4. localTxid.set(txid); 
  5. // 構(gòu)造 EditLog 變量 
  6. EditLog log = new EditLog(txid, content); 
  7. // 寫入當(dāng)前的 Buffer 
  8. editLogBuffer.write(log); 

當(dāng)它執(zhí)行完了這些之后,釋放鎖,開始執(zhí)行 logSync() 方法。此時(shí)由于釋放了鎖,于是很多線程開始拿到鎖,進(jìn)入了這個(gè)方法中。

假設(shè)有五個(gè)線程進(jìn)來了分別寫了一條日志,于是現(xiàn)在雙緩沖是這樣子的:

好,然后線程1 開始進(jìn)入 logSync 方法,第一步就是使用當(dāng)前類的實(shí)例加了鎖,保證只有一個(gè)線程進(jìn)來。

檢查 isSyncRunning 變量是否為 true,目前是 false,跳過這個(gè)方法。

開始執(zhí)行這個(gè) editLogBuffer.setReadyToSync(); 方法,于是雙緩沖的數(shù)據(jù)直接被交換了。

然后獲得了全局最大的id,當(dāng)前是 5,賦值給了 syncMaxTxid 變量

  1. if(editLogBuffer.syncBuffer.size() > 0) { 
  2.     syncMaxTxid = editLogBuffer.getSyncMaxTxid(); 

然后 isSyncRunning = true; 把這個(gè)變量置為 true,表示正在同步數(shù)據(jù)到磁盤。此時(shí)釋放鎖。

然后 線程 1 開始執(zhí)行數(shù)據(jù)同步到磁盤的動(dòng)作:editLogBuffer.flush() ,這個(gè)動(dòng)作肯定耗費(fèi)的時(shí)間比較久,基本是在 ms 級(jí)別。

此時(shí)我們假設(shè) 線程2 爭搶到了鎖,進(jìn)入到了 logSync 方法。

  1. // 線程2 判斷 是否有人正在把數(shù)據(jù)同步到磁盤上面,這個(gè)值被線程 1 改為 true 了 
  2. // 進(jìn)入到 if 方法 
  3. if(isSyncRunning){  
  4.     // 獲得到自己線程的事務(wù)id,為 2 
  5.     long txid = localTxid.get(); 
  6.     // 2 是否小于 5 ?小于,直接返回,因?yàn)榇藭r(shí) 5 已經(jīng)正在被同步到磁盤了 
  7.     if(txid <= syncMaxTxid){ 
  8.         return
  9.     } 
  10.     if(isWaitSync){ 
  11.         return
  12.     } 
  13.     isWaitSync = true
  14.  
  15.     while(isSyncRunning){ 
  16.         try { 
  17.             wait(2000); 
  18.         }catch (Exception e){ 
  19.             e.printStackTrace(); 
  20.         } 
  21.     } 
  22.     isWaitSync = false

線程2 由于自身的編輯日志的事務(wù)id 小于當(dāng)前正在同步的最大的事務(wù)id,所以直接返回了,然后線程3 ,線程4,線程5 進(jìn)來都是這樣,直接 return 返回。

假設(shè)線程6 此時(shí)進(jìn)來,當(dāng)前雙緩沖狀態(tài)是這樣的

下面線程 6 干的活,參考下面代碼里的注釋:

  1. // 線程6 判斷是否有人正在把數(shù)據(jù)同步到磁盤上面,這個(gè)值被線程 1 改為 true 了 
  2. // 進(jìn)入到 if 方法 
  3. if(isSyncRunning){  
  4.     // 獲得到自己線程的事務(wù)id,為 6 
  5.     long txid = localTxid.get(); 
  6.     // 6 是否小于 5 ,不小于繼續(xù)執(zhí)行 
  7.     if(txid <= syncMaxTxid){ 
  8.         return
  9.     } 
  10.     // 這個(gè)值為 false,繼續(xù)執(zhí)行 
  11.     if(isWaitSync){ 
  12.         return
  13.     } 
  14.     // 把 isWaitSync 設(shè)置為 true 
  15.     isWaitSync = true
  16.  
  17.     // 這個(gè)值被線程1置為了 true,所以這里在死循環(huán) 
  18.     while(isSyncRunning){ 
  19.         try { 
  20.             // 等待 2s,wait 會(huì)釋放鎖,同時(shí)線程 6 進(jìn)入睡眠中 
  21.             wait(2000); 
  22.         }catch (Exception e){ 
  23.             e.printStackTrace(); 
  24.         } 
  25.     } 
  26.     isWaitSync = false

可以看到 線程 6 在 while 循環(huán)里無限等待數(shù)據(jù)同步到磁盤完畢。然后由于線程 6 把 isWaitSync 值改為了 true,線程 6 在等待期間釋放鎖,被其他線程搶到之后,其他線程由于 isWaitSync 為true,直接返回了。

當(dāng)過了一會(huì)兒,線程1 把第二個(gè) Buffer 同步到磁盤完畢后,線程1 會(huì)執(zhí)行這些代碼

  1. synchronized (this) { 
  2.     isSyncRunning = false
  3.     notify(); 
  4. } //釋放鎖 

把 isSyncRunning 變量置為 false,同時(shí)調(diào)用 notify(),通知線程 6 ,你可以繼續(xù)參與鎖的競爭了。

然后線程6 ,從 wait 中醒來,重新參與鎖競爭,繼續(xù)執(zhí)行接下來的代碼。此時(shí) isSyncRunning 已經(jīng)為 false,所以它跳出了 while 循環(huán),把 isWaitSync 置為了 false。

然后它開始執(zhí)行:交換緩沖區(qū),給最大的事務(wù)id(此時(shí)為6 )賦值,把 isSyncRunning 賦值為 true。

  1. editLogBuffer.setReadyToSync(); 
  2. if(editLogBuffer.syncBuffer.size() > 0) { 
  3.     syncMaxTxid = editLogBuffer.getSyncMaxTxid(); 
  4.  
  5. isSyncRunning = true

執(zhí)行完了之后,釋放鎖,開始執(zhí)行Buffer2 的同步。然后所有的線程就按照上面的方式有序的工作。

這段幾十行的代碼很精煉,值得反復(fù)推敲,總結(jié)下來如下:

(1)寫緩沖到內(nèi)存 和 同步數(shù)據(jù)到磁盤分開,互不影響和干擾;

(2)使用 synchronize ,wait 和 notify 來保證多線程有序進(jìn)行工作;

(3)當(dāng)在同步數(shù)據(jù)到磁盤中的時(shí)候,其他爭搶到鎖進(jìn)來準(zhǔn)備同步數(shù)據(jù)的線程只能等待;

(4)線程使用 ThreadLocal 變量,來記錄自身當(dāng)前的事務(wù)id,如果小于當(dāng)前正在同步的最大事務(wù)id,則不同步;

(5)有線程在等待同步數(shù)據(jù)的時(shí)候,其他線程寫完 editLog 到內(nèi)存后直接返回;

四、最后的總結(jié)

本文詳細(xì)探討了 HDFS 在大數(shù)據(jù)中基石的地位,以及如何保障 NameNode 高可用的運(yùn)行。

NameNode 在高可用運(yùn)行時(shí),同時(shí)是如何保證高并發(fā)讀寫操作的。雙緩沖在其中起到了核心的作用,把寫數(shù)據(jù)和同步數(shù)據(jù)到磁盤分離開,互不影響。

同時(shí)我們還剝離了一段核心雙緩沖的實(shí)現(xiàn)代碼,仔細(xì)分析了實(shí)現(xiàn)原理。這短短的幾十行代碼,可謂綜合利用了多線程高并發(fā)的知識(shí),耐人尋味。

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)師
相關(guān)推薦

2019-09-23 08:46:04

零拷貝 CPU內(nèi)存

2019-02-27 09:46:05

數(shù)據(jù)庫架構(gòu)并發(fā)

2011-08-23 17:12:22

MySQL支撐百萬級(jí)流

2019-09-23 13:03:42

NameNode元數(shù)據(jù)文件

2019-05-06 11:12:18

Redis高并發(fā)單線程

2021-04-25 19:00:55

大數(shù)據(jù)視頻分析人工智能

2017-11-10 09:16:07

直播彈幕系統(tǒng)

2019-12-31 10:33:57

Netty高性能內(nèi)存

2023-10-25 11:20:09

快手電商混合云容器云

2019-10-16 17:03:22

架構(gòu)技術(shù)棧微信半月刊

2019-07-18 08:10:01

Java開發(fā)代碼

2022-08-19 06:42:11

數(shù)據(jù)庫高并系統(tǒng)

2019-10-11 10:23:13

ClassLoaderJavaJVM

2019-05-07 09:44:45

Redis高并發(fā)模型

2025-02-10 08:20:09

2022-01-14 11:54:15

區(qū)塊鏈元宇宙技術(shù)

2020-11-01 19:00:55

開源區(qū)塊鏈區(qū)塊鏈技術(shù)

2020-10-30 16:20:38

Redis單線程高并發(fā)

2023-03-21 11:10:27

2022-07-03 14:06:27

元宇宙交互技術(shù)AR
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)