Presto+Alluxio 加速 Iceberg 數(shù)據(jù)湖訪問
一、Presto & Alluxio
1、Presto Overview
?Presto 是一個(gè)里程碑式的產(chǎn)品,它能夠讓我們很簡單的不需要數(shù)據(jù)的導(dǎo)入和導(dǎo)出,就可以使用標(biāo)準(zhǔn)的 SQL 來查詢數(shù)據(jù)湖倉上的數(shù)據(jù)。早先是數(shù)據(jù)倉庫 data warehouse 即 Hive 數(shù)據(jù)倉庫,之后出現(xiàn)了 Hudi 和 Iceberg,有一些公司用 Presto 查詢 Kafka ,還有 Druid 等等。Druid 很快,但是可能對 Join 支持不好,可以用 Presto 直接查詢 Druid 一步到位,然后通過一些計(jì)算的 pushdown,能夠讓 Druid 中有些跑得比較困難的任務(wù)得到很好的運(yùn)行。
Presto 中有一個(gè)概念叫做交互式的查詢,即在幾秒種最多幾分鐘返回一個(gè)結(jié)果?,F(xiàn)實(shí)中很多人用 Presto 來做秒級查詢,即 subsecond 的查詢,一秒鐘返回結(jié)果,得出一些很快很高效的 dashboard。也有人用 Presto 來處理一些幾小時(shí)的 job,甚至用 Presto 來部分取代 ETL,通過 SQL 語句就能直接處理數(shù)據(jù),簡單易用。Presto 處理的數(shù)據(jù)量為 PB 級,在日常的使用中,一般一個(gè) Presto 集群,一天處理幾十個(gè) PB 的數(shù)據(jù),還是很容易的。當(dāng)然,集群越多,處理的數(shù)據(jù)量也越大。
目前 Presto 有兩個(gè)開源的社區(qū),一個(gè)是 prestodb,此社區(qū)主要是由 Facebook 領(lǐng)導(dǎo)的社區(qū),包括 uber、Twitter,以及國內(nèi)公司 TikTok,騰訊都有參與。
另一個(gè)社區(qū)是 trinodb,prestodb 分出去之后,新建的開源社區(qū)更加的活躍。此社區(qū)背后的商用公司叫 starburst,這個(gè)社區(qū)更加活躍,用戶會(huì)更多一些,但是 prestodb 背后的大廠多一些。
Presto 目前的使用場景有很多,很多數(shù)據(jù)科學(xué)家和數(shù)據(jù)工程師通過 SQL 查詢想要的數(shù)據(jù);一些公司決策使用的 BI 工具,比如 tableau 和 zeppelin;公司決策需要報(bào)表和 dashboard,這些 query 可能需要在幾秒鐘快速地完成,將數(shù)據(jù)展示出來,比如廣告的轉(zhuǎn)化率和活躍用戶,這些數(shù)據(jù)需要實(shí)時(shí)或準(zhǔn)實(shí)時(shí)的反饋出來;還有一個(gè)場景就是 A/B testing,因?yàn)樗暮锰幘褪呛芸?,結(jié)果能夠很快的反饋回來;最后一個(gè)是 ETL,ETL 是很多公司的數(shù)據(jù)倉庫或者數(shù)據(jù)平臺(tái)的基石,非常重要,但是 Presto 并不是特別適合在這個(gè)領(lǐng)域,雖然很多人使用 Presto 來處理一些 ETL 的 job,但是 Presto 并不是一個(gè)很容錯(cuò)的系統(tǒng),如果計(jì)算過程中間壞掉,整個(gè)查詢可能就要從頭開始了。?
下圖展示了 Presto 發(fā)展的歷史。
2、Presto 主體架構(gòu)
上圖是 Presto 的主體架構(gòu),coordinator 如同一個(gè) master,負(fù)責(zé)調(diào)度,當(dāng)有一個(gè)查詢進(jìn)來時(shí),把 SQL 解析生成查詢的 plan,然后根據(jù) plan 分發(fā)給若干個(gè) worker 執(zhí)行。根據(jù)不同的運(yùn)算性質(zhì),每個(gè) worker 去查對應(yīng)的數(shù)據(jù)源,數(shù)據(jù)源可能是 Hive 數(shù)倉,也可能是數(shù)據(jù)湖 Iceberg 或者 Hudi,不同的數(shù)據(jù)源對應(yīng)不同的 connector。connector 在使用的時(shí)候,其實(shí)在 Presto 里就像一個(gè) catalog 一個(gè) namespace。比如在 SQL 中查詢 Hive 數(shù)據(jù)倉庫中的部門表,通過 hive.ADS.tablename 就可以把這個(gè) table 找到。
由于 Presto 有著多個(gè) connector 和 catalog,天生能夠提供數(shù)據(jù)的 federation,即聯(lián)合??梢栽?Presto 中聯(lián)合不同的數(shù)據(jù)源,可以來自 Hive 、Iceberg 、Kafka 、Druid、mysql 等各式各樣的數(shù)據(jù)源,并把來自多個(gè)數(shù)據(jù)源的數(shù)據(jù) join 到一起。Presto很靈活,如很多人還把 Hive 的表跟 Google 的 spreadsheet 表格 join 到一起。
目前 presto 主要的數(shù)據(jù)來源可能 95% 甚至 99% 是來自 Hive 。當(dāng)然現(xiàn)在也有些變化了,由于數(shù)據(jù)湖的崛起,可能越來越多流量會(huì)轉(zhuǎn)向數(shù)據(jù)湖 Iceberg 和 Hudi。
3、Presto + Alluxio Overview
Presto 訪問數(shù)據(jù)源就是通過直連的方式,比如要訪問 HDFS 就連到 HDFS 上。有的公司可能數(shù)據(jù)源太多,可能有十幾個(gè) HDFS 的集群,這時(shí)候 presto 需要一個(gè)統(tǒng)一的命名空間,此時(shí) Presto 可以提供一個(gè)聯(lián)合,在物理的數(shù)據(jù)層上面提供一個(gè)抽象層,看起來就像是一個(gè) cluster,然后在 Presto 中呈現(xiàn)出來的就是一個(gè)統(tǒng)一的命名空間,這個(gè)功能還是挺方便的。
4、Presto 與 Alluxio 結(jié)合
Presto 查數(shù)據(jù)并不是把數(shù)據(jù)給吃進(jìn)來,而是訪問數(shù)據(jù)的原始的存儲(chǔ),數(shù)據(jù)存儲(chǔ)在 HDFS 就訪問 HDFS,當(dāng) SQL 查詢進(jìn)來后翻譯完,去到這個(gè) Hive Metastore 中拿到元數(shù)據(jù),通過元數(shù)據(jù)找到表數(shù)據(jù)存儲(chǔ)在哪個(gè)目錄中,將該目錄分開,然后讓每個(gè) worker 讀取若干的文件去計(jì)算結(jié)果。在結(jié)合 Alluxio 的工作時(shí),改變了緩存路徑。
?其實(shí)在商用版本有更好的一個(gè)功能。可以不改變這個(gè)路徑,還是這個(gè) S3 路徑,但它其實(shí)使用了本地的 Alluxio,當(dāng)然這在我們數(shù)據(jù)庫中遇到一些麻煩,因?yàn)閿?shù)據(jù)庫中 expert 文件里邊是 hard code 而不是死的路徑,為緩存帶來了一些麻煩,我們通過轉(zhuǎn)換,讓本來是訪問原始數(shù)據(jù)的存儲(chǔ),通過 election 變成訪問本地的數(shù)據(jù)源,得到提速的效果。
5、Co-located deployment
我們提出提供了另外一種部署的方式。我們把 Presto worker 和 Alluxio worker 部署在同一臺(tái)物理機(jī)上。這樣保證了數(shù)據(jù)的本地性。確保數(shù)據(jù)加載到了 Presto worker 的本地。這里 Presto DB 上有更精簡的實(shí)現(xiàn)方式 ,在 to local cache 項(xiàng)目中,有 local cache 實(shí)現(xiàn)數(shù)據(jù)的本地化,通過數(shù)據(jù)本地化省掉網(wǎng)絡(luò)傳輸。對于 Alluxio 就是 Co-located 的部署方式。它跟 HDFS 相比也省掉了一次網(wǎng)絡(luò)的傳輸。
6、Disaggregated deployment
國內(nèi)很多公司使用數(shù)據(jù)一體機(jī),將 Presto、Spark、HDFS、 ClickHouse 等都放到一起。針對這種情況,推薦的實(shí)現(xiàn)就是用 in memory 的 Lark show 的 local cache,會(huì)有非常好的提速,即 local cache 結(jié)合 Alluxio worker ,能有百分之四五十的提速。缺點(diǎn)在于這種實(shí)現(xiàn)需要使用很多的內(nèi)存,數(shù)據(jù)緩存在內(nèi)存中,通過 SSD 或者內(nèi)存來給 HDD 或者慢速的 SSD 做一個(gè)提速。這種方式即 Alluxio worker 跟 Presto worker 捆綁到了一起,200 個(gè) Presto worker節(jié)點(diǎn),就需要 200 個(gè) Alluxio worker,這種方式會(huì)導(dǎo)致拓展的時(shí)候可能出現(xiàn)問題。
所以當(dāng)數(shù)據(jù)量特別巨大,且跨數(shù)據(jù)中心訪問的時(shí)候,更推薦分離式 disaggregated 的部署方式。
二、Alluxio & Iceberg
Hive 數(shù)據(jù)倉庫已經(jīng)有十幾年的歷史了?,但是一直存在著一些問題,對于一個(gè)表的 Schema 經(jīng)常有多人的改動(dòng),且改動(dòng)往往不按規(guī)律改,原來是簡單類型,改成了復(fù)雜類型,導(dǎo)致無法保證數(shù)據(jù)的一致性,如果一條 SQL 查詢兩年的數(shù)據(jù),這個(gè)表很可能兩年中改了好幾次,可能很多列的類型也改了,名字也改了,甚至可能刪掉或者又加回來,這就會(huì)導(dǎo)致 Presto 報(bào)錯(cuò),即使 Spark 也很難在數(shù)據(jù) Schema 修改的過程中做到完全兼容。這是各個(gè)計(jì)算引擎的通病。
其實(shí)最早我們討論 Iceberg 這個(gè)方案的時(shí)候,最想解決的就是 Schema 的升級變化問題,另外想解決的就是數(shù)據(jù)版本的一致性問題。眾所周知,數(shù)據(jù)可能中間會(huì)出錯(cuò),此時(shí)需要數(shù)據(jù)回滾從而查看上一個(gè)版本的數(shù)據(jù),也可能要做一些 time travel 查指定時(shí)間版本的數(shù)據(jù)。有些數(shù)據(jù)是追加的,可以通過 partition 按時(shí)間來分區(qū),通過 partition 查詢指定時(shí)間分區(qū)數(shù)據(jù)。有的數(shù)據(jù)集是快照數(shù)據(jù)集,數(shù)據(jù)后一天覆蓋前一天,歷史數(shù)據(jù)無法保留,而 Iceberg 能解決這個(gè)問題。
其實(shí) Iceberg 并沒有提供一個(gè)新的數(shù)據(jù)存儲(chǔ),它更多的是提供一個(gè)數(shù)據(jù)的組織方式。數(shù)?據(jù)的存儲(chǔ)還是像 Hive 的數(shù)倉一樣,存在 parquet 或者 ORC 中,Iceberg 支持這兩種數(shù)據(jù)格式。
當(dāng)然很多時(shí)候?yàn)榱四苁褂?export table,我們會(huì)把一些原始的數(shù)據(jù) CSV 或者其他格式導(dǎo)進(jìn)來變成一個(gè) expert table,根據(jù)分區(qū)重新組織寫入 parquet 或者 ORC 文件。
關(guān)于 Schema 的 evolution 是一個(gè)痛點(diǎn),Presto 支持讀和寫,但是目前用 Presto 寫 Iceberg 的不多,主要還是用 Presto 讀,用 Spark 來寫,這給我們的 Alluxio to Iceberg 結(jié)合造成了一定的麻煩。
1、Alluxio + Iceberg Architecture 方案
- 方案一:
所有的操作都通過 Alluxio 寫,Spark 和 Presto 將 Alluxio 作為一個(gè)底層存儲(chǔ),從而充分保證數(shù)據(jù)的一致性。
弊端是,實(shí)施該方案的公司稍微大了之后,數(shù)據(jù)直接往 S3 或 HDFS 寫,不通過 Alluxio。
- 方案二:
讀寫都通過 Alluxio,通過自動(dòng)同步元數(shù)據(jù),保證拿到最新數(shù)據(jù),此方案基本可用,不過還需 Spark 社區(qū)、Iceberg 社區(qū)以及 Presto 社區(qū)繼續(xù)合作來把數(shù)據(jù)一致性做得更好。
三、最佳實(shí)踐
?1、Iceberg Native Catalog
目前,與 cache 結(jié)合比較好的是使用 Iceberg native catalog,在 Iceberg 叫 Hadoop catalog,在 Presto 中叫 native catalog,如果使用最原始的 Hive catalog,則 table 的元數(shù)據(jù),即 table 位置的數(shù)據(jù)是放在 Hive-Metastore 中,Presto 或者 Spark 訪問表的時(shí)候先去查詢 Hive-Metastore 獲取表的存儲(chǔ)路徑,然后通過 Iceberg 將數(shù)據(jù)文件加載進(jìn)來,但是實(shí)際上,table 會(huì)有變更,此時(shí)需要將 Hive-Metastore 上鎖,這種方案在只有一個(gè) Hive-Metastore 的時(shí)候才有效,如果面臨多個(gè) Hive-Metastore 會(huì)出現(xiàn)鎖失效的問題。?
更好的一個(gè)方案是 Iceberg native catalog,即完全拋棄 Hive-Metastore,使用一個(gè)目錄來存儲(chǔ)這個(gè) table 的列表,這個(gè)目錄可以在 HDFS 上或者 S3 上,我們更加推薦 HDFS,因?yàn)?HDFS 效果好一些,一致性也強(qiáng)一些。這一方案避免了 Hive-Metastore service 本身的很多問題,如 scalability 、延時(shí)。此方案對 cache 也比較友好,不需要做一個(gè) metadata 的 cache,而是直接 cache 存放 metadata 的目錄。
2、Iceberg Local Cache
Local Cache 的實(shí)現(xiàn)是 Presto DB 的 RaptorX 項(xiàng)目,是給 Hive connector 做 Local Cache,很容易就可以給 Iceberg connector 也來打開這個(gè) Local Cache。相當(dāng)于是 cache 了 parquet 的文件到 local 的 SSD 上,Prestoworker,worker 上的 SSD 其實(shí)本來是閑置的,通過它來緩存數(shù)據(jù)效果還是挺好的。它可以提速,但我們目前還沒有特別好的官方 benchmark。
目前只是對 worker 進(jìn)行 cache,metadata coordinator 是不開的,打開的話可能會(huì)有數(shù)據(jù)一致性的問題。
3、數(shù)據(jù)加密
早先 parquet 文件是不加密的,cache 了 parquet 文件,雖然不是明文,但只要你知道怎么讀取這個(gè) parquet 文件格式就能把所有數(shù)據(jù)讀取出來。其 magic number 原來是 pare 1 就代表第一個(gè)版本,現(xiàn)在增加了一個(gè) magic number 即 pare 加密的版本,這個(gè)加密版本把一些加密的信和 metadata 存在 footer 里邊,它可以選擇對一些 column 和配置進(jìn)行加密。加密好后,數(shù)據(jù)便不再是明文的了,如果沒有對應(yīng)的 key,就無法讀取出數(shù)據(jù)。
通過對 parquet 加密,我們不再需要第三方的加密,也不需要對整個(gè)文件加密,可以只對需要加密的一些數(shù)據(jù)進(jìn)行加密,這個(gè)方案也解決了另外一個(gè)重要的問題,就是有的公司其實(shí)是整個(gè)文件來加密存放在 HDFS,然后 Presto 讀之前把它解密好,很多文件存儲(chǔ)系統(tǒng)就是存的時(shí)候是加密的。讀取的時(shí)候確實(shí)拿到的解密好的數(shù)據(jù),當(dāng) Presto 再通過 Local Cache 緩存數(shù)據(jù)的時(shí)候,cache 里存儲(chǔ)還是明文數(shù)據(jù),這破壞了數(shù)據(jù)加密的管理。但是采用 parquet 內(nèi)部加密,local cache 就可以滿足數(shù)據(jù)加密的要求了。
4、謂詞下推
Iceberg 通過謂詞下推(Predicate Pushdown)可以減少查詢的數(shù)據(jù)量。
原來 Presto 的暴力查詢,根據(jù)條件把符合條件的一條條數(shù)據(jù)挑出來,但是中間有優(yōu)化。其實(shí)很多查詢條件可以直接 push 到 Iceberg,Iceberg 讀取文件的范圍就小了。
下面是一個(gè) benchmark,可以看到?jīng)]有謂詞下推前掃到了 200 萬條記錄,CPU time 是 62 毫秒。謂詞下推后,掃到了一條記錄,查詢時(shí)間極大的縮短,這也是對緩存的一個(gè)優(yōu)化。開謂詞下推(Predicate Pushdown)功能后,我們發(fā)現(xiàn),緩存層次夠用,掃的文件少了很多,這意味著我們都可以緩存的下了,命中率有一個(gè)提高。
四、未來的工作
在前面的工作中我們發(fā)現(xiàn)系統(tǒng)的瓶頸在 CPU。此瓶體現(xiàn)在很多地方,其中很大一部分是對 parquet 文件的解析,parquet 文件解析任務(wù)太重了。由于 parquet 很節(jié)約資源,很難將 parquet 轉(zhuǎn)換為更好的格式。此時(shí),一種解決方案是將數(shù)據(jù)分為冷熱數(shù)據(jù),將較熱的數(shù)據(jù)轉(zhuǎn)換為更加輕量,序列化低的格式存到緩存中,通過實(shí)驗(yàn),將 parquet 文件反序列好的數(shù)據(jù)直接放到內(nèi)存中,效率提升 8% 到 10% 。
但這有一個(gè)問題,此方案對 Java 的 GC 壓力非常大,因?yàn)榫彺骈L時(shí)間存在。我們發(fā)現(xiàn)此方案并不是那么好實(shí)施,所以我們更加想用 off heap 的方式,將數(shù)據(jù)存在 heap 之外。此時(shí)不能 cache object 本身,需要 cache Arrow 或者 flat buffer 格式,這兩種格式反序列成本極低,又是二進(jìn)制的流存在內(nèi)存中,通過 off heap 把它裝進(jìn)來,然后在 Java 中再反序列化,這樣可以達(dá)到一個(gè)很好的提速效果。
另外我們也可以把一些算子 pushdown 到 native 實(shí)現(xiàn)存儲(chǔ)。比如說 Alluxio 再增加一些實(shí)現(xiàn) native 的 worker 和客戶端的 cache 實(shí)現(xiàn),我們將算子直接 pushdown 過去,就像前面 Iceberg pushdown 一樣,有些計(jì)算 push 到存儲(chǔ),存儲(chǔ)返回來的結(jié)果特別少,它幫你計(jì)算,而且格式更好,它是 Arrow 并可以有 native 的實(shí)現(xiàn),也可以向量化的計(jì)算。
Java 也能向量化計(jì)算。但問題在于 Java 的版本要求比較高,需要 Java16 或 17,而現(xiàn)在 Presto DB 還在 Java 11,trainer 倒是可以了,但是這個(gè)效果也不是特別好,因?yàn)?nbsp; Presto 和 trainer 內(nèi)存中的格式對性能化計(jì)算不友好,而且這個(gè)格式基本上是不能動(dòng)的,如果要?jiǎng)?,基本上全都要重新?shí)現(xiàn),這也是為什么會(huì)有這個(gè) vlogs 在那里的原因。
可能這個(gè) Presto 以后會(huì)有格式轉(zhuǎn)換,但是不在眼前,但是我們可以 off heap 的緩存,可以把這個(gè) Arrow 緩存到 off heap 上,然后在那里邊需要的時(shí)候把它拿出來。然后反序列化成 page,然后給 Presto 進(jìn)行進(jìn)一步的計(jì)算。這個(gè)開發(fā)正在進(jìn)行,可能在將來會(huì)給大家展現(xiàn)一部分的工作。其實(shí)就是為了降低 CPU 的使用和系統(tǒng)的延時(shí),降低 GC 的開銷,讓系統(tǒng)變得更加的穩(wěn)定。
今天的分享就到這里,謝謝大家。