自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

spark 自己的分布式存儲系統(tǒng) - BlockManager

存儲 存儲軟件 大數(shù)據(jù) Spark 分布式
BlockManager 是 spark 中至關(guān)重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

整體架構(gòu)

BlockManager 是 spark 中至關(guān)重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

BlockManager 是一個嵌入在 spark 中的 key-value型分布式存儲系統(tǒng),是為 spark 量身打造的,

BlockManager 在一個 spark 應用中作為一個本地緩存運行在所有的節(jié)點上, 包括所有 driver 和 executor上。 BlockManager 對本地和遠程提供一致的 get 和set 數(shù)據(jù)塊接口, BlockManager 本身使用不同的存儲方式來存儲這些數(shù)據(jù), 包括 memory, disk, off-heap。

 

上面是一個整體的架構(gòu)圖, BlockManagerMaster擁有BlockManagerMasterEndpoint 的actor和所有BlockManagerSlaveEndpoint的ref, 可以通過這些引用對 slave 下達命令

executor 節(jié)點上的BlockManagerMaster 則擁有BlockManagerMasterEndpoint的ref和自身BlockManagerSlaveEndpoint的actor??梢酝ㄟ^ Master的引用注冊自己。

在master 和 slave 可以正常的通信之后, 就可以根據(jù)設計的交互協(xié)議進行交互, 整個分布式緩存系統(tǒng)也就運轉(zhuǎn)起來了,

初始化

我們知道, sparkEnv 啟動的時候會啟動各個組件, BlockManager 也不例外, 也是這個時候啟動的,

啟動的時候會根據(jù)自己是在 driver 還是 executor 上進行不同的啟動過程,

  1. def registerOrLookupEndpoint( 
  2.         name: String, endpointCreator: => RpcEndpoint): 
  3.       RpcEndpointRef = { 
  4.       if (isDriver) { 
  5.         logInfo("Registering " + name
  6.         rpcEnv.setupEndpoint(name, endpointCreator) 
  7.       } else { 
  8.         RpcUtils.makeDriverRef(name, conf, rpcEnv) 
  9.       } 
  10.     } 

上圖是 sparkEnv 在 master上啟動的時候, 構(gòu)造了一個 BlockManagerMasterEndpoint, 然后把這個Endpoint 注冊在 rpcEnv中, 同時也會啟動自己的 BlockManager

上圖是 sparkEnv 在executor上啟動的時候, 通過 setupEndpointRef 方法獲取到了  BlockManagerMaster的引用 BlockManagerMasterRef, 同時也會啟動自己的 BlockManager,

在 BlockManager 初始化自己的時候, 會向 BlockManagerMasterEndpoint 注冊自己, BlockManagerMasterEndpoint 發(fā)送 registerBlockManager消息,  BlockManagerMasterEndpoint 接受到消息, 把 BlockManagerSlaveEndpoint  的引用 保存在自己的  blockManagerInfo 數(shù)據(jù)結(jié)構(gòu)中以待后用。

分布式協(xié)議

下面的一個表格是 master 和 slave 接受到各種類型的消息, 以及接受到消息后,做的處理。

  • BlockManagerMasterEndpoint  接受的消息

  • BlockManagerSlaveEndpoint 接受的消息

根據(jù)以上的協(xié)議, 相信我們可以很清楚的猜測整個交互的流程, 一般過程應該是這樣的, slave的 BlockManager  在自己接的上存儲一個 Block, 然后把這個 BlockId 匯報到master的BlockManager , 經(jīng)過 cache, shuffle 或者 Broadcast后,別的節(jié)點需要上一步的Block的時候, 會到 master 獲取數(shù)據(jù)所在位置, 然后去相應節(jié)點上去 fetch。

存儲層

在RDD層面上我們了解到RDD是由不同的partition組成的,我們所進行的transformation和action是在partition上面進行的;而在storage模塊內(nèi)部,RDD又被視為由不同的block組成,對于RDD的存取是以block為單位進行的,本質(zhì)上partition和block是等價的,只是看待的角度不同。在Spark storage模塊中中存取數(shù)據(jù)的最小單位是block,所有的操作都是以block為單位進行的。

 

BlockManager對象被創(chuàng)建的時候會創(chuàng)建出MemoryStore和DiskStore對象用以存取block,如果內(nèi)存中擁有足夠的內(nèi)存, 就 使用 MemoryStore存儲,  如果 不夠, 就 spill 到 磁盤中, 通過 DiskStore進行存儲。

  • DiskStore 有一個DiskBlockManager,DiskBlockManager 主要用來創(chuàng)建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個邏輯 block 通過 BlockId 映射到一個磁盤上的文件。 在 DiskStore 中會調(diào)用  diskManager.getFile 方法, 如果子文件夾不存在,會進行創(chuàng)建, 文件夾的命名方式為(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一個隨機數(shù)), 所有的block都會存儲在所創(chuàng)建的folder里面。
  • MemoryStore 相對于DiskStore需要根據(jù)block id hash計算出文件路徑并將block存放到對應的文件里面,MemoryStore管理block就顯得非常簡單:MemoryStore內(nèi)部維護了一個hash map來管理所有的block,以block id為key將block存放到hash map中。而從MemoryStore中取得block則非常簡單,只需從hash map中取出block id對應的value即可。

BlockManager 的 PUT 和GET接口

BlockManager 提供了 Put 接口和 Get 接口, 這兩個 api 屏蔽了底層的細節(jié), 我們來看下底層是如何實現(xiàn)的

  • GET操作 如果 local 中存在就直接返回, 從本地獲取一個Block, 會先判斷如果是 useMemory, 直接從內(nèi)存中取出, 如果是 useDisk, 會從磁盤中取出返回, 然后根據(jù)useMemory判斷是否在內(nèi)存中緩存一下,方便下次獲取,  如果local 不存在, 從其他節(jié)點上獲取, 當然元信息是存在 drive上的,要根據(jù)我們上文中提到的 GETlocation 協(xié)議獲取 Block 所在節(jié)點位置, 然后到其他節(jié)點上獲取。
  • PUT操作 操作之前會加鎖來避免多線程的問題, 存儲的時候會根據(jù) 存儲級別, 調(diào)用對應的是 memoryStore 還是  diskStore, 然后在具體存儲器上面調(diào)用 存儲接口。 如果有 replication 需求, 會把數(shù)據(jù)備份到其他的機器上面。

blockManager 和 blockTransferService 關(guān)系

spark 歷史上使用過兩套網(wǎng)絡框架, 最開始的時候, rpc 調(diào)用使用的是 akka, 大文件傳輸使用的是 netty,  后面統(tǒng)一全部使用 netty,  這里的大文件傳輸其實走的是 netty,  在啟動 blockManager的時候會啟動一個 blockTransferService 服務, 這個服務就是用來傳輸大文件用的, 對應的具體類是  NettyBlockTransferService, 這個實例中也會有 BlocakManager的引用, 會啟動一個 NettyBlockRpcServer的 netty Handler, 也擁有 BlocakManager 的引用,  用來提供服務, BlocakManager 根據(jù) BlockId 獲取一個 Block 然后包裝為一個 ManagedBuffer 對象,

當我們需要從遠端獲取一個 Block的時候,就需要 blockTransferService 傳輸大的字節(jié)數(shù)組,

首先需要從 driver上獲取到 Block的真正存儲位置, 然后調(diào)用 blockTransferService 的 fetchBlocks方法, 去其他真正存儲節(jié)點上去fetch數(shù)據(jù), 會從 client 資源池中獲取一個client,  如果是一對一的進行fetch,  使用的是 OneForOneBlockFetcher, 這個Fetcher 是以 Chunks 為單位分別單獨fetch,  每個 Chunks 也就對應一個Block的數(shù)據(jù), 根據(jù)配置,會進行重試直到***重試次數(shù),發(fā)送 OpenBlocks消息,  里面會包裝對應的是哪個  BlockId,  其他節(jié)點服務端會根據(jù) BlockId 從 blockManager中拿到數(shù)據(jù), 然后用來傳輸, 使用的是 netty 的流式傳輸方式, 同時也會有回調(diào)函數(shù),

如果是備份的時候同步上傳一個 Block,  其他節(jié)點服務端會根據(jù),uploadBlock消息中包含的BlockId, 在本地的BlockManager 中冗余存儲一份,

ChunkFetch也有一個類似Stream的概念,ChunkFetch的對象是“一個內(nèi)存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據(jù)StreamChunkId獲取為一個Buffer并返回給Client; 不管是Stream還是ChunkFetch,在Server的內(nèi)存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操作,并且針對Server的ChunkFetch提供一個registerStream接口來注冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]注冊到StreamManager,從而支持遠程Block Fetch操作。

對于ExternalShuffleService(一種單獨shuffle服務進程,對其他計算節(jié)點提供本節(jié)點上面的所有shuffle map輸出),它為遠程Executor提供了一種OpenBlocks的RPC接口,即根據(jù)請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內(nèi)存,并返回加載后的streamId返回給客戶端,從而支持后續(xù)的ChunkFetch的操作。

Partition 與 Block 的關(guān)系

我們都知道, RDD 的運算是基于 partition, 每個 task 代表一個 分區(qū)上一個 stage 內(nèi)的運算閉包, task 被分別調(diào)度到 多個 executor上去運行, 那么是在哪里變成了 Block 呢,  我們以 spark 2.11 源碼為準, 看看這個轉(zhuǎn)變過程,

一個 RDD 調(diào)度到 executor 上會運行調(diào)用 getOrCompute方法,

  1. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 
  2.       readCachedBlock = false 
  3.       computeOrReadCheckpoint(partition, context) 
  4.     }) 

如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

當然獲取的時候是先從本地的 BlockManager 中獲取, 如果本地沒有, 然后再 從 remote 獲取, 先從 driver 上獲取到元數(shù)據(jù) Block的位置, 然后去到真正的節(jié)點上fetch

如果沒有, 就進行計算, 然后根據(jù)存儲級別,存儲到計算節(jié)點本地的BlockManager 的內(nèi)存或磁盤中,

這樣RDD的transformation、action就和block數(shù)據(jù)建立了聯(lián)系,雖然抽象上我們的操作是在partition層面上進行的,但是partition最終還是被映射成為block,因此實際上我們的所有操作都是對block的處理和存取。

blockManager 在 spark 中扮演的角色

blockManager 是非常非常重要的一個 spark 組件, 我們隨便舉幾個例子, 你就知道 BlockManager 多重要了 ,

  • spark  shuffle 的過程總用到了 BlockManager 作為數(shù)據(jù)的中轉(zhuǎn)站
  • spark broadcast 調(diào)度 task 到多個 executor 的時候, broadCast 底層使用的數(shù)據(jù)存儲層
  • spark streaming  一個 ReceiverInputDStream 接受到的數(shù)據(jù)也是先放在 BlockManager 中, 然后封裝為一個 BlockRdd 進行下一步運算的
  • 如果我們 對一個 rdd 進行了cache, cacheManager 也是把數(shù)據(jù)放在了 blockmanager 中, 截斷了計算鏈依賴, 后續(xù)task 運行的時候可以直接從 cacheManager 中獲取到 cacherdd ,不用再從頭計算。

spark cache  與  spark   broadcast task

我隨便舉兩個例子, 看看具體 spark cache 和 spark  broadcast 調(diào)度 task 的時候怎么用的 blockManager的

spark cache

rdd 計算的時候, 首先根據(jù)RDD id和partition index構(gòu)造出block id (rdd_xx_xx), 接著從BlockManager中取出相應的block, 如果該block存在,表示此RDD在之前已經(jīng)被計算過和存儲在BlockManager中,因此取出即可,無需再重新計算。 如果 block 不存在我們可以 計算出來, 然后吧 block 通過   doPutIterator 函數(shù)存儲在 節(jié)點上的 BlockManager上面, 匯報block信息到 driver, 下次如果使用同一個 rdd, 就可以直接從分布式存儲中 直接取出相應的 block

下面看一下源碼

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 
  2.     if (storageLevel != StorageLevel.NONE) { 
  3.       getOrCompute(split, context) 
  4.     } else { 
  5.       computeOrReadCheckpoint(split, context) 
  6.     } 
  7.   } 

如果存儲級別不是 NONE類型就會調(diào)用 getOrCompute 這個我們已經(jīng)看過了,  里面實際調(diào)用  SparkEnv.get.blockManager.getOrElseUpdate 方法, 如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

在  BlockManager 進行存儲后, 會調(diào)用下面的代碼把 匯報block信息到 driver,

  1. private def tryToReportBlockStatus( 
  2.      blockId: BlockId, 
  3.      status: BlockStatus, 
  4.      droppedMemorySize: Long = 0L): Boolean = { 
  5.    val storageLevel = status.storageLevel 
  6.    val inMemSize = Math.max(status.memSize, droppedMemorySize) 
  7.    val onDiskSize = status.diskSize 
  8.    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) 
  9.  } 

實際上上想  masterEndpoint 的引用發(fā)送一條 UpdateBlockInfo消息,  master 會把這個 blockId 對應的 location 放在 driver 上,

同樣的如果一個 Block已經(jīng)計算過了,會到 driver 上獲取到 location 信息

  1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 
  2.    val locs = Random.shuffle(master.getLocations(blockId)) 
  3.    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } 
  4.    preferredLocs ++ otherLocs 
  5.  } 

spark   broadcast task

這個調(diào)度 task 到多個 task 上面過程代碼太多,我就不貼了, 直接說一下流程,

  • DAGScheduler 在  submitMissingTasks 方法提交 task的時候, 會把 task 包裝為一個 Broadcast 類型, 里面使用 TorrentBroadcastFactory 創(chuàng)建一個 TorrentBroadcast 的類型, 使用的是p2p的協(xié)議, 會減輕 master 的壓力,  這個里面會 調(diào)用 writeBlocks 里面把taskBinary  通過 blockManager.putSingle 放在 BlockManager 緩存中
  • ShuffleMapTask 或者 ResultTask,然后調(diào)用 runTask 方法, 里面實際上會調(diào)用 Broadcast 的value 方法, 里面最終調(diào)用了 BlockManager 的 getLocalBytes 或者 getRemoteBytes 方法

blockManager 在  spark streaming 中的應用

  • ReceiverTracker 在啟動的時候,會運行一個 job, 這個job 就是到 各個executor上去啟動 ReceiverSupervisorImpl, 然后啟動各個具體的數(shù)據(jù)接收器,  如果是SocketInputDStream, 就會啟動一個 SocketReceiver,
  • Receiver 接收到數(shù)據(jù)后, 先在 BlockGenerator 中緩存, 等到達一定的大小后,  調(diào)用 BlockManagerBasedBlockHandler 的 storeBlock方法持久化到 BlockManager 中, 然后把數(shù)據(jù)信息匯報到 ReceiverTracker上, 最終 匯總到   ReceivedBlockTracker 中的 timeToAllocatedBlocks中,
  • ReceiverInputDStream compute的時候,  receivedBlockTracker 會根據(jù)時間獲取到  BlockManager 中的元信息,里面最終對應的還是 BlockManager 的存儲位置, 最終獲取到數(shù)據(jù)進行計算,

測試 blockManager

我們做一個簡單的測試,兩端代碼的區(qū)別就是 一個 進行了cache ,一個沒有進行cache。

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" 
  2. file.count()  
  3. file.count() 

我們從日志可以觀察出來, ***段代碼, 兩個 job 中都從 hdfs 中讀取文件, 讀取了兩次,

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz").cache() 
  2. file.count() 
  3. file.count() 

有以下日志

  1. MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 1354.9 MB, free 4.9 GB) 
  2. BlockManager: Found block rdd_1_0 locally 

我們發(fā)現(xiàn)在***次讀取文件后, 把文件 cache 在了 blockManager 中, 下一個 job 運行的時候, 在本地 BlockManager 直接發(fā)現(xiàn)獲取到了 block , 沒有讀取 hdfs 文件 ,

在 spark ui 中也發(fā)現(xiàn)了 cache的 Block, 全部是在內(nèi)存中緩存的, 

責任編輯:武曉燕 來源: spark技術(shù)分享
相關(guān)推薦

2017-04-14 09:48:25

分布式存儲系統(tǒng)

2018-09-29 14:08:04

存儲系統(tǒng)分布式

2017-07-18 09:51:36

文件存儲系統(tǒng)

2017-10-16 10:24:47

LogDevice存儲系統(tǒng)

2017-10-12 09:36:54

分布式存儲系統(tǒng)

2017-10-19 08:45:15

存儲系統(tǒng)HBase

2018-11-20 09:19:58

存儲系統(tǒng)雪崩效應

2017-10-17 08:33:31

存儲系統(tǒng)分布式

2017-12-18 10:47:04

分布式存儲數(shù)據(jù)

2019-05-13 15:20:42

存儲系統(tǒng)算法

2019-10-15 10:59:43

分布式存儲系統(tǒng)

2021-07-04 07:07:06

Ceph分布式存儲架構(gòu)

2018-10-24 11:01:53

分布式存儲系統(tǒng)

2021-08-07 05:00:20

存儲系統(tǒng)

2010-07-02 10:08:12

BigtableGoogle

2013-12-27 10:56:42

分布式對象存儲Sheepdog性能測試

2014-02-19 11:37:57

分布式對象存儲Sheepdog

2018-03-13 08:45:08

存儲系統(tǒng)DHT算法

2018-10-29 12:42:23

Ceph分布式存儲

2025-01-26 11:54:39

分布式存儲系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號