美團(tuán)基于 Flink 的實(shí)時數(shù)倉平臺建設(shè)新進(jìn)展
?摘要:本文整理自美團(tuán)實(shí)時數(shù)倉平臺負(fù)責(zé)人姚冬陽在 Flink Forward Asia 2021 實(shí)時數(shù)倉專場的演講。主要內(nèi)容包括:
- 平臺建設(shè)現(xiàn)狀
- 遇到的問題及解決
- 未來規(guī)劃
01平臺建設(shè)現(xiàn)狀
美團(tuán)于 2018 年首次引入 Flink 實(shí)時計算引擎,當(dāng)時的實(shí)時數(shù)倉概念還不太普及,平臺只提供了 Flink Jar 任務(wù)的生命周期管理和監(jiān)控報警。
2019 年,我們注意到實(shí)時計算的主要應(yīng)用場景是解決離線數(shù)倉時效性低的問題。離線數(shù)倉已經(jīng)比較成熟,通過 SQL 方式開發(fā)很簡單,而數(shù)倉的實(shí)時部分主要通過 Flink DataStream API 來開發(fā),門檻比較高,而且與離線數(shù)倉的開發(fā)方式相比較為割裂。因此,我們開始調(diào)研實(shí)時數(shù)倉的解決方案,目標(biāo)是降低開發(fā)門檻,并嘗試推廣 FlinkSQL,最終將美團(tuán)的實(shí)時數(shù)倉平臺取名為 NAU。
2020 年,美團(tuán)實(shí)時數(shù)倉平臺正式上線。它向業(yè)務(wù)提供 FlinkSQL 作業(yè)開發(fā)入口,主要負(fù)責(zé)兩個方面的工作:
- 首先,將實(shí)時數(shù)倉常見的數(shù)據(jù)源與離線表概念對齊,用數(shù)據(jù)模型進(jìn)行管理;
- 其次,提供 FlinkSQL 開發(fā)配套的效率工具,比如校驗(yàn)和調(diào)試功能。
但是在實(shí)際推廣過程中,我們發(fā)現(xiàn)業(yè)務(wù)在 FlinkSQL 的運(yùn)維方面門檻依然比較高,因此,我們將接下來的工作重點(diǎn)轉(zhuǎn)向了運(yùn)維中心。
FlinkSQL 作業(yè)運(yùn)維的痛點(diǎn)主要集中在兩個方面:有狀態(tài) SQL 作業(yè)部署的斷流問題和 SQL 作業(yè)的異常定位問題。為此,我們通過 Checkpoint 持久化和狀態(tài)生成的異步化來解決第一個問題,并通過提供作業(yè)的自動診斷來解決第二個問題。目前,整個實(shí)時數(shù)倉的平臺化建設(shè)已經(jīng)初步完備,未來我們會在開發(fā)和運(yùn)維能力上不斷精細(xì)化,并且繼續(xù)推動公司業(yè)務(wù)數(shù)倉架構(gòu)的進(jìn)化,比如流批生產(chǎn)的一體化、生產(chǎn)服務(wù)的一體化。
實(shí)時數(shù)倉目前已基本覆蓋了公司的全部業(yè)務(wù),為 100 多個業(yè)務(wù)團(tuán)隊(duì)提供了支持,比如美團(tuán)優(yōu)選、美團(tuán)買菜、金融、騎行等業(yè)務(wù)。托管了 7000 多個實(shí)時數(shù)據(jù)模型,主要為 Kafka 表和 KV 表模型。線上運(yùn)行 FlinkSQL 作業(yè) 4000+,新增的實(shí)時 SQL 作業(yè)占比已經(jīng)達(dá)到 70% 以上。從數(shù)據(jù)上看,F(xiàn)linkSQL 已經(jīng)可以解決美團(tuán)實(shí)時數(shù)倉大部分流處理的問題。
接下來以美團(tuán)業(yè)務(wù)中的兩個實(shí)時數(shù)倉生產(chǎn)鏈路為例,具體分享 FlinkSQL 的實(shí)際應(yīng)用。
應(yīng)用場景 1 是基于 FlinkSQL + OLAP 的實(shí)時生產(chǎn)鏈路。這個業(yè)務(wù)鏈路的實(shí)時數(shù)據(jù)源有兩個,分別是業(yè)務(wù) DB 的變更事件和業(yè)務(wù)服務(wù)的日志事件,這些事件首先會被收集到 Kafka 中,然后 DB 事件會按表名分發(fā)到新的 Kafka 中,DB 和日志的數(shù)據(jù)也會在這一層進(jìn)行格式上的統(tǒng)一并完成實(shí)時數(shù)倉的 ODS 層。然后業(yè)務(wù)會使用 FlinkSQL 來清洗和關(guān)聯(lián) ODS 層的數(shù)據(jù),生成實(shí)時數(shù)倉的主題寬表,最后寫入 OLAP 查詢引擎做實(shí)時分析。對于時效性要求不高的場景,部分業(yè)務(wù)還會在 OLAP 引擎上配置分鐘級別的調(diào)度來減少相同查詢的壓力。
應(yīng)用場景 2 與場景 1 的不同點(diǎn)在于,業(yè)務(wù)實(shí)時數(shù)倉的主題寬表數(shù)據(jù)并不是直接寫入 OLAP 查詢引擎,而是繼續(xù)寫入 Kafka,使用 FlinkSQL 做 APP 層的指標(biāo)聚合,最終把預(yù)計算的指標(biāo)數(shù)據(jù)寫入 OLAP、DB 或 KV 這類應(yīng)用層的存儲。這種方式更適合對接數(shù)據(jù)服務(wù),因?yàn)樗骖櫫藬?shù)據(jù)的時效性和高 QPS 的查詢。
上圖是實(shí)時數(shù)倉平臺的架構(gòu),分為集成、開發(fā)、運(yùn)維、治理、安全 5 個模塊分別建設(shè)。
集成模塊主要關(guān)注的是數(shù)據(jù)模型的管理,具體包括 Kafka 和 KV 兩種模型管理,管理的內(nèi)容有數(shù)據(jù)源的 schema 信息和連接信息等。
開發(fā)模塊主要關(guān)注的是 FlinkSQL 轉(zhuǎn)化業(yè)務(wù)需求,比如提供版本管理來記錄業(yè)務(wù)需求的迭代過程,提供 FlinkSQL 的校驗(yàn)和調(diào)試,來確保開發(fā)的 SQL 正確表達(dá)了業(yè)務(wù)邏輯,支持業(yè)務(wù)使用自定義的 Flink UDF 函數(shù)和自定義的 Format 解析,讓 FlinkSQL 可以擴(kuò)展?jié)M足更多業(yè)務(wù)需求場景。
運(yùn)維模塊關(guān)注的是 SQL 作業(yè)的部署和運(yùn)行時的監(jiān)控。在監(jiān)控方面,我們提供了 SQL 作業(yè)的監(jiān)控報警、異常日志和作業(yè)診斷,能夠幫助業(yè)務(wù)快速發(fā)現(xiàn)和定位作業(yè)的異常;部署方面,我們提供 SQL 作業(yè)的快照管理、AB 部署和參數(shù)調(diào)優(yōu),來幫助業(yè)務(wù)解決 SQL 作業(yè)變更時的問題。
治理模塊關(guān)注的是實(shí)時數(shù)倉的數(shù)據(jù)質(zhì)量、資源成本,通過建設(shè)實(shí)時數(shù)倉的 DQC 監(jiān)控,幫助業(yè)務(wù)發(fā)現(xiàn)上游數(shù)據(jù)或產(chǎn)出數(shù)據(jù)的異常值/異常波動;通過鏈路血緣和資源計費(fèi),讓業(yè)務(wù)可以量化實(shí)時數(shù)倉的生產(chǎn)成本,方便進(jìn)行成本治理。
安全模塊主要關(guān)注的是對數(shù)據(jù)流向的管控,提供數(shù)據(jù)源讀寫權(quán)限的管理和受限域機(jī)制,保證公司業(yè)務(wù)數(shù)據(jù)的安全性。
02遇到的問題及解決
在實(shí)際推廣 FlinkSQL 的過程中,我們也面臨了不少挑戰(zhàn)。
2.1 雙流關(guān)聯(lián)大狀態(tài)問題
首先是雙流關(guān)聯(lián)的大狀態(tài)問題,F(xiàn)linkSQL 的雙流關(guān)聯(lián)會保留左右流的歷史數(shù)據(jù)來互相關(guān)聯(lián),需要關(guān)聯(lián)的時間間隔越長,保存的歷史數(shù)據(jù)就會越多,狀態(tài)也就會越大。比如,要關(guān)聯(lián)訂單的下單事件和退款事件,并保證計算結(jié)果的正確性,需要考慮這兩個事件發(fā)生的間隔,可能是一個月甚至更久。
上圖左側(cè)是一個雙流關(guān)聯(lián)的有狀態(tài) SQL 作業(yè),圖中的 Mem 和 Disk 組成了 SQL 作業(yè)的 TaskManager 節(jié)點(diǎn),SQL 作業(yè)狀態(tài)后端使用 RocksDB,狀態(tài)持久化在 HDFS 文件系統(tǒng)上。一開始我們嘗試把 SQL 作業(yè)的狀態(tài)設(shè)置為保留一個月,但 SQL 作業(yè)會變得不穩(wěn)定,出現(xiàn)內(nèi)存超限、狀態(tài)讀取性能下降等問題,只能不斷增加作業(yè)的 TM 數(shù)和內(nèi)存大小來緩解。
即使這樣,業(yè)務(wù)上仍然存在兩個痛點(diǎn)。首先是關(guān)聯(lián)數(shù)據(jù)初始化難,目前公司 Kafka 數(shù)據(jù)源對歷史回溯有限制,因此業(yè)務(wù)不能構(gòu)建出完整的歷史狀態(tài),即使 Kafka 支持了更久的回溯,狀態(tài)初始化的效率也依然是一個問題。其次,內(nèi)存資源開銷大,特別是當(dāng)多個 SQL 作業(yè)關(guān)聯(lián)相同的數(shù)據(jù)源時,需要為每個 SQL 作業(yè)都分配相應(yīng)的內(nèi)存資源,不同 SQL 作業(yè)間的狀態(tài)是隔離的,作業(yè)間相同的關(guān)聯(lián)數(shù)據(jù)不能復(fù)用。
解決方案對于上述問題,我們提出了冷熱關(guān)聯(lián)分離的解決方案。假設(shè)關(guān)聯(lián)兩天前的數(shù)據(jù)是相對低頻的且狀態(tài)回滾不會超過兩天,那么可以定義兩天前的數(shù)據(jù)為冷數(shù)據(jù),兩天之內(nèi)的數(shù)據(jù)為熱數(shù)據(jù)。
如上圖所示,左側(cè)的 SQL 作業(yè)通過設(shè)置狀態(tài)保留時長,只保留 T+0 和 T+1 這兩天的熱數(shù)據(jù),而 T+2 及更久以前的冷數(shù)據(jù)則通過批任務(wù)每天從 Hive 同步到外存 KV 中。關(guān)聯(lián)時,若狀態(tài)中的熱數(shù)據(jù)不存在,則再通過訪問外存 KV 來關(guān)聯(lián)冷數(shù)據(jù)。右側(cè)是另外一個 SQL 作業(yè)需要關(guān)聯(lián)相同的數(shù)據(jù)源,它與左側(cè)的 SQL 作業(yè)共享外層 KV 中的冷數(shù)據(jù)。
對于第一個痛點(diǎn),因?yàn)闋顟B(tài)控制在了兩天內(nèi),SQL 作業(yè)上線時,關(guān)聯(lián)數(shù)據(jù)初始化的數(shù)據(jù)量得到了控制。對于第二個痛點(diǎn),因?yàn)閮商烨暗拇蟛糠謹(jǐn)?shù)據(jù)都保存在外層KV中,不同的 SQL 作業(yè)都可以查詢外存 KV,從而可以節(jié)省大量內(nèi)存資源。
2.2 SQL 變更狀態(tài)恢復(fù)問題
第二個問題是有狀態(tài) SQL 邏輯變更后狀態(tài)如何恢復(fù)?FlinkSQL 支持有狀態(tài)的增量計算,狀態(tài)是增量計算的歷史累計,實(shí)際上業(yè)務(wù)需要修改邏輯的情況很多,上圖右側(cè)列出了一些常見的 SQL 變更情況,比如新增聚合指標(biāo)、修改原指標(biāo)口徑、增加過濾條件、新增數(shù)據(jù)流關(guān)聯(lián)、增加聚合維度等。
舉個例子,業(yè)務(wù)增加了更多服務(wù)維度,在數(shù)據(jù)產(chǎn)品上就需要擴(kuò)展分析的維度,因此也需要修改 FlinkSQL 增加聚合維度。但是上述 SQL 邏輯變化后卻不能從之前的狀態(tài)恢復(fù),因?yàn)闅v史狀態(tài)對于變更后的 SQL 不能保證其完整性,即使恢復(fù)后也不能百分百保證后續(xù)計算的正確性。這種情況下,業(yè)務(wù)為了保證數(shù)據(jù)的正確性,需要從歷史回溯重新計算,回溯的過程會導(dǎo)致線上斷流,但業(yè)務(wù)又不希望犧牲太多的時效性。
解決方案針對這個問題,我們給出了三種解決方案。
解法 1:雙鏈路切換。此解法的關(guān)鍵是再搭建一條相同的實(shí)時鏈路作為備用鏈路,當(dāng)變更有狀態(tài) SQL 時,可以在備用鏈路上做回溯,重新計算歷史數(shù)據(jù),回溯完成后先驗(yàn)證備用鏈路的結(jié)果數(shù)據(jù),確保沒問題后再在鏈路最下游的數(shù)據(jù)服務(wù)層切換讀取的表,完成整個變更流程。
解法 2:旁路狀態(tài)生成。與雙鏈路切換不同點(diǎn)在于,這里變更的是鏈路上的單個作業(yè),思路是臨時啟動一個旁路作業(yè)來回溯,構(gòu)建出新邏輯的狀態(tài),驗(yàn)證數(shù)據(jù)完成后再重啟線上作業(yè),以此完成 SQL 和狀態(tài)的同時切換。
解法 3:歷史狀態(tài)遷移,前兩個方法的思路比較類似,都是基于歷史數(shù)據(jù)重新計算,構(gòu)建出新狀態(tài)。但這個思路是基于歷史狀態(tài)遷移出新狀態(tài),這種方法構(gòu)建出的新狀態(tài)雖然不能保證完整性,但在某些情況下,業(yè)務(wù)也是可以接受的。目前我們通過改造 State Process API 支持在 SQL 算子及其上下游關(guān)系不變的情況下,允許 Join 和 Agg 算子來新增列。
上述三種方式各有優(yōu)點(diǎn),可以從普適性、資源成本、線上斷流、等待時長四個維度來對以上三個解決方案進(jìn)行橫向比較。
普適性是指在保證數(shù)據(jù)正確的前提下支持的 SQL 變更范圍,前兩個方法都是重新計算,狀態(tài)是完整的,因此比方案 3 的普適性更高。
資源成本是指完成 SQL 變更所需要的額外 Flink 或 Kafka 資源,方法 1 需要構(gòu)建整條鏈路,需要更多的 Flink 和 Kafka 資源,因此成本最高。
線上斷流指的是在變更過程中導(dǎo)致下游數(shù)據(jù)延遲的時長,方法 1 是在數(shù)據(jù)服務(wù)層做切換,幾乎沒有斷流;方法 2 的斷流時長取決于作業(yè)從狀態(tài)恢復(fù)的速度;方法 3 除了狀態(tài)恢復(fù),還需要考慮狀態(tài)遷移的速度。
等待時長指的是完成整個變更流程需要的時間,前兩個方法都需要重新計算,因此比方法 3 的等待時間更長。
上圖是方法 2 的平臺自動化流程。流程分為七個階段,變更流程執(zhí)行的時間較長,可能需要幾十分鐘,通過流程條以及圖中每個階段的執(zhí)行日志可以讓用戶感受到變更的進(jìn)度和狀態(tài)。我們還為用戶做了自動化指標(biāo)檢查,比如在第 2 個階段的旁路數(shù)據(jù)回溯中,我們會檢查作業(yè)消費(fèi) Kafka 的積壓指標(biāo),來判斷回溯是否完成,完成后自動制作新邏輯狀態(tài)。再比如在第 6 個階段,原作業(yè)從旁路作業(yè)啟動時會比較 Kafka Offset 指標(biāo)來比較兩個作業(yè)的消費(fèi)進(jìn)度,確保線上作業(yè)重啟后不會少發(fā)數(shù)據(jù)。
2.3 FlinkSQL 調(diào)試繁瑣問題
遇到的第 3 個問題是 FlinkSQL 調(diào)試繁瑣,操作步驟多,業(yè)務(wù)需要創(chuàng)建額外的作業(yè)和 Kafka,還要將導(dǎo)出的結(jié)果進(jìn)行存儲。此外,輸入構(gòu)造復(fù)雜,為了針對性地調(diào)試某種輸入場景,業(yè)務(wù)需要寫代碼來構(gòu)建消息并寫入數(shù)據(jù)源,甚至需要對多個不同數(shù)據(jù)源消息到來的順序進(jìn)行控制。上圖左側(cè)可以看到,為了做 FlinkSQL 調(diào)試,需要手動搭建一條與線上隔離的調(diào)試鏈路,然后寫入 Mock 數(shù)據(jù)。
解決方案?
針對上述問題的解法是:基于文件調(diào)試一鍵化。首先業(yè)務(wù)在 Web 端可以在線編輯 Mock 數(shù)據(jù),Mock 數(shù)據(jù)是有界的消息序列,它的初始化可以先從線上抽樣,然后再由業(yè)務(wù)進(jìn)行修改。業(yè)務(wù)構(gòu)建完 Mock 數(shù)據(jù)后,會將 SQL 作業(yè)的 Mock 數(shù)據(jù)持久化到右側(cè)的 S3 文件對象系統(tǒng)上。業(yè)務(wù)在 Web 端點(diǎn)擊調(diào)試,左側(cè)發(fā)起的調(diào)試任務(wù)會在與線上隔離的服務(wù)器上單進(jìn)程執(zhí)行,執(zhí)行時會從 S3 獲取之前上傳的 Mock 數(shù)據(jù),而且可以根據(jù) Mock 數(shù)據(jù)指定的多源消息之間的到達(dá)順序和消息之間的發(fā)送間隔來執(zhí)行,執(zhí)行完成后會將輸出結(jié)果也持久化到 S3,最后在 Web 端查詢 S3 呈現(xiàn)給業(yè)務(wù)。
更多情況下業(yè)務(wù)不需要修改 Mock 數(shù)據(jù),只需要做抽樣和執(zhí)行兩步操作。另外我們也支持了一些調(diào)試的高級功能,比如支持控制消息的順序和間隔。
上圖是基于以上解法的調(diào)試工具。業(yè)務(wù)會為 SQL 作業(yè)創(chuàng)建多個測試用例,其中包括了 Source 的 Mock 數(shù)據(jù)和 Sink 的預(yù)期結(jié)果。執(zhí)行調(diào)試后,會檢查所有測試用例的通過情況,通過的條件是要保證結(jié)果流 Merge 之后的表與預(yù)期表數(shù)據(jù)一致。
2.4 SQL 作業(yè)異常定位問題
第 4 個問題是 FlinkSQL 作業(yè)的異常定位。作業(yè)異常是指作業(yè)消費(fèi) Kafka 出現(xiàn)了積壓,為了解決這個問題,需要定位出產(chǎn)生積壓的原因。而定位原因時,歸因的路徑比較復(fù)雜,排查門檻比較高。另外由于歸因的路徑缺少系統(tǒng)化的沉淀,定位花費(fèi)的時間也比較長。隨著 SQL 作業(yè)的數(shù)量越來越多,如果完全依賴人工排查,工作量將會非常巨大。解決方案
針對上述為的解決方法是實(shí)現(xiàn) SQL 作業(yè)的自動化異常診斷。通過 Flink Reporter 上報 SQL 作業(yè)的運(yùn)行指標(biāo),并持久化到 TSDB 中用于歷史查詢。同時也會持久化 SQL 作業(yè)的運(yùn)行日志,報警服務(wù)會根據(jù)規(guī)則監(jiān)控 SQL 作業(yè)上報的 Kafka Offset 指標(biāo),當(dāng)消費(fèi)的 Offset 落后于生產(chǎn)的 Offset 時,會判定位作業(yè)發(fā)生消費(fèi)積壓,然后發(fā)出報警并下發(fā)異常事件,診斷服務(wù)會監(jiān)聽報警服務(wù)的異常事件。
異常發(fā)生時,根據(jù)異常時間窗口內(nèi)作業(yè)日志和作業(yè)指標(biāo)分析異常原因,診斷服務(wù)可以通過增加規(guī)則來沉淀人工排查的經(jīng)驗(yàn)。比如發(fā)生了 Restart,就會從日志中根據(jù)關(guān)鍵字來提取異常信息,未發(fā)生 Restart 則會根據(jù)反壓指標(biāo)找出瓶頸節(jié)點(diǎn),然后結(jié)合 GC 指標(biāo)、數(shù)據(jù)傾斜、火焰圖等來分析瓶頸的原因,最后提出調(diào)優(yōu)建議。
上圖展示了診斷出業(yè)務(wù)消息臟數(shù)據(jù)的例子。圖中的運(yùn)行概況一欄會給出 SQL 作業(yè)在每個時間檢查點(diǎn)的診斷情況,綠色表明運(yùn)行正常,紅色表明作業(yè)存在異常,通過這個時間線可以清楚看到異常發(fā)生的時間點(diǎn)。診斷結(jié)果欄中可以看到異常的原因、詳情和建議。比如在這個事例中,原因是業(yè)務(wù)消息存在臟數(shù)據(jù),在詳情中可以看到導(dǎo)致作業(yè)異常的原始消息內(nèi)容,在建議中會提示業(yè)務(wù)配置臟數(shù)據(jù)的處理策略。
03未來規(guī)劃
未來,美團(tuán)實(shí)時數(shù)倉平臺的規(guī)劃主要包括以下兩個方面。
- 首先,是流批一體開發(fā)運(yùn)維,我們即將在實(shí)時數(shù)倉平臺集成數(shù)據(jù)湖存儲,并開放 FlinkSQL 的批作業(yè),在存儲和計算層都做到流批統(tǒng)一,提高工作效率。
- 其次,是作業(yè)的自動調(diào)優(yōu),繼續(xù)提升作業(yè)診斷的準(zhǔn)確率以及作業(yè)重啟的效率。?