新一代數(shù)據(jù)架構(gòu)的性能與成本平衡之道
Alluxio 大致可分為兩個(gè)部分:Alluxio Service 和 Alluxio Local Cache。Alluxio Local Cache 為計(jì)算存儲(chǔ)分離的計(jì)算環(huán)節(jié)實(shí)現(xiàn)了數(shù)據(jù)本地化,通過這種方式來加速查詢,同時(shí)減少對 underline 的 FS 的 request 和對應(yīng)的數(shù)據(jù)的出口,從而提高性能并節(jié)省成本。
NewsBreak 是美國的一家新聞資訊企業(yè)。文章將通過該公司案例,介紹Alluxio Local Cache for Presto 的應(yīng)用。
一、NewsBreak 的架構(gòu)
首先來介紹一下 NewsBreak 的整體架構(gòu)。
從下往上看,有很多不同的數(shù)據(jù)源,通過 DIP(Data Engine Pipeline) 的 model 做到數(shù)據(jù)入湖和入倉。引入了 Schema Registry 來管理大部分的 schema,還有 Hudi 這種流行的 open table format,以及其它如 Mongo、Scylla、MySQL 等不同的 transactional Database。通過 Managed ETL 層,經(jīng)過 Airflow,load 到 Data & Service。Data & Service 分兩大塊:偏 raw data 的數(shù)據(jù)湖和偏 ETL 的數(shù)倉。Query engine 是建構(gòu)在 Presto 之上的,提供 ad-hoc 查詢和 BI 分析。同時(shí),對敏感數(shù)據(jù),利用 SnowFlake 做更精細(xì)化的管理,尤其是對 PRI 的信息。
在此之上構(gòu)建了不同的數(shù)據(jù)產(chǎn)品,如內(nèi)部的 Self-service ETL,偏向產(chǎn)品、工程、數(shù)分的 Log Query Service,幫助用戶獲得公司的原始數(shù)據(jù)以進(jìn)行各種分析,還有面向運(yùn)營或者 CXO 的不同的 data products 和 BI 工具,例如 AB 系統(tǒng)等。
二、Presto at NewBreak
今天主要介紹的是 Data & Service 部分。在不同的 log 之上,通過 Presto 來做計(jì)算、存儲(chǔ)和查詢。我們的數(shù)據(jù)比較雜也比較多,所以方案需要能夠加速 query 的整體性能,同時(shí)減少 S3 的 cost。
S3 存儲(chǔ)的 cost 跟 query 沒有太大關(guān)系,最大部分的查詢 cost 是按照 request 的請求和數(shù)據(jù)的出口收費(fèi)。
1. Presto 在 NewsBreak 的使用方式和架構(gòu)
Presto 是典型的計(jì)算存儲(chǔ)分離,我們使用了其很多功能,比如不同的 connecter,利用聯(lián)邦查詢,連接 Scylla、Mongo、Iceberg、Hive、MySQL、Hudi 等。底下的存儲(chǔ),包括以 S3 為主的 Hive、Iceberg,還有比較偏 OLTP 的 MySQL、Scylla。
上面是一些比較常見的產(chǎn)品,如 Feast 支持 feature store,還有自建的 CMS以及 Mode 等第三方 SaaS 系統(tǒng)。
通過 Presto 的 CTAS 對數(shù)分或者偏內(nèi)部的開發(fā),對數(shù)據(jù)回流處理,再導(dǎo)入數(shù)據(jù)庫。
另外,引入了 Presto 的一個(gè)插件 event stream。它是一個(gè) listener,將所有的 SQL 結(jié)果、運(yùn)行狀態(tài)等發(fā)給 Kafka stream,通過 Hudi 落回到存儲(chǔ)。
底部存儲(chǔ)以 S3 加上一些 OLTP 的方式為主。
我們期望在整個(gè)數(shù)據(jù)生命周期內(nèi)得到性能加速,數(shù)據(jù)無論是通過 CTS 產(chǎn)生,還是通過傳統(tǒng) ETL 產(chǎn)生,能夠自動(dòng)支持 Cache。
2. Cache Considerations
這里列出了 Cache 相關(guān)的主要考慮點(diǎn):
首先,需要支持 Presto On S3。Alluxio Local Cache 很早就有,在Facebook 和 Uber 都有實(shí)踐,但支持的是 Presto on HDFS,On S3 是在今年 3 月份剛剛發(fā)布的 2.2.9.3 版本中才支持。
第二,希望能最小化對現(xiàn)有系統(tǒng)的影響。
第三,提速 query,同時(shí)通過減少 S3 request 來減少 S3 的 cost。
第四,由于康威定律,我們的架構(gòu)比較復(fù)雜,因此希望支持 multiple 的 Hive metastores。
第五,cache 的 storage 是非常小的,只有整體的 1% 左右,所以希望支持 cache filter,來指定哪些表或哪些形式的內(nèi)容需要被 cache,從而提高整體的命中率。
第六,支持 Hudi,或有版本的文件。
最后,希望有詳細(xì)的 monitor 可以監(jiān)控和衡量整個(gè)系統(tǒng)的效果。
經(jīng)過評估,我們借鑒了 Uber 去年的類似實(shí)驗(yàn)。在 Uber 的架構(gòu)中,Alluxio 部分整體向外提供了一個(gè) HDFS 的 API,在訪問遠(yuǎn)程文件的時(shí)候會(huì)判斷是否有 cache 能被 hit,如果有,就直接走 local disk,如果沒有就到外部找 external storage。
這樣做的優(yōu)點(diǎn)在于其強(qiáng)一致性,如果 local disk 沒有到遠(yuǎn)程去找,或者遠(yuǎn)程的文件已經(jīng)被修改,被改的信息會(huì)傳遞到整個(gè)系統(tǒng)當(dāng)中,會(huì)認(rèn)為這個(gè)文件是不命中的。
在此之上我們做了幾點(diǎn)簡單的改動(dòng):
首先,支持了 S3。這是通過修改 Presto 0.275,再結(jié)合最新的 release 0.292 的 Alluxio code 實(shí)現(xiàn)的。
第二,將 cache filter 從 global 的粒度降低到 catalog level,因?yàn)槭怯捎诠炯軜?gòu)原因?qū)е碌?,因此要支?multiple catalog。
第三,在最下面為整個(gè) Alluxio Cluster 配置了一個(gè) shadow cache 來衡量整體的性能效果。
三、ALC4PS3 at NewsBreak
我們用實(shí)體數(shù)據(jù)進(jìn)行了測試。上圖展示了 S3 的 prefix 效果:
可以看到,在某一天有一個(gè)非常大的 burst,從平時(shí)不到幾十 million request,burst 到 900 million request。通過 SQL 方式訪問數(shù)據(jù)容易產(chǎn)生重復(fù)的訪問數(shù)據(jù),也就帶來了更多的 cost,我們希望盡量避免這種情況。
通過與 Alluxio 的集成可以基本上把數(shù)據(jù)量控制在每天 10 million 以下。因此得出 Alluxio 符合我們的場景需求,一方面可以降低整體的平均 access 的 request,同時(shí)還可以砍掉異常的峰值。
1. Cache filters
接下來的問題是,如何從選中的幾個(gè) bucket 或幾個(gè) prefix,scale 到整個(gè)公司的十幾個(gè) PB 的數(shù)據(jù)量之上。這里就用到了 cache filter 機(jī)制。
Uber 提供了比較復(fù)雜的機(jī)制,可以根據(jù) database、namespace 上的 table partition 來進(jìn)行配置。但這不適用于我們的場景,因?yàn)槲覀兪菙?shù)據(jù)湖+數(shù)倉的復(fù)雜模型,用戶有比較多的 ad-hoc 需求,因此我們需要更通用的方法。
最終選擇了最原始的、對 cache 最基本的需求,采用了 mtime 這種 genernal 的方式來做處理,通過 monitor 的方式來看最終的效果。整體機(jī)制為,對當(dāng)前的時(shí)間給一個(gè) lookback window,通過 window 的數(shù)據(jù)才放到 cache 上。
Alluxio Local Cache 提供了 cache filter 的 overwrite 的機(jī)制。這里簡單定義一個(gè) Latest21DayCacheFilter,可以得到每一個(gè)文件的 modified time,跟 window time 做對比。
另外,我們發(fā)現(xiàn)小文件比較多,而小文件會(huì)浪費(fèi) local cache 的 disk,所以增加了進(jìn)一步的過濾機(jī)制,文件大小要大于 patch size,默認(rèn) patch size 是 2MB。
2. Multi HMS
第二個(gè)場景是支持 Multiple Hive Meta Store。由于公司組織架構(gòu)復(fù)雜,業(yè)務(wù)繁多,造成了比較多的 Hive metastore,包括 Glue、Iceberg 等,各業(yè)務(wù)線處理自己的數(shù)據(jù)時(shí)需要跨 catalog 做 Spark 或 Flink 的處理,需要把元數(shù)據(jù)重新在 remote 注冊一遍。
Alluxio 假設(shè)自己是 Singleton,在初始化 manager、monitor 的時(shí)候是 singleton 的,但在 filter 級別支持 per catalog 的配置,所以利用這一特性,在每個(gè) catalog 上面都設(shè)置一模一樣的配置,除了 cache filter 之外。這樣做的好處是,由于文件是共享的,因此只要在任何 catalog 被 cache 后,其它查詢也會(huì)得到相似的數(shù)據(jù)。
四、Presto event stream
1. Query level monitor
以上介紹了整體的機(jī)制。接下來看是如何對效果進(jìn)行評估。
Alluxio 原有的 monitor 數(shù)據(jù)是比較粗的,只有 cluster 級別的最基本的信息。我們引入了塊 level 的 monitor,利用了 Presto event stream 組件。它借鑒了 Trino event stream 的基本想法,Presto 產(chǎn)生的 query event 都會(huì)發(fā)到 Kafka 里,通過 Hudi 把所有數(shù)據(jù)重新引入到數(shù)據(jù)平臺(tái),再通過 Presto 查詢,中間通過簡單的 Schema 管理起來。
有了這個(gè)組件,在 Presto cluster上面就可以簡單配置,加一個(gè) event listener,指定名字到 Kafka 中簡單配置即可。
需要強(qiáng)調(diào)的是,要把運(yùn)行時(shí)的 detail 信息暴露出來,因?yàn)?Alluxio 的 cache 命中率、命中 cache size 等都會(huì)通過這個(gè)配置 enable。
2. Query/storage coverage and hit rate
有了 detail 的 query level 的 monitor 后,就可以拿到很多的 metrics。這里列出了一些常用的 metrics:
第一個(gè)是塊級別的命中率,最開始時(shí)是 70~80%,后來加上簡單的過濾條件,比如只 cache 最近幾十天的數(shù)據(jù)、只 cache 文件大小大于某個(gè) size,可以降到大約 20~30%。但整體的 storage 的 coverage 還是比較高的,在 70~80% 左右。
第二個(gè)是每個(gè) query 涉及到的 storage 有多少命中率。
第三個(gè)是 storage 中多少是從 Alluxio cache中讀取,多少是從 underline remote 中讀取。
3. Metrics
在 2 clusters、1600 核,針對 P95,整體從 9 秒減少到了 8 秒。每月 scan 的 storage 約 6PB,其中大約有 3PB 從 Alluxio 讀取,后續(xù)有可能會(huì)更高。
五、ALC4PS4 Next
性能提升其實(shí)并不是特別明顯,但我們目的是提升性能的同時(shí)減少 cost,在過程中我們也發(fā)現(xiàn)了很多問題,比如 SQL 命中率大約只有 30%,系統(tǒng)中還有很多小文件,甚至很多文件不是列存。因此要進(jìn)一步提升性能,還需要做一些傳統(tǒng)的 data governance,如列存、壓縮、處理小文件等等。Local cache filter 還需要 fine tuning,比如,現(xiàn)在storage 比較小,每個(gè) worker 上只配置大約500GB。Cache filter 也要繼續(xù)調(diào)優(yōu)。后續(xù)也考慮將用在 Presto 的這種機(jī)制擴(kuò)展到 Flink、Spark、Hudi、Iceberg 等。
六、Q&A
Q:怎么用 event stream 機(jī)制得到 metrics?它與緩存、命中率等是怎么結(jié)合起來的?
A:Presto支持開發(fā) listener plugin,可以對每個(gè) event 如 query event 開啟、結(jié)束或者失敗了,trigger event。Event stream plugin 可以到 github 查到,它可以把成功的事件,包括事件的 query、當(dāng)時(shí)運(yùn)行狀態(tài) status 發(fā)到 Kafka ,經(jīng)過 Hudi 落盤,進(jìn)而即可查詢。
Monitor 輔助評估整個(gè)效果。如果要做精細(xì)化的 monitor,需要在 presto 做更細(xì)致的處理。現(xiàn)在沒辦法做到 request 級別,所以退而求其次,用 query 的端到端性能和最后訪問 storage 的 hit rate、hit reach、hit ratio 來反向評估。