理想汽車基于Flink on K8s的數(shù)據(jù)集成實(shí)踐
一、數(shù)據(jù)集成的發(fā)展與現(xiàn)狀
理想汽車數(shù)據(jù)集成的發(fā)展經(jīng)歷了四個(gè)階段:
第一階段:在 2020 年 7 月基于 DataX 構(gòu)建了離線數(shù)據(jù)交換能力。
第二階段:在 2021 年 7 月,構(gòu)建了基于 Flink 的實(shí)時(shí)處理平臺,在這兩個(gè)階段,還沒有一個(gè)真正的數(shù)據(jù)集成的產(chǎn)品。
第三階段:2022 年 9 月,開始建設(shè)數(shù)據(jù)集成平臺,構(gòu)建了第一個(gè)數(shù)據(jù)集成鏈路,實(shí)現(xiàn) Kafka 到 Hive 的數(shù)據(jù)鏈路。
第四階段:2023 年 4 月,在原來實(shí)時(shí)處理能力的基礎(chǔ)上擴(kuò)展了離線集成能力,實(shí)現(xiàn)了批流數(shù)據(jù)的統(tǒng)一。
早期,理想還沒有統(tǒng)一的數(shù)據(jù)集成平臺,數(shù)據(jù)產(chǎn)品紛雜。
TiDB、MySQL、StarRocks、MongoDB 等數(shù)據(jù)傳輸?shù)较掠?,是通過 DataX 來實(shí)現(xiàn)的;而 Kafka、Oracle 等數(shù)據(jù)傳輸具備流特性的數(shù)據(jù)又是通過 Flink 實(shí)現(xiàn)的;同時(shí),比如 Hive 等一些數(shù)據(jù)傳輸是通過寫 Spark SQL 來實(shí)現(xiàn)的;還有一些數(shù)據(jù)庫,如 TiDB、Oracle 是通過數(shù)據(jù)庫自己的引擎進(jìn)行數(shù)據(jù)傳輸。
基于以上數(shù)據(jù)形態(tài),業(yè)務(wù)方在使用產(chǎn)品時(shí)存在如下一些痛點(diǎn):
- 產(chǎn)品能力缺失。需要在多套平臺之間切換,沒有直接可落地的產(chǎn)品,需要開發(fā)團(tuán)隊(duì)寫代碼。
- 多套開發(fā)語言。無論是 Flink、Spark 還是 DataX 等等,每一個(gè)引擎都有其特有的配置,需要同時(shí)了解多套開發(fā)語音和不同的開發(fā)細(xì)節(jié)。
- 資源共享難。由于批流使用不同的引擎,由不同的團(tuán)隊(duì)開發(fā),底層的計(jì)算和存儲資源很難共享。
- 資源利用率低。正是由于資源共享難,又引發(fā)了另外一個(gè)問題,就是資源利用率低,且存在不均衡的狀況。比如實(shí)時(shí)計(jì)算集群,是長期運(yùn)行的任務(wù),其計(jì)算資源經(jīng)常吃緊,但是存儲資源基本上處于沒有使用的狀態(tài)。
根據(jù)業(yè)務(wù)痛點(diǎn),我們總結(jié)出了三大需求:
首先是統(tǒng)一平臺,屏蔽各種異構(gòu)數(shù)據(jù)源之間不同傳輸引擎的差異。
第二是統(tǒng)一計(jì)算引擎,將批式、流式用一套引擎實(shí)現(xiàn)。
第三是存算分離,能夠在計(jì)算層和存儲層,按照業(yè)務(wù)的需求進(jìn)行獨(dú)立的彈性伸縮。
為了滿足上述需求,我們選擇使用 Fink 作為計(jì)算引擎,其批流一體的計(jì)算引擎,讓我們在處理批式、流式時(shí)可以做到無縫切換。同時(shí),F(xiàn)link 基于 K8s 的云原生的能力也能夠幫助我們實(shí)現(xiàn)計(jì)算資源和存儲資源的彈性擴(kuò)縮容。
我們的產(chǎn)品已經(jīng)在業(yè)務(wù)上有很多應(yīng)用實(shí)例。對接了包括服務(wù)器端的日志類數(shù)據(jù),比如 Oracle、MySQL、TiDB 等有 binlog 的業(yè)務(wù)數(shù)據(jù)傳輸?shù)较掠巍M瑫r(shí),車端、云端還有工廠的一些埋點(diǎn)和信號的數(shù)據(jù),也通過集成平臺實(shí)現(xiàn)了數(shù)據(jù)傳輸。
計(jì)算方面,在傳輸能力上,可以實(shí)現(xiàn)流式和批式的數(shù)據(jù)處理轉(zhuǎn)換,支持并行讀取和并行寫入的能力,以及對于異構(gòu)數(shù)據(jù)源不同類型數(shù)據(jù)轉(zhuǎn)換的能力,業(yè)務(wù)用戶不再需要去了解不同產(chǎn)品的細(xì)節(jié)。在產(chǎn)品運(yùn)維能力上,包括任務(wù)的管理、權(quán)限控制、監(jiān)控告警、日志采集等等,覆蓋了任務(wù)的全部生命周期。最終落到下游各種數(shù)據(jù)存儲。
二、數(shù)據(jù)集成的落地實(shí)踐
1、數(shù)據(jù)集成平臺架構(gòu)
存儲層:采用 JuiceFS+BOS 的方式,同時(shí)借助 K8s 本身 node 節(jié)點(diǎn)的一些本地存儲能力,為計(jì)算引擎提供相應(yīng)的存儲能力。
計(jì)算層:基于 Flink 的內(nèi)核,擴(kuò)展了各種 connector,最終封裝成了一個(gè)標(biāo)準(zhǔn)化鏡像,通過 Flink Operator 來調(diào)取鏡像,把任務(wù)提交到 K8s 的集群當(dāng)中。同時(shí),配備了 Flink 的 history 的服務(wù),這樣用戶可以在任務(wù)出現(xiàn)異?;蛉蝿?wù)結(jié)束時(shí)對任務(wù)的歷史狀況進(jìn)行分析。Flink Operator 是一個(gè)定義在 Flink 上的 CRD,它通過 K8s 的 API server 對外提供標(biāo)準(zhǔn)化的 API。上面封裝了一個(gè)中間層,對下使用 K8s 的 API server,對產(chǎn)品則封裝了標(biāo)準(zhǔn)化的 API,屏蔽各個(gè)層的依賴關(guān)系,還承接了任務(wù)的編排和任務(wù)生命周期的管理。
2、設(shè)計(jì)模型
數(shù)據(jù)集成的設(shè)計(jì)模型如下圖所示,通過定義各種 source 和 sink 的插件來實(shí)現(xiàn)數(shù)據(jù)傳輸?shù)霓D(zhuǎn)換。
下面封裝的 API,用戶只需要定義 source 和要傳輸?shù)臄?shù)據(jù)內(nèi)容,以及寫到哪一個(gè) sink,就可以完成一個(gè) transform 的過程。
例如,假設(shè)我們已經(jīng)有了 TiDB、OceanBase、Hive、Kafka 等 sink 的鏈路,當(dāng)增加一個(gè)新的 MySQL 的 connector 時(shí),就創(chuàng)建了這一套插件的一組數(shù)據(jù)傳輸能力。這樣就可以快速實(shí)現(xiàn)各種場景的數(shù)據(jù)落地。
3、典型場景
在離線集成場景中,首先獲取庫表的關(guān)聯(lián)關(guān)系。數(shù)據(jù)之間的增量和全量的數(shù)據(jù)同步,通過調(diào)度平臺進(jìn)行統(tǒng)一的調(diào)度處理。
在過去使用 OceanBase 到 Hive 的鏈路,數(shù)據(jù)量大時(shí) OB 經(jīng)常出現(xiàn) time out,因?yàn)?OB 本身設(shè)置了 time out 時(shí)間。我們的解決方法是,首先獲取 OB 的數(shù)據(jù)結(jié)構(gòu),分析主鍵及分區(qū)選擇分片字段,計(jì)算出這個(gè)字段的最大值、最小值,以及這一批次的數(shù)據(jù)量,然后使用這三個(gè)信息,合理設(shè)置拉取這個(gè)數(shù)據(jù)的 size。之后, Flink 就可以基于這個(gè) size 的大小并行地去拉取。保證每一次拉取的數(shù)據(jù)量不會特別的多,這樣就解決了數(shù)據(jù)出現(xiàn) time out 的問題
過去的實(shí)時(shí)傳輸鏈路,用戶需要跨多個(gè)平臺去做,開發(fā)流程長。并且,用戶需要手動創(chuàng)建表,開發(fā)復(fù)雜。
有了數(shù)據(jù)集成平臺之后,就省去了上面一系列的人工過程。通過集成平臺配置 source、sink,就可以實(shí)現(xiàn)數(shù)據(jù)流轉(zhuǎn)。
對于 Hive 的表,經(jīng)常會有數(shù)據(jù)分區(qū)。這里提供了幾種方式來生成 Hive 的分區(qū),可以基于數(shù)據(jù)、基于處理或基于元數(shù)據(jù)時(shí)間來進(jìn)行分區(qū)。基于元數(shù)據(jù)時(shí)間分區(qū)的好處是可以避免生成太多的 Hive 的小碎文件,因?yàn)橄M(fèi)數(shù)據(jù)在不出現(xiàn)延遲的情況下,基本上一個(gè)分區(qū)的數(shù)據(jù)都會寫到同一個(gè) Hive 的 partition 里面。同時(shí),開啟了 Kafka 的自動感知分區(qū)的能力,比如當(dāng) Kafka 數(shù)據(jù)暴增時(shí),Kafka 的 topic 的分區(qū)進(jìn)行增加,自動感知就非常有必要。
上圖展示的是一個(gè) Oracle 傳輸入流的場景。借助 Flink CDC 的能力,在全量階段,通過設(shè)置多個(gè)并行度來讀全量數(shù)據(jù),當(dāng)全量數(shù)據(jù)讀取完成后,F(xiàn)link 會通過自動切換能力切換到增量模式。增量模式會選擇其中的某一個(gè) task manager,去讀取增量的數(shù)據(jù)。
4、異構(gòu)數(shù)據(jù)源
不同類型的數(shù)據(jù)庫支持的數(shù)據(jù)類型存在差異,我們很難在這個(gè)過程當(dāng)一一記住該把哪個(gè)類型轉(zhuǎn)換到哪個(gè)類型。因此,在數(shù)據(jù)集成平臺上,我們把數(shù)據(jù)源的類型映射成 Flink 類型,把數(shù)據(jù)目標(biāo)的類型也映射成 Flink 類型。最終,都通過 Flink 的類型進(jìn)行統(tǒng)一的處理轉(zhuǎn)換。映射過程用戶是不需要關(guān)注的。
5、SQL 形式的過濾條件
這個(gè)轉(zhuǎn)換過程中,需要過濾一些常用的 where 條件,這里提供了常用的一些函數(shù)。
三、數(shù)據(jù)集成云原生的落地實(shí)踐
K8s 云原生方案的落地主要考慮了四大關(guān)鍵點(diǎn),接下來逐一展開介紹。
1、方案選型
選型方面,選擇使用 Flink Operator 進(jìn)行任務(wù)管理。首先,F(xiàn)link Operator 可以方便地進(jìn)行管理集群。它封裝了 K8s 的一個(gè)應(yīng)用,可以擴(kuò)展 API 來實(shí)現(xiàn)配置和創(chuàng)建應(yīng)用實(shí)例。采用聲明式地提交。同時(shí)配備了集成的 ingress,可以通過 ingress 來實(shí)現(xiàn)配置 Flink 的 web UI,在運(yùn)行過程中通過 web UI 監(jiān)控任務(wù)的狀態(tài),或者查看運(yùn)行日志。Flink Operator 實(shí)現(xiàn)了作業(yè)全生命周期的管理,可以實(shí)現(xiàn)運(yùn)行和暫停應(yīng)用程序,有狀態(tài)、無狀態(tài)的應(yīng)用升級,以及定時(shí)觸發(fā)和管理 CheckPoint 點(diǎn)。還可以做到回滾。
上圖展示了 Flink Operator 的處理過程。首先,在平臺上注冊 Flink Operator,也就是在 K8s 集群中創(chuàng)建一個(gè) Flink deployment 的 CRD。之后就可以使用這個(gè) CRD 去創(chuàng)建相應(yīng)的資源。一個(gè) yaml 文件提交到 K8s 集群之后,K8s 的 API 調(diào)用 CRD 創(chuàng)建 FlinkDeployment。然后由 Flink Operator 創(chuàng)建 Flink 的 Deployment,并創(chuàng)建相應(yīng)的 TaskManager。同時(shí),Operator 會監(jiān)聽 FlinkDeployment 的狀態(tài),其實(shí)質(zhì)上是監(jiān)聽 JobManagerPod 的狀態(tài),并更新到 Operator 中。如果任務(wù)失敗,會嘗試重新調(diào)起。
2、狀態(tài)判斷及日志采集
各種任務(wù)的狀態(tài)已經(jīng)被記錄到了 FlinkDeployment 中,通過 watch 的方式去監(jiān)聽 Flink K8s API server,就可以捕獲任務(wù)各個(gè)事件的狀態(tài)。同時(shí),還會 watch 每一個(gè) JobManager 和 TaskManager 的 pod,將 pod 狀態(tài)和名稱來作為日志標(biāo)題。已經(jīng)有了任務(wù)運(yùn)行的狀態(tài),為什么還要采集 pod 的狀態(tài)呢?因?yàn)閷?shí)時(shí)任務(wù)是一個(gè)持久化運(yùn)行的任務(wù),pod 可能會在某一個(gè)時(shí)間節(jié)點(diǎn),由于一些原因?qū)е铝怂劳?。對于已?jīng)死亡的 pod,不需要看到所有的日志的狀態(tài)。通過標(biāo)記 pod 的狀態(tài),來描述這一個(gè)日志是一個(gè)有效還是一個(gè)無效的日志。在每一個(gè) K8s 的 node 節(jié)點(diǎn)上部署了 Agent,通過 Agent 采集某每一個(gè) pod 的日志作為下游的轉(zhuǎn)換日志。
上圖描繪出了狀態(tài)轉(zhuǎn)換關(guān)系。Failed、Finished、Canceled 和 Suspended 這四個(gè)狀態(tài)是最終任務(wù)結(jié)束的狀態(tài)類型。如果出現(xiàn) Failed,下游會進(jìn)行相應(yīng)的告警。
3、監(jiān)控告警
我們對每一個(gè)任務(wù)都給用戶提供了配置告警的方式,當(dāng)用戶啟動任務(wù)的時(shí)候,任務(wù)會把相應(yīng)的指標(biāo)上報(bào)到 Prometheus,Prometheus 會周期性地去采集和運(yùn)算,如果告警指標(biāo)沒有被觸發(fā),就會處于靜默的狀態(tài)。如果告警指標(biāo)被觸發(fā),就會觸發(fā)一條告警發(fā)送給相應(yīng)的用戶。用戶就可以基于告警采取相應(yīng)的處理。
4、共享存儲
共享存儲使用了 JuiceFS。每一個(gè) pod 都會通過掛載本地 CSI 的方式把 JuiceFS 掛載到本地,形成一個(gè)本地的存儲目錄。
Flink 的任務(wù)需要去做周期性的 checkpoint,checkpoint 會被持久化到 JuiceFS 里面。在任務(wù)運(yùn)行時(shí),F(xiàn)link 配置了重啟的策略,Operator 也會有一些重啟的策略,當(dāng)任務(wù)出現(xiàn)異常的時(shí)候會進(jìn)行任務(wù)的重啟,在重啟時(shí)會找到最近一次 checkpoint 點(diǎn)進(jìn)行重啟。另外,F(xiàn)link Operator 可以實(shí)現(xiàn)任務(wù)的無狀態(tài)和有狀態(tài)的升級,升級時(shí),如果 yaml 狀態(tài)發(fā)生了變更,就會去找到最新的 checkpoint 點(diǎn)進(jìn)行任務(wù)重啟。
Flink 運(yùn)行期間的狀態(tài)信息和存檔信息也會記錄在 JuiceFS 里面,會由 Flink 的 history 來提供查看。
四、未來規(guī)劃
首先,支持更多的數(shù)據(jù)源,實(shí)現(xiàn)更多異構(gòu)數(shù)據(jù)源之間的轉(zhuǎn)換。
第二,彈性伸縮能力方面,目前雖然使用了 K8s 的能力,但是對資源的彈性伸縮等問題還沒有進(jìn)行完整的落地,后續(xù)希望在彈性伸縮能力上進(jìn)行一些增強(qiáng)。
第三,進(jìn)一步提升海量數(shù)據(jù)的傳輸性能。
最后,對于批處理任務(wù),目前 Flink 存在一個(gè)缺陷,無法進(jìn)行謂詞下推,導(dǎo)致在做有 where 條件的任務(wù)時(shí)會把全量數(shù)據(jù)拉到 Flink 內(nèi)存里面,再進(jìn)行 where 條件的過濾。