字節(jié)跳動(dòng)基于 Ray 的大規(guī)模多模態(tài)數(shù)據(jù)處理框架
Ray Summit是Ray社區(qū)一年一度的全球盛會(huì),2024年于9月30日至10月2日在美國(guó)舊金山舉行,主題是"Where Builders Create the AI Future",聚焦于構(gòu)建人工智能的未來(lái),吸引了全球眾多AI開(kāi)發(fā)者和行業(yè)領(lǐng)袖。今年的Ray Summit不僅是一個(gè)技術(shù)交流的平臺(tái),更是一個(gè)展示最新AI技術(shù)和趨勢(shì)的舞臺(tái)。包括OpenAI、Meta、LangChain、Google、Nvidia、ByteDance在內(nèi)的多家頂尖科技公司和組織參與了此次盛會(huì),共同探討和分享了他們?cè)贏I領(lǐng)域的最新進(jìn)展和洞見(jiàn)。
來(lái)自ByteDance的Xiaohong Dong、Zhibei Ma、Liguang Xie分享了題為《How Bytedance Builds Large-Scale Data Processing Pipelines for Multimodal Models with Ray》的演講。
YouTube視頻鏈接:https://www.youtube.com/watch?v=f67SKoxR9H0&t=152s
我們來(lái)自字節(jié)跳動(dòng)Seed(語(yǔ)音、視覺(jué))和Data Infra團(tuán)隊(duì),致力于構(gòu)建高性能、可擴(kuò)展的分布式數(shù)據(jù)處理平臺(tái),通過(guò)數(shù)據(jù)驅(qū)動(dòng)的方法來(lái)提高多模態(tài)大模型能力。
當(dāng)前數(shù)據(jù)處理面臨三大挑戰(zhàn):1) 數(shù)據(jù)呈指數(shù)級(jí)增長(zhǎng),數(shù)據(jù)量達(dá)到PB量級(jí);2) GPU和CPU資源有限;3) 數(shù)據(jù)處理任務(wù)越來(lái)越復(fù)雜,有多個(gè)步驟和復(fù)雜的算法。
我們的目標(biāo)是在有限的資源下,提高數(shù)據(jù)處理效率。Ray就是答案,它可以處理大量數(shù)據(jù),優(yōu)化異構(gòu)資源分配,并具有靈活的編排能力。
接下來(lái)將介紹在字節(jié)跳動(dòng),如何使用Ray/RayData構(gòu)建Audio/Video數(shù)據(jù)處理Pipeline,以及在大規(guī)模不穩(wěn)定資源上運(yùn)行RayData所做的優(yōu)化工作。
首先,讓我們深入探究 Audio 數(shù)據(jù)處理平臺(tái)的細(xì)節(jié),了解 Ray 是怎樣解決上述提到的在數(shù)據(jù)處理中頗具挑戰(zhàn)性的三個(gè)問(wèn)題的。如圖所示,數(shù)據(jù)處理平臺(tái)架構(gòu)可分為三層:第一層為基礎(chǔ)設(shè)施層,它管理基礎(chǔ)的存儲(chǔ)資源與計(jì)算資源,以及任務(wù)調(diào)度,確??捎觅Y源的高效利用以及各類任務(wù)的順暢執(zhí)行。第二層是自主研發(fā)的數(shù)據(jù)處理Pipeline,專門(mén)用于處理各種Audio的數(shù)據(jù),執(zhí)行一系列的數(shù)據(jù)轉(zhuǎn)換與處理。頂層是應(yīng)用層,處理后的數(shù)據(jù)被用于各種業(yè)務(wù)場(chǎng)景,比如音樂(lè)生成等。接下來(lái)重點(diǎn)介紹數(shù)據(jù)處理Pipeline。
數(shù)據(jù)處理Pipeline中的第一個(gè)概念是node,它通常代表一個(gè)特定的任務(wù)或算子,例如過(guò)濾算子、打標(biāo)算子、去重算子。node可能需要 CPU資源也可能需要 GPU 資源。
第二個(gè)概念是flow,flow被定義為節(jié)點(diǎn)之間的數(shù)據(jù)或控制傳輸關(guān)系,是一種有向連接。一個(gè)node可能有多個(gè)輸入flow或輸出flow。最終數(shù)據(jù)處理Pipeline的DAG由node和flow定義,并通過(guò) YAML 組裝。
基于上述設(shè)計(jì),框架定義了很多常見(jiàn)的數(shù)據(jù)處理Pipeline,通過(guò)重新組合這些常見(jiàn)的Pipeline,可以滿足用戶模型訓(xùn)練的數(shù)據(jù)處理要求。這種方法極大地提高了整體數(shù)據(jù)處理的工作效率,并為數(shù)據(jù)處理和模型訓(xùn)練提供了更靈活的解決方案。
在構(gòu)建數(shù)據(jù)處理Pipeline的初始階段,遇到了一些問(wèn)題:1)可擴(kuò)展性不夠,很難從單個(gè)節(jié)點(diǎn)拓展至多個(gè)節(jié)點(diǎn),不能充分利用資源;2)任務(wù)調(diào)度與負(fù)載均衡問(wèn)題,需要手動(dòng)管理任務(wù)分配,復(fù)雜度很高且任務(wù)分配不合理;3)高可用性和容錯(cuò)性,缺乏任務(wù)重試和節(jié)點(diǎn)故障檢測(cè)的自動(dòng)化機(jī)制,在節(jié)點(diǎn)中斷時(shí),可能會(huì)有任務(wù)失敗和數(shù)據(jù)丟失的情況;4)數(shù)據(jù)傳輸與共享,需要手動(dòng)完成任務(wù)和節(jié)點(diǎn)之間高效的數(shù)據(jù)傳輸和同步。
在調(diào)研了Ray的基礎(chǔ)能力之后,開(kāi)始嘗試使用RayCore構(gòu)建Pipeline。因?yàn)镽ay對(duì)Python生態(tài)非常友好,使得用戶能夠更高效地進(jìn)行開(kāi)發(fā)以及和現(xiàn)存方案進(jìn)行集成,原有方案得以快速遷移到了Ray上。
RayCore 提供了強(qiáng)大的分布式計(jì)算能力,比如Actor、Task,使用RayCore可以方便的開(kāi)發(fā)分布式應(yīng)用程序,構(gòu)建數(shù)據(jù)處理pipeline。但是RayCore提供的是low level的API,直接使用它進(jìn)行開(kāi)發(fā)需要自行處理很多問(wèn)題,包括不限于:1) 數(shù)據(jù)切片和分片管理,需要手動(dòng)管理數(shù)據(jù)分片和分布,這無(wú)疑增加了復(fù)雜性;2)數(shù)據(jù)讀取和加載效率低的問(wèn)題,缺乏高效的自動(dòng)化數(shù)據(jù)讀取和加載機(jī)制,會(huì)影響整體效率;3)缺乏高級(jí)數(shù)據(jù)操作功能,需要手動(dòng)實(shí)現(xiàn)常見(jiàn)的數(shù)據(jù)操作,開(kāi)發(fā)成本高。
所以我們開(kāi)始在Pipeline中使用RayData,它提供了一系列開(kāi)箱即用的算子,和豐富的多模態(tài)數(shù)據(jù)DataSource支持,自動(dòng)管理數(shù)據(jù)分片能力,同時(shí)具有自動(dòng)擴(kuò)縮容的能力,極大的減少了開(kāi)發(fā)成本。
同時(shí)在Pipeline中也使用到了RayServe進(jìn)行高效的模型部署和服務(wù),RayServe 提供了易于使用的 API,使模型能夠快速轉(zhuǎn)化為可訪問(wèn)的服務(wù)。另一方面,在高可用性和容錯(cuò)性方面。RayServe 具有內(nèi)置的高可用性和容錯(cuò)機(jī)制,可以自動(dòng)檢測(cè)和從故障節(jié)點(diǎn)恢復(fù),以確保服務(wù)的穩(wěn)定性。
在構(gòu)建Audio數(shù)據(jù)處理Pipeline中,我們看到Ray的優(yōu)勢(shì)有:
- 良好的可擴(kuò)展性,從單機(jī)到大型集群的輕松無(wú)縫擴(kuò)展
- 靈活的 API,易于編寫(xiě)和管理復(fù)雜的數(shù)據(jù)處理任務(wù)
- 完善的數(shù)據(jù)生態(tài),為各種數(shù)據(jù)處理需求提供全面的解決方案
- 高性能,高效地分布式計(jì)算、數(shù)據(jù)傳輸
- 兼容性好,RayData 與現(xiàn)有的數(shù)據(jù)處理庫(kù)和框架(如 Pandas 和 Spark)兼容,可以輕松集成到現(xiàn)有工作流程中
接下來(lái)介紹如何使用Ray來(lái)增強(qiáng)Video數(shù)據(jù)處理Pipeline。大量高質(zhì)量的視頻數(shù)據(jù)是視頻生成基礎(chǔ)模型訓(xùn)練的關(guān)鍵,然而與文本圖像和音頻相比,視頻數(shù)據(jù)量龐大,而且與其他數(shù)據(jù)格式相比,處理視頻數(shù)據(jù)需要更多的計(jì)算資源和時(shí)間。比如,對(duì)于視頻數(shù)據(jù)經(jīng)常需要使用 ffmpeg 進(jìn)行視頻編碼和解碼,這需要很多計(jì)算資源和計(jì)算時(shí)間。因此,高效處理大量視頻數(shù)據(jù)并實(shí)現(xiàn)高吞吐量和可擴(kuò)展性是一個(gè)關(guān)鍵挑戰(zhàn)。
視頻數(shù)據(jù)處理流程一般有如下步驟:流程需要處理一系列的原始視頻,時(shí)長(zhǎng)可以從幾秒到幾小時(shí)不等。使用視頻分割算法,將視頻分割成不同的片段,每個(gè)片段的時(shí)長(zhǎng)可以從幾秒到 1 分鐘左右。然后,下一個(gè)步驟是視頻處理算法。首先裁剪視頻,以便只選擇需要的視頻,同時(shí)還會(huì)對(duì)視頻進(jìn)行評(píng)分。在這個(gè)過(guò)程中,需要將元數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)中,并將片段上傳到對(duì)象存儲(chǔ)中以備將來(lái)使用。下一步稱為打包,我們使用Ray構(gòu)建了打包過(guò)程,選擇需要的視頻片段,將來(lái)自對(duì)象存儲(chǔ)和元數(shù)據(jù)的所有片段放入一個(gè) Parquet 文件中,打包后的Parquet文件將被用于接下來(lái)的訓(xùn)練。
那么什么是視頻數(shù)據(jù)打包呢?正如剛才提到的,視頻數(shù)據(jù)打包就是將一組視頻剪輯存儲(chǔ)在 Parquet 文件中,以方便高效的數(shù)據(jù)管理和訪問(wèn)。這樣做的目的是避免在訓(xùn)練階段加載大量小文件,預(yù)先將多個(gè)小的視頻文件放入一個(gè) Parquet 文件中,然后在訓(xùn)練階段訓(xùn)練進(jìn)程直接加載parquet以提高加載效率。
首先調(diào)研直接使用RayData構(gòu)建打包流程。第一步是利用 RayData 從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù),依據(jù)不同條件進(jìn)行過(guò)濾操作篩選視頻,隨后把數(shù)據(jù)集重新劃分成不同的partition,以利于后續(xù)進(jìn)行并行處理。接著是視頻處理,從對(duì)象存儲(chǔ)中下載數(shù)據(jù),借助 ffmpeg 等框架處理視頻。再將數(shù)據(jù)打包到 Parquet 并上傳。在實(shí)驗(yàn)中我們發(fā)現(xiàn)這個(gè)方案有兩個(gè)問(wèn)題,1) 二進(jìn)制對(duì)象的序列化和反序列化,尤其對(duì)于大對(duì)象會(huì)非常耗時(shí);2) 一旦 ObjectStore 滿,Ray就會(huì)把 Object Spill 到磁盤(pán)上,從而影響整體性能。
所以,打包過(guò)程中將視頻數(shù)據(jù)像存儲(chǔ)在 objectstore中,特別是在大容量的情況下,效率不高。嘗試使用另一個(gè)解決方案,將所有操作融合到單個(gè) actor 中,以避免 actor 之間的數(shù)據(jù)傳輸。如上圖所示,actor內(nèi)部啟動(dòng)多個(gè)線程一起運(yùn)行,在每個(gè)線程中下載視頻并運(yùn)行視頻處理操作,然后寫(xiě)入 Parquet 文件并上傳到外部存儲(chǔ)。這個(gè)解決方案效果很好,可以實(shí)現(xiàn)高吞吐量,并且具有良好的線性擴(kuò)展性,增加更多的 CPU 資源,帶寬也會(huì)相應(yīng)地增加。
接下來(lái)分享一些使用Ray的經(jīng)驗(yàn)以及Ray的優(yōu)勢(shì):
1)Ray具有可擴(kuò)展性和靈活性,可以輕松地從本地 Python 腳本擴(kuò)展到大規(guī)模集群,在數(shù)千個(gè)工作節(jié)點(diǎn)的規(guī)模下也能運(yùn)行良好;
2)對(duì)Python友好,ML場(chǎng)景中大量使用 Python,Ray對(duì)python非常友好,非常方便進(jìn)行開(kāi)發(fā)調(diào)試,與現(xiàn)有ML生態(tài)也結(jié)合的比較好;
3)Ray Dashboard提供作業(yè)相關(guān)的Restful API,可以非常方便地將這些API集成到業(yè)務(wù)平臺(tái)中,包括提交和監(jiān)控Ray作業(yè)的運(yùn)行狀態(tài);
接下來(lái)介紹Ray相關(guān)的底層基礎(chǔ)設(shè)施。在字節(jié)跳動(dòng),Ray被應(yīng)用在很多業(yè)務(wù)場(chǎng)景中,包括但不限于Audio/Video數(shù)據(jù)處理、RLHF等。Ray支持非常靈活的編排和異構(gòu)資源(CPU/GPU)的調(diào)度能力,幫助用戶進(jìn)行靈活的多角色 DAG 編排和異構(gòu)計(jì)算,構(gòu)建大規(guī)模高性能的 ML 基礎(chǔ)設(shè)施。但是,像許多其他大規(guī)模分布式系統(tǒng)一樣,生產(chǎn)環(huán)境中使用Ray也面臨著巨大的挑戰(zhàn)。
LLMs的數(shù)據(jù)處理任務(wù)通常需要巨大的資源需求和相對(duì)較長(zhǎng)的處理時(shí)間,一般做為離線處理任務(wù)。為了降低成本,會(huì)使用大量不穩(wěn)定的Kubernetes Pod來(lái)運(yùn)行這些數(shù)據(jù)處理任務(wù),這些Pod可以隨時(shí)被搶占,也可能隨時(shí)重新添加進(jìn)來(lái)。我們希望Worker節(jié)點(diǎn)被搶占不會(huì)導(dǎo)致數(shù)據(jù)處理任務(wù)失敗或中斷。比如,如果一個(gè)Ray任務(wù)在 100 個(gè) GPU 上運(yùn)行,其中 40 個(gè)GPU被搶占,期望剩余的 60 個(gè) GPU 能夠繼續(xù)高效運(yùn)行,這是一個(gè)非常大的挑戰(zhàn)。雖然 RayCore 提供了強(qiáng)大的 Actor 和 Task 恢復(fù)機(jī)制,但在當(dāng)前的 RayData 設(shè)計(jì)中,當(dāng)一個(gè) Actor 異常退出,必須等待 Actor 重新啟動(dòng)才能繼續(xù)執(zhí)行 task,如果資源不足,Actor將處于Pending狀態(tài),因此RayData任務(wù)也會(huì)hang住,直到資源恢復(fù)。為了解決這些問(wèn)題,我們?cè)O(shè)計(jì)并開(kāi)發(fā)了RayData增強(qiáng)方案。
第一個(gè)增強(qiáng)是在RayData調(diào)度器重中進(jìn)行任務(wù)重新分配。簡(jiǎn)單來(lái)說(shuō)就是將RayData Actor Pool中失敗的task分派給Actor Pool中其他的actor運(yùn)行。具體做法是,將Actor Pool中actor的max_restarts設(shè)置為0,也就是完全由RayData掌控actor的生命周期,這樣在actor掛掉后RayCore不會(huì)再重啟它。當(dāng)RayData調(diào)度器檢測(cè)到actor異常退出時(shí),原先分配給它的未完成的task,會(huì)被重新分配給其他actor。RayData調(diào)度器重新創(chuàng)建一個(gè)map actor,如果沒(méi)有資源,actor處在pending狀態(tài),但是這不會(huì)阻礙RayData任務(wù)的正常運(yùn)行。一旦新的資源加入,actor就會(huì)變?yōu)閞unning,重新加入到actor pool中。
任務(wù)重新分配的策略非常簡(jiǎn)單,但會(huì)引入一個(gè)新問(wèn)題。由于任務(wù)重新分配需要將 actor 的 max_restarts 參數(shù)設(shè)置為 0,那么當(dāng)一個(gè)object丟失時(shí),就無(wú)法再依賴 RayCore 的血緣重建來(lái)重建object。例如,在右側(cè)的圖中,map算子1的輸出作為 map算子2 的輸入。如果運(yùn)行 map算子1的節(jié)點(diǎn)異常退出,它輸出對(duì)象就會(huì)丟失,當(dāng) map算子 2 嘗試讀取該對(duì)象時(shí),會(huì)出現(xiàn)object丟失的情況從而無(wú)法繼續(xù)處理。
為了解決這個(gè)問(wèn)題,進(jìn)一步提出了第二個(gè)優(yōu)化方案——由RayData 管理算子之間輸入輸出數(shù)據(jù)的血緣關(guān)系,而不是依賴 RayCore 的對(duì)象血緣關(guān)系。RayData 調(diào)度器引入了一個(gè)與血緣相關(guān)的數(shù)據(jù)結(jié)構(gòu)。每個(gè)算子都有一個(gè)表,用于記錄輸入和輸出數(shù)據(jù)之間的關(guān)系,其中key是輸出引用value是輸入引用,方便從輸出引用查找到輸入引用。當(dāng)下游算子(例如 Map-B)的 actor 遇到object丟失時(shí),意味著其上游(Map-A)的輸出丟失,RayData 調(diào)度器通過(guò)血緣表查找到input,并重新計(jì)算其輸出。當(dāng)一個(gè)block流經(jīng)所有算子產(chǎn)生輸出后,可以刪除其所有上游對(duì)應(yīng)的血緣表。目前該解決方案僅支持OneToOne算子,但是理論上也完全支持 AllToAll 算子。
對(duì)于大多數(shù)RayData用戶來(lái)說(shuō),一般使用的是穩(wěn)定資源,不會(huì)遇到前面介紹的問(wèn)題。但是我們認(rèn)為“RayData不穩(wěn)定資源下的問(wèn)題”其實(shí)是一個(gè)通用的問(wèn)題,這里介紹的解決方案具有一定的通用性。
隨著模型的逐漸增大,actor故障后恢復(fù)時(shí)間會(huì)比較長(zhǎng),下載+加載模型會(huì)達(dá)到分鐘級(jí),上游算子actor的故障對(duì)下游的影響會(huì)逐步放大。如上圖中,如果map op1的某個(gè)actor異常退出,短時(shí)間內(nèi)沒(méi)有恢復(fù),該actor的輸出已經(jīng)被調(diào)度給了map op2的兩個(gè)actor,map-op2-1和map-op2-2。map-op2-1和map-op2-2因?yàn)檩斎霐?shù)據(jù)沒(méi)有ready,會(huì)處于waiting空閑狀態(tài),等待上游恢復(fù),此時(shí)也無(wú)法繼續(xù)處理后續(xù)數(shù)據(jù),利用率降低。map op2給到下游map op3的輸出也減少,利用率也會(huì)降低。map op2和map op3變得空閑,也會(huì)觸發(fā)raydata的autoscale,這可能是不必要的,會(huì)有額外的抖動(dòng)。而任務(wù)重新分配和RayData血緣方案可以有效減小 actor 的異常退出對(duì)整個(gè)作業(yè)的影響。
總結(jié)一下今天分享的內(nèi)容。我們深入探討了如何使用Ray構(gòu)建可擴(kuò)展的Audio/Video數(shù)據(jù)處理Pipeline,分享了在不穩(wěn)定的 Kubernetes 節(jié)點(diǎn)上運(yùn)行 RayData 的經(jīng)驗(yàn),并提出了對(duì) RayData的改進(jìn)方案。