自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Native Flink on Kubernetes 在小紅書的實踐

大數(shù)據(jù)
本文介紹了小紅書基于 K8s 管理 Flink 任務(wù)的建設(shè)過程,以及往 Native Flink on K8s 方案遷移過程的一些實踐經(jīng)驗。

摘要:本文整理自小紅書數(shù)據(jù)流團(tuán)隊資深研發(fā)工程師何軍在 Flink Forward Asia 2021 平臺建設(shè)專場的演講,介紹了小紅書基于 K8s 管理 Flink 任務(wù)的建設(shè)過程,以及往 Native Flink on K8s 方案遷移過程的一些實踐經(jīng)驗。主要內(nèi)容包括:

  • 多云部署架構(gòu)
  • 業(yè)務(wù)場景
  • Helm 集群管理模式
  • Native Flink on Kubernetes
  • 流批一體作業(yè)管控平臺
  • 未來展望

一、多云部署架構(gòu)

上圖是當(dāng)前 Flink 集群多云部署模式圖。業(yè)務(wù)數(shù)據(jù)分散在各個云廠商之上,為了適配業(yè)務(wù)數(shù)據(jù)處理,F(xiàn)link 集群自然也進(jìn)行了多云部署。這些云存儲產(chǎn)品一方面用于內(nèi)部的離線數(shù)據(jù)存儲,另外一方面會用于 Flink 做 checkpoint 存儲使用。

在這些云基礎(chǔ)設(shè)施之上,我們搭建了 Flink 引擎支持 SQL 及 JAR 任務(wù)的運(yùn)行,得益于之前做的一項推動任務(wù) SQL 化的工作,當(dāng)前內(nèi)部 SQL 任務(wù)和 JAR 任務(wù)比例已經(jīng)達(dá)到了 9:1。

在此之上是流批一體作業(yè)管控平臺,它主要有以下幾個功能:作業(yè)開發(fā)運(yùn)維、任務(wù)監(jiān)控報警、任務(wù)版本管理、數(shù)據(jù)血緣分析、元數(shù)據(jù)管理、資源管理等。

平臺數(shù)據(jù)輸入主要有以下三個部分,第一部分是業(yè)務(wù)數(shù)據(jù),存在于業(yè)務(wù)內(nèi)部的 DB 系統(tǒng)里比如 MySQL 或者 MongoDB,還有一部分是前后端打點數(shù)據(jù),前端打點主要是用戶在小紅書 APP 端的行為日志,后端打點主要是 APP 內(nèi)部應(yīng)用程序性能指標(biāo)相關(guān)的數(shù)據(jù)。這些數(shù)據(jù)經(jīng)過 Flink 集群處理之后,會輸出到三個主要業(yè)務(wù)場景中,首先是消息總線,比如 Kafka 集群以及 RocketMQ 集群,其次會輸出到 olap 引擎中,比如 StarRocks 或 Clickhouse,最后會輸出到在線系統(tǒng),比如 Redkv 或者 ES 供一些在線查詢使用。

二、業(yè)務(wù)場景

Flink 在小紅書內(nèi)部的應(yīng)用場景有很多,比如實時反欺詐監(jiān)控、實時數(shù)倉、實時算法推薦、實時數(shù)據(jù)傳輸。本章會著重介紹一下其中兩個場景。

第一個是實時推薦算法訓(xùn)練。上圖是推薦算法訓(xùn)練的執(zhí)行流程。

Flink 集群先接收打點服務(wù)采集過來的原始數(shù)據(jù),對這一部分?jǐn)?shù)據(jù)進(jìn)行歸因并將它寫入到 Kafka 集群,之后會再有一個 Flink 任務(wù)對這部分?jǐn)?shù)據(jù)再做一次匯總,然后得到一個 Summary 的標(biāo)簽數(shù)據(jù),針對這個標(biāo)簽數(shù)據(jù),后面還有三條實時處理路徑:

  • 第一,Summary 標(biāo)簽數(shù)據(jù)會和推薦引擎推薦出來筆記的特征數(shù)據(jù)進(jìn)行關(guān)聯(lián),這個關(guān)聯(lián)也是在 Flink 任務(wù)中進(jìn)行的,內(nèi)部稱其為 FeatureJoiner 任務(wù)。接著會產(chǎn)出一個算法訓(xùn)練的樣本,這個樣本經(jīng)過算法訓(xùn)練之后產(chǎn)出一個推薦模型,而這個模型最終會反饋到實時推薦引擎中。
  • 第二,Summary 標(biāo)簽數(shù)據(jù)會通過 Flink 實時寫到 OLAP 引擎中,比如寫到 Hologres 或 Clickhouse 中。
  • 最后, Summary 標(biāo)簽數(shù)據(jù)會通過 Flink 寫入到離線 Hive 表中,提供給后續(xù)離線報表使用。

第二個場景是實時數(shù)倉。業(yè)務(wù)數(shù)據(jù)包括前后端打點的數(shù)據(jù),按照業(yè)務(wù)分流規(guī)則進(jìn)行處理之后會寫入到 Kafka 或者 RocketMQ 中,后續(xù) Flink 會對這部分?jǐn)?shù)據(jù)做實時 ETL 業(yè)務(wù)處理,最終進(jìn)入實時數(shù)據(jù)中心。目前實時數(shù)據(jù)中心主要是基于 StarRocks 實現(xiàn)的,StarRocks 是一個性能十分強(qiáng)大的 OLAP 引擎,它承載了公司很多實時相關(guān)業(yè)務(wù)。在數(shù)據(jù)中心之上,我們還支撐了很多重要實時指標(biāo),比如實時 DAU、實時 GMV、實時直播歸因、實時廣告計費等。

三、Helm 集群管理模式

在正式遷入到 Native Flink on K8s 之前很長一段時間內(nèi),都是基于 Helm 來進(jìn)行集群管理的。Helm 是一個 K8s 上的包管理器,它可以定義、安裝和升級 K8s 應(yīng)用和服務(wù),同時具有以下幾個特點:

第一,可以管理比較復(fù)雜的 K8s 應(yīng)用,創(chuàng)建 Flink 集群時會創(chuàng)建很多 K8s 相關(guān)的資源,例如 service 或者 config map 以及 Deployment 等, Helm 可以將這些資源統(tǒng)一打包成一個 Helm chart,然后進(jìn)行統(tǒng)一管理,從而不需要感知每一種資源對應(yīng)的底層描述文件。

第二,比較方便升級和回滾,只需要執(zhí)行一條簡單命令就可以進(jìn)行升級或者回滾。同時因為它的代碼是和 Flink Client 的代碼做了隔離,因此在升級過程中不需要去修改 Flink Client 的代碼,實現(xiàn)了代碼解耦。

第三,非常易于共享,將 Helm chart 部署在公司私有服務(wù)器上之后,已經(jīng)可以同時支持多個云產(chǎn)品的 Flink 集群管理。

上圖是基于 Helm 管理的 Flink 任務(wù)生命周期,主要分為啟動任務(wù)和停止任務(wù)兩個階段。這里有三個角色,第一個是 Client,它可以是一個 API 請求,也可以是用戶在界面上的一次點擊行為。啟動任務(wù)時,百川平臺接收到 API 請求后,會通過 Helm Client 命令去執(zhí)行 install 指令,創(chuàng)建對應(yīng)的集群資源,同時內(nèi)部集成的 Flink Client 也會去檢查當(dāng)前集群的 JobManager 是否啟動,如果已經(jīng)啟動就進(jìn)行 job 提交。job 提交到集群運(yùn)行起來之后,F(xiàn)link Client 也會不斷地檢查當(dāng)前 job 的運(yùn)行狀態(tài),這也是 Helm 管理模式下作業(yè)狀態(tài)的維護(hù)機(jī)制。

第二個階段是任務(wù)停止階段,Client 會向百川平臺發(fā)起一個 stop 命令,接收到 stop 命令之后百川平臺會通過 Flink Client 向 JobManager 發(fā)起 cancel 指令,同時檢查這個 cancel 指令有沒有執(zhí)行成功,發(fā)現(xiàn) job 被 cancel 之后,會通過 Helm Client 去執(zhí)行 delete 指令,完成集群資源的銷毀。

上圖展示了通過 Helm 創(chuàng)建了哪些 K8s 資源。

首先是最基礎(chǔ)的 JobManager 和 TaskManager Deployment;

第二部分是 ConfigMap,主要是針對 log4j 的配置和各大云廠商提供的云存儲產(chǎn)品相關(guān)的配置;

第三部分是 Ingress,目前主要用于 Flink web UI 使用以及訪問 JobManager 當(dāng)前任務(wù)狀態(tài);

第四部分是 Nodeport Service,每啟動一個 JobManager,就會在 JM 上啟動一個 Nodeport Service,并與 Ingress 做綁定;

第五部分是指磁盤資源,主要有以下兩個應(yīng)用場景:使用 RocksDB Backend 的時候需要去掛載高效云盤、批處理任務(wù)需要掛載磁盤做中間數(shù)據(jù)交換;

最后一部分是 ServiceMesh,TaskManager 內(nèi)部會通過 sidecar 形式去訪問第三方服務(wù),比如說 Redkv service,這些 service 的配置也是在這里面創(chuàng)建的。

上圖可以看到 Helm Client 里面是集成了各大云廠商提供了 K8s 相關(guān)的配置,當(dāng)它接收到創(chuàng)建任務(wù)的參數(shù)時,會根據(jù)這些參數(shù)去渲染出不同的 Helm 模板,并提交到不同的云上執(zhí)行,創(chuàng)建出對應(yīng)的集群資源。

目前的集群管理模式下,在實際生產(chǎn)過程中還是遇到了不少問題:

第一是 K8s 資源瓶頸問題。因為每啟動一個 JobManager 就會創(chuàng)建一個 NodePort Service,而這個 Service 會在整個集群范圍內(nèi)占用一個端口和一個 ClusterIP。當(dāng)作業(yè)規(guī)模達(dá)到一定程度的時候,這些端口資源以及 IP 資源就會遇到性能瓶頸了。

第二個是 ServiceMesh 配置成本過高。上文提到 TaskManager 內(nèi)部會訪問第三方服務(wù),比如說 redkv service,那么每增加一個 redkv service,就需要去修改對應(yīng)的配置并完成發(fā)版,過程的成本是比較高的。

第三個是存在一定的資源泄露問題。所有的資源創(chuàng)建以及銷毀都是通過執(zhí)行 Helm 命令來完成的,在某些異常情況下,job 失敗會導(dǎo)致 Helm delete 命令沒有被執(zhí)行,這個時候就有可能會存在資源泄露的問題。

第四個是鏡像版本比較難以收斂。在日常的生產(chǎn)過程中,某些線上任務(wù)出現(xiàn)了問題,會臨時出一個 hotfix 版本鏡像并上線運(yùn)行,久而久之線上就會存在很多版本鏡像在運(yùn)行,這對于后面的運(yùn)維工作以及問題排查產(chǎn)生了非常大的挑戰(zhàn)。

最后一個問題是 UDF 管理復(fù)雜度比較高,這是任何分布式計算平臺都會遇到的一個問題。

針對上述這些問題,我們在 Native Flink on K8s 模式下一一進(jìn)行了優(yōu)化解決。

四、Native Flink on Kubernetes

首先,為什么會選擇這種部署模式?因為它具有以下三個特征:

  • 更短的 Failover 時間;
  • 可以實現(xiàn)資源托管,不需要手動創(chuàng)建 TaskManager 的 pod,也可以自動完成銷毀;
  • 具有更加便捷的 HA。在 Flink 1.12 之前,實現(xiàn) JobManager HA 還是依賴于第三方的 zookeeper。但在 Native Flink on K8s 模式下,可以依賴于原生 K8s 的 leader 選舉機(jī)制來完成 JobManager 的 HA。

上圖是 Native Flink on K8s 的體系架構(gòu)圖。Flink Client 里面集成了一個 K8s Client,它可以直接和 K8s API server 進(jìn)行通訊,完成 JobManager Deployment 以及 ConfigMap 的創(chuàng)建。JobManager development 創(chuàng)建完成之后,它里面的 resource manager 模塊可以直接和 K8s API server 進(jìn)行通訊,完成 TaskManager pod 的創(chuàng)建和銷毀工作,這也是它與傳統(tǒng) session Cluster 模式比較大的不同之處。

內(nèi)部將 UDF 分為兩類:

第一類是平臺內(nèi)置的,將平時的生產(chǎn)工作中經(jīng)常使用到的 UDF 進(jìn)行抽象歸納總結(jié),并內(nèi)置到鏡像里面。鏡像里有關(guān)于 UDF 的配置文件,其中有 UDF 的名稱以及類型,同時指定了它對應(yīng)的實現(xiàn)類。

另外一類是 User-defined UDF,在 Helm 管理模式下,針對用戶自定義的 UDF 管理是比較粗放的,將用戶 project 下所有 UDF 相關(guān)的 JAR 包統(tǒng)一加載到 classloader 下,這會導(dǎo)致類沖突問題。而在 Native Flink 模式下,實現(xiàn)了一個 create function using JAR 的語法,可以按需加載用戶所需要的 UDF 對應(yīng)的 JAR 包,可以極大地緩解類沖突的問題。

在原有的模式上,鏡像管理是通過將所有代碼統(tǒng)一打包到一個大的 image 里,但這樣會存在一個問題,對任何模塊的修改都需要對整個代碼庫進(jìn)行一次編譯打包,而這個過程是非常耗時的。

在 Native Flink 版本下,針對鏡像版本管理做了一些優(yōu)化,主要是將 Flink 的 image 拆分為了三個部分,分為 Flink engine、connector 以及第三方插件。這三個部分都有各自版本號,并且可以自由進(jìn)行拼裝組合。這項優(yōu)化降低了引擎打包的頻率,也意味著可以提升發(fā)版效率。

拆分之后,F(xiàn)link 如何將這些鏡像組合成一個可以運(yùn)行的鏡像呢?下面以加載一個 Kafka SDK 插件為例來進(jìn)行闡述。job 運(yùn)行時會從一個動態(tài)配置倉庫中獲取當(dāng)前這個 job 應(yīng)該使用的 Kafka SDK 版本,并將其傳遞給百川的后端,這個 SDK 版本對應(yīng)了 docker 倉庫里面的一個鏡像,鏡像只包含一個 SDK 對應(yīng)的 JAR 包,百川的后端在渲染 pod 模板的時候,會在 InitContainer 階段將 image 加載進(jìn)來,同時將它 Kafka 的 JAR 包移動到 Flink container 某個指定的目錄下去,以此完成加載。

在新的模式下,對 job 狀態(tài)維護(hù)機(jī)制做了一次重構(gòu),引入了一個 headless 類型的 service 以及一個 status DB。在 JobManager 模塊,通過 JobManager status listener 不斷監(jiān)聽 job 狀態(tài)變化,并將這個變化上傳到 job ststusDB 中,百川平臺可以通過 Query DB 來獲取任務(wù)的狀態(tài)。另外在某些場景下,可能因為 job 狀態(tài)上傳失敗導(dǎo)致百川無法獲取到任務(wù)的狀態(tài),百川還是可以走原來的路徑,通過 Ingress 去訪問 JobManager 來獲取任務(wù)的狀態(tài)。此時的 Ingress 和之前不同之處在于它綁定的是一個 headless service,不需要占用集群的 Cluster IP,這就解決了之前模式下 K8s ClusterIP 以及 nodePort 不足的問題。

完成上述優(yōu)化工作以后,面臨的最大的問題就是如何將老版本的任務(wù)平滑地遷移到新版本 Flink 1.13 上,這其實是一項非常具有挑戰(zhàn)性的工作。主要做了以下 4 個方面的工作:

第一,兼容轉(zhuǎn)化工具。這個工具會對 SQL 進(jìn)行轉(zhuǎn)化,保證 SQL 在 1.13 運(yùn)行的語法校驗不會出錯。1.10 到 1.13 經(jīng)歷過幾個大版本的變更, SQL 的定義在眾多方面已經(jīng)不兼容,比如在 1.10 和 1.11 的時候,Kafka connector 的取值是 0.11,到 1.13 之后,對應(yīng)取值已經(jīng)變成 universal,如果不做任何轉(zhuǎn)化,原始 SQL 肯定在 1.13 上沒有辦法運(yùn)行。

第二,兼容檢測工具。這個工具的目的是為了檢查 SQL 運(yùn)行在 1.13 的時候能不能從一個低版本的 savepoint 去進(jìn)行恢復(fù)。主要從以下幾個方面去做了檢查:operator ID 升級之后,名稱有沒有發(fā)生變化;新舊兩個版本對應(yīng)的 max parallelism 有沒有發(fā)生變化,因為 max parallelism 發(fā)生變化的時候,在某部分場景下是沒有辦法從一個老的 savepoint 來恢復(fù)的。

第三,預(yù)編譯。在 1.13 上對轉(zhuǎn)換之后的 SQL 進(jìn)行預(yù)編譯,看編譯的結(jié)果是否能夠正常通過。在兼容檢測工具的過程中,也發(fā)現(xiàn)了很多從低版本到高版本不兼容的地方,引入了新的數(shù)據(jù)類型機(jī)制,1.11 沒有使用 ExternalSerializer,而 1.12 及以后使用 ExternalSerializer 進(jìn)行包裝;BaseRowSerializer 已經(jīng)在 Flink 1.11 時候改名成了 RowDataSerializer;數(shù)據(jù)類型里面有一個 seriaVersionUID,之前它是一個隨機(jī)的 long 類型的數(shù)字,而在 1.13 統(tǒng)一固定成了 1。上述種種不兼容會導(dǎo)致 1.13 沒有辦法直接從一個低版本的 savepoint 來恢復(fù)的。因此針對這些問題,在引擎?zhèn)茸隽艘恍└脑臁?/p>

第四,遷移工具。這個工具的目標(biāo)主要有以下三點:

首先,對用戶作業(yè)的影響時間盡可能降到最低,為了達(dá)成這個目標(biāo),我們對 Native Flink on K8s 的 application mode 做了比較大的改造。原生的 application mode 是一邊調(diào)度一邊申請資源,為了在升級過程中降低對用戶作業(yè)的影響,實現(xiàn)了 application mode 下可以提前申請好資源并完成 SQL 的編譯 (即 JobManager 的預(yù)啟動),這個過程完成之后,將舊的 job 停掉然后啟動新的 job,整個過程對用戶作業(yè)的影響能夠控制在 30 秒以內(nèi) (中等規(guī)模任務(wù))。

其次,在遷移的過程中要保證狀態(tài)不丟失,因為所有遷移都是基于 savepoint 來啟動的,所以這塊的數(shù)據(jù)是不會有任何丟失的。

最后,如果在升級過程中發(fā)生了異常,可以支持異常情況下自動完成回滾。

在實際 Application mode 應(yīng)用過程中,也發(fā)現(xiàn)了原生 Flink 的一些問題,并做了對應(yīng)的處理方案。

例如 JobManager 在 failover 的時候會重新拉起一批新的 TM,會導(dǎo)致 TaskManager 的資源翻倍。如果資源池的資源不足以滿足 double 的需求,就有可能導(dǎo)致 failover 失敗。此外,即使這一次 failover 成功了,但是新啟動的 job 會基于首次啟動時指定的 recover path 來進(jìn)行恢復(fù),這個時候的位點可能已經(jīng)是一個十天以前的位點了,這會導(dǎo)致數(shù)據(jù)重復(fù)消費的問題。針對這個問題,在檢測到 JobManager 發(fā)生 failover 的時候就會在引擎?zhèn)戎苯訉?job fail 掉并告警,然后通過人工手動介入來處理。

五、流批一體作業(yè)管控平臺

流批一體作業(yè)管控平臺主要提供了以下幾個模塊的功能:作業(yè)開發(fā)及運(yùn)維、版本管理、監(jiān)控報警、資源管理、數(shù)據(jù)血緣、元數(shù)據(jù)管理以及 SDK。其中資源管理主要分為資源隔離和資源推薦,數(shù)據(jù)血緣主要用于展示 Flink 任務(wù)上下游之間的關(guān)系,元數(shù)據(jù)管理主要是針對用戶 catalog 表。

上圖上半部分是 SQL 開發(fā)界面,頁面的主體部分 SQL 編輯器,右側(cè)有任務(wù)的基本信息、版本信息、作業(yè)參數(shù)以及一些資源配置相關(guān)的界面元素。

下半部分是任務(wù)運(yùn)維界面,上面提供了很多常規(guī)操作,比如停止任務(wù),或先打 savepoint 再停止任務(wù)等。

作業(yè)版本管理分為 Flink SQL 任務(wù)以及 Flink JAR 任務(wù)。在 SQL 任務(wù)界面上可以看到 SQL 經(jīng)歷過很多次發(fā)版,“更多” 按鈕提供了回滾操作。針對 Flink JAR 任務(wù),目前有兩種提交 JAR 任務(wù)的方法,可以直接將用戶的 JAR 包上傳到一個分布式存儲路徑,也可以通過指定代碼倉庫 tag 來指定 JAR 包的版本。

資源管理主要分為資源隔離和資源推薦。這里引入了資源池的概念,并基于以下幾個維度做了切分:

  • 第一個因素是它運(yùn)行所屬的云環(huán)境;
  • 第二個因素是業(yè)務(wù)類型;
  • 第三個因素是資源池提供給流還是批任務(wù)使用。

另外,針對已經(jīng)運(yùn)行一段時間的任務(wù),會結(jié)合它歷史運(yùn)行期間的 CPU、內(nèi)存、延遲 lag 等指標(biāo)信息,給出當(dāng)前任務(wù)所需要的最佳 K8s 資源配置推薦結(jié)果。

Rugal 調(diào)度平臺是公司內(nèi)部一個對標(biāo) airflow 的產(chǎn)品,它可以通過百川提供的 SDK 定時創(chuàng)建任務(wù)提交到百川平臺。上圖左側(cè)是一個 SQL 編輯模板,其中的很多參數(shù)信息都是通過變量的形式來展示。調(diào)用 SDK 的時候,可以將這些變量對應(yīng)的實際值傳入進(jìn)來,并用這些值渲染出具體要執(zhí)行的 SQL,從而生成具體的執(zhí)行實例。

六、未來展望

最后是對未來工作的規(guī)劃。

第一,動態(tài)資源調(diào)整。目前, Flink job 一旦提交運(yùn)行,就無法在運(yùn)行期間修改某個 operator 占用的資源。所以希望未來能夠在 job 不進(jìn)行 restart 的情況下,調(diào)整某個算子所占用的資源。

第二,跨云多活方案。目前公司核心 P0 作業(yè)基本都是雙鏈路的,但都僅限于在單朵云上。希望針對這些核心任務(wù),實現(xiàn)跨云雙活方案,其中一個云上任務(wù)出現(xiàn)問題的時候,能夠穩(wěn)定切換到另外一朵云上。

第三,批任務(wù)資源調(diào)度優(yōu)化。因為批任務(wù)大多是在凌晨以后開始執(zhí)行,同時會調(diào)度很多任務(wù),有的任務(wù)可能因為搶占不到資源導(dǎo)致無法及時運(yùn)行,在任務(wù)調(diào)度執(zhí)行策略上仍有可以優(yōu)化的空間。

責(zé)任編輯:未麗燕 來源: Apache Flink
相關(guān)推薦

2024-09-10 09:36:26

2024-10-23 20:09:47

2024-10-10 08:19:50

2017-10-25 09:15:46

鏡像部署容器

2024-09-25 16:08:52

2023-04-18 07:49:06

2021-05-06 11:54:40

大數(shù)據(jù)Flink

2018-11-14 13:49:16

Apache Flin唯品會架構(gòu)

2020-11-26 18:30:33

機(jī)器學(xué)習(xí)Kubernetes開發(fā)

2022-06-10 15:21:15

MySQL CDCSqlServer數(shù)據(jù)庫

2016-12-23 09:09:54

TensorFlowKubernetes框架

2021-06-05 06:52:16

Kubernetes

2024-06-19 07:45:20

2024-12-19 21:09:38

2023-09-07 13:34:00

云原生數(shù)據(jù)倉庫

2025-01-15 11:36:28

2024-08-02 14:56:00

2022-04-07 16:50:28

FlinkB站Kafka

2022-09-16 08:23:22

Flink數(shù)據(jù)湖優(yōu)化

2021-05-20 09:55:23

Apache Flin阿里云大數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號