自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink 在 B 站的多元化探索與實(shí)踐

大數(shù)據(jù)
在過去的一年里,B站圍繞 Flink 主要做了三個方面的工作:平臺建設(shè)、增量化和 AI on Flink。實(shí)時平臺是實(shí)時業(yè)務(wù)的技術(shù)底座,也是 Flink 面向用戶的窗口,需要堅持持續(xù)迭代優(yōu)化,不斷增強(qiáng)功能,提升用戶效率。

本文整理自嗶哩嗶哩基礎(chǔ)架構(gòu)部資深研發(fā)工程師張楊在 Flink Forward Asia 2021 平臺建設(shè)專場的演講。主要內(nèi)容包括:

  • 平臺建設(shè)
  • 增量化
  • AI On Flink

在過去的一年里,B站圍繞 Flink 主要做了三個方面的工作:平臺建設(shè)、增量化和 AI on Flink。實(shí)時平臺是實(shí)時業(yè)務(wù)的技術(shù)底座,也是 Flink 面向用戶的窗口,需要堅持持續(xù)迭代優(yōu)化,不斷增強(qiáng)功能,提升用戶效率。增量化是我們在增量化數(shù)倉和流批一體上的嘗試,在實(shí)時和離線之間找到一個更好的平衡,加速數(shù)倉效率,解決計算口徑問題。AI 方向,我們也正在結(jié)合業(yè)務(wù)做進(jìn)一步的探索,與 AIFlow 社區(qū)進(jìn)行合作,完善優(yōu)化機(jī)器學(xué)習(xí)工作流。

一、 平臺建設(shè)

1.1 基礎(chǔ)功能完善

在平臺的基礎(chǔ)功能方面,我們做了很多新的功能和優(yōu)化。其中兩個重點(diǎn)的是支持 Kafka 的動態(tài) sink 和任務(wù)提交引擎的優(yōu)化。

我們遇到了大量這樣的 ETL 場景,業(yè)務(wù)的原始實(shí)時數(shù)據(jù)流是一條較大的混合數(shù)據(jù)流,包含了數(shù)個子業(yè)務(wù)數(shù)據(jù)。數(shù)據(jù)通過 Kafka 傳輸,末端的每個子業(yè)務(wù)都對應(yīng)單獨(dú)的處理邏輯,每個子業(yè)務(wù)都去消費(fèi)全量數(shù)據(jù),再進(jìn)行過濾,這樣的資源消耗對業(yè)務(wù)來說是難以接受的,Kafka 的 IO 壓力也很大。因此我們會開發(fā)一個 Flink 任務(wù),對混合數(shù)據(jù)流按照子業(yè)務(wù)進(jìn)行拆分,寫到子業(yè)務(wù)對應(yīng)的 topic 里,讓業(yè)務(wù)使用。

技術(shù)實(shí)現(xiàn)上,早期 Flink SQL 的寫法就是寫一個 source 再寫多個 sink,每個 sink 對應(yīng)一個業(yè)務(wù)的 topic,這確實(shí)可以滿足短期的業(yè)務(wù)訴求,但存在的問題也較多:

第一是數(shù)據(jù)的傾斜,不同的子業(yè)務(wù)數(shù)據(jù)量不同,數(shù)據(jù)拆分后,不同 sink 之處理的數(shù)據(jù)量也存在較大差別,而且 sink 都是獨(dú)立的 Kafka producer,高峰期間會造成 sink 之間資源的爭搶,對性能會有明顯的影響;

第二是無法動態(tài)增減 sink,需要改變 Flink SQL 代碼,然后重啟任務(wù)才能完成增減 sink。過程中,不僅所有下游任務(wù)都會抖動,還有一個嚴(yán)重的問題就是無法從 savepoint 恢復(fù),也就意味著數(shù)據(jù)的一致性無法保證;

第三是維護(hù)成本高,部分業(yè)務(wù)存在上百個子分流需求,會導(dǎo)致 SQL 太長,維護(hù)成本極高。

基于以上原因,我們開發(fā)了一套 Kafka 動態(tài) sink 的功能,支持在一個 Kafka sink 里面動態(tài)地寫多個 topic 數(shù)據(jù),架構(gòu)如上圖。我們對 Kafka 表的 DDL 定義進(jìn)行了擴(kuò)展,在 topic 屬性里支持了 UDF 功能,它會根據(jù)入倉的數(shù)據(jù)計算出這條數(shù)據(jù)應(yīng)該寫入哪個 Kafka 集群和 topic。sink 收到數(shù)據(jù)后會先調(diào)用 UDF 進(jìn)行計算,拿到結(jié)果后再進(jìn)行目標(biāo)集群和 topic 數(shù)據(jù)的寫入,這樣業(yè)務(wù)就不需要在 SQL 里編寫多個 sink,代碼很干凈,也易于維護(hù),并且這個 sink 被所有 topic 共用,不會產(chǎn)生傾斜問題。UDF 直接面向業(yè)務(wù)系統(tǒng),分流規(guī)則也會平臺化,業(yè)務(wù)方配置好規(guī)則后,分流實(shí)施自動生效,任務(wù)不需要做重啟。而且為了避免 UDF 的性能問題,避免用戶自己去開發(fā) UDF,我們提供了一套標(biāo)準(zhǔn)的分流,做了大量的緩存優(yōu)化,只要按照規(guī)范定義好分流,規(guī)則的業(yè)務(wù)表就可以直接使用 UDF。

目前內(nèi)部幾個千億級別的分流場景,都在這套方案下高效運(yùn)行中。

基礎(chǔ)功能上做的第二個優(yōu)化就是任務(wù)的提交引擎優(yōu)化。做提交器的優(yōu)化主要是因為存在以下幾個問題:

第一,本地編譯問題。Flink SQL 任務(wù)在 Yarn 上的部署有三種模式:per-job、application 和 yarn-session。早前我們一直沿用 per-job 模式,但是隨著任務(wù)規(guī)模變大,這個模式出現(xiàn)了很多的問題。per-job 模式下,任務(wù)的編譯是在本地進(jìn)行再提交到遠(yuǎn)程 app master,編譯消耗提交引擎的服務(wù)性能,在短時批量操作時很容易導(dǎo)致性能不足;

第二,多版本的支持問題。我們支持多個 Flink 版本,因此在版本與提交引擎耦合的情況下,需要維護(hù)多個不同代碼版本的提交引擎,維護(hù)成本高;

第三,UDF 的加載。我們一直使用 Flink 命令里的 -c 命令進(jìn)行 UDF 傳遞,UDF 代碼包存在 UDFS 上,通過 Hadoop 的 web HDFS 協(xié)議進(jìn)行 cluster 加載,一些大的任務(wù)啟動時,web HDFS 的 HTTP 端口壓力會瞬間增大,存在很大的穩(wěn)定隱患;

第四,代碼包的傳輸效率。用戶代碼包或者 Flink 引擎代碼包都要做多次的上傳下載操作,遇到 HDFS 反應(yīng)較慢的場景,耗時較長,而實(shí)時任務(wù)希望做到極致的快速上下線。

因此我們做了提交器的優(yōu)化:

首先引入了 1.11 版本以上支持的 application 模式,這個模式與 per-job 最大的區(qū)別就是 Flink 任務(wù)的編譯全部移到了 APP master 里做,這樣就解決了提交引擎的瓶頸問題;

在多版本的支持上面,我們對提交引擎也做了改造,把提交器與 Flink 的代碼徹底解耦,所有依賴 Flink 代碼的操作全部抽象了標(biāo)準(zhǔn)的接口放到了 Flink 源碼側(cè),并在 Flink 源碼側(cè)增加了一個模塊,這個模塊會隨著 Flink 的版本一起升級提交引擎,對通用接口的調(diào)用全部進(jìn)行反射和緩存,在性能上也是可接受的;

而且 Flink 的多版本源碼全部按照 maled 模式進(jìn)行管理,存放在 HDFS。按照業(yè)務(wù)指定的任務(wù)版本,提交引擎會從遠(yuǎn)程下載 Flink 相關(guān)的版本包緩存到本地,所以只需要維護(hù)一套提交器的引擎。Flink 任何變更完全和引擎無關(guān),升級版本提交引擎也不需要參與;

完成 application 模式升級后,我們對 UDF 和其他資源包的上傳下載機(jī)制也進(jìn)行了修改,通過 HDFS 遠(yuǎn)程直接分發(fā)到 GM/TM 上,減少了上傳下載次數(shù),同時也避免了 cluster 的遠(yuǎn)程加載。

1.2 新任務(wù)構(gòu)建模式

平臺之前支持 Flink 的構(gòu)建模式主要有兩種, SQL 和 JAR 包。兩者的優(yōu)劣勢都很明顯,SQL 簡單易用門檻低,但是不夠靈活,比如一些定時操作在 SQL 里面無法進(jìn)行。JAR 包功能完善也靈活,但是門檻高,需要學(xué)習(xí) Flink datastream 一整套 API 的概念,非開發(fā)人員難以掌握,而我們大量的用戶是數(shù)倉,這種JAR包的任務(wù)難以標(biāo)準(zhǔn)化管理。業(yè)務(wù)方大多希望使用 SQL,避免使用 JAR 包。

我們調(diào)研了平臺已有的 Datastream JAR 包任務(wù),發(fā)現(xiàn)大部分的 JAR 包任務(wù)還是以 Table API 為主,只有少量過程用 Datastream 做了一些數(shù)據(jù)的轉(zhuǎn)換,完成之后還是注冊成了 Table 進(jìn)行 Table 操作。如果平臺可以支持在 SQL 里面做一些復(fù)雜的自定義轉(zhuǎn)換,業(yè)務(wù)其實(shí)完全不需要編寫代碼。

因此我們支持了一種新的任務(wù)構(gòu)建模式——算子化,模塊化地構(gòu)建一個 Flink 任務(wù),混合 JAR 包與 SQL,在進(jìn)行任務(wù)構(gòu)建時,先定義一段 SQL,再定義一個 JAR 包,再接一段 SQL,每段都稱為算子,算子之間相互串聯(lián),構(gòu)成一個完整的任務(wù)。

采用 Flink 標(biāo)準(zhǔn)的 SQL 語法,對 JAR 包進(jìn)行了接口的限制,必須繼承平臺的接口定義進(jìn)行開發(fā)。輸入輸出都是定義好的 Datastream。它比 UDF 的擴(kuò)展性更強(qiáng),靈活性也更好。而且整個任務(wù)的輸入輸出基本可以做到和 SQL 同級別的管控力,算子的開發(fā)也比純 JAR 包簡單得多,不需要學(xué)習(xí)太多 Flink API 的操作,只需要對 Datastream 進(jìn)行變換。而且對于一些常用的公共算子,平臺可以統(tǒng)一開發(fā)提供,擁有更專業(yè)的性能優(yōu)化,業(yè)務(wù)方只要引用即可。

目前在實(shí)時數(shù)倉等一些偏固定業(yè)務(wù)的場景,我們都在嘗試進(jìn)行標(biāo)準(zhǔn)化算子的推廣和使用。

1.3 智能診斷

平臺建設(shè)的第三點(diǎn)是流任務(wù)的智能診斷。目前實(shí)時支持的業(yè)務(wù)場景包括 ETL、AI、數(shù)據(jù)集成等,且任務(wù)規(guī)模增長速度很快。越來越大的規(guī)模對平臺的服務(wù)能力也提出了更高的要求。

此前,平臺人員需要花費(fèi)很多的時間在協(xié)助業(yè)務(wù)解決資源或各種業(yè)務(wù)問題上,主要存在以下幾個方面的問題:

  • 資源配置:初始資源確認(rèn)困難,碎片化嚴(yán)重,使用資源周期性變化;
  • 性能調(diào)優(yōu):數(shù)據(jù)傾斜,網(wǎng)絡(luò)資源優(yōu)化,state 性能調(diào)優(yōu),gc 性能調(diào)優(yōu);
  • 錯誤診斷:任務(wù)失敗原因分析,修復(fù)建議。

這些問題日常都靠平臺人員兜底,規(guī)模小的時候大家勉強(qiáng)可以負(fù)擔(dān),但是規(guī)模快速變大后已經(jīng)完全無力消化,需要一套自動化的系統(tǒng)來解決這些問題。

因此我們做了一套流任務(wù)的智能診斷系統(tǒng),架構(gòu)如上圖。

系統(tǒng)會持續(xù)抓取任務(wù)運(yùn)行時的 metrics 進(jìn)行性能分析,分析完成后推給用戶,讓用戶自己執(zhí)行具體的優(yōu)化改進(jìn)操作;也會實(shí)時抓取任務(wù)失敗的日志,并與詞庫進(jìn)行匹配,將錯誤進(jìn)行翻譯,使用戶更容易理解,同時也會給出更好理解的解決方案,讓用戶自行進(jìn)行故障處理;同時還會根據(jù)任務(wù)的歷史運(yùn)行資源進(jìn)行自動化縮容處理,解決資源浪費(fèi)和資源不足的問題。

目前此功能已經(jīng)節(jié)省了整個隊列 10% 的資源左右,分擔(dān)了相當(dāng)一部分平臺的運(yùn)維壓力,在未來我們會持續(xù)進(jìn)行優(yōu)化迭代,更進(jìn)一步提高這套系統(tǒng)在自動化運(yùn)維上面的能力以及覆蓋度。

未來,在提交引擎方面,我們希望融合 Yarn session 模式與 application 模式做 session 的復(fù)用,解決任務(wù)上線的資源申請效率問題。同時希望大 state 任務(wù)也能夠在 session 的基礎(chǔ)上復(fù)用本地的 state,啟動時無需重新下載 state。

智能診斷方面,我們希望實(shí)現(xiàn)更多自動化的操作,實(shí)現(xiàn)自動進(jìn)行優(yōu)化改進(jìn),而不需要用戶手動操作,做到用戶低感知;擴(kuò)容縮容也會持續(xù)提速,目前縮容的頻率只在天級,擴(kuò)容還未實(shí)現(xiàn)自動化。未來我們希望整個操作的周期和頻率做到分鐘級的自動化。

算子方面,我們希望能統(tǒng)一目前的 SQL 和 JAR 包兩種模式,統(tǒng)一任務(wù)構(gòu)建方式,讓用戶以更低的成本更多復(fù)雜的操作,平臺也更方便管理。

二、增量化

上圖是我們早期的數(shù)據(jù)架構(gòu),是典型的 Lambda 架構(gòu)。實(shí)時和離線從源頭上就完全分離、互不干涉,實(shí)時占較低,離線數(shù)倉是核心的數(shù)倉模型,占主要的比例,但它存在幾個明顯的問題。

第一,時效性。數(shù)倉模型是分層架構(gòu),層與層之間的轉(zhuǎn)換靠調(diào)度系統(tǒng)驅(qū)動,而調(diào)度系統(tǒng)是有周期的,常見的基本都是天或小時。源頭生產(chǎn)的數(shù)據(jù),數(shù)倉各層基本需要隔一天或幾個小時才可見,無法滿足實(shí)時性要求稍高的場景;

第二,數(shù)據(jù)的使用效率低。ETL 和 adhoc 的數(shù)據(jù)使用完全一樣,沒有針對性的讀寫優(yōu)化,也沒有按照用戶的查詢習(xí)慣進(jìn)行重新組織,缺乏數(shù)據(jù)布局優(yōu)化的能力。

針對第一個問題,是否全部實(shí)時化即可?但是實(shí)時數(shù)倉的成本高,而且不太好做大規(guī)模的數(shù)據(jù)回溯。大部分業(yè)務(wù)也不需要做到 Kafka 的秒級時效。第二個問題也不好解決,流式寫入為了追求效率,對數(shù)據(jù)的布局能力較弱,不具備數(shù)據(jù)的重新組織能力。因此我們在實(shí)時和離線之間找到了一個平衡——做分鐘級的增量化。

我們采用 Flink 作為計算引擎,它的 checkpoint 是一個天然的增量化機(jī)制,實(shí)時任務(wù)進(jìn)行一次 checkpoint,產(chǎn)出一批增量數(shù)據(jù)進(jìn)行增量化處理。數(shù)倉來源主要有日志數(shù)據(jù)和 binlog 數(shù)據(jù),日志數(shù)據(jù)使用 Append 傳統(tǒng)的 HDFS 存儲即可做到增量化的生產(chǎn);binlog 數(shù)據(jù)是 update 模式,但 HDFS 對 update 的支持并不好,因此我們引入了 Hudi 存儲,它能夠支持 update 操作,并且具備一定的數(shù)據(jù)布局能力,同時它也可以做 Append 存儲,并且能夠解決 HDFS 的一些小文件問題。因此日志數(shù)據(jù)也選擇了 Hudi 存儲,采用 Append 模式。

最終我們的增量化方案由 Flink 計算引擎 + Hudi 存儲引擎構(gòu)成。

增量化場景的落地上,考慮到落地的復(fù)雜性,我們先選取了業(yè)務(wù)邏輯相對簡單、沒有復(fù)雜聚合邏輯的 ODS 和 DWD 層進(jìn)行落地。目前的數(shù)據(jù)是由 Flink 直接寫到 Hive 的 ODS 層,我們對此進(jìn)行了針對性的適配,支持了 Hive 表的增量化讀取,開發(fā)了 HDFSStreamingSource,同時為了避免對 HDFS 路徑頻繁掃描的壓力,ODS 層寫入時會進(jìn)行索引創(chuàng)建,記錄寫入的文件路徑和時間,只需要追蹤索引文件即可。

source 也是分層架構(gòu),有文件分發(fā)層和讀取層,文件分發(fā)層進(jìn)行協(xié)調(diào),分配讀取文件數(shù),防止讀取層某個文件讀取過慢堆積過多文件,中間的轉(zhuǎn)換能夠支持 FlinkSQL 操作,具備完整的實(shí)時數(shù)倉的能力。

sink 側(cè)我們引入了 Hudi connector,支持?jǐn)?shù)據(jù) Append 寫入 Hudi,我們還對 Hudi 的 compaction 機(jī)制進(jìn)行了一些擴(kuò)展,主要有三個:DQC 檢測、數(shù)據(jù)布局的優(yōu)化以及映射到 Hive 表的分區(qū)目錄。目前數(shù)據(jù)的布局依舊還很弱,主要依賴 Hudi 本身的 min、max 和 bloom 的優(yōu)化。

完成所有上述操作后,ODS 到 DWD 的數(shù)據(jù)時效性有了明顯提升。

從數(shù)據(jù)生產(chǎn)到 DWD 可見,提高到了分鐘級別;DWD 層的生產(chǎn)完成時間也從傳統(tǒng)的 2:00~5:00 提前到了凌晨 1 點(diǎn)之前。此外,采用 Hudi 存儲也為日后的湖倉一體打下了以一個好的基礎(chǔ)。

除了日志數(shù)據(jù),我們對 CDC 也采用這套方案進(jìn)行加速。基于 Flink 的 CDC 能力,針對 MySQL 的數(shù)據(jù)同步實(shí)現(xiàn)了全增量一體化操作。依賴 Hudi 的 update 能力,單任務(wù)完成了 MySQL 的數(shù)據(jù)同步工作,并且數(shù)據(jù)只延遲了一個 checkpoint 周期。CDC 暫時不支持全量拉取,需要額外進(jìn)行一次全量的初始化操作,其他的流程則完全一致。

Hudi 本身的模型和離線的分區(qū)全量有較大的區(qū)別,為了兼容離線調(diào)度需要的分區(qū)全量數(shù)據(jù),我們也修改了 Hudi 的 compaction 機(jī)制。在做劃分區(qū)的 compaction 時會做一次數(shù)據(jù)的全量拷貝,生成全量的歷史數(shù)據(jù)分區(qū),映射到 Hive 表的對應(yīng)分區(qū)。同時對于 CDC 場景下的數(shù)據(jù)質(zhì)量,我們也做了很多的保障工作。

為了保證 CDC 數(shù)據(jù)的一致性,我們從以下 4 個方面進(jìn)行了完善和優(yōu)化:

第一,binlog 條數(shù)的一致性。按照時間窗口進(jìn)行 binlog 生產(chǎn)側(cè)和消費(fèi)側(cè)的條數(shù)校驗,避免中間件丟數(shù)據(jù);

第二,數(shù)據(jù)內(nèi)容抽樣檢測。考慮到成本,我們在 DB 端和源端、Hudi 存儲端抽樣增量數(shù)據(jù)進(jìn)行內(nèi)容的精確比較,避免 update 出錯;

第三,全鏈路的黑盒測試。測試庫表模擬了線上情況,進(jìn)行 7×24 小時不間斷的 Kafka 生產(chǎn) MySQL 數(shù)據(jù),然后串通整套流程防止鏈路故障;

第四,定期的全量對比。業(yè)務(wù)的庫表一般比較大,歷史數(shù)據(jù)會低頻地定期進(jìn)行全量比對,防止抽樣觀測漏掉的錯誤。

剛開始使用 Hudi 的時候,Hudi on Flink 還是處于初級的階段,因此存在大量問題,我們也一起和 Hudi 社區(qū)做了大量優(yōu)化工作,主要有 4 個方面:Hudi 表的冷啟動優(yōu)化、checkpoint 一致性問題解決、Append 效率低的優(yōu)化以及 get list 的性能問題。

首先是冷啟動的問題。Hudi 的索引存儲在 Flink state 里,一張存在的 Hudi 表如果要通過 Flink 進(jìn)行增量化更新寫入,就必然面臨一個問題:如何把 Hudi 表已有的信息寫入到 Flink state 里。

MySQL 可以借助 Flink CDC 完成全量 + 增量的過程構(gòu)建,可以繞開從已有 Hudi 表冷啟動的過程,但是 TiDB 不行,它的存量表在借助別的手段構(gòu)建完之后,想要增量化就會面臨如何從 FlinkSQL 冷啟動的問題。

社區(qū)有個原始方案,在記錄所有的算子 BucketAssigner 里面讀取全部的 Hudi 表數(shù)據(jù),然后進(jìn)行 state 構(gòu)建,從功能上是可行的,但是在性能上根本無法接受,尤其是大表,由于 Flink 的 key state 機(jī)制原理,BucketAssigner 每個并發(fā)度都要讀取全表數(shù)據(jù),然后挑選出屬于當(dāng)前這個并發(fā)的數(shù)據(jù)存儲到自己的 state 里面,每個并方案都要去讀全量的表,這在性能上難以滿足。

業(yè)務(wù)能啟動的時間太長了,很多百億級別的表能啟動的時間可能是在幾個小時,而且讀取的數(shù)據(jù)太多,很容易失敗。

和社區(qū)進(jìn)行了溝通交流后,他們提供了一套全新的方案,新增了獨(dú)立的 Bootstrap 機(jī)制,專門負(fù)責(zé)冷啟動過程。Bootstrap 由 coordinator 和 IndexBootstrap 兩個算子組成,IndexBootstrap 負(fù)責(zé)讀取工作,coordinator 負(fù)責(zé)協(xié)調(diào)分配文件讀取,防止單個 IndexBootstrap 讀取速度慢而降低整個初始化流程的效率。

IndexBootstrap 算子讀取到數(shù)據(jù)后,會按照與業(yè)務(wù)數(shù)據(jù)一樣的 Keyby 規(guī)則,Keyby 到對應(yīng)的 BucketAssigner 算子上,并在數(shù)據(jù)上面打標(biāo),告知 BucketAssigner 這條數(shù)據(jù)是有 Bootstrap 的,不需要往下游 writer 發(fā)送。整個流程里,原始數(shù)據(jù)只需讀取一遍,而且是多并發(fā)一起讀,效率獲得了極大的提升。而且 BucketAssigner 只需要處理自己應(yīng)該處理的數(shù)據(jù),不再需要處理全表的數(shù)據(jù)。

其次是 Hudi 的 checkpoint 一致性問題。Hudi on checkpoint 在每次 checkpoint 完成的時候會進(jìn)行一次 commit 操作,具體流程是 writer 算子在 checkpoint 的時候 flush 內(nèi)存數(shù)據(jù),然后給 writer coordinator 算子匯報匯總信息,writer coordinor 算子收到匯報信息時會將其緩存起來,checkpoin 完成后,收到 notification 信息時會進(jìn)行一次 commit 操作。

但是在 Flink 的 checkpoint 機(jī)制里,notification 無法保證一定成功,因為它并不在 checkpoint 的生命周期里,而是一個回調(diào)操作,是在 checkpoin 成功后執(zhí)行。checkpoin 成功后,如果這個接口還沒有執(zhí)行完成,commit 操作就會丟失,也就意味著 checkpoint 周期內(nèi)的數(shù)據(jù)會丟失。

針對上述問題,我們進(jìn)行了重構(gòu)。Writer 算子在 cehckpoint 時,會對匯報的 writer coordinator 的信息進(jìn)行 state 持久化,任務(wù)重啟后重新匯報給 writer coordinator 算子。writer coordinator 算子再收集所有 writer 算子信息并做一次 commit 判斷,確保對應(yīng)的 commit 已經(jīng)完成。此時,Writer 算子也會保持阻塞,確保上次持久化的 commit 完成之后才會處理最新的數(shù)據(jù),這樣就對齊了 Hudi 與 Flink 的 checkpoint 機(jī)制,保證了邊界場景數(shù)據(jù)的一致性。

第三是針對 Hudi 在 Append 寫入場景下的優(yōu)化。

由于 Append 模式是復(fù)用 update 模式的代碼,所以在沒有重復(fù) key 的 Append 場景下,很多操作是可以簡化的,因為 update 為了處理重復(fù),需要做很多額外的操作。如果能夠簡化這些操作,吞吐能力可以有較大的提升。

第一個操作是小文件的查找,每次 checkpoint 后,update 都會重新 list 文件,然后從文件中找到大小不達(dá)標(biāo)的文件繼續(xù) open 并寫入。update 場景存在傾斜,會造成很多文件大小不均勻,但是 Append 場景不存在這種問題,它所有的文件大小都很均勻;

第二個是 keyby。在 update 的模式下面,單個 key 只能被一個節(jié)點(diǎn)處理,因此上游需要按照 Hudi key 進(jìn)行 keyby 操作。但是 Append 場景沒有重復(fù) key,可以直接用 chain 代替 keyby,大大減少了節(jié)點(diǎn)之間序列化傳輸?shù)拈_銷。同時 Append 場景下不存在內(nèi)存合并,整體效率也會更高。

最后一個是 GetListing 的優(yōu)化。Hudi 表與底層 HDFS 文件的映射是通過 ViewManager 來做的,Hudi table 對象和 TimelineService 都會自己去初始化一個 ViewManager,每個 ViewManager 在初始化的時候都會進(jìn)行 HDFS 目錄的 list 操作,由于每個并發(fā)都持有多個 Hudi table 或 TimelineService,會造成大并發(fā)任務(wù)啟動時 HDFS 的壓力很大。我們對 TimelineService 進(jìn)行了單例化的優(yōu)化,保證每個進(jìn)程只有一 TimelineService,能夠數(shù)倍地降低 HDFS list 的壓力。后續(xù)我們還會基于 Flink 的 coordinator 機(jī)制做任務(wù)級別的單例化。

未來,我們會繼續(xù)挖掘增量的能力,給業(yè)務(wù)帶來更多的價值。

三、AI on Flink

傳統(tǒng)的機(jī)器學(xué)習(xí)鏈路里數(shù)據(jù)的傳輸、特征的計算以及模型的訓(xùn)練,都是離線處理的,存在兩個大的問題。

第一個是時效性低,模型和特征的更新周期基本是 t+1 天或者 t+1 小時,在追求時效性的場景下體驗并不好。第二個是計算訓(xùn)練的效率很低,必須等天或小時的分區(qū)數(shù)據(jù)全部準(zhǔn)備好之后才能開始特征計算和訓(xùn)練。全量分區(qū)數(shù)據(jù)導(dǎo)致計算和訓(xùn)練的壓力大。

在實(shí)時技術(shù)成熟后,大部分模型訓(xùn)練流程都切換到實(shí)時架構(gòu)上,數(shù)據(jù)傳輸、特征計算和訓(xùn)練都可以做到幾乎實(shí)時,從全量變成了短時的小批量增量進(jìn)行,訓(xùn)練的壓力也大大減輕。同時由于實(shí)時對離線的兼容性,在很多場景比如特征回補(bǔ)上,也可以嘗試使用 Flink 的流批一體進(jìn)行落地。

上圖是我們典型的機(jī)器學(xué)習(xí)鏈路圖。從圖上可以看出,樣本數(shù)據(jù)生產(chǎn)特征的計算、模型的訓(xùn)練和效果的評估都大量實(shí)時化,中間也夾雜著少量離線過程,比如一些超長周期的特征計算。

同時也可以看出,完整的業(yè)務(wù)的模型訓(xùn)練鏈路長,需要管理和維護(hù)大量的實(shí)時任務(wù)和離線任務(wù)。出現(xiàn)故障的時候,具體問題的定位也異常艱難。如何在整個機(jī)器學(xué)習(xí)的鏈路中同時管理號這么多實(shí)時和離線任務(wù),并且讓任務(wù)之間的協(xié)同和調(diào)度有序進(jìn)行、高效運(yùn)維,是我們一直在思考的問題。

因此我們引入了 Flink 生態(tài)下 AIFlow 系統(tǒng)。AIFlow 本身的定位就是做機(jī)器學(xué)習(xí)鏈路的管理,核心的機(jī)器計算引擎是 Flink,這和我們的訴求不謀而合。這套系統(tǒng)有三個主要的特性符合我們的業(yè)務(wù)需求。

第一,流批的混合調(diào)度。在我們實(shí)際的業(yè)務(wù)生產(chǎn)上,一套完整的實(shí)時鏈路都會夾雜著實(shí)時和離線兩種類型的任務(wù)。AIFlow 支持流批的混合調(diào)度,支持?jǐn)?shù)據(jù)依賴與控制依賴,能夠很好地支持我們現(xiàn)有的業(yè)務(wù)形態(tài)。并且未來在 Flink 流批一體方面也會有更多的發(fā)揮空間;

第二,元數(shù)據(jù)的管理,AIFlow 對所有數(shù)據(jù)和模型都支持版本管理。有了版本管理,各種實(shí)驗效果和實(shí)驗參數(shù)就都可追溯;

第三,開放的 notification 機(jī)制。整個鏈路中存在很多的外部系統(tǒng)節(jié)點(diǎn),難以歸納到平臺內(nèi)部,但是通過 notification 機(jī)制,可以打通 AIFlow 內(nèi)部節(jié)點(diǎn)與外部節(jié)點(diǎn)的依賴。整套系統(tǒng)的部署分為三部分,notification service、 meta service 以及 scheduler,擴(kuò)展性也很好,我們在內(nèi)部化的過程中實(shí)現(xiàn)了很多自己的擴(kuò)展。

實(shí)時平臺在今年引入 AIFlow 的之后已經(jīng)經(jīng)歷了兩個版本的迭代,V2 版本是社區(qū) release 之前的一個內(nèi)部版本,我們進(jìn)行了分裝提供試用。V3 版本是今年 7 月社區(qū)正式 release 之后,我們進(jìn)行了版本的對接。

AIFlow 的構(gòu)建使用 Python 進(jìn)行描述,運(yùn)行時會有可視化的節(jié)點(diǎn)展示,可以很方便地追蹤各個節(jié)點(diǎn)的狀態(tài),運(yùn)維也可以做到節(jié)點(diǎn)級的管理,不需要做整個鏈路級別的運(yùn)維。

未來我們會對這套系統(tǒng)在流批一體、特征管理以及模型訓(xùn)練三個方向進(jìn)行重點(diǎn)的迭代與開發(fā),更好地發(fā)揮它的價值。

責(zé)任編輯:未麗燕 來源: Apache Flink
相關(guān)推薦

2021-05-20 09:55:23

Apache Flin阿里云大數(shù)據(jù)

2024-05-13 10:44:22

云計算

2023-11-03 12:54:00

KAFKA探索中間件

2024-07-08 14:41:51

2011-05-05 14:52:10

無縫拼接拼接大屏幕

2025-02-26 01:17:57

2023-02-28 12:12:21

語音識別技術(shù)解碼器

2022-04-24 11:27:05

邊緣計算數(shù)據(jù)自動駕駛

2019-04-30 09:00:33

SQL數(shù)據(jù)庫Apache Flin

2023-04-04 12:38:50

GPT機(jī)器人LLM

2021-03-17 07:59:36

邊緣計算遠(yuǎn)程辦公數(shù)字化轉(zhuǎn)型

2022-10-08 15:41:08

分布式存儲

2014-01-15 16:46:07

多元化

2022-09-15 15:18:23

計算實(shí)踐

2015-05-28 17:34:50

順豐田民IT驅(qū)動

2010-05-13 23:34:39

統(tǒng)一通信環(huán)境

2015-12-14 17:36:16

5G無線網(wǎng)絡(luò)

2023-07-19 08:58:00

數(shù)據(jù)管理數(shù)據(jù)分析

2024-04-26 12:13:45

NameNodeHDFS核心

2020-11-04 10:09:06

物聯(lián)網(wǎng)智慧辦公技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號