分布式計(jì)算引擎 Flink/Spark on k8s 的實(shí)現(xiàn)對比以及實(shí)踐
以 Flink 和 Spark 為代表的分布式流批計(jì)算框架的下層資源管理平臺逐漸從 Hadoop 生態(tài)的 YARN 轉(zhuǎn)向 Kubernetes 生態(tài)的 k8s 原生 scheduler 以及周邊資源調(diào)度器,比如 Volcano 和 Yunikorn 等。這篇文章簡單比較一下兩種計(jì)算框架在 Native Kubernetes 的支持和實(shí)現(xiàn)上的異同,以及對于應(yīng)用到生產(chǎn)環(huán)境我們還需要做些什么。
1. 什么是 Native
這里的 native 其實(shí)就是計(jì)算框架直接向 Kubernetes 申請資源。比如很多跑在 YARN 上面的計(jì)算框架,需要自己實(shí)現(xiàn)一個(gè) AppMaster 來想 YARN 的 ResourceManager 來申請資源。Native K8s 相當(dāng)于計(jì)算框架自己實(shí)現(xiàn)一個(gè)類似 AppMaster 的角色向 k8s 去申請資源,當(dāng)然和 AppMaster 還是有差異的 (AppMaster 需要按 YARN 的標(biāo)準(zhǔn)進(jìn)行實(shí)現(xiàn))。
2. Spark on k8s 使用
提交作業(yè)
向 k8s 集群提交作業(yè)和往 YARN 上面提交很類似,命令如下,主要區(qū)別包括:
--master 參數(shù)指定 k8s 集群的 ApiServer
需要通過參數(shù) spark.kubernetes.container.image 指定在 k8s 運(yùn)行作業(yè)的 image,
指定 main jar,需要 driver 進(jìn)程可訪問:如果 driver 運(yùn)行在 pod 中,jar 包需要包含在鏡像中;如果 driver 運(yùn)行在本地,那么 jar 需要在本地。
通過 --name 或者 spark.app.name 指定 app 的名字,作業(yè)運(yùn)行起來之后的 driver 命名會以 app 名字為前綴。當(dāng)然也可以通過參數(shù) spark.kubernetes.driver.pod.name 直接指定 dirver 的名字
- $ ./bin/spark-submit \ --master k8s://https://: \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image= \ local:///path/to/examples.jar
提交完該命令之后,spark-submit 會創(chuàng)建一個(gè) driver pod 和一個(gè)對應(yīng)的 servcie,然后由 driver 創(chuàng)建 executor pod 并運(yùn)行作業(yè)。
deploy-mode
和在 YARN 上面使用 Spark 一樣,在 k8s 上面也支持 cluster 和 client 兩種模式:
cluster mode: driver 在 k8s 集群上面以 pod 形式運(yùn)行。
client mode: driver 運(yùn)行在提交作業(yè)的地方,然后 driver 在 k8s 集群上面創(chuàng)建 executor。為了保證 executor 能夠注冊到 driver 上面,還需要提交作業(yè)的機(jī)器可以和 k8s 集群內(nèi)部的 executor 網(wǎng)絡(luò)連通(executor 可以訪問到 driver,需要注冊)。
資源清理
這里的資源指的主要是作業(yè)的 driver 和 executor pod。spark 通過 k8s 的 onwer reference 機(jī)制將作業(yè)的各種資源連接起來,這樣當(dāng) driver pod 被刪除的時(shí)候,關(guān)聯(lián)的 executor pod 也會被連帶刪除。但是如果沒有 driver pod,也就是以 client 模式運(yùn)行作業(yè)的話,如下兩種情況涉及到資源清理:
作業(yè)運(yùn)行完成,driver 進(jìn)程退出,executor pod 運(yùn)行完自動(dòng)退出
driver 進(jìn)程被殺掉,executor pod 連不上 driver 也會自行退出
可以參考:https://kubernetes.io/docs/concepts/architecture/garbage-collection/
依賴管理
前面說到 main jar 包需要在 driver 進(jìn)程可以訪問到的地方,如果是 cluster 模式就需要將 main jar 打包到 spark 鏡像中。但是在日常開發(fā)和調(diào)試中,每次重新 build 一個(gè)鏡像的 effort 實(shí)在是太大了。spark 支持提交的時(shí)候使用本地的文件,然后使用 s3 等作為中轉(zhuǎn):先上傳上去,然后作業(yè)運(yùn)行的時(shí)候再從 s3 上面下載下來。下面是一個(gè)實(shí)例。
- ...--packages org.apache.hadoop:hadoop-aws:3.2.0--conf spark.kubernetes.file.upload.path=s3a:///path--conf spark.hadoop.fs.s3a.access.key=...--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem--conf spark.hadoop.fs.s3a.fast.upload=true--conf spark.hadoop.fs.s3a.secret.key=....--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmpfile:///full/path/to/app.jar
Pod Template
k8s 的 controller (比如 Deployment,Job)創(chuàng)建 Pod 的時(shí)候根據(jù) spec 中的 pod template 來創(chuàng)建。下面是一個(gè) Job 的示例。
- apiVersion: batch/v1kind: Jobmetadata: name: hellospec: template: # 下面的是一個(gè) pod template spec: containers: - name: hello image: busybox command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600'] restartPolicy: OnFailure # The pod template ends here
由于我們通過 spark-submit 提交 spark 作業(yè)的時(shí)候,最終的 k8s 資源(driver/executor pod)是由 spark 內(nèi)部邏輯構(gòu)建出來的。但是有的時(shí)候我們想要在 driver/executor pod 上做一些額外的工作,比如增加 sidecar 容器做一些日志收集的工作。這種場景下 PodTemplate 就是一個(gè)比較好的選擇,同時(shí) PodTemplate 也將 spark 和底層基礎(chǔ)設(shè)施(k8s)解耦開。比如 k8s 發(fā)布新版本支持一些新的特性,那么我們只要修改我們的 PodTemplate 即可,而不涉及到 spark 的內(nèi)部改動(dòng)。
RBAC
RBAC 全稱是 Role-based access control,是 k8s 中的一套權(quán)限控制機(jī)制。通俗來說:
RBAC 中包含了一系列的權(quán)限設(shè)置,比如 create/delete/watch/list pod 等,這些權(quán)限集合的實(shí)體叫 Role 或者 ClusterRole
同時(shí) RBAC 還包含了角色綁定關(guān)系(Role Binding),用于將 Role/ClusterRole 賦予一個(gè)或者一組用戶,比如 Service Account 或者 UserAccount
為了將 Spark 作業(yè)在 k8s 集群中運(yùn)行起來,我們還需要一套 RBAC 資源:
指定 namespace 下的 serviceaccount
定義了權(quán)限規(guī)則的 Role 或者 ClusterRole,我們可以使用常見的 ClusterRole "edit"(對幾乎所有資源具有操作權(quán)限,比如 create/delete/watch 等)
綁定關(guān)系
下面命令在 spark namespace 下為 serviceaccount spark 賦予了操作同 namespace 下其他資源的權(quán)限,那么只要 spark 的 driver pod 掛載了該 serviceaccount,它就可以創(chuàng)建 executor pod 了。
- $ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
下面做一個(gè)簡單的演示:
通過如下命令提交作業(yè) SparkPiSleep 到 k8s 集群中。
- $ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory=2g --conf spark.driver.memory=2g --conf spark.driver.core=1 --conf spark.app.name=test12 --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.executor.core=1 --conf spark.kubernetes.container.image= --conf spark.eventLog.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=1 --conf spark.dynamicAllocation.enabled=false --conf sparkspark.kubernetes.namespace=spark --conf sparkspark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.core=1 local:///path/to/main/jar
查看 k8s 集群中的資源
- $ kubectl get po -n sparkNAME READY STATUS RESTARTS AGEspark-pi-5b88a27b576050dd-exec-1 0/1 ContainerCreating 0 2stest12-9fd3c27b576039ae-driver 1/1 Running 0 8s
其中第一個(gè)就是 executor pod,第二個(gè)是 driver 的 pod。除此之外還創(chuàng)建了一個(gè) service,可以通過該 service 訪問到 driver pod,比如 Spark UI 都可以這樣訪問到。
- $ kubectl get svc -n sparkNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEtest12-9fd3c27b576039ae-driver-svc ClusterIP None 7078/TCP,7079/TCP,4040/TCP 110s
下面再看一下 service owner reference,executor pod 也是類似的。
- $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata: creationTimestamp: "2021-08-18T03:48:50Z" name: test12-9fd3c27b576039ae-driver-svc namespace: spark # service 的 ownerReference 指向了 driver pod,只要 driver pod 被刪除,該 service 也會被刪除 ownerReferences: - apiVersion: v1 controller: true kind: Pod name: test12-9fd3c27b576039ae-driver uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95 resourceVersion: "9975441" uid: 06c1349f-be52-4133-80d9-07af34419b1f
3. Flink on k8s 使用
Flink on k8s native 的實(shí)現(xiàn)支持兩種模式:
application mode:在遠(yuǎn)程 k8s 集群中啟動(dòng)一個(gè) flink 集群(jm 和 tm),driver 運(yùn)行在 jm 中,也就是只支持 detached 模式,不支持 attached 模式。
session mode:在遠(yuǎn)程 k8s 集群啟動(dòng)一個(gè)常駐的 flink 集群(只有 jm),然后向上面提交作業(yè),根據(jù)實(shí)際情況決定啟動(dòng)多少個(gè) tm。
在生產(chǎn)上面使用一般不太建議使用 session mode,所以下面主要討論的是 application mode。
Flink 的 native k8s 模式是不需要指定 tm 個(gè)數(shù)的,jm 會根據(jù)用戶的代碼計(jì)算需要多少 tm。
提交作業(yè)
下面是一個(gè)簡單的提交命令,需要包含:
參數(shù) run-application 指定是 application 模式
參數(shù) --target 指定運(yùn)行在 k8s 上
參數(shù) kubernetes.container.image 指定作業(yè)運(yùn)行使用的 flink 鏡像
最后需要指定 main jar,路徑是鏡像中的路徑
- $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar
資源清理
Flink 的 native 模式會先創(chuàng)建一個(gè) JobManager 的 deployment,并將其托管給 k8s。同一個(gè)作業(yè)所有的相關(guān)資源的 owner reference 都指向該 Deployment,也就是說刪除了該 deployment,所有相關(guān)的資源都會被清理掉。下面根據(jù)作業(yè)的運(yùn)行情況討論一下資源如何清理。
作業(yè)運(yùn)行到終態(tài)(SUCCESS,F(xiàn)AILED,CANCELED 等)之后,F(xiàn)link 會清理掉所有作業(yè)
JobManager 進(jìn)程啟動(dòng)失?。╬od 中的 jm 容器啟動(dòng)失?。?,由于控制器是 Deployment,所以會一直重復(fù)拉起
運(yùn)行過程中,如果 JobManager 的 pod 被刪除,Deployment 會重新拉起
運(yùn)行過程中,如果 JobManager 的 Deployment 被刪除,那么關(guān)聯(lián)的所有 k8s 資源都會被刪除
Pod Template
Flink native 模式也支持 Pod Template,類似 Spark。
RBAC
類似 Spark。
依賴文件管理
Flink 暫時(shí)只支持 main jar 以及依賴文件在鏡像中。也就是說用戶要提交作業(yè)需要自己定制化鏡像,體驗(yàn)不是很好。一種 workaroud 的方式是結(jié)合 PodTemplate:
如果依賴是本地文件,需要 upload 到一個(gè) remote 存儲做中轉(zhuǎn),比如各大云廠商的對象存儲。
如果依賴是遠(yuǎn)端文件,不需要 upload。
運(yùn)行時(shí)在 template 中使用 initContainer 將用戶的 jar 以及依賴文件下載到 Flink 容器中,并加到 classpath 下運(yùn)行。
Flink 的作業(yè) demo 就不在演示了。
4. Spark on Kubernetes 實(shí)現(xiàn)
Spark on Kubernetes 的實(shí)現(xiàn)比較簡單:
Spark Client 創(chuàng)建一個(gè) k8s pod 運(yùn)行 driver
driver 創(chuàng)建 executor pod,然后開始運(yùn)行作業(yè)
作業(yè)運(yùn)行結(jié)束之后 driver pod 進(jìn)入到 Completed 狀態(tài),executor pod 會被清理掉。作業(yè)結(jié)束之后通過 driver pod 我們還是可以查看 driver pod 的。
代碼實(shí)現(xiàn)
Spark 的 native k8s 實(shí)現(xiàn)代碼在 resource-managers/kubernetes module 中。我們可以從 SparkSubmit 的代碼開始分析。我們主要看一下 deploy-mode 為 cluster 模式的代碼邏輯。
- // Set the cluster manager val clusterManager: Int = args.master match { case "yarn" => YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => error("Master must either be yarn or start with spark, mesos, k8s, or local") -1 }
首先根據(jù) spark.master 配置中 scheme 來判斷是不是 on k8s。我們上面也看到這個(gè)配置的形式為 --master k8s://https://: 。如果是 on k8s 的 cluster 模式,則去加載 Class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,并運(yùn)行其中的 start 方法。childArgs 方法的核心邏輯簡單來說就是根據(jù) spark-submit 提交的參數(shù)構(gòu)造出 driver pod 提交到 k8s 運(yùn)行。
- private[spark] class KubernetesClientApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = KubernetesConf.getKubernetesAppId() val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, clientArguments.proxyUser) // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient => val client = new Client( kubernetesConf, new KubernetesDriverBuilder(), kubernetesClient, watcher) client.run() } }}
上面的代碼的核心就是最后創(chuàng)建 Client 并運(yùn)行。這個(gè) Client 是 Spark 封裝出來的 Client,內(nèi)置了 k8s client。
- private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { // 構(gòu)造 Driver 的 Pod val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient) val configMapName = KubernetesClientUtils.configMapNameDriver val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName, conf.sparkConf, resolvedDriverSpec.systemProperties) val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap) // 修改 Pod 的 container spec:增加 SPARK_CONF_DIR val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container) .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) .endEnv() .addNewVolumeMount() .withName(SPARK_CONF_VOLUME_DRIVER) .withMountPath(SPARK_CONF_DIR_INTERNAL) .endVolumeMount() .build() val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod) .editSpec() .addToContainers(resolvedDriverContainer) .addNewVolume() .withName(SPARK_CONF_VOLUME_DRIVER) .withNewConfigMap() .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava) .withName(configMapName) .endConfigMap() .endVolume() .endSpec() .build() val driverPodName = resolvedDriverPod.getMetadata.getName var watch: Watch = null var createdDriverPod: Pod = null try { // 通過 k8s client 創(chuàng)建 Driver Pod createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) } catch { case NonFatal(e) => logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.") throw e } try { // 創(chuàng)建其他資源,修改 owner reference 等 val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => kubernetesClient.pods().delete(createdDriverPod) throw e } val sId = Seq(conf.namespace, driverPodName).mkString(":") // watch pod breakable { while (true) { val podWithName = kubernetesClient .pods() .withName(driverPodName) // Reset resource to old before we start the watch, this is important for race conditions watcher.reset() watch = podWithName.watch(watcher) // Send the latest pod state we know to the watcher to make sure we didn't miss anything watcher.eventReceived(Action.MODIFIED, podWithName.get()) // Break the while loop if the pod is completed or we don't want to wait // 根據(jù)參數(shù) "spark.kubernetes.submission.waitAppCompletion" 判斷是否需要退出 if(watcher.watchOrStop(sId)) { watch.close() break } } } }
下面再簡單介紹一下 Driver 如何管理 Executor 的流程。當(dāng) Spark Driver 運(yùn)行 main 函數(shù)時(shí),會創(chuàng)建一個(gè) SparkSession,SparkSession 中包含了 SparkContext,SparkContext 需要?jiǎng)?chuàng)建一個(gè) SchedulerBackend 會管理 Executor 的生命周期。對應(yīng)到 k8s 上的 SchedulerBackend 其實(shí)就是 KubernetesClusterSchedulerBackend,下面主要看一下這個(gè) backend 是如何創(chuàng)建出來的。大膽猜想一下,大概率也是根據(jù) spark.master 的 url 的 scheme "k8s" 創(chuàng)建的。
下面是 SparkContext 創(chuàng)建 SchedulerBackend 的核心代碼邏輯。
- private def createTaskScheduler(...) = { case masterUrl => // 創(chuàng)建出 KubernetesClusterManager val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) // 上面創(chuàng)建出來的 KubernetesClusterManager 這里會創(chuàng)建出 KubernetesClusterSchedulerBackend val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }}// 方法 getClsuterManager 會通過 ServiceLoader 加載所有實(shí)現(xiàn) ExternalClusterManager 的 ClusterManager (KubernetesClusterManager 和 YarnClusterManager),然后通過 master url 進(jìn)行 filter,選出 KubernetesClusterManagerprivate def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption}
后面就是 KubernetesClusterSchedulerBackend 管理 Executor 的邏輯了。
可以簡單看一下創(chuàng)建 Executor 的代碼邏輯。
- private def requestNewExecutors( expected: Int, running: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( conf, newExecutorId.toString, applicationId, driverPod, resourceProfileId) // 構(gòu)造 Executor 的 Pod Spec val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient, rpIdToResourceProfile(resourceProfileId)) val executorPod = resolvedExecutorSpec.pod val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) .endSpec() .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) // 創(chuàng)建 Executor Pod val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) try { // 增加 owner reference addOwnerReference(createdExecutorPod, resources) resources .filter(_.getKind == "PersistentVolumeClaim") .foreach { resource => if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { addOwnerReference(driverPod.get, Seq(resource)) } val pvc = resource.asInstanceOf[PersistentVolumeClaim] logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") kubernetesClient.persistentVolumeClaims().create(pvc) } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) => kubernetesClient.pods().delete(createdExecutorPod) throw e } } }
5. Flink on Kubernetes 實(shí)現(xiàn)
Flink 的 Native K8s 實(shí)現(xiàn):
Flink Client 創(chuàng)建 JobManager 的 Deployment,然后將 Deployment 托管給 k8s
k8s 的 Deployment Controller 創(chuàng)建 JobManager 的 Pod
JobManager 內(nèi)的 ResourceManager 負(fù)責(zé)先 Kubernetes Scheduler 請求資源并創(chuàng)建 TaskManager 等相關(guān)資源并創(chuàng)建相關(guān)的 TaskManager Pod 并開始運(yùn)行作業(yè)
當(dāng)作業(yè)運(yùn)行到終態(tài)之后所有相關(guān)的 k8s 資源都被清理掉
代碼(基于分支 release-1.13)實(shí)現(xiàn)主要如下:
CliFrontend 作為 Flink Client 的入口根據(jù)命令行參數(shù) run-application 判斷通過方法 runApplication 去創(chuàng)建 ApplicationCluster
KubernetesClusterDescriptor 通過方法 deployApplicationCluster 創(chuàng)建 JobManager 相關(guān)的 Deployment 和一些必要的資源
JobManager 的實(shí)現(xiàn)類 JobMaster 通過 ResourceManager 調(diào)用類 KubernetesResourceManagerDriver 中的方法 requestResource 創(chuàng)建 TaskManager 等資源
其中 KubernetesClusterDescriptor 實(shí)現(xiàn)自 interface ClusterDescriptor ,用來描述對 Flink 集群的操作。根據(jù)底層的資源使用不同, ClusterDescriptor 有不同的實(shí)現(xiàn),包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。
- public interface ClusterDescriptor<T> extends AutoCloseable { /* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */ String getClusterDescription(); /* 查詢已存在的 Flink 集群. */ ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException; /** 創(chuàng)建 Flink Session 集群 */ ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** 創(chuàng)建 Flink Application 集群 **/ ClusterClientProvider<T> deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException; /** 創(chuàng)建 Per-job 集群 **/ ClusterClientProvider<T> deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; /** 刪除集群 **/ void killCluster(T clusterId) throws FlinkException; @Override void close();}
下面簡單看一下 KubernetesClusterDescriptor 的核心邏輯:創(chuàng)建 Application 集群。
- public class KubernetesClusterDescriptor implements ClusterDescriptor<String> { private final Configuration flinkConfig; // 內(nèi)置 k8s client private final FlinkKubeClient client; private final String clusterId; @Override public ClusterClientProvider<String> deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException { // 查詢 flink 集群在 k8s 中是否存在 if (client.getRestService(clusterId).isPresent()) { throw new ClusterDeploymentException( "The Flink cluster " + clusterId + " already exists."); } final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { throw new ClusterDeploymentException( "Couldn't deploy Kubernetes Application Cluster." + " Expected deployment.target=" + KubernetesDeploymentTarget.APPLICATION.getName() + " but actual one was \"" + deploymentTarget + "\""); } // 設(shè)置 application 參數(shù):$internal.application.program-args 和 $internal.application.main applicationConfiguration.applyToConfiguration(flinkConfig); // 創(chuàng)建集群 final ClusterClientProvider<String> clusterClientProvider = deployClusterInternal( KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { LOG.info( "Create flink application cluster {} successfully, JobManager Web Interface: {}", clusterId, clusterClient.getWebInterfaceURL()); } return clusterClientProvider; } // 創(chuàng)建集群邏輯 private ClusterClientProvider<String> deployClusterInternal( String entryPoint, ClusterSpecification clusterSpecification, boolean detached) throws ClusterDeploymentException { final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfig.setString( ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. // 將端口指定為固定值,方便 k8s 的資源構(gòu)建。因?yàn)?nbsp;pod 的隔離性,所以沒有端口沖突 KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); // HA 配置 if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, flinkConfig.get(JobManagerOptions.PORT)); } try { final KubernetesJobManagerParameters kubernetesJobManagerParameters = new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); // 補(bǔ)充 PodTemplate 邏輯 final FlinkPod podTemplate = kubernetesJobManagerParameters .getPodTemplateFilePath() .map( file -> KubernetesUtils.loadPodFromTemplateFile( client, file, Constants.MAIN_CONTAINER_NAME)) .orElse(new FlinkPod.Builder().build()); final KubernetesJobManagerSpecification kubernetesJobManagerSpec = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( podTemplate, kubernetesJobManagerParameters); // 核心邏輯:在 k8s 中創(chuàng)建包括 JobManager Deployment 在內(nèi) k8s 資源,比如 Service 和 ConfigMap client.createJobManagerComponent(kubernetesJobManagerSpec); return createClusterClientProvider(clusterId); } catch (Exception e) { //... } }}
上面代碼中需要說的在構(gòu)建 JobManager 的時(shí)候補(bǔ)充 PodTemplate。簡單來說 PodTemplate 就是一個(gè) Pod 文件。
第三步的 TaskManager 創(chuàng)建就不再贅述了。
7. 生態(tài)
這里生態(tài)這個(gè)詞可能也不太合適,這里主要指的的如果要在生產(chǎn)上面使用該功能還有哪些可以做的。下面主要討論在生產(chǎn)環(huán)境上面用來做 trouble-shooting 的兩個(gè)功能:日志和監(jiān)控。
日志
日志收集對于線上系統(tǒng)是非常重要的一環(huán),毫不夸張地說,80% 的故障都可以通過日志查到原因。但是前面也說過,F(xiàn)link 作業(yè)在作業(yè)運(yùn)行到終態(tài)之后會清理掉所有資源,Spark 作業(yè)運(yùn)行完只會保留 Driver Pod 的日志,那么我們?nèi)绾问占酵暾淖鳂I(yè)日志呢?
有幾種方案可供選擇:
DaemonSet。每個(gè) k8s 的 node 上面以 DaemonSet 形式部署日志收集 agent,對 node 上面運(yùn)行的所有容器日志進(jìn)行統(tǒng)一收集,并存儲到類似 ElasticSearch 的統(tǒng)一日志搜索平臺。
SideCar。使用 Flink/Spark 提供的 PodTemplate 功能在主容器側(cè)配置一個(gè) SideCar 容器用來進(jìn)行日志收集,最后存儲到統(tǒng)一的日志服務(wù)里面。
這兩種方式都有一個(gè)前提是有其他的日志服務(wù)提供存儲、甚至搜索的功能,比如 ELK,或者各大云廠商的日志服務(wù)。
除此之外還有一種簡易的方式可以考慮:利用 log4j 的擴(kuò)展機(jī)制,自定義 log appender,在 appender 中定制化 append 邏輯,將日志直接收集并存儲到 remote storage,比如 hdfs,對象存儲等。這種方案需要將自定義的 log appender 的 jar 包放到運(yùn)行作業(yè)的 ClassPath 下,而且這種方式有可能會影響作業(yè)主流程的運(yùn)行效率,對性能比較敏感的作業(yè)并不太建議使用這種方式。
監(jiān)控
目前 Prometheus 已經(jīng)成為 k8s 生態(tài)的監(jiān)控事實(shí)標(biāo)準(zhǔn),下面我們的討論也是討論如何將 Flink/Spark 的作業(yè)的指標(biāo)對接到 Prometheus。下面先看一下 Prometheus 的架構(gòu)。
其中的核心在于 Prometheus Servier 收集指標(biāo)的方式是 pull 還是 push:
對于常駐的進(jìn)程,比如在線服務(wù),一般由 Prometheus Server 主動(dòng)去進(jìn)程暴露出來的 api pull 指標(biāo)。
對于會結(jié)束的進(jìn)程指標(biāo)收集,比如 batch 作業(yè),一般使用進(jìn)程主動(dòng) push 的方式。詳細(xì)流程是進(jìn)程將指標(biāo) push 到常駐的 PushGateway,然后 Prometheus Server 去 PushGateway pull 指標(biāo)。
上面兩種使用方式也是 Prometheus 官方建議的使用方式,但是看完描述不難發(fā)現(xiàn)其實(shí)第一種場景也可以使用第二種處理方式。只不過第二種方式由于 PushGateway 是常駐的,對其穩(wěn)定性要求會比較高。
Flink
Flink 同時(shí)提供了 PrometheusReporter (將指標(biāo)通過 api 暴露,由 Prometheus Server 來主動(dòng) pull 數(shù)據(jù)) 和 PrometheusPushGatewayReporter (將指標(biāo)主動(dòng) push 給 PushGateway,Prometheus Server 不需要感知 Flink 作業(yè))。
這兩種方式中 PrometheusPushGatewayReporter 會更簡單一點(diǎn),但是 PushGateway 可能會成為瓶頸。如果使用 PrometheusReporter 的方式,需要引入服務(wù)發(fā)現(xiàn)機(jī)制幫助 Prometheus Server 自動(dòng)發(fā)現(xiàn)運(yùn)行的 Flink 作業(yè)的 Endpoint。Prometheus 目前支持的主流的服務(wù)發(fā)現(xiàn)機(jī)制主要有:
基于 Consul。Consul 是基于 etcd 的一套完整的服務(wù)注冊與發(fā)現(xiàn)解決方案,要使用這種方式,我們需要 Flink 對接 Consul。比如我們在提交作業(yè)的時(shí)候,將作業(yè)對應(yīng)的 Service 進(jìn)行捕獲并寫入 Consul。
基于文件。文件也就是 Prometheus 的配置文件,里面配置需要拉取 target 的 endpoint。文件這種方式本來是比較雞肋的,因?yàn)樗枰?Prometheus Server 和 Flink 作業(yè)同時(shí)都可以訪問,但是需要文件是 local 的。但是在 k8s 環(huán)境中,基于文件反而變的比較簡單,我們可以將 ConfigMap 掛載到 Prometheus Server 的 Pod 上面,F(xiàn)link 作業(yè)修改 ConfigMap 就可以了。
基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制簡單來說就是 label select??梢詤⒖?/p>
- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
關(guān)于 Prometheus 支持的更多服務(wù)發(fā)現(xiàn)機(jī)制,可以參考:https://prometheus.io/docs/prometheus/latest/configuration/configuration/ ,簡單羅列包括:
azure
consul
digitalocean
docker
dockerswarm
dns
ec2
eureka
file
gce
hetzner
http
kubernetes
...
Spark
以批計(jì)算為代表的 Spark 使用 PushGateway 的方式來對接 Prometheus 是比較好的方式,但是 Spark 官方并沒有提供對 PushGateway 的支持,只支持了 Prometheus 的 Exporter,需要 Prometheus Server 主動(dòng)去 pull 數(shù)據(jù)。
這里推薦使用基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。
需要注意的是 Prometheus Server 拉取指標(biāo)是按固定時(shí)間間隔進(jìn)行拉取的,對于持續(xù)時(shí)間比較短的批作業(yè),有可能存在還沒有拉取指標(biāo),作業(yè)就結(jié)束的情況。
8. 缺陷
雖然 Spark 和 Flink 都實(shí)現(xiàn)了 native k8s 的模式,具體實(shí)現(xiàn)略有差異。但是在實(shí)際使用上發(fā)現(xiàn)兩者的實(shí)現(xiàn)在某些場景下還是略有缺陷的。
Spark
pod 不具有容錯(cuò)性 spark-submit 會先構(gòu)建一個(gè) k8s 的 driver pod,然后由 driver pod 啟動(dòng) executor 的 pod。但是在 k8s 環(huán)境中并不太建議直接構(gòu)建 pod 資源,因?yàn)?pod 不具有容錯(cuò)性,pod 所在節(jié)點(diǎn)掛了之后 pod 就掛了。熟悉 k8s scheduler 的同學(xué)應(yīng)該知道 pod 有一個(gè)字段叫 podName,scheduler 的核心是為 pod 填充這個(gè)字段,也就是為 pod 選擇一個(gè)合適的 node。一旦調(diào)度完成之后 pod 的該字段就固定下來了。這也是 pod 不具有 node 容錯(cuò)的原因。
Flink
Deployment 語義。 Deployment 可以認(rèn)為是 ReplicaSet 的增強(qiáng)版,而 ReplicaSet 的官方定義如下。
A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
簡單來說,ReplicaSet 的目的是保證幾個(gè)相同的 Pod 副本可以不間斷的運(yùn)行,說是為了線上服務(wù)量身定制的也不為過(線上服務(wù)最好是無狀態(tài)且支持原地重啟,比如 WebService)。但是盡管 Flink 以流式作業(yè)為主,但是我們并不能簡單地將流式作業(yè)等同于無狀態(tài)的 WebService。比如 Flink 作業(yè)的 Main Jar 如果寫的有問題,會導(dǎo)致 JobManager 的 Pod 一直啟動(dòng)失敗,但是由于是 Deployment 語義的問題會不斷被重啟。這個(gè)可能是 ByDesign 的,但是感覺并不太好。
Batch 作業(yè)處理。 由于 Flink 作業(yè)運(yùn)行完所有資源包括 Deployment 都會被清理掉,拿不到最終的作業(yè)狀態(tài),不知道成功有否(流作業(yè)的話停止就可以認(rèn)為是失敗了)。對于這個(gè)問題可以利用 Flink 本身的歸檔功能,將結(jié)果歸檔到外部的文件系統(tǒng)(兼容 s3 協(xié)議,比如阿里云對象存儲 oss)中。涉及到的配置如下:
s3.access-key
s3.secret-key
s3.region
s3.endpoint
jobmanager.archive.fs.dir
如果不想引入外部系統(tǒng)的話,需要改造 Flink 代碼在作業(yè)運(yùn)行完成之后將數(shù)據(jù)寫到 k8s 的 api object 中,比如 ConfigMap 或者 Secret。
作業(yè)日志。 Spark 作業(yè)運(yùn)行結(jié)束之后 Executor Pod 被清理掉,Driver Pod 被保留,我們可以通過它查看到 Driver 的日志。Flink 作業(yè)結(jié)束之后就什么日志都查看不到了。
9. 總結(jié)
本文從使用方式、源碼實(shí)現(xiàn)以及在生產(chǎn)系統(tǒng)上面如何補(bǔ)足周邊系統(tǒng)地介紹了 Spark 和 Flink 在 k8s 生態(tài)上的實(shí)現(xiàn)、實(shí)踐以及對比。但是限于篇幅,很多內(nèi)容來不及討論了,比如 shuffle 如何處理。如果你們公司也在做這方面的工作,相信還是有很多參考價(jià)值的,也歡迎留言交流。
另外,YARN 的時(shí)代已經(jīng)過去了,以后 on k8s scheduler 將成為大數(shù)據(jù)計(jì)算以及 AI 框架的標(biāo)配。但是 k8s scheduler 這種天生為在線服務(wù)設(shè)計(jì)的調(diào)度器在吞吐上面有很大的不足,并不是很契合大數(shù)據(jù)作業(yè)。k8s 社區(qū)的批調(diào)度器 kube-batch,以及基于 kube-batch 衍生出來的 Volcano 調(diào)度器,基于 YARN 的調(diào)度算法實(shí)現(xiàn)的 k8s 生態(tài)調(diào)度器 Yunikorn 也逐漸在大數(shù)據(jù) on k8s 場景下嶄露頭角,不過這些都是后話了,后面有時(shí)間再專門寫文章進(jìn)行分析對比。