自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從源碼角度看Spark on yarn client & cluster模式的本質(zhì)區(qū)別

大數(shù)據(jù) Spark
對于yarn-client和yarn-cluster的唯一區(qū)別在于,yarn-client的Driver運行在本地,而AppMaster運行在yarn的一個節(jié)點上,他們之間進(jìn)行遠(yuǎn)程通信,AppMaster只負(fù)責(zé)資源申請和釋放(當(dāng)然還有DelegationToken的刷新),然后等待Driver的完成;而yarn-cluster的Driver則運行在AppMaster所在的container里,Driver和AppMaster是同一個進(jìn)程的兩個不同線程,它們之間也會進(jìn)行通信,AppMaster同樣等待Driver的完成,從而釋放資源。

首先區(qū)分下AppMaster和Driver,任何一個yarn上運行的任務(wù)都必須有一個AppMaster,而任何一個Spark任務(wù)都會有一個Driver,Driver就是運行SparkContext(它會構(gòu)建TaskScheduler和DAGScheduler)的進(jìn)程,當(dāng)然在Driver上你也可以做很多非Spark的事情,這些事情只會在Driver上面執(zhí)行,而由SparkContext上牽引出來的代碼則會由DAGScheduler分析,并形成Job和Stage交由TaskScheduler,再由TaskScheduler交由各Executor分布式執(zhí)行。

所以Driver和AppMaster是兩個完全不同的東西,Driver是控制Spark計算和任務(wù)資源的,而AppMaster是控制yarn app運行和任務(wù)資源的,只不過在Spark on Yarn上,這兩者就出現(xiàn)了交叉,而在standalone模式下,資源則由Driver管理。在Spark on Yarn上,Driver會和AppMaster通信,資源的申請由AppMaster來完成,而任務(wù)的調(diào)度和執(zhí)行則由Driver完成,Driver會通過與AppMaster通信來讓Executor的執(zhí)行具體的任務(wù)。

client與cluster的區(qū)別

對于yarn-client和yarn-cluster的唯一區(qū)別在于,yarn-client的Driver運行在本地,而AppMaster運行在yarn的一個節(jié)點上,他們之間進(jìn)行遠(yuǎn)程通信,AppMaster只負(fù)責(zé)資源申請和釋放(當(dāng)然還有DelegationToken的刷新),然后等待Driver的完成;而yarn-cluster的Driver則運行在AppMaster所在的container里,Driver和AppMaster是同一個進(jìn)程的兩個不同線程,它們之間也會進(jìn)行通信,AppMaster同樣等待Driver的完成,從而釋放資源。

Spark里AppMaster的實現(xiàn):org.apache.spark.deploy.yarn.ApplicationMaster Yarn里MapReduce的AppMaster實現(xiàn):org.apache.hadoop.mapreduce.v2.app.MRAppMaster

在yarn-client模式里,優(yōu)先運行的是Driver(我們寫的應(yīng)用代碼就是入口),然后在初始化SparkContext的時候,會作為client端向yarn申請AppMaster資源,當(dāng)AppMaster運行后,它會向yarn注冊自己并申請Executor資源,之后由本地Driver與其通信控制任務(wù)運行,而AppMaster則時刻監(jiān)控Driver的運行情況,如果Driver完成或意外退出,AppMaster會釋放資源并注銷自己。所以在該模式下,如果運行spark-submit的程序退出了,整個任務(wù)也就退出了

在yarn-cluster模式里,本地進(jìn)程則僅僅只是一個client,它會優(yōu)先向yarn申請AppMaster資源運行AppMaster,在運行AppMaster的時候通過反射啟動Driver(我們的應(yīng)用代碼),在SparkContext初始化成功后,再向yarn注冊自己并申請Executor資源,此時Driver與AppMaster運行在同一個container里,是兩個不同的線程,當(dāng)Driver運行完畢,AppMaster會釋放資源并注銷自己。所以在該模式下,本地進(jìn)程僅僅是一個client,如果結(jié)束了該進(jìn)程,整個Spark任務(wù)也不會退出,因為Driver是在遠(yuǎn)程運行的

下面從源碼的角度看看SparkSubmit的代碼調(diào)用(基于Spark2.0.0):

代碼公共部分

SparkSubmit#main =>

  1. val appArgs = new SparkSubmitArguments(args) 
  2. appArgs.action match { 
  3.   // normal spark-submit 
  4.   case SparkSubmitAction.SUBMIT => submit(appArgs) 
  5.   // use --kill specified 
  6.   case SparkSubmitAction.KILL => kill(appArgs) 
  7.   // use --status specified 
  8.   case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) 

SparkSubmit的main方法是在用戶使用spark-submit腳本提交Spark app的時候調(diào)用的,可以看到正常情況下,它會調(diào)用SparkSubmit#submit方法

SparkSubmit#submit =>

  1. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 
  2. // 此處省略掉代理賬戶,異常處理,提交失敗的重提交邏輯,只看主干代碼 
  3. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 

在submit方法內(nèi)部,會先進(jìn)行提交環(huán)境相關(guān)的處理,調(diào)用的是SparkSubmit#prepareSubmitEnvironment方法,之后利用拿到的mainClass等信息,再調(diào)用SparkSubmit#runMain方法來執(zhí)行對于主函數(shù)

SparkSubmit#prepareSubmitEnvironment =>

主干相關(guān)的代碼如下:

  1. // yarn client mode 
  2. if (deployMode == CLIENT) { 
  3.   // client 模式下,運行的是 --class 后指定的mainClass,也即我們的代碼 
  4.   childMainClass = args.mainClass 
  5.   if (isUserJar(args.primaryResource)) { 
  6.     childClasspath += args.primaryResource 
  7.   } 
  8.   if (args.jars != null) { childClasspath ++= args.jars.split(",") } 
  9.   if (args.childArgs != null) { childArgs ++= args.childArgs } 
  10.  
  11. // yarn cluster mode 
  12. val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER 
  13. if (isYarnCluster) { 
  14.   // cluster 模式下,運行的是Client類 
  15.   childMainClass = "org.apache.spark.deploy.yarn.Client" 
  16.   if (args.isPython) { 
  17.     childArgs += ("--primary-py-file", args.primaryResource) 
  18.     childArgs += ("--class""org.apache.spark.deploy.PythonRunner"
  19.   } else if (args.isR) { 
  20.     val mainFile = new Path(args.primaryResource).getName 
  21.     childArgs += ("--primary-r-file", mainFile) 
  22.     childArgs += ("--class""org.apache.spark.deploy.RRunner"
  23.   } else { 
  24.     if (args.primaryResource != SparkLauncher.NO_RESOURCE) { 
  25.       childArgs += ("--jar", args.primaryResource) 
  26.     } 
  27.     // 這里 --class 指定的是AppMaster里啟動的Driver,也即我們的代碼 
  28.     childArgs += ("--class", args.mainClass) 
  29.   } 
  30.   if (args.childArgs != null) { 
  31.     args.childArgs.foreach { arg => childArgs += ("--arg", arg) } 
  32.   } 

在 prepareSubmitEnvironment 里,主要負(fù)責(zé)解析用戶參數(shù),設(shè)置環(huán)境變量env,處理python/R等依賴,然后針對不同的部署模式,匹配不同的運行主類,比如: yarn-client>args.mainClass,yarn-cluster>o.a.s.deploy.yarn.Client

SparkSubmit#runMain =>

骨干代碼如下

  1. try { 
  2.   mainClass = Utils.classForName(childMainClass) 
  3. } catch { 
  4.   // ... 
  5. val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 
  6. try { 
  7.   // childArgs就是用戶自己傳給Spark應(yīng)用代碼的參數(shù) 
  8.   mainMethod.invoke(null, childArgs.toArray) 
  9. } catch { 
  10.   // ... 

在runMain方法里,會設(shè)置ClassLoader,根據(jù)用戶代碼優(yōu)先的設(shè)置(spark.driver.userClassPathFirst)來加載對應(yīng)的類,然后反射調(diào)用prepareSubmitEnvironment方法返回的主類,并調(diào)用其main方法

從所反射的不同主類,我們來看看具體調(diào)用方式的不同:

對于yarn-cluster

o.a.s.deploy.yarn.Client#main =>

  1. val sparkConf = new SparkConf  
  2. val args = new ClientArguments(argStrings) 
  3. new Client(args, sparkConf).run() 

在Client伴生對象里構(gòu)建了Client類的對象,然后調(diào)用了Client#run方法

o.a.s.deploy.yarn.Client#run =>

  1. this.appId = submitApplication() 
  2. // report application ... 

run方法核心的就是提交任務(wù)到y(tǒng)arn,其調(diào)用了Client#submitApplication方法,拿到提交完的appID后,監(jiān)控app的狀態(tài)

o.a.s.deploy.yarn.Client#submitApplication =>

  1. try { 
  2.   // 獲取提交用戶的Credentials,用于后面獲取delegationToken 
  3.   setupCredentials() 
  4.   yarnClient.init(yarnConf) 
  5.   yarnClient.start() 
  6.  
  7.   // Get a new application from our RM 
  8.   val newApp = yarnClient.createApplication() 
  9.   val newAppResponse = newApp.getNewApplicationResponse() 
  10.   // 拿到appID 
  11.   appId = newAppResponse.getApplicationId() 
  12.   // 報告狀態(tài) 
  13.   reportLauncherState(SparkAppHandle.State.SUBMITTED) 
  14.   launcherBackend.setAppId(appId.toString) 
  15.  
  16.   // Verify whether the cluster has enough resources for our AM 
  17.   verifyClusterResources(newAppResponse) 
  18.  
  19.   // 創(chuàng)建AppMaster運行的context,為其準(zhǔn)備運行環(huán)境,java options,以及需要運行的java命令,AppMaster通過該命令在yarn節(jié)點上啟動 
  20.   val containerContext = createContainerLaunchContext(newAppResponse) 
  21.   val appContext = createApplicationSubmissionContext(newApp, containerContext) 
  22.  
  23.   // Finally, submit and monitor the application 
  24.   logInfo(s"Submitting application $appId to ResourceManager"
  25.   yarnClient.submitApplication(appContext) 
  26.   appId 
  27. } catch { 
  28.   case e: Throwable => 
  29.     if (appId != null) { 
  30.       cleanupStagingDir(appId) 
  31.     } 
  32.     throw e 

在 submitApplication 里完成了app的申請,AppMaster context的創(chuàng)建,***完成了任務(wù)的提交,對于cluster模式而言,任務(wù)提交后本地進(jìn)程就只是一個client而已,Driver就運行在與AppMaster同一container里,對于client模式而言,執(zhí)行 submitApplication 方法時,Driver已經(jīng)在本地運行,這一步就只是提交任務(wù)到y(tǒng)arn而已

o.a.s.deploy.yarn.Client#createContainerLaunchContext

  1. val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) 
  2. // 非pySpark時,pySparkArchives為Nil 
  3. val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) 
  4. // 這一步會進(jìn)行delegationtoken的獲取,存于Credentials,在AppMasterContainer構(gòu)建完的***將其存入到context里 
  5. val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) 
  6.  
  7. val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) 
  8. // 設(shè)置AppMaster container運行的資源和環(huán)境 
  9. amContainer.setLocalResources(localResources.asJava) 
  10. amContainer.setEnvironment(launchEnv.asJava) 
  11. // 設(shè)置JVM參數(shù) 
  12. val javaOpts = ListBuffer[String]() 
  13. javaOpts += "-Djava.io.tmpdir=" + tmpDir 
  14. // other java opts setting... 
  15.  
  16. // 對于cluster模式,通過 --class 指定AppMaster運行我們的Driver端,對于client模式則純作為資源申請和分配的工具 
  17. val userClass = 
  18.   if (isClusterMode) { 
  19.     Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) 
  20.   } else { 
  21.     Nil 
  22.   } 
  23. // 設(shè)置AppMaster運行的主類 
  24. val amClass = 
  25.   if (isClusterMode) { 
  26.     Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName 
  27.   } else { 
  28.     // ExecutorLauncher只是ApplicationMaster的一個warpper 
  29.     Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName 
  30.   } 
  31.  
  32. val amArgs = 
  33.   Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ 
  34.     userArgs ++ Seq( 
  35.       "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), 
  36.         LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) 
  37.  
  38. // Command for the ApplicationMaster 
  39. val commands = prefixEnv ++ Seq( 
  40.     YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java""-server" 
  41.   ) ++ 
  42.   javaOpts ++ amArgs ++ 
  43.   Seq( 
  44.     "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
  45.     "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
  46.  
  47. val printableCommands = commands.map(s => if (s == null"null" else s).toList 
  48. // 設(shè)置需運行的命令 
  49. amContainer.setCommands(printableCommands.asJava) 
  50.  
  51. val securityManager = new SecurityManager(sparkConf) 
  52. // 設(shè)置應(yīng)用權(quán)限 
  53. amContainer.setApplicationACLs( 
  54.       YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) 
  55. // 設(shè)置delegationToken 
  56. setupSecurityToken(amContainer) 

對于yarn-client

args.mainClass =>

在我們的Spark代碼里,需要創(chuàng)建一個SparkContext來執(zhí)行Spark任務(wù),而在其構(gòu)造器里創(chuàng)建TaskScheduler的時候,對于client模式就會向yarn申請資源提交任務(wù),如下

  1. // 調(diào)用createTaskScheduler方法,對于yarn模式,master=="yarn" 
  2. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 
  3. _schedulerBackend = sched 
  4. _taskScheduler = ts 
  5. // 創(chuàng)建DAGScheduler 
  6. _dagScheduler = new DAGScheduler(this) 

SparkContext#createTaskScheduler =>

這里會根據(jù)master匹配不同模式,比如local/standalone/yarn,在yarn模式下會利用ServiceLoader裝載YarnClusterManager,然后由它創(chuàng)建TaskScheduler和SchedulerBackend,如下:

  1. // 當(dāng)為yarn模式的時候 
  2. case masterUrl => 
  3.   // 利用當(dāng)前l(fā)oader裝載YarnClusterManager,masterUrl為"yarn" 
  4.   val cm = getClusterManager(masterUrl) match { 
  5.     case Some(clusterMgr) => clusterMgr 
  6.     case None => throw new SparkException("Could not parse Master URL: '" + master + "'"
  7.   } 
  8.   try { 
  9.     // 創(chuàng)建TaskScheduler,這里masterUrl并沒有用到 
  10.     val scheduler = cm.createTaskScheduler(sc, masterUrl) 
  11.     // 創(chuàng)建SchedulerBackend,對于client模式,這一步會向yarn申請AppMaster,提交任務(wù) 
  12.     val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 
  13.     cm.initialize(scheduler, backend) 
  14.     (backend, scheduler) 
  15.   } catch { 
  16.     case se: SparkException => throw se 
  17.     case NonFatal(e) => 
  18.       throw new SparkException("External scheduler cannot be instantiated", e) 
  19.   } 

YarnClusterManager#createSchedulerBackend

  1. sc.deployMode match { 
  2.   case "cluster" => 
  3.     new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 
  4.   case "client" => 
  5.     new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 
  6.   case  _ => 
  7.     throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn"

可以看到y(tǒng)arn下的SchedulerBackend實現(xiàn)對于client和cluster模式是不同的,yarn-client模式為YarnClientSchedulerBackend,yarn-cluster模式為 YarnClusterSchedulerBackend,之所以不同,是因為在client模式下,YarnClientSchedulerBackend 相當(dāng)于 yarn application 的client,它會調(diào)用o.a.s.deploy.yarn.Client#submitApplication 來準(zhǔn)備環(huán)境,申請資源并提交yarn任務(wù),如下:

  1. val driverHost = conf.get("spark.driver.host"
  2. val driverPort = conf.get("spark.driver.port"
  3. val hostport = driverHost + ":" + driverPort 
  4. sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } 
  5.  
  6. val argsArrayBuf = new ArrayBuffer[String]() 
  7. argsArrayBuf += ("--arg", hostport) 
  8.  
  9. val args = new ClientArguments(argsArrayBuf.toArray) 
  10. totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) 
  11. // 創(chuàng)建o.a.s.deploy.yarn.Client對象 
  12. client = new Client(args, conf) 
  13. // 調(diào)用submitApplication準(zhǔn)備環(huán)境,申請資源,提交任務(wù),并把a(bǔ)ppID保存下來 
  14. // 對于submitApplication,前文有詳細(xì)的分析,這里與前面是一致的 
  15. bindToYarn(client.submitApplication(), None) 

而在 YarnClusterSchedulerBackend 里,由于 AppMaster 已經(jīng)運行起來了,所以它并不需要再做申請資源等等工作,只需要保存appID和attemptID并啟動SchedulerBackend即可.

責(zé)任編輯:武曉燕 來源: oschina博客
相關(guān)推薦

2021-02-06 23:21:35

SaaS開發(fā)低代碼

2021-03-10 08:20:54

設(shè)計模式OkHttp

2011-05-25 13:10:40

SQL ServerOracle

2021-07-02 06:54:45

GoJavachannel

2019-04-28 16:10:50

設(shè)計Redux前端

2017-05-27 09:58:42

BGP動態(tài)靜態(tài)

2018-02-06 14:32:03

云服務(wù)器本質(zhì)區(qū)別

2014-04-16 13:47:43

SparkYarn

2023-03-13 07:43:51

PHP類型轉(zhuǎn)換

2021-08-26 11:21:34

技術(shù)代碼計算

2015-05-05 11:04:31

CoreOS自動化運維

2020-02-04 09:53:05

數(shù)據(jù)安全數(shù)據(jù)泄漏信息安全

2010-09-27 11:24:37

SQL聚簇索引

2009-07-12 13:55:29

2010-09-27 15:17:48

JVM client模式server模式

2022-03-08 11:29:06

Linux進(jìn)程系統(tǒng)

2012-04-29 10:37:28

APP

2010-07-16 09:00:20

開源RedOffice紅旗2000

2025-04-17 07:41:07

進(jìn)程線程窗口

2013-12-11 21:48:38

OpenStack
點贊
收藏

51CTO技術(shù)棧公眾號