大意了!一早這樣搭實(shí)時(shí)數(shù)倉(cāng)能少走很多彎路……
一、平臺(tái)建設(shè)現(xiàn)狀
美團(tuán)于 2018 年首次引入 Flink 實(shí)時(shí)計(jì)算引擎,當(dāng)時(shí)的實(shí)時(shí)數(shù)倉(cāng)概念還不太普及,平臺(tái)只提供了 Flink Jar 任務(wù)的生命周期管理和監(jiān)控報(bào)警。
2019 年,我們注意到實(shí)時(shí)計(jì)算的主要應(yīng)用場(chǎng)景是解決離線數(shù)倉(cāng)時(shí)效性低的問題。離線數(shù)倉(cāng)已經(jīng)比較成熟,通過 SQL 方式開發(fā)很簡(jiǎn)單,而數(shù)倉(cāng)的實(shí)時(shí)部分主要通過 Flink DataStream API 來開發(fā),門檻比較高,而且與離線數(shù)倉(cāng)的開發(fā)方式相比較為割裂。因此,我們開始調(diào)研實(shí)時(shí)數(shù)倉(cāng)的解決方案,目標(biāo)是降低開發(fā)門檻,并嘗試推廣 FlinkSQL,最終將美團(tuán)的實(shí)時(shí)數(shù)倉(cāng)平臺(tái)取名為 NAU。
2020 年,美團(tuán)實(shí)時(shí)數(shù)倉(cāng)平臺(tái)正式上線。它向業(yè)務(wù)提供 FlinkSQL 作業(yè)開發(fā)入口,主要負(fù)責(zé)兩個(gè)方面的工作:
- 首先,將實(shí)時(shí)數(shù)倉(cāng)常見的數(shù)據(jù)源與離線表概念對(duì)齊,用數(shù)據(jù)模型進(jìn)行管理;
- 其次,提供 FlinkSQL 開發(fā)配套的效率工具,比如校驗(yàn)和調(diào)試功能。
但是在實(shí)際推廣過程中,我們發(fā)現(xiàn)業(yè)務(wù)在 FlinkSQL 的運(yùn)維方面門檻依然比較高,因此,我們將接下來的工作重點(diǎn)轉(zhuǎn)向了運(yùn)維中心。
FlinkSQL 作業(yè)運(yùn)維的痛點(diǎn)主要集中在兩個(gè)方面:有狀態(tài) SQL 作業(yè)部署的斷流問題和 SQL 作業(yè)的異常定位問題。為此,我們通過 Checkpoint 持久化和狀態(tài)生成的異步化來解決第一個(gè)問題,并通過提供作業(yè)的自動(dòng)診斷來解決第二個(gè)問題。目前,整個(gè)實(shí)時(shí)數(shù)倉(cāng)的平臺(tái)化建設(shè)已經(jīng)初步完備,未來我們會(huì)在開發(fā)和運(yùn)維能力上不斷精細(xì)化,并且繼續(xù)推動(dòng)公司業(yè)務(wù)數(shù)倉(cāng)架構(gòu)的進(jìn)化,比如流批生產(chǎn)的一體化、生產(chǎn)服務(wù)的一體化。
實(shí)時(shí)數(shù)倉(cāng)目前已基本覆蓋了公司的全部業(yè)務(wù),為 100 多個(gè)業(yè)務(wù)團(tuán)隊(duì)提供了支持,比如美團(tuán)優(yōu)選、美團(tuán)買菜、金融、騎行等業(yè)務(wù)。托管了 7000 多個(gè)實(shí)時(shí)數(shù)據(jù)模型,主要為 Kafka 表和 KV 表模型。線上運(yùn)行 FlinkSQL 作業(yè) 4000+,新增的實(shí)時(shí) SQL 作業(yè)占比已經(jīng)達(dá)到 70% 以上。從數(shù)據(jù)上看,F(xiàn)linkSQL 已經(jīng)可以解決美團(tuán)實(shí)時(shí)數(shù)倉(cāng)大部分流處理的問題。
接下來以美團(tuán)業(yè)務(wù)中的兩個(gè)實(shí)時(shí)數(shù)倉(cāng)生產(chǎn)鏈路為例,具體分享 FlinkSQL 的實(shí)際應(yīng)用。
應(yīng)用場(chǎng)景 1 是基于 FlinkSQL + OLAP 的實(shí)時(shí)生產(chǎn)鏈路。這個(gè)業(yè)務(wù)鏈路的實(shí)時(shí)數(shù)據(jù)源有兩個(gè),分別是業(yè)務(wù) DB 的變更事件和業(yè)務(wù)服務(wù)的日志事件,這些事件首先會(huì)被收集到 Kafka 中,然后 DB 事件會(huì)按表名分發(fā)到新的 Kafka 中,DB 和日志的數(shù)據(jù)也會(huì)在這一層進(jìn)行格式上的統(tǒng)一并完成實(shí)時(shí)數(shù)倉(cāng)的 ODS 層。然后業(yè)務(wù)會(huì)使用 FlinkSQL 來清洗和關(guān)聯(lián) ODS 層的數(shù)據(jù),生成實(shí)時(shí)數(shù)倉(cāng)的主題寬表,最后寫入 OLAP 查詢引擎做實(shí)時(shí)分析。對(duì)于時(shí)效性要求不高的場(chǎng)景,部分業(yè)務(wù)還會(huì)在 OLAP 引擎上配置分鐘級(jí)別的調(diào)度來減少相同查詢的壓力。
應(yīng)用場(chǎng)景 2 與場(chǎng)景 1 的不同點(diǎn)在于,業(yè)務(wù)實(shí)時(shí)數(shù)倉(cāng)的主題寬表數(shù)據(jù)并不是直接寫入 OLAP 查詢引擎,而是繼續(xù)寫入 Kafka,使用 FlinkSQL 做 APP 層的指標(biāo)聚合,最終把預(yù)計(jì)算的指標(biāo)數(shù)據(jù)寫入 OLAP、DB 或 KV 這類應(yīng)用層的存儲(chǔ)。這種方式更適合對(duì)接數(shù)據(jù)服務(wù),因?yàn)樗骖櫫藬?shù)據(jù)的時(shí)效性和高 QPS 的查詢。
上圖是實(shí)時(shí)數(shù)倉(cāng)平臺(tái)的架構(gòu),分為集成、開發(fā)、運(yùn)維、治理、安全 5 個(gè)模塊分別建設(shè)。
集成模塊主要關(guān)注的是數(shù)據(jù)模型的管理,具體包括 Kafka 和 KV 兩種模型管理,管理的內(nèi)容有數(shù)據(jù)源的 schema 信息和連接信息等。
開發(fā)模塊主要關(guān)注的是 FlinkSQL 轉(zhuǎn)化業(yè)務(wù)需求,比如提供版本管理來記錄業(yè)務(wù)需求的迭代過程,提供 FlinkSQL 的校驗(yàn)和調(diào)試,來確保開發(fā)的 SQL 正確表達(dá)了業(yè)務(wù)邏輯,支持業(yè)務(wù)使用自定義的 Flink UDF 函數(shù)和自定義的 Format 解析,讓 FlinkSQL 可以擴(kuò)展?jié)M足更多業(yè)務(wù)需求場(chǎng)景。
運(yùn)維模塊關(guān)注的是 SQL 作業(yè)的部署和運(yùn)行時(shí)的監(jiān)控。在監(jiān)控方面,我們提供了 SQL 作業(yè)的監(jiān)控報(bào)警、異常日志和作業(yè)診斷,能夠幫助業(yè)務(wù)快速發(fā)現(xiàn)和定位作業(yè)的異常;部署方面,我們提供 SQL 作業(yè)的快照管理、AB 部署和參數(shù)調(diào)優(yōu),來幫助業(yè)務(wù)解決 SQL 作業(yè)變更時(shí)的問題。
治理模塊關(guān)注的是實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)質(zhì)量、資源成本,通過建設(shè)實(shí)時(shí)數(shù)倉(cāng)的 DQC 監(jiān)控,幫助業(yè)務(wù)發(fā)現(xiàn)上游數(shù)據(jù)或產(chǎn)出數(shù)據(jù)的異常值/異常波動(dòng);通過鏈路血緣和資源計(jì)費(fèi),讓業(yè)務(wù)可以量化實(shí)時(shí)數(shù)倉(cāng)的生產(chǎn)成本,方便進(jìn)行成本治理。
安全模塊主要關(guān)注的是對(duì)數(shù)據(jù)流向的管控,提供數(shù)據(jù)源讀寫權(quán)限的管理和受限域機(jī)制,保證公司業(yè)務(wù)數(shù)據(jù)的安全性。
二、遇到的問題及解決
在實(shí)際推廣 FlinkSQL 的過程中,我們也面臨了不少挑戰(zhàn)。
1、雙流關(guān)聯(lián)大狀態(tài)問題
首先是雙流關(guān)聯(lián)的大狀態(tài)問題,F(xiàn)linkSQL 的雙流關(guān)聯(lián)會(huì)保留左右流的歷史數(shù)據(jù)來互相關(guān)聯(lián),需要關(guān)聯(lián)的時(shí)間間隔越長(zhǎng),保存的歷史數(shù)據(jù)就會(huì)越多,狀態(tài)也就會(huì)越大。比如,要關(guān)聯(lián)訂單的下單事件和退款事件,并保證計(jì)算結(jié)果的正確性,需要考慮這兩個(gè)事件發(fā)生的間隔,可能是一個(gè)月甚至更久。
上圖左側(cè)是一個(gè)雙流關(guān)聯(lián)的有狀態(tài) SQL 作業(yè),圖中的 Mem 和 Disk 組成了 SQL 作業(yè)的 TaskManager 節(jié)點(diǎn),SQL 作業(yè)狀態(tài)后端使用 RocksDB,狀態(tài)持久化在 HDFS 文件系統(tǒng)上。一開始我們嘗試把 SQL 作業(yè)的狀態(tài)設(shè)置為保留一個(gè)月,但 SQL 作業(yè)會(huì)變得不穩(wěn)定,出現(xiàn)內(nèi)存超限、狀態(tài)讀取性能下降等問題,只能不斷增加作業(yè)的 TM 數(shù)和內(nèi)存大小來緩解。
即使這樣,業(yè)務(wù)上仍然存在兩個(gè)痛點(diǎn)。首先是關(guān)聯(lián)數(shù)據(jù)初始化難,目前公司 Kafka 數(shù)據(jù)源對(duì)歷史回溯有限制,因此業(yè)務(wù)不能構(gòu)建出完整的歷史狀態(tài),即使 Kafka 支持了更久的回溯,狀態(tài)初始化的效率也依然是一個(gè)問題。其次,內(nèi)存資源開銷大,特別是當(dāng)多個(gè) SQL 作業(yè)關(guān)聯(lián)相同的數(shù)據(jù)源時(shí),需要為每個(gè) SQL 作業(yè)都分配相應(yīng)的內(nèi)存資源,不同 SQL 作業(yè)間的狀態(tài)是隔離的,作業(yè)間相同的關(guān)聯(lián)數(shù)據(jù)不能復(fù)用。
- 解決方案
對(duì)于上述問題,我們提出了冷熱關(guān)聯(lián)分離的解決方案。假設(shè)關(guān)聯(lián)兩天前的數(shù)據(jù)是相對(duì)低頻的且狀態(tài)回滾不會(huì)超過兩天,那么可以定義兩天前的數(shù)據(jù)為冷數(shù)據(jù),兩天之內(nèi)的數(shù)據(jù)為熱數(shù)據(jù)。
如上圖所示,左側(cè)的 SQL 作業(yè)通過設(shè)置狀態(tài)保留時(shí)長(zhǎng),只保留 T+0 和 T+1 這兩天的熱數(shù)據(jù),而 T+2 及更久以前的冷數(shù)據(jù)則通過批任務(wù)每天從 Hive 同步到外存 KV 中。關(guān)聯(lián)時(shí),若狀態(tài)中的熱數(shù)據(jù)不存在,則再通過訪問外存 KV 來關(guān)聯(lián)冷數(shù)據(jù)。右側(cè)是另外一個(gè) SQL 作業(yè)需要關(guān)聯(lián)相同的數(shù)據(jù)源,它與左側(cè)的 SQL 作業(yè)共享外層 KV 中的冷數(shù)據(jù)。
對(duì)于第一個(gè)痛點(diǎn),因?yàn)闋顟B(tài)控制在了兩天內(nèi),SQL 作業(yè)上線時(shí),關(guān)聯(lián)數(shù)據(jù)初始化的數(shù)據(jù)量得到了控制。對(duì)于第二個(gè)痛點(diǎn),因?yàn)閮商烨暗拇蟛糠謹(jǐn)?shù)據(jù)都保存在外層KV中,不同的 SQL 作業(yè)都可以查詢外存 KV,從而可以節(jié)省大量?jī)?nèi)存資源。
2、SQL 變更狀態(tài)恢復(fù)問題
第二個(gè)問題是有狀態(tài) SQL 邏輯變更后狀態(tài)如何恢復(fù)?FlinkSQL 支持有狀態(tài)的增量計(jì)算,狀態(tài)是增量計(jì)算的歷史累計(jì),實(shí)際上業(yè)務(wù)需要修改邏輯的情況很多,上圖右側(cè)列出了一些常見的 SQL 變更情況,比如新增聚合指標(biāo)、修改原指標(biāo)口徑、增加過濾條件、新增數(shù)據(jù)流關(guān)聯(lián)、增加聚合維度等。
舉個(gè)例子,業(yè)務(wù)增加了更多服務(wù)維度,在數(shù)據(jù)產(chǎn)品上就需要擴(kuò)展分析的維度,因此也需要修改 FlinkSQL 增加聚合維度。但是上述 SQL 邏輯變化后卻不能從之前的狀態(tài)恢復(fù),因?yàn)闅v史狀態(tài)對(duì)于變更后的 SQL 不能保證其完整性,即使恢復(fù)后也不能百分百保證后續(xù)計(jì)算的正確性。這種情況下,業(yè)務(wù)為了保證數(shù)據(jù)的正確性,需要從歷史回溯重新計(jì)算,回溯的過程會(huì)導(dǎo)致線上斷流,但業(yè)務(wù)又不希望犧牲太多的時(shí)效性。
- 解決方案?
針對(duì)這個(gè)問題,我們給出了三種解決方案。
① 解法 1
雙鏈路切換。此解法的關(guān)鍵是再搭建一條相同的實(shí)時(shí)鏈路作為備用鏈路,當(dāng)變更有狀態(tài) SQL 時(shí),可以在備用鏈路上做回溯,重新計(jì)算歷史數(shù)據(jù),回溯完成后先驗(yàn)證備用鏈路的結(jié)果數(shù)據(jù),確保沒問題后再在鏈路最下游的數(shù)據(jù)服務(wù)層切換讀取的表,完成整個(gè)變更流程。
② 解法 2
旁路狀態(tài)生成。與雙鏈路切換不同點(diǎn)在于,這里變更的是鏈路上的單個(gè)作業(yè),思路是臨時(shí)啟動(dòng)一個(gè)旁路作業(yè)來回溯,構(gòu)建出新邏輯的狀態(tài),驗(yàn)證數(shù)據(jù)完成后再重啟線上作業(yè),以此完成 SQL 和狀態(tài)的同時(shí)切換。
③ 解法 3
歷史狀態(tài)遷移,前兩個(gè)方法的思路比較類似,都是基于歷史數(shù)據(jù)重新計(jì)算,構(gòu)建出新狀態(tài)。但這個(gè)思路是基于歷史狀態(tài)遷移出新狀態(tài),這種方法構(gòu)建出的新狀態(tài)雖然不能保證完整性,但在某些情況下,業(yè)務(wù)也是可以接受的。目前我們通過改造 State Process API 支持在 SQL 算子及其上下游關(guān)系不變的情況下,允許 Join 和 Agg 算子來新增列。
上述三種方式各有優(yōu)點(diǎn),可以從普適性、資源成本、線上斷流、等待時(shí)長(zhǎng)四個(gè)維度來對(duì)以上三個(gè)解決方案進(jìn)行橫向比較。
普適性是指在保證數(shù)據(jù)正確的前提下支持的 SQL 變更范圍,前兩個(gè)方法都是重新計(jì)算,狀態(tài)是完整的,因此比方案 3 的普適性更高。
資源成本是指完成 SQL 變更所需要的額外 Flink 或 Kafka 資源,方法 1 需要構(gòu)建整條鏈路,需要更多的 Flink 和 Kafka 資源,因此成本最高。
線上斷流指的是在變更過程中導(dǎo)致下游數(shù)據(jù)延遲的時(shí)長(zhǎng),方法 1 是在數(shù)據(jù)服務(wù)層做切換,幾乎沒有斷流;方法 2 的斷流時(shí)長(zhǎng)取決于作業(yè)從狀態(tài)恢復(fù)的速度;方法 3 除了狀態(tài)恢復(fù),還需要考慮狀態(tài)遷移的速度。
等待時(shí)長(zhǎng)指的是完成整個(gè)變更流程需要的時(shí)間,前兩個(gè)方法都需要重新計(jì)算,因此比方法 3 的等待時(shí)間更長(zhǎng)。
上圖是方法 2 的平臺(tái)自動(dòng)化流程。流程分為七個(gè)階段,變更流程執(zhí)行的時(shí)間較長(zhǎng),可能需要幾十分鐘,通過流程條以及圖中每個(gè)階段的執(zhí)行日志可以讓用戶感受到變更的進(jìn)度和狀態(tài)。我們還為用戶做了自動(dòng)化指標(biāo)檢查,比如在第 2 個(gè)階段的旁路數(shù)據(jù)回溯中,我們會(huì)檢查作業(yè)消費(fèi) Kafka 的積壓指標(biāo),來判斷回溯是否完成,完成后自動(dòng)制作新邏輯狀態(tài)。再比如在第 6 個(gè)階段,原作業(yè)從旁路作業(yè)啟動(dòng)時(shí)會(huì)比較 Kafka Offset 指標(biāo)來比較兩個(gè)作業(yè)的消費(fèi)進(jìn)度,確保線上作業(yè)重啟后不會(huì)少發(fā)數(shù)據(jù)。
3、FlinkSQL 調(diào)試繁瑣問題
遇到的第 3 個(gè)問題是 FlinkSQL 調(diào)試繁瑣,操作步驟多,業(yè)務(wù)需要?jiǎng)?chuàng)建額外的作業(yè)和 Kafka,還要將導(dǎo)出的結(jié)果進(jìn)行存儲(chǔ)。此外,輸入構(gòu)造復(fù)雜,為了針對(duì)性地調(diào)試某種輸入場(chǎng)景,業(yè)務(wù)需要寫代碼來構(gòu)建消息并寫入數(shù)據(jù)源,甚至需要對(duì)多個(gè)不同數(shù)據(jù)源消息到來的順序進(jìn)行控制。上圖左側(cè)可以看到,為了做 FlinkSQL 調(diào)試,需要手動(dòng)搭建一條與線上隔離的調(diào)試鏈路,然后寫入 Mock 數(shù)據(jù)。
- 解決方案?
針對(duì)上述問題的解法是:基于文件調(diào)試一鍵化。首先業(yè)務(wù)在 Web 端可以在線編輯 Mock 數(shù)據(jù),Mock 數(shù)據(jù)是有界的消息序列,它的初始化可以先從線上抽樣,然后再由業(yè)務(wù)進(jìn)行修改。業(yè)務(wù)構(gòu)建完 Mock 數(shù)據(jù)后,會(huì)將 SQL 作業(yè)的 Mock 數(shù)據(jù)持久化到右側(cè)的 S3 文件對(duì)象系統(tǒng)上。業(yè)務(wù)在 Web 端點(diǎn)擊調(diào)試,左側(cè)發(fā)起的調(diào)試任務(wù)會(huì)在與線上隔離的服務(wù)器上單進(jìn)程執(zhí)行,執(zhí)行時(shí)會(huì)從 S3 獲取之前上傳的 Mock 數(shù)據(jù),而且可以根據(jù) Mock 數(shù)據(jù)指定的多源消息之間的到達(dá)順序和消息之間的發(fā)送間隔來執(zhí)行,執(zhí)行完成后會(huì)將輸出結(jié)果也持久化到 S3,最后在 Web 端查詢 S3 呈現(xiàn)給業(yè)務(wù)。
更多情況下業(yè)務(wù)不需要修改 Mock 數(shù)據(jù),只需要做抽樣和執(zhí)行兩步操作。另外我們也支持了一些調(diào)試的高級(jí)功能,比如支持控制消息的順序和間隔。
上圖是基于以上解法的調(diào)試工具。業(yè)務(wù)會(huì)為 SQL 作業(yè)創(chuàng)建多個(gè)測(cè)試用例,其中包括了 Source 的 Mock 數(shù)據(jù)和 Sink 的預(yù)期結(jié)果。執(zhí)行調(diào)試后,會(huì)檢查所有測(cè)試用例的通過情況,通過的條件是要保證結(jié)果流 Merge 之后的表與預(yù)期表數(shù)據(jù)一致。
4、SQL 作業(yè)異常定位問題
第 4 個(gè)問題是 FlinkSQL 作業(yè)的異常定位。作業(yè)異常是指作業(yè)消費(fèi) Kafka 出現(xiàn)了積壓,為了解決這個(gè)問題,需要定位出產(chǎn)生積壓的原因。而定位原因時(shí),歸因的路徑比較復(fù)雜,排查門檻比較高。另外由于歸因的路徑缺少系統(tǒng)化的沉淀,定位花費(fèi)的時(shí)間也比較長(zhǎng)。隨著 SQL 作業(yè)的數(shù)量越來越多,如果完全依賴人工排查,工作量將會(huì)非常巨大。
- 解決方案?
針對(duì)上述為的解決方法是實(shí)現(xiàn) SQL 作業(yè)的自動(dòng)化異常診斷。通過 Flink Reporter 上報(bào) SQL 作業(yè)的運(yùn)行指標(biāo),并持久化到 TSDB 中用于歷史查詢。同時(shí)也會(huì)持久化 SQL 作業(yè)的運(yùn)行日志,報(bào)警服務(wù)會(huì)根據(jù)規(guī)則監(jiān)控 SQL 作業(yè)上報(bào)的 Kafka Offset 指標(biāo),當(dāng)消費(fèi)的 Offset 落后于生產(chǎn)的 Offset 時(shí),會(huì)判定位作業(yè)發(fā)生消費(fèi)積壓,然后發(fā)出報(bào)警并下發(fā)異常事件,診斷服務(wù)會(huì)監(jiān)聽報(bào)警服務(wù)的異常事件。
異常發(fā)生時(shí),根據(jù)異常時(shí)間窗口內(nèi)作業(yè)日志和作業(yè)指標(biāo)分析異常原因,診斷服務(wù)可以通過增加規(guī)則來沉淀人工排查的經(jīng)驗(yàn)。比如發(fā)生了 Restart,就會(huì)從日志中根據(jù)關(guān)鍵字來提取異常信息,未發(fā)生 Restart 則會(huì)根據(jù)反壓指標(biāo)找出瓶頸節(jié)點(diǎn),然后結(jié)合 GC 指標(biāo)、數(shù)據(jù)傾斜、火焰圖等來分析瓶頸的原因,最后提出調(diào)優(yōu)建議。
上圖展示了診斷出業(yè)務(wù)消息臟數(shù)據(jù)的例子。圖中的運(yùn)行概況一欄會(huì)給出 SQL 作業(yè)在每個(gè)時(shí)間檢查點(diǎn)的診斷情況,綠色表明運(yùn)行正常,紅色表明作業(yè)存在異常,通過這個(gè)時(shí)間線可以清楚看到異常發(fā)生的時(shí)間點(diǎn)。診斷結(jié)果欄中可以看到異常的原因、詳情和建議。比如在這個(gè)事例中,原因是業(yè)務(wù)消息存在臟數(shù)據(jù),在詳情中可以看到導(dǎo)致作業(yè)異常的原始消息內(nèi)容,在建議中會(huì)提示業(yè)務(wù)配置臟數(shù)據(jù)的處理策略。
三、未來規(guī)劃
未來,美團(tuán)實(shí)時(shí)數(shù)倉(cāng)平臺(tái)的規(guī)劃主要包括以下兩個(gè)方面。
首先,是流批一體開發(fā)運(yùn)維,我們即將在實(shí)時(shí)數(shù)倉(cāng)平臺(tái)集成數(shù)據(jù)湖存儲(chǔ),并開放 FlinkSQL 的批作業(yè),在存儲(chǔ)和計(jì)算層都做到流批統(tǒng)一,提高工作效率。
其次,是作業(yè)的自動(dòng)調(diào)優(yōu),繼續(xù)提升作業(yè)診斷的準(zhǔn)確率以及作業(yè)重啟的效率。?