Arctic助力傳媒實(shí)現(xiàn)低成本的大數(shù)據(jù)準(zhǔn)實(shí)時(shí)計(jì)算
網(wǎng)易傳媒大數(shù)據(jù)實(shí)際業(yè)務(wù)中,存在著大量的準(zhǔn)實(shí)時(shí)計(jì)算需求場(chǎng)景,業(yè)務(wù)方對(duì)于數(shù)據(jù)的實(shí)效性要求一般是分鐘級(jí);這種場(chǎng)景下,用傳統(tǒng)的離線數(shù)倉(cāng)方案不能滿足用戶在實(shí)效性方面的要求,而使用全鏈路的實(shí)時(shí)計(jì)算方案又會(huì)帶來(lái)較高的資源占用。
基于對(duì)開(kāi)源數(shù)據(jù)湖方案的調(diào)研,我們注意到了網(wǎng)易數(shù)帆開(kāi)源的基于 Apache Iceberg 構(gòu)建的 Arctic 數(shù)據(jù)湖解決方案。Arctic 能相對(duì)較好地支持與服務(wù)于流批混用的場(chǎng)景,其開(kāi)放的疊加式架構(gòu),可以幫助我們非常平滑地過(guò)渡與實(shí)現(xiàn) Hive 到數(shù)據(jù)湖的升級(jí)改造,且由于傳媒離線數(shù)倉(cāng)已接入有數(shù),通過(guò) Arctic 來(lái)改造現(xiàn)有業(yè)務(wù)的成本較低,于是我們準(zhǔn)備通過(guò)引入 Arctic ,嘗試解決 push 業(yè)務(wù)場(chǎng)景下的痛點(diǎn)。
1、項(xiàng)目背景
以傳媒 push 實(shí)時(shí)數(shù)倉(cāng)為例,新聞推送在地域、時(shí)間、頻次等因素上有較高的不確定性,非常容易出現(xiàn)偶發(fā)的流量洪峰,尤其是在出現(xiàn)突發(fā)性社會(huì)熱點(diǎn)新聞的時(shí)候。如果采用全鏈路的實(shí)時(shí)計(jì)算方案來(lái)處理,則需要預(yù)留出較多的資源 buffer 來(lái)應(yīng)對(duì)。
由于推送時(shí)機(jī)的不確定性,push 業(yè)務(wù)的數(shù)據(jù)指標(biāo)一般不是增量型的,而是以當(dāng)天截止到當(dāng)前的各種累計(jì)型指標(biāo)為主,計(jì)算窗口通常為十五分鐘到半小時(shí)不等,統(tǒng)計(jì)維度區(qū)分發(fā)送類型、內(nèi)容分類、發(fā)送票數(shù)、發(fā)送廠商、首啟方式、用戶活躍度、AB 實(shí)驗(yàn)等,具有流量波動(dòng)大和數(shù)據(jù)口徑繁多等特點(diǎn)。
此前采用的全鏈路 Flink 實(shí)時(shí)計(jì)算方案中,主要遇到以下問(wèn)題:
(1)資源占用成本高
為應(yīng)對(duì)流量洪峰,需要為實(shí)時(shí)任務(wù)分配預(yù)留出較高的資源,且多個(gè)聚合任務(wù)需要消費(fèi)同一個(gè)上游數(shù)據(jù),存在讀放大問(wèn)題。push 相關(guān)的實(shí)時(shí)計(jì)算流程占到了實(shí)時(shí)任務(wù)總量的 18+%,而資源使用量占到了實(shí)時(shí)資源總使用量的近 25%。
(2)大狀態(tài)帶來(lái)的任務(wù)穩(wěn)定性下降
push 業(yè)務(wù)場(chǎng)景下進(jìn)行窗口計(jì)算時(shí),大流量會(huì)帶來(lái)大狀態(tài)的問(wèn)題,而大狀態(tài)的維護(hù)在造成資源開(kāi)支的同時(shí)比較容易影響任務(wù)的穩(wěn)定性。
(3)任務(wù)異常時(shí)難以及時(shí)的進(jìn)行數(shù)據(jù)修復(fù)?
實(shí)時(shí)任務(wù)出現(xiàn)異常時(shí),以實(shí)時(shí)方式來(lái)回溯數(shù)據(jù)時(shí)效慢且流程復(fù)雜;而以離線流程來(lái)修正,則會(huì)帶來(lái)雙倍的人力和存儲(chǔ)成本。
2、項(xiàng)目思路和方案
2.1 項(xiàng)目思路?
我們通過(guò)對(duì)數(shù)據(jù)湖的調(diào)研,期望利用數(shù)據(jù)實(shí)時(shí)入湖的特點(diǎn),同時(shí)使用 Spark 等離線資源完成計(jì)算,用較低的成本滿足業(yè)務(wù)上對(duì)準(zhǔn)實(shí)時(shí)計(jì)算場(chǎng)景的需求。我們以 push 業(yè)務(wù)場(chǎng)景作為試點(diǎn)進(jìn)行方案的探索落地,再逐漸將方案推廣至更多類似業(yè)務(wù)場(chǎng)景。
基于對(duì)開(kāi)源數(shù)據(jù)湖方案的調(diào)研,我們注意到了網(wǎng)易數(shù)帆開(kāi)源的基于 Apache Iceberg 構(gòu)建的 Arctic 數(shù)據(jù)湖解決方案。Arctic 能相對(duì)較好地支持與服務(wù)于流批混用的場(chǎng)景,其開(kāi)放的疊加式架構(gòu),可以幫助我們非常平滑地過(guò)渡與實(shí)現(xiàn) Hive 到數(shù)據(jù)湖的升級(jí)改造,且由于傳媒離線數(shù)倉(cāng)已接入有數(shù),通過(guò) Arctic 來(lái)改造現(xiàn)有業(yè)務(wù)的成本較低,于是我們準(zhǔn)備通過(guò)引入 Arctic ,嘗試解決 push 業(yè)務(wù)場(chǎng)景下的痛點(diǎn)。
Arctic 是由網(wǎng)易數(shù)帆開(kāi)源的流式湖倉(cāng)系統(tǒng),在 Iceberg 和 Hive 之上添加了更多實(shí)時(shí)場(chǎng)景的能力。通過(guò) Arctic,用戶可以在 Flink、Spark、Trino、Impala 等引擎上實(shí)現(xiàn)更加優(yōu)化的 CDC、流式更新、OLAP 等功能。
實(shí)現(xiàn) push 業(yè)務(wù)場(chǎng)景下的數(shù)據(jù)湖改造,只需要使用 Arctic 提供的 Flink Connector,便可快速地實(shí)現(xiàn) push 明細(xì)數(shù)據(jù)的實(shí)時(shí)入湖。
此時(shí)需要我們關(guān)注的重點(diǎn)是,數(shù)據(jù)產(chǎn)出需要滿足分鐘級(jí)業(yè)務(wù)需求。數(shù)據(jù)產(chǎn)出延遲由兩部分組成:
- 數(shù)據(jù)就緒延遲,取決于 Flink 實(shí)時(shí)任務(wù)的 Commit 間隔,一般為分鐘級(jí)別;
- 數(shù)據(jù)計(jì)算耗時(shí),取決于計(jì)算引擎和業(yè)務(wù)邏輯:數(shù)據(jù)產(chǎn)出延遲 = 數(shù)據(jù)就緒延遲 + 數(shù)據(jù)計(jì)算耗時(shí)?
2.2 解決方案
2.2.1 數(shù)據(jù)實(shí)時(shí)入湖
Arctic 能夠兼容已有的存儲(chǔ)介質(zhì)(如 HDFS)和表結(jié)構(gòu)(如 Hive、Iceberg),并在之上提供透明的流批一體表服務(wù)。存儲(chǔ)結(jié)構(gòu)上主要為 Basestore 和 Changestore 兩部分:
(1)Basestore 中存儲(chǔ)了表的存量數(shù)據(jù)。它通常由 Spark/Flink 等引擎完成第一次寫(xiě)入,再之后則通過(guò)自動(dòng)的結(jié)構(gòu)優(yōu)化過(guò)程將 Changestore 中的數(shù)據(jù)轉(zhuǎn)化之后寫(xiě)入。
(2)Changestore 中存儲(chǔ)了表上最近的變更數(shù)據(jù)。Changestore 中存儲(chǔ)了表上最近的變更數(shù)據(jù)。它通常由 Apache Flink 任務(wù)實(shí)時(shí)寫(xiě)入,并用于下游 Flink 任務(wù)進(jìn)行準(zhǔn)實(shí)時(shí)的流式消費(fèi)。同時(shí)也可以對(duì)它直接進(jìn)行批量計(jì)算或聯(lián)合 Basestore 里的數(shù)據(jù)一起通過(guò) Merge-On-Read(以下簡(jiǎn)稱為MOR) 的查詢方式提供分鐘級(jí)延遲的批量查詢能力。
Arctic 表支持實(shí)時(shí)數(shù)據(jù)的流式寫(xiě)入,數(shù)據(jù)寫(xiě)入過(guò)程中為了保證數(shù)據(jù)的實(shí)效性,寫(xiě)入側(cè)需要頻繁的進(jìn)行數(shù)據(jù)提交,但因此會(huì)產(chǎn)生大量的小文件,積壓的小文件一方面會(huì)影響數(shù)據(jù)的查詢性能,另一方面也會(huì)對(duì)文件系統(tǒng)帶來(lái)壓力。這方面,Arctic 支持基于主鍵的行級(jí)更新,提供了 Optimizer 來(lái)進(jìn)行數(shù)據(jù) Update 和自動(dòng)的結(jié)構(gòu)優(yōu)化,以幫助用戶解決數(shù)據(jù)湖常見(jiàn)的小文件、讀放大、寫(xiě)放大等問(wèn)題。
以傳媒 push 數(shù)倉(cāng)場(chǎng)景為例,push 發(fā)送、送達(dá)、點(diǎn)擊、展示等明細(xì)數(shù)據(jù)需要通過(guò) Flink 作業(yè)實(shí)時(shí)寫(xiě)入到 Arctic 中。由于上游已經(jīng)做了 ETL 清洗,此階段只需要通過(guò) FlinkSQL 即可方便地將上游數(shù)據(jù)寫(xiě)入 Changestore。Changestore 內(nèi)包含了存儲(chǔ)插入數(shù)據(jù)的 insert 文件和存儲(chǔ)刪除數(shù)據(jù)的 equality delete 文件,更新數(shù)據(jù)會(huì)被拆分為更新前項(xiàng)和更新后項(xiàng)分別存儲(chǔ)在 delete 文件與 insert 文件中。
具體的,對(duì)于有主鍵場(chǎng)景,insert/update_after 消息會(huì)寫(xiě)入 Changestore 的 insert 文件,delete/update_before 會(huì)寫(xiě)入 Arctic 的 delete 文件。當(dāng)進(jìn)行 Optimize 的時(shí)候,會(huì)先把 delete 文件讀到內(nèi)存中形成一個(gè) delete map, map 的 key 是記錄的主鍵,value 是 record_lsn。然后 再讀取 Basestore 和 Changestore 中的 insert 文件, 對(duì)主鍵相同的 row 進(jìn)行 record_lsn 的對(duì)比,如果 insert 記錄中 record_lsn 比 deletemap 中相同主鍵的 record_lsn 小,則認(rèn)為這條記錄已經(jīng)被刪除了,不會(huì)再追加到 base 里;否則把數(shù)據(jù)寫(xiě)入到新文件里,最終實(shí)現(xiàn)了行級(jí)的更新。
2.2.2 湖水位感知
傳統(tǒng)的離線計(jì)算在調(diào)度方面需要有一個(gè)觸發(fā)機(jī)制,一般由作業(yè)調(diào)度系統(tǒng)按照任務(wù)之間的依賴關(guān)系來(lái)處理,當(dāng)上游任務(wù)全部成功后自動(dòng)調(diào)起下游的任務(wù)。但在實(shí)時(shí)入湖的場(chǎng)景下,下游任務(wù)缺乏一個(gè)感知數(shù)據(jù)是否就緒的途徑。以 push 場(chǎng)景為例,需要產(chǎn)出的指標(biāo)主要為按照指定的時(shí)間粒度來(lái)計(jì)算一次當(dāng)天累計(jì)的各種統(tǒng)計(jì)值,此時(shí)下游如果沒(méi)法感知當(dāng)前湖表水位的話,要么需要留出一個(gè)較冗余的緩沖時(shí)間來(lái)保證數(shù)據(jù)就緒,要么則有漏數(shù)據(jù)的可能,畢竟 push 場(chǎng)景的流量變化是非常起伏不定的。
傳媒大數(shù)據(jù)團(tuán)隊(duì)和 Arctic 團(tuán)隊(duì)借鑒了 Flink Watermark 的處理機(jī)制和 Iceberg 社區(qū)討論的方案,將 Watermark 信息寫(xiě)入到 Iceberg 表的 metadata 文件里,然后由 Arctic 通過(guò)消息隊(duì)列或者 API 暴露出來(lái),從而做到下游任務(wù)的主動(dòng)感知,盡可能地降低了啟動(dòng)延遲。具體方案如下:
(1)Arctic 表水位感知
當(dāng)前只考慮 Flink 寫(xiě)入的場(chǎng)景,業(yè)務(wù)在 Flink 的 source 定義事件時(shí)間和 Watermark。ArcticSinkConnector 包含兩個(gè)算子,一個(gè)是負(fù)責(zé)寫(xiě)文件的多并發(fā)的 ArcticWriter, 一個(gè)是負(fù)責(zé)提交文件的的單并發(fā)的 ArcticFileCommitter。當(dāng)執(zhí)行 checkpoint 時(shí),ArcticFileCommitter 算子會(huì)進(jìn)行 Watermark 對(duì)齊之后取最小的 Watermark。會(huì)新建一個(gè)類似于 Iceberg 事務(wù)的 AMS Transaction,在這個(gè)事務(wù)里除了 AppendFiles 到 Iceberg,同時(shí)把 TransactionID,以及 Watermark 通過(guò) AMS 的 thrift 接口上報(bào)給 AMS。
(2)Hive 表水位感知
Hive表里可見(jiàn)的數(shù)據(jù)是經(jīng)過(guò) Optimize 過(guò)后的數(shù)據(jù),Optimize 由 AMS 來(lái)調(diào)度,F(xiàn)link 任務(wù)異常執(zhí)行文件的讀寫(xiě)合并,并且把 Metric 上報(bào)給 AMS, 由 AMS 來(lái)把這一次 Optimize 執(zhí)行的結(jié)果 Commit,AMS 天然知道這一次 Optimize 推進(jìn)到了哪次 Transaction, 并且 AMS 本身也存儲(chǔ)了 Transaction 對(duì)應(yīng)的 Watermark,也就知道 Hive 表水位推進(jìn)到了哪里。
2.2.3 數(shù)據(jù)湖查詢
Arctic 提供了 Spark/Flink/Trino/Impala 等計(jì)算引擎的 Connector 支持。通過(guò)使用Arctic數(shù)據(jù)源,各計(jì)算引擎都可以實(shí)時(shí)讀取到已經(jīng) Commit 的文件,Commit 的間隔按照業(yè)務(wù)的需求一般為分鐘級(jí)別。下面以 push 業(yè)務(wù)為例介紹幾種場(chǎng)景下的查詢方案和相應(yīng)成本:
(1)Arctic + Trino/Impala 滿足秒級(jí) OLAP 查詢
OLAP 場(chǎng)景下,用戶一般更關(guān)注計(jì)算上的耗時(shí),對(duì)數(shù)據(jù)就緒的敏感度相對(duì)不高。針對(duì)中小規(guī)模數(shù)據(jù)量的 Arctic 表或較簡(jiǎn)單的查詢,通過(guò) Trino/Impala 進(jìn)行 OLAP 查詢是一個(gè)相對(duì)高效的方案,基本上可以做到秒級(jí) MOR 查詢耗時(shí)。成本上,需要搭建 Trino/Impala 集群,如果團(tuán)隊(duì)中已有在使用的話,則可以根據(jù)負(fù)載情況考慮復(fù)用。
Arctic 在開(kāi)源發(fā)布會(huì)上發(fā)布了自己的 benchmark 數(shù)據(jù),在數(shù)據(jù)庫(kù) CDC 持續(xù)流式攝取的場(chǎng)景下,對(duì)比各個(gè)數(shù)據(jù)湖 Format 的 OLAP benchmark 性能, 整體上帶 Optimize 的 Arctic 的性能優(yōu)于 Hudi,這主要得益于 Arctic 內(nèi)部有一套高效的文件索引 Arctic Tree,在 MOR 場(chǎng)景下可以做到更細(xì)粒度、精確地 merge。詳細(xì)的對(duì)比報(bào)告可以參考:https://arctic.netease.com/ch/benchmark/。
(2)Arctic + Spark 滿足分鐘級(jí)預(yù)聚合查詢
針對(duì)提供下游數(shù)據(jù)報(bào)表展示的場(chǎng)景,一般需要走預(yù)計(jì)算的流程將結(jié)果持久化下來(lái),對(duì)數(shù)據(jù)就緒和計(jì)算耗時(shí)的敏感度都較高,而且查詢邏輯相對(duì)復(fù)雜,Trino/Impala 集群規(guī)模相對(duì)較小,執(zhí)行容易失敗,導(dǎo)致穩(wěn)定性欠佳。這個(gè)場(chǎng)景下我們使用了集群部署規(guī)模最大的 Spark 引擎來(lái)處理,在不引入新的資源成本的情況下,做到了離線計(jì)算資源的復(fù)用。
數(shù)據(jù)就緒方面,通過(guò) Arctic 表水位感知方案,可以做到較低的分鐘級(jí)就緒延遲。
計(jì)算方面,Arctic 對(duì) Spark Connector 提供了一些讀取優(yōu)化,用戶可以通過(guò)配置 Arctic 表的 read.split.planning-parallelism 和 read.split.planning-parallelism-factor 這兩個(gè)參數(shù)值,來(lái)調(diào)整 Arctic Combine Task 的數(shù)量,進(jìn)而控制計(jì)算任務(wù)的并發(fā)度。由于 Spark 離線計(jì)算的資源相對(duì)靈活和充足,我們可以通過(guò)上述調(diào)整并發(fā)度的方式來(lái)保證在 2~3 分鐘內(nèi)完成業(yè)務(wù)的計(jì)算需求。
(3)Hive + Spark 滿足傳統(tǒng)離線數(shù)倉(cāng)生產(chǎn)鏈路的調(diào)度
Arctic 支持將 Hive 表作為 Basestore,F(xiàn)ull Optimize 時(shí)會(huì)將文件寫(xiě)入到 Hive 數(shù)據(jù)目錄下,以達(dá)到更新 Hive 原生讀取內(nèi)容的目的,通過(guò)存儲(chǔ)架構(gòu)上的流批一體來(lái)降低成本。因此傳統(tǒng)的離線數(shù)倉(cāng)生產(chǎn)鏈路,可以直接使用對(duì)應(yīng)的 Hive 表來(lái)作為離線數(shù)倉(cāng)鏈路的一部分,時(shí)效性上相較于 Arctic 表雖缺少了 MOR,但通過(guò) Hive 表水位感知方案,可以做到業(yè)務(wù)能接受的就緒延遲,從而滿足傳統(tǒng)離線數(shù)倉(cāng)生產(chǎn)鏈路的調(diào)度。
3、項(xiàng)目影響力與產(chǎn)出價(jià)值
3.1 項(xiàng)目影響力
通過(guò) Arctic + X 方案在傳媒的探索和落地,為傳媒準(zhǔn)實(shí)時(shí)計(jì)算場(chǎng)景提供了一個(gè)新的解決思路。該思路不但減輕了全鏈路 Flink 實(shí)時(shí)計(jì)算方案所帶來(lái)的實(shí)時(shí)資源壓力和開(kāi)發(fā)運(yùn)維負(fù)擔(dān),而且還能較好地復(fù)用現(xiàn)有的 HDFS 和 Spark 等存儲(chǔ)計(jì)算資源,做到了降本增效。
此外 Arctic 在音樂(lè)、有道等多個(gè) BU 也有落地,比如在音樂(lè)公技,用于 ES 冷數(shù)據(jù)的存儲(chǔ),降低了用戶 ES 的存儲(chǔ)成本;而有道精品課研發(fā)團(tuán)隊(duì)也在積極探索和使用 Arctic 作為其部分業(yè)務(wù)場(chǎng)景下的解決方案。
目前 Arctic 已經(jīng)在 github 上開(kāi)源,受到了開(kāi)源社區(qū)與外部用戶的持續(xù)關(guān)注,在 Arctic 的建設(shè)與發(fā)展中,也收到了不少外部用戶提交的高質(zhì)量 PR 。
3.2 項(xiàng)目產(chǎn)出價(jià)值
通過(guò)上述方案我們將 push ETL 明細(xì)數(shù)據(jù)通過(guò) Flink 實(shí)時(shí)入湖到 Arctic,然后在調(diào)度平臺(tái)上配置分鐘級(jí)的調(diào)度任務(wù),按照不同交叉維度進(jìn)行計(jì)算后將累計(jì)型指標(biāo)后寫(xiě)入關(guān)系數(shù)據(jù)庫(kù),最后通過(guò)有數(shù)直連進(jìn)行數(shù)據(jù)展示,做到了業(yè)務(wù)方要求的分鐘級(jí)時(shí)效數(shù)據(jù)產(chǎn)出。改造后的方案,同原來(lái)的全鏈路 Flink 實(shí)時(shí)計(jì)算方案相比:
(1)充分復(fù)用離線空閑算力,降低了實(shí)時(shí)計(jì)算資源開(kāi)支
方案利用了空閑狀態(tài)下的離線計(jì)算資源,且基本不會(huì)帶來(lái)新的資源開(kāi)支。離線計(jì)算業(yè)務(wù)場(chǎng)景注定了資源使用的高峰在凌晨,而新聞 push 推送及熱點(diǎn)新聞產(chǎn)生的場(chǎng)景大多為非凌晨時(shí)段,在滿足準(zhǔn)實(shí)時(shí)計(jì)算時(shí)效的前提下,通過(guò)復(fù)用提升了離線計(jì)算集群的綜合利用率。另外,該方案能幫我們釋放大約 2.4T 左右的實(shí)時(shí)計(jì)算內(nèi)存資源。
(2)降低任務(wù)維護(hù)成本,提升任務(wù)穩(wěn)定性
Arctic + Spark 水位感知觸發(fā)調(diào)度的方案可減少 17+ 實(shí)時(shí)任務(wù)的維護(hù)成本,減少了 Flink 實(shí)時(shí)計(jì)算任務(wù)大狀態(tài)所帶來(lái)的穩(wěn)定性問(wèn)題。通過(guò) Spark 離線調(diào)度任務(wù)可充分利用離線資源池調(diào)整計(jì)算并行度,有效提升了應(yīng)對(duì)突發(fā)熱點(diǎn)新聞流量洪峰時(shí)的健壯性。
(3)提升數(shù)據(jù)異常時(shí)的修復(fù)能力,降低數(shù)據(jù)修復(fù)時(shí)間開(kāi)支
通過(guò)流批一體的 Arctic 數(shù)據(jù)湖存儲(chǔ)架構(gòu),當(dāng)數(shù)據(jù)出現(xiàn)異常需要修正時(shí),可靈活地對(duì)異常數(shù)據(jù)進(jìn)行修復(fù),降低修正成本;而如果通過(guò)實(shí)時(shí)計(jì)算鏈路回溯數(shù)據(jù)或通過(guò)額外的離線流程來(lái)修正,則需要重新進(jìn)行狀態(tài)累計(jì)或復(fù)雜的 ETL 流程。
4、項(xiàng)目未來(lái)規(guī)劃和展望
當(dāng)前還有一些場(chǎng)景 Arctic 不能做到較好的支持,傳媒大數(shù)據(jù)團(tuán)隊(duì)將和 Arctic 團(tuán)隊(duì)繼續(xù)對(duì)以下場(chǎng)景下的解決方案進(jìn)行探索和落地:
(1)當(dāng)前入湖前的 push 明細(xì)數(shù)據(jù)是通過(guò)上游多條數(shù)據(jù)流 join 生成的,也同樣會(huì)存在大狀態(tài)的問(wèn)題。而 Arctic 當(dāng)前只能支持行級(jí)的更新能力,如果能落地有主鍵表的部分列更新能力,則可以幫助業(yè)務(wù)在入湖的時(shí)候,以較低的成本直接實(shí)現(xiàn)多流 join。
(2)進(jìn)一步完善 Arctic 表和 Hive 表的水位定義和感知方案,提升時(shí)效,并推廣到更多的業(yè)務(wù)場(chǎng)景中。當(dāng)前的方案只支持單 Spark/Flink 任務(wù)寫(xiě)入的場(chǎng)景,對(duì)于多個(gè)任務(wù)并發(fā)寫(xiě)表的場(chǎng)景,還需要再完善。