自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Master 分配資源并在 Worker上啟動 Executor ,逐行代碼注釋版

開發(fā) 前端
這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

[[432016]]

本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)」,作者wangkai。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)公眾號。

一、回顧一下之前的內(nèi)容

上一次閱讀到了 SparkContext 初始化,繼續(xù)往下之前,先溫故一下之前的內(nèi)容。

這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

首先需要啟動 Spark 集群,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。

啟動好之后,開始提交作業(yè),使用 spark-submit 命令來提交。

  • 首先在提交任務(wù)的機(jī)器上使用 java 命令啟動了一個虛擬機(jī),并且執(zhí)行了主類 SparkSubmit 的 main 方法作為入口。
  • 然后根據(jù)提交到不同的集群,來 new 不同的客戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然后把 java DriverWrapper 這個命令封裝到 RequestSubmmitDriver 消息中,把這個消息發(fā)送給 Master;
  • Master 隨機(jī)找一個滿足資源條件的 Worker 來啟動 Driver,實(shí)際上是在虛擬機(jī)里執(zhí)行 DriverWrapper 的 main 方法;
  • 然后 Worker 開始啟動 Driver,啟動的時候會執(zhí)行用戶提交的 java 包里的 main 方法,然后開始執(zhí)行 SparkContext 的初始化,依次在 Driver 中創(chuàng)建了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的實(shí)例。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。

二、Master 處理應(yīng)用的注冊

接著上次 ClientEndpoint 啟動之后,會向 Master 發(fā)送一個 RegisterApplication 消息,Master 開始處理這個消息。

然后看到 Matster 類處理 RegisterApplication 消息的地方:

可以看到,用應(yīng)用程序的描述和 Driver 的引用創(chuàng)建了一個 Application,然后開始注冊這個 Application。

注冊 Application 很簡單,就是往 Master 的內(nèi)存中加入各種信息,重點(diǎn)來了,把 ApplicationInfo 加入到了 waitingApps 這個結(jié)構(gòu)里,然后 schedule() 方法會遍歷這個列表,為 Application 分配資源,并調(diào)度起來。

然后往 zk 中寫入了 Application 的信息,并且往 Driver 發(fā)送了一個 RegisteredApplication 應(yīng)用已經(jīng)注冊的消息。

接著開始 schedule(),這個方法上次講過,它會遍歷兩個列表,一個是遍歷 waitingDrivers 來啟動 Driver,一個是遍歷 waitingApps,來啟動 Application。

waitingDrivers 列表在客戶端請求啟動 Driver 的時候就處理過了,本次重點(diǎn)看這個方法:

  1. startExecutorsOnWorkers() 

三、Master 對資源的調(diào)度

有以下幾個步驟:

  • 遍歷 waitingApps 的所有 app;
  • 如果 app 需要的核數(shù)小于一個 Executor 可以提供的核數(shù),就不為 app 分配新的 Executor;
  • 過濾出還有可供調(diào)度的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
  • 計算所有 usableWorkers 上要分配多少 CPU;
  • 然后遍歷可用的 Workers,分配資源并執(zhí)行調(diào)度,啟動 Executor。

源碼從 Master 類的 schedule() 方法的最后一行 startExecutorsOnWorkers() 開始:

這個方法主要作用是計算 worker 的 executor 數(shù)量和分配的資源并啟動 executor。

  1. /** 
  2.  * Schedule and launch executors on workers 
  3.  */ 
  4. private def startExecutorsOnWorkers(): Unit = { 
  5.     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 
  6.     // in the queue, then the second app, etc. 
  7.  
  8.     for (app <- waitingApps) { 
  9.         val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) 
  10.         // If the cores left is less than the coresPerExecutor,the cores left will not be allocated 
  11.         if (app.coresLeft >= coresPerExecutor) { 
  12.             // 1. 剩余內(nèi)存大于單個 executor 需要的內(nèi)存 
  13.             // 2. 剩余的內(nèi)核數(shù)大于單個 executor 需要的內(nèi)核數(shù) 
  14.             // 3. 按照內(nèi)核數(shù)從大到小排序 
  15.             // Filter out workers that don't have enough resources to launch an executor 
  16.             val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 
  17.                 .filter(canLaunchExecutor(_, app.desc)) 
  18.                 .sortBy(_.coresFree).reverse 
  19.             val appMayHang = waitingApps.length == 1 && 
  20.                 waitingApps.head.executors.isEmpty && usableWorkers.isEmpty 
  21.             if (appMayHang) { 
  22.                 logWarning(s"App ${app.id} requires more resource than any of Workers could have."
  23.             } 
  24.             // 計算每個 Worker 上可用的 cores 
  25.             val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 
  26.              
  27.             // Now that we've decided how many cores to allocate on each worker, let's allocate them 
  28.             for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 
  29.                 allocateWorkerResourceToExecutors( 
  30.                     app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) 
  31.             } 
  32.         } 
  33.     } 

(1)遍歷 waitingApps,如果 app 還需要的 cpu 核數(shù)大于每個執(zhí)行器的核數(shù),才繼續(xù)分配。

(2)過濾可用的 worker,條件一:該 worker 剩余內(nèi)存大于單個 executor 需要的內(nèi)存;條件二:該 worker 剩余 cpu 核數(shù)大于單個 executor 需要的核數(shù);然后按照可用 cpu核數(shù)從大到小排序。

(3)下面兩個方法是關(guān)鍵的方法

scheduleExecutorsOnWorkers(),用來計算每個 Worker 上可用的 cpu 核數(shù);

allocateWorkerResourceToExecutors() 用來真正在 Worker 上分配 Executor。

四、scheduleExecutorsOnWorkers 計算每個 Worker 可用的核數(shù)

這個方法很長,首先看方法注釋,大致翻譯了一下:

當(dāng)執(zhí)行器分配的 cpu 核數(shù)(spark.executor.cores)被顯示設(shè)置的時候,如果這個 worker 上有足夠的核數(shù)和內(nèi)存的話,那么每個 worker 上可以執(zhí)行多個執(zhí)行器;反之,沒有設(shè)置的時候,每個 worker 上只能啟動一個執(zhí)行器;并且,這個執(zhí)行器會使用 worker 能提供出來的盡可能多的核數(shù);

appA 和 appB 都有一個執(zhí)行器運(yùn)行在 worker1 上。但是 appA 還需要一些 cpu 核,當(dāng) appB 執(zhí)行結(jié)束,釋放了它在 worker1 上的核數(shù)時, 下一次調(diào)度的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上啟動了多個執(zhí)行器。

設(shè)置 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:集群有4個worker,每個worker有16核;用戶請求 3 個執(zhí)行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設(shè)置這個參數(shù),那么每次分配 1 個 cpu核心,每個 worker 輪流分配一個 cpu核,最終 4 個執(zhí)行器分配 12 個核心給每個 executor,4 個 worker 也同樣分配了48個核心,但是最終每個 executor 只有 12核 < 16 核,所以最終沒有執(zhí)行器被啟動。

如果看我的翻譯還是很費(fèi)勁,我就再精簡下:

  • 如果沒有設(shè)置 spark.executor.cores,那么每個 Worker 只能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數(shù);
  • 如果顯示設(shè)置了,那么每個 Worker 可以啟動多個 Executor;

下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再分配 Executor 了。

重點(diǎn)是中間方法后面那一段,遍歷每個 Worker 分配 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直分配,直到 Worker 資源分配完畢。

  1. private def scheduleExecutorsOnWorkers( 
  2.     app: ApplicationInfo, 
  3.     usableWorkers: Array[WorkerInfo], 
  4.     spreadOutApps: Boolean): Array[Int] = { 
  5.     // 每個 executor 的核數(shù) 
  6.     val coresPerExecutor = app.desc.coresPerExecutor 
  7.     // 每個 executor 的最小核數(shù) 為1 
  8.     val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 
  9.     //  每個Worker分配一個Executor? 這個參數(shù)可以控制這個行為 
  10.     val oneExecutorPerWorker = coresPerExecutor.isEmpty 
  11.     //  每個Executor的內(nèi)存 
  12.     val memoryPerExecutor = app.desc.memoryPerExecutorMB 
  13.     val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor 
  14.     // 可用 Worker 的總數(shù) 
  15.      
  16.     val numUsable = usableWorkers.length 
  17.     // 給每個Worker的cores數(shù) 
  18.     val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker 
  19.     // 給每個Worker上新的Executor數(shù) 
  20.     val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker 
  21.     // app 需要的核心數(shù) 和 所有 worker 能提供的核心總數(shù),取最小值 
  22.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum
  23.  
  24.     //  判斷指定的worker是否可以為這個app啟動一個executor 
  25.     /** Return whether the specified worker can launch an executor for this app. */ 
  26.     def canLaunchExecutorForApp(pos: Int): Boolean = { 
  27.         // 如果能提供的核心數(shù) 大于等 executor 需要的最小核心數(shù),則繼續(xù)分配 
  28.         val keepScheduling = coresToAssign >= minCoresPerExecutor 
  29.         // 是否有足夠的核心:當(dāng)前 worker 能提供的核數(shù) 減去 每個 worker 已分配的核心數(shù) ,大于每個 executor最小的核心數(shù) 
  30.         val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 
  31.         // 當(dāng)前 worker 新分配的 executor 個數(shù) 
  32.         val assignedExecutorNum = assignedExecutors(pos) 
  33.  
  34.         //  如果每個worker允許多個executor,就能一直在啟動新的的executor 
  35.         //  如果在這個worker上已經(jīng)有executor,則給這個executor更多的core 
  36.         // If we allow multiple executors per worker, then we can always launch new executors. 
  37.         // Otherwise, if there is already an executor on this worker, just give it more cores. 
  38.  
  39.         // 如果一個 worker 上可以啟動多個 executor  或者 這個 worker 還沒分配 executor 
  40.         val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 
  41.         if (launchingNewExecutor) { 
  42.             // 總共已經(jīng)分配的內(nèi)存 
  43.             val assignedMemory = assignedExecutorNum * memoryPerExecutor 
  44.             // 是否有足夠的內(nèi)存:當(dāng)前worker 的剩余內(nèi)存 減去 已分配的內(nèi)存 大于每個 executor需要的內(nèi)存 
  45.             val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor 
  46.             // 
  47.             val assignedResources = resourceReqsPerExecutor.map { 
  48.                 req => req.resourceName -> req.amount * assignedExecutorNum 
  49.             }.toMap 
  50.             val resourcesFree = usableWorkers(pos).resourcesAmountFree.map { 
  51.                 case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) 
  52.             } 
  53.             val enoughResources = ResourceUtils.resourcesMeetRequirements( 
  54.                 resourcesFree, resourceReqsPerExecutor) 
  55.             // 所有已分配的核數(shù)+app需要的核數(shù)  小于 app的核數(shù)限制 
  56.             val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit 
  57.             keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit 
  58.         } else { 
  59.             // We're adding cores to an existing executor, so no need 
  60.             // to check memory and executor limits 
  61.             keepScheduling && enoughCores 
  62.         } 
  63.     } 
  64.  
  65.     // 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達(dá)到了這個Application的要求 
  66.     // Keep launching executors until no more workers can accommodate any 
  67.     // more executors, or if we have reached this application's limits 
  68.     // 過濾出可以啟動 executor 的 workers 
  69.     var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) 
  70.  
  71.     while (freeWorkers.nonEmpty) { 
  72.         // 遍歷每個 worker 
  73.         freeWorkers.foreach { pos => 
  74.             var keepScheduling = true 
  75.             while (keepScheduling && canLaunchExecutorForApp(pos)) { 
  76.                 coresToAssign -= minCoresPerExecutor 
  77.                 assignedCores(pos) += minCoresPerExecutor 
  78.  
  79.                 //  如果我們在每個worker上啟動一個executor,每次迭代為每個executor增加一個core 
  80.                 //  否則,每次迭代都會為新的executor分配cores 
  81.                 // If we are launching one executor per worker, then every iteration assigns 1 core 
  82.                 // to the executor. Otherwise, every iteration assigns cores to a new executor. 
  83.                 if (oneExecutorPerWorker) { 
  84.                     assignedExecutors(pos) = 1 
  85.                 } else { 
  86.                     assignedExecutors(pos) += 1 
  87.                 } 
  88.  
  89.                 //  如果不使用Spreading out方法,我們會在這個worker上繼續(xù)調(diào)度executor,直到使用它所有的資源 
  90.                 //  否則,就跳轉(zhuǎn)到下一個worker 
  91.                 // Spreading out an application means spreading out its executors across as 
  92.                 // many workers as possible. If we are not spreading outthen we should keep 
  93.                 // scheduling executors on this worker until we use all of its resources. 
  94.                 // Otherwise, just move on to the next worker. 
  95.                 if (spreadOutApps) { 
  96.                     keepScheduling = false 
  97.                 } 
  98.             } 
  99.         } 
  100.         freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) 
  101.     } 
  102.     assignedCores 

接著真正開始在 Worker 上啟動 Executor:

在 launchExecutor 在方法里:

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 
  2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 
  3.     worker.addExecutor(exec
  4.     worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, 
  5.         exec.application.descexec.cores, exec.memory, exec.resources)) 
  6.     exec.application.driver.send( 
  7.         ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 

給 Worker 發(fā)送了一個 LaunchExecutor 消息。

然后給執(zhí)行器對應(yīng)的 Driver 發(fā)送了 ExecutorAdded 消息。

五、總結(jié)

本次我們講了 Master 處理應(yīng)用的注冊,重點(diǎn)是把 app 信息加入到 waitingApps 列表中,然后調(diào)用 schedule() 方法,計算每個 Worker 可用的 cpu核數(shù),并且在 Worker 上啟動執(zhí)行器。

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)
相關(guān)推薦

2011-04-19 13:32:52

2009-12-24 11:04:59

固定分配資源動態(tài)分配資源

2021-08-31 23:09:27

Spark資源分配

2015-04-17 10:28:02

無線頻譜移動通信頻譜

2012-03-09 17:38:17

ibmdw

2012-06-05 08:59:35

Hadoop架構(gòu)服務(wù)器

2010-04-07 15:55:17

無線接入頻段

2014-12-26 10:58:35

托管云托管私有云公共云

2022-06-06 12:02:23

代碼注釋語言

2022-12-12 08:42:06

Java對象棧內(nèi)存

2013-04-17 15:10:07

銳捷寬帶寬帶網(wǎng)絡(luò)

2011-04-19 13:48:55

vCloud Dire

2013-05-21 09:08:24

服務(wù)器虛擬化網(wǎng)卡

2024-10-09 14:25:21

2019-12-20 08:50:21

LinuxKsnip截圖

2021-06-22 16:40:32

鴻蒙HarmonyOS應(yīng)用

2016-03-21 18:56:54

物聯(lián)網(wǎng)IoTIT基礎(chǔ)架構(gòu)

2023-10-24 07:25:10

容器資源云分級

2011-01-26 11:01:37

虛擬機(jī)負(fù)載管理資源分配

2022-04-19 07:47:13

數(shù)據(jù)中心末端資源分配
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號