流批一體的實時特征工程平臺建設(shè)實踐
本次分享分為四大部分,第一部分總體介紹 FeatHub 在特征開發(fā)、部署、監(jiān)控、分享過程中面臨的場景、目標、痛點和挑戰(zhàn);第二部分介紹 FeatHub 的架構(gòu)思路實踐,及相關(guān)核心概念;第三部分介紹 FeatHub 在使用過程中的 API 基本使用、基本計算功能,樣例場景的代碼實踐,還有性能優(yōu)化,未來的擴展目標,以及開源社區(qū)的共建,提供項目的學(xué)習(xí)、開發(fā)使用,還將分享 FeatHub 歷史數(shù)據(jù)的回放功能, 支持離線、近線、在線處理和阿里云上下游組件的支持等問題。
一、為什么需要 FeatHub
1、目標場景
(1)需要 Python 環(huán)境的數(shù)據(jù)科學(xué)家
今天大部分流行的機器學(xué)習(xí)的推理和訓(xùn)練程序基本都是由數(shù)據(jù)科學(xué)家用 Python 來編寫的,比如流行的 TensorFlow、PyTorch 以及一些傳統(tǒng)機器學(xué)習(xí)場景中用到的 scikit-learn 等等。我們希望支持數(shù)據(jù)科學(xué)家繼續(xù)使用熟悉的 Python 編寫特征工程代碼來完成端到端機器學(xué)習(xí)鏈路的開發(fā)與部署,并且能夠使用他們所熟悉的 Python 生態(tài)環(huán)境中的庫。
(2)生成實時特征
越來越多的機器學(xué)習(xí)應(yīng)用在往實時方向發(fā)展,通過實時處理可以提高機器學(xué)習(xí)的效率和準確度。為了達到目標,需要生成實時特征。這里不僅僅是去實時獲取查詢特征,而是要實時生成特征。例如需要實時獲取用戶在最近兩分鐘內(nèi)的點擊次數(shù),為此需要使用流式計算引擎完成實時特征計算。
(3)需要開源方案支持多云部署
越來越多的中小型公司希望做到多云部署,以得到生產(chǎn)的安全保證,以及獲得云廠商之間的競價優(yōu)勢。因此我們的方案不要求用戶綁定一個云廠商,而是要讓用戶能夠自由地在不同云廠商之間做選擇,甚至在私有云部署特征工程作業(yè)。
這是 FeatHub 項目設(shè)立之初所希望滿足的一些條件。
2、實時特征工程的痛點
今天已經(jīng)有很多公司在開發(fā)實時特征工程作業(yè)。其中存在一些痛點,涵蓋了特征的整個生命周期,包含開發(fā)、部署、監(jiān)控、以及之后的分享。
(1)開發(fā)難度高
① 特征穿越
開發(fā)階段,用的比較多的是實時特征框架 Apache Flink,因為 Flink 已經(jīng)基本上是實時流計算的事實標準,但是用 Flink 或者類似的框架來開發(fā)實時特征存在著需要解決特征穿越的難點。很多數(shù)據(jù)科學(xué)家并不了解特征穿越的解決經(jīng)驗,并且需要比較多的學(xué)習(xí)時間和成本來解決這類問題,這是開發(fā)階段的主要痛點。
(2)部署難度高
① 需要手動翻譯
很多公司會有一個專門的平臺團隊把數(shù)據(jù)科學(xué)家寫的單進程 Python 作業(yè)翻譯成可分布式執(zhí)行的 Flink 或者 Spark 作業(yè),來實現(xiàn)高性能高可用的部署。其翻譯過程會增加整個開發(fā)生命周期長度。并且因為還需要額外的人力去做翻譯工作,增加了開發(fā)成本,更進一步帶來了引入 Bug 的可能。另一撥人將數(shù)據(jù)科學(xué)家的作業(yè)翻譯之后的邏輯未必和原先的邏輯保持一致,這樣就帶來更多的 Debug 工作量。
(3)監(jiān)控難度高
① 特征分布變化
特征工程作業(yè)的整個質(zhì)量和效率不只是取決于作業(yè)有沒有 Bug,還依賴于上游的輸入數(shù)據(jù)數(shù)值分布能滿足一些特性,例如能接近于訓(xùn)練時的數(shù)據(jù)數(shù)值分布。很多作業(yè)的推理效果下降,經(jīng)常是由于上游作業(yè)生產(chǎn)的數(shù)據(jù)分布發(fā)生了變化。這種情況下,需要開發(fā)者去追蹤整個鏈路,一段段去看在哪個地方的特征數(shù)據(jù)分布發(fā)生了變化,根據(jù)具體情況再去看是否需要重新訓(xùn)練或者解決 Bug。這部分人力工作量過大也是一個痛點。
(4)分享難度高
① 開發(fā)工作重復(fù)
雖然很多特征計算作業(yè)的開發(fā)團隊和場景不同,但其實用了類似甚至相同的特征定義。很多公司中沒有一個很好的渠道,讓公司內(nèi)不同團隊能查詢和復(fù)用已有特征。這就導(dǎo)致不同團隊經(jīng)常需要做重復(fù)開發(fā),甚至對于相同特征需要重復(fù)跑作業(yè)去生成一些特征。這帶來了人力和計算/儲存資源的浪費,因為需要更多的計算、內(nèi)存、存儲空間去生成相同特征。
② point-in-time correct 語義
為了讓大家能夠理解什么叫特征穿越,上圖給出了一個簡單例子,來展現(xiàn)這個問題。圖左上表是用戶的一個行為特征,表達了在不同時間節(jié)點,對于一個給定 ID 的用戶,在最近兩分鐘內(nèi)的點擊數(shù)。這個點擊數(shù)可能幫助我們推理用戶是否會點擊某個廣告。為了用這些特征去做訓(xùn)練,通常需要將特征拼接到用戶帶有 Label 的一些數(shù)據(jù)集上。圖左下表展現(xiàn)的是一個用戶實際有沒有點擊廣告的一些正樣本和負樣本的數(shù)據(jù)集,標注了在不同的時間點,用戶所產(chǎn)生的正樣本或負樣本。為了將這兩個數(shù)據(jù)集中的特征拼接起來,形成訓(xùn)練用的數(shù)據(jù)集,通常需要根據(jù)用戶 ID 作為 key 進行特征拼接。如果只是簡單地進行 Table Join,不考慮時間戳,就可能產(chǎn)生特征穿越問題。 例如在 6:03 分時,用戶最近 2 分鐘點擊數(shù)應(yīng)該是 10,但拼接得到的特征值可能是來自 7:00 分時的 6。這種特征穿越會帶來實際推理效果的下降。一個具有 point-in-time correct 語義的 Join 結(jié)果應(yīng)該如下圖所示:
為了在樣本拼接時避免特征穿越,對于在上圖左表中的每一條數(shù)據(jù),應(yīng)該在維表的多個版本特征當中找到時間戳小于并且最接近于左表中的時間戳的特征數(shù)值,并將其拼接到最終生成的訓(xùn)練數(shù)據(jù)集上。這樣一個具有 point-in-time correct 語義的拼接,將產(chǎn)生上圖右邊所顯示的訓(xùn)練數(shù)據(jù)集。針對不同的時間點,都有所對應(yīng)最近兩分鐘內(nèi)產(chǎn)生的特征值。這樣生成的訓(xùn)練數(shù)據(jù)集可以提高訓(xùn)練和推理的效果。
3、Feature Store 的核心場景
接下來介紹 FeatHub 作為一個 Feature Store,對于整個特征開發(fā)周期的每一階段試圖解決的問題和提供的工具。
(1)特征開發(fā)
在特征開發(fā)階段,F(xiàn)eatHub 會提供一個基于 Python 的具有高易用性的 SDK,讓用戶能簡潔地表達特征的計算邏輯。特征計算本質(zhì)是一個特征的 ETL。開發(fā)階段最重要的是 SDK 的易用性和簡潔性。
(2)特征部署
在特征部署階段,F(xiàn)eatHub 會提供執(zhí)行引擎,實現(xiàn)高性能,低延遲的特征計算邏輯的部署,并且能對接不同的特征存儲。部署階段最重要的是執(zhí)行引擎的性能和對接不同特征存儲的能力。
(3)特征告警
在特征監(jiān)控階段,為了方便開發(fā)者及時發(fā)現(xiàn)特征數(shù)值分布的變化并做出應(yīng)對,F(xiàn)eatHub 將來會產(chǎn)生一些常用指標來覆蓋常見的特征質(zhì)量問題,例如具有非法數(shù)值的特征比例,或者特征平均值,并根據(jù)這些指標進行報警,去及時通知負責(zé)人調(diào)查相關(guān)特征分布變化的原因和做出應(yīng)對,來維護端到端的推薦鏈路的效果。
(4)特征分享
在特征分享階段,F(xiàn)eatHub 將來會提供特征的注冊和搜索能力,支持同一公司內(nèi)不同團隊的開發(fā)人員去查詢自己想要的特征是不是已經(jīng)存在,并復(fù)用這些特征定義和已經(jīng)產(chǎn)生的特征數(shù)據(jù)。
上圖中說明 FeatHub 的核心特點。在開發(fā)階段,F(xiàn)eatHub 能提供簡單易用的 SDK,支持具有 point-in-time correct 語義的特征拼接,特征聚合等邏輯。在部署階段,F(xiàn)eatHub 能支持高吞吐、低延遲的特征生成,支持使用 Flink 作為執(zhí)行引擎來計算特征;并且能支持多種特征存儲系統(tǒng),方便用戶自由選擇所希望使用的存儲類型。在監(jiān)控階段, FeatHub 將能提供實時指標來監(jiān)控特征分布的變化,包含離線和實時監(jiān)控,方便開發(fā)者及時發(fā)現(xiàn)問題。在分享階段,F(xiàn)eatHub 將會提供簡單易用的 Web UI 以及 SDK,支持開發(fā)者注冊,搜索和復(fù)用特征。
在 Feature Store 領(lǐng)域內(nèi)已經(jīng)有一些具有代表性的 Feature Store 項目,例如今年初 LinkedIn 開源的 Feathr,以及開源了多年的 Feast。我們調(diào)研了這些項目,發(fā)現(xiàn)他們并不能很好地達成我們提出的目標場景。
FeatHub 相比現(xiàn)有方案,帶來的額外價值包括:
① 簡單易用的 Python SDK。FeatHub 的 SDK 參考了已有的 Feature Store 項目的 SDK,能支持這些項目的核心功能,并進一步提升了 SDK 的抽象能力和易用性,
② 支持單機上的開發(fā)和實驗。開發(fā)者不需要對接分布式的 Flink 或 Spark 集群來跑實驗,而只需要使用單機上的 CPU 或者內(nèi)存資源就可以進行開發(fā)和實驗,并能使用 scikit-learn 等單機上的機器學(xué)習(xí)算法庫。
③ 無需修改代碼即可切換執(zhí)行引擎。當用戶完成單機上的開發(fā)后,可以將單機執(zhí)行引擎切換到 Flink 或 Spark 等分布式執(zhí)行引擎,而無需修改表達特征計算邏輯的代碼。使用 Flink 作為執(zhí)行引擎可以讓 Feathub 支持高吞吐、低延時的實時特征計算。FeatHub 將來會進一步支持使用 Spark 作為執(zhí)行引擎,讓用戶在離線場景中可以得到潛在的更好的吞吐性能,根據(jù)場景自由選擇最合適的執(zhí)行引擎。
④ 提供執(zhí)行引擎的擴展能力。FeatHub 不僅可以支持以 Flink、Spark 作為執(zhí)行引擎,還支持開發(fā)者自定義執(zhí)行引擎,使用公司內(nèi)部自研的執(zhí)行引擎進行特征 ETL。
⑤ 代碼開源,使得用戶可以自由選擇部署 FeatHub 的云廠商,也可以在私有云中進行部署。
二、FeatHub 架構(gòu)與核心概念
1、架構(gòu)
以上是包含 FeatHub 主要模塊的架構(gòu)圖。最上層提供了一套 Python SDK,支持用戶定義數(shù)據(jù)源、數(shù)據(jù)終點以及特征計算邏輯。由 SDK 所定義的特征可以注冊到特征元數(shù)據(jù)中心,支持其他用戶和作業(yè)來查詢和復(fù)用特征,甚至可以基于特征元數(shù)據(jù)進一步分析特征血緣。特征定義包含了特征的 source、sink,以及常見的計算邏輯,例如 UDF 調(diào)用、特征拼接,基于 over 窗口與滑動窗口的聚合等。當需要取生成用戶所定義的特征時,F(xiàn)eatHub 會提供一些內(nèi)置的 Feature Processor,也就是執(zhí)行引擎,去執(zhí)行已有特征的計算邏輯。當用戶需要在單機上做實驗時,可以使用 Local Processor 使用單機上的資源,無需對接一個遠程的集群。當需要生成實時特征時,可以使用 Flink Processor 完成高吞吐、低延時的流式特征計算。
將來也可以支持類似于 Lambda Function 的 Feature Service 來實現(xiàn)在線的特征計算,以及對接 Spark 來完成高吞吐的離線特征計算。執(zhí)行引擎可以對接不同的離線和在線特征儲存系統(tǒng),例如用 Redis 完成在線特征儲存,用 HDFS 完成離線特征儲存,以及用 Kafka 完成近線特征儲存。
上圖展現(xiàn)了 FeatHub 如何被用戶使用,以及對接下游的機器學(xué)習(xí)訓(xùn)練和推理程序,用戶或開發(fā)者將通過 SDK 來表達所希望計算的特征,然后提交到執(zhí)行引擎上進行部署。特征經(jīng)過計算后,需要輸出到特征儲存,例如 Redis 和 HDFS。一個機器學(xué)習(xí)離線訓(xùn)練程序可以直接讀取 HDFS 中的數(shù)據(jù)去做批量訓(xùn)練。一個在線的機器學(xué)習(xí)推理程序可以直接讀取 Redis 中的數(shù)據(jù)進行在線推理。
2、核心概念
上圖展現(xiàn)了 FeatHub 中的核心概念之間的關(guān)系。一個 TableDescriptor 表達一組特征的集合。TableDescriptor 經(jīng)過邏輯轉(zhuǎn)換可以生產(chǎn)一個新的 TableDescriptor。
TableDescriptor 分為兩類。其中 FeatureTable 表達的是具有特定物理地址的表,例如可以是一個在 Redis 中的表,也可以是一個在 HDFS 中的表。FeatureView 則是一些不一定有物理地址的邏輯表,通常是從一個 FeatureTable 經(jīng)過一連邏輯串轉(zhuǎn)換后得到的。
FeatureView 有如下 3 個子類:
① DerivedFeatureView 輸出的特征表和其輸入的特征表(i.e. source)的行基本是一對一的。它可以支持表達單行轉(zhuǎn)換邏輯(e.g. 加減乘除),over window 聚合邏輯,以及特征拼接邏輯。它可用于生成訓(xùn)練數(shù)據(jù)。例如在之前所介紹的例子中,需要將訓(xùn)練樣本去拼接來自不同維表的特征以得到實際的訓(xùn)練數(shù)據(jù),就可以使用 DerivedFeatureView 來完成。
② SlidingFeatureView 支持表達由滑動窗口計算得到的特征。它輸出的特征表和其輸入的特征表的行不一定是一對一的。這是因為即使沒有新的輸入,滑動窗口計算得到的特征數(shù)值會隨著時間流逝而變化。SlidingFeatureView 可以用于維護實時生成的特征,并輸出到在線特征存儲,例如 Redis,用于在線推理。例如,我們可以用 SlidingFeatureView 去計算每個用戶最近兩分鐘內(nèi)點擊某個網(wǎng)頁的次數(shù),并將特征數(shù)值實時更新到 Redis 中,然后廣告推薦鏈路就可以在線查詢這個特征的值來做在線推理。
③ OnDemandFeatureView 可以與 Feature Service 用在一起,支持在線特征計算。例如在使用高德地圖時,開發(fā)者可能會希望在收到用戶的請求之后,根據(jù)用戶當前的物理位置與上一次發(fā)送請求時的物理位置,計算出用戶移動的速度和方向速度,來協(xié)助推薦路線的決策。這些特征必須在收到用戶請求的時候進行在線計算得到。OnDemandFeatureView 可以用于支持這類場景。
Transform 表達的是特征計算邏輯。FeatHub 當前支持如下 5 種特征計算邏輯:
① Expression 支持用戶基于一個 DSL 語言表達單行的特征計算邏輯。其表達能力接近SQL 語言中的 select 語句,可以支持加減乘除和內(nèi)置函數(shù)調(diào)用,可以讓熟悉 SQL 的開發(fā)者快速上手。
② Join 表達的是特征拼接邏輯。開發(fā)者可以指定維表的名字和需要拼接的特征名字等信息。
③ PythonUDF 支持用戶自定義 Python 函數(shù)來計算特征。
④ OverWindow 表達的是 Over 窗口聚合邏輯。例如在收到一行數(shù)據(jù)時,用戶希望根據(jù)之前的 5 行數(shù)據(jù),進行聚合并計算有多少條數(shù)據(jù)符合某個規(guī)則。
⑤ SlidingWindow 表達的是滑動窗口聚合邏輯。
從上圖中可以看到,通常一個特征 ETL 作業(yè)會從特征源表讀取特征,經(jīng)過多次特征計算邏輯產(chǎn)生新的特征,并將生成的特征輸出到特征結(jié)果表。特征源表可以對接不同的特征存儲,例如有 FileSystem,Kafka,Hive 等。類似的,特征結(jié)果表也可以對接 FileSystem,Kafka,Redis 等特征儲存。
Processor 包括 LocalProcessor、FlinkProcessor、SparkProcessor,分別可以使用單機物理資源,分布式的 Flink 集群,以及分布式 Spark 集群,去執(zhí)行用戶所定義的特征計算邏輯。
三、FeatHub API 展示
1、特征計算功能
在介紹了 FeatHub 的架構(gòu)和核心概念后,我們將通過一些樣例程序來展現(xiàn) FeatHub SDK 的表達能力以及易用性。對于特征開發(fā) SDK 來說,其最核心的能力就是如何表達新的特征計算邏輯。FeatHub SDK 支持特征拼接、窗口聚合、內(nèi)置函數(shù)調(diào)用以及自定義 Python 等能力,將來還可以支持基于 JAVA 或者 C++ 的 UDF 調(diào)用。
上圖展示了一個特征拼接的代碼片段。在這個例子中,假設(shè) HDFS 中有原始的正負樣本數(shù)據(jù),記錄了用戶購買商品的行為。我們想進一步想獲取用戶在購買每個商品時的商品價格。一個 price_updates 表維護了商品價格變化的數(shù)據(jù)。每次商品價格變化時,會在 price_updates 表中產(chǎn)生一行數(shù)據(jù),包含商品 ID 和最新的商品價格。我們可以使用 JoinTransform,設(shè)置 table_name=price_updates,feature_name=price,以及 key=item_id,來表達相應(yīng)的特征拼接邏輯。這樣 FeatHub 就可以根據(jù)在 price_updates 中,找到具有給定 item_id 的行,并根據(jù)時間戳,找到最合適的 price 數(shù)值,來拼接到樣本數(shù)據(jù)表上。
Over 窗口聚合的代碼片段則展示了如何用 OverWindowTransform 來計算特征。用戶可以使用 expr=”item_counts * price”,以及 agg_fun=”SUM”,來根據(jù)購買的商品數(shù)量和價格,計算出最近時間窗口中的總消費量。其中窗口的時間長度為 2 分鐘。group_by_keys=[“user_id”] 則說明了我們會為每個用戶單獨計算出對應(yīng)的總消費量。
?滑動窗口聚合與 Over 窗口聚合比較類似,API 上唯一區(qū)別是可以額外指定 step_size。如果 step_size=1 分鐘,則窗口會在每分鐘進行滑動并產(chǎn)生新的特征值。
內(nèi)置函數(shù)調(diào)用的代碼片段展示了如何使用 D?SL 語言表達加減乘除和 UDF 調(diào)用。假設(shè)輸入的數(shù)據(jù)包含出租車接送乘客的時間戳。我們可以通過調(diào)用 UNIX_TIMESTAMP 內(nèi)置函數(shù)將接送乘客的時間戳轉(zhuǎn)換為整數(shù)類型的 epoch time,然后將得到的 epoch time 相減,得到每次旅程的時間長度,作為一個特征用于之后的訓(xùn)練和推理。
在 PythonUDF 調(diào)用的代碼片段中,用戶可以自定義一個 Python 函數(shù),對輸入的特征進行任意的處理,例如產(chǎn)生小寫的字符串。
通過以上幾個代碼片段,我們可以看出 FeatHub 的 API 是比較簡潔易用的。用戶只需要設(shè)置計算邏輯所必須的參數(shù),而無需了解處理引擎的細節(jié)。
2、樣例場景
在以上樣例場景中,用戶有兩個數(shù)據(jù)源。其 Purchase Events 包含用戶購買商品的樣本數(shù)據(jù),可以來自于 Kafka,也可以來自于 FileSystem;Item Price Events 包含商品價格變動的數(shù)據(jù)。每次商品價格變化時,會在 Item Price Events 中產(chǎn)生一行數(shù)據(jù),包含商品 ID 和最新的商品價格。我們希望對于每條用戶購買商品的樣本數(shù)據(jù),計算用戶在該行為發(fā)生時最近兩分鐘內(nèi)的消費總量,作為特征來協(xié)助推理出用戶會不會購買某樣商品。為了生成這個特征,可以使用上圖中所描述的計算邏輯,先將 Item Price Events 中的 price 特征以 item_id 作為 join_key 拼接到 Purchase Events 上。然后再基于時間窗口和使用 user_id 作為 group_by _keys 進行聚合,來計算得到每個用戶最近兩分鐘內(nèi)的消費總量。
3、樣例代碼
以上代碼片段展示了一個樣例 FeatHub 應(yīng)用所需要完成的步驟。
① 首先用戶需要創(chuàng)建一個 FeatHubClient 并設(shè)置 processor_type。如果是本地實驗,可以設(shè)置成 Local,如果是遠程分布式生產(chǎn)部署,可以設(shè)置成 Flink。
② 用戶需要創(chuàng)建 Source 來讀取數(shù)據(jù),例如可以使用 FileSystemSource 讀取在離線儲存系統(tǒng)中的數(shù)據(jù),或者使用 KafkaSource 讀取近線儲存系統(tǒng)中的實時數(shù)據(jù)。FileSystemSource 中,用戶可以指定例如 data_format,schema、文件的位置等信息。值得注意的是,用戶可以提供 time_stamp_field 和 time_stamp_format,分別表達數(shù)據(jù)源表中代表時間的列以及對應(yīng)的解析格式。FeatHub 將使用這些信息完成做 point-in-time correct 的特征計算,避免特征穿越的問題。
③ 用戶可以創(chuàng)建一個 FeatureView 來表達特征拼接和聚合的邏輯。如果要做拼接,用戶可以 item_price_events.price 來表達希望拼接的特征。FeatHub 會找到名字為 item_price_events 的表并從中拿到名字為 price 的特征。用戶還可以使用 OverWindowTransform 來完成 Over 窗口聚合,定義一個名為total_payment_last_two_minutes 的特征。其中 window_size=2 分鐘表示對于兩分鐘內(nèi)的數(shù)據(jù)應(yīng)用指定的表達式和聚合函數(shù)來計算特征。
④ 對于已經(jīng)定義的 FeatureView,如果用戶想做本地開發(fā)和實驗,并使用 scikit-learn 算法庫進行單機上的訓(xùn)練,可以使用 to_pandas() API 來將數(shù)據(jù)以 Pandas DataFrame 格式獲取到單機的內(nèi)存中。
⑤ 當用戶需要完成特征的生產(chǎn)部署時,可以使用 FileSystemSink 指定用于存放數(shù)據(jù)的離線特征儲存。然后調(diào)用 execute_insert() 將特征輸出到所指定的 Sink 當中。
FeatHub 的基本價值是提供 SDK 來方便用戶開發(fā)特征,并且提供執(zhí)行引擎來計算特征。除此之外,F(xiàn)eatHub 還將提供執(zhí)行引擎的性能優(yōu)化,讓用戶在特征部署階段獲得更多的收益。例如對于基于滑動窗口聚合的特征,目前如果使用原生的 Flink API 來計算,F(xiàn)link 會在每個滑動的 step_size 都輸出對應(yīng)的特征值,無論特征的數(shù)值是否發(fā)生了變化。對于 window_size=1 小時,step_size=1 秒這樣的滑動窗口,大部分情況下 Flink 可能會輸出相同的特征數(shù)值。這樣會浪費網(wǎng)絡(luò)流量、下游存儲等資源。FeatHub 中支持用戶配置滑動窗口的行為,允許滑動窗口只在特征數(shù)值發(fā)生變化的時候輸出特征,來優(yōu)化特征計算作業(yè)的資源使用量。
另外 FeatHub 還將進一步優(yōu)化滑動窗口的內(nèi)存和 CPU 使用量。在某些場景中,用戶會定于許多類似的滑動窗口特征。這些特征只有 window size 不一樣。例如我們可能希望得到每個用戶最近 1 分鐘,5 分鐘,和 10 分鐘內(nèi)的購買商品的花費總數(shù)。如果使用原生的 Flink API 來計算,作業(yè)可能會使用三個聚合算子來分別計算這 3 個特征。每個聚合算子會有單獨的內(nèi)存空間。考慮到這些算子所處理的數(shù)據(jù)和計算邏輯具有較大的重合,F(xiàn)eatHub 可以用一個自定義算子,統(tǒng)一完成這些特征的計算,來達到節(jié)約內(nèi)存和 CPU 資源的目標。
FeatHub 目前已經(jīng)在 GitHub 開源,能夠支持一些基本的 LocalProcessor 和 FlinkProcessor 的功能。我們會進一步完善 FeatHub 的核心功能來方便用戶特征工程的開發(fā)和落地。其中包括支持更多常用的離線儲存、在線存儲,對接 Notebook,提供 Web UI 來可視化特征的元數(shù)據(jù),支持用戶做特征的注冊、搜索、復(fù)用,以及支持使用 Spark 作為 FeatHub 的執(zhí)行引擎。
FeatHub 代碼庫:?https://github.com/alibaba/FeatHub?
FeatHub 代碼樣例:?https://github.com/flink-extended/FeatHub-examples?
FeatHub 代碼庫目前放在 github/alibaba 目錄下。為了方便大家學(xué)習(xí)使用 FeatHub,并快速找到和參照滿足所需場景需求的代碼片段,我們在 flink-extended/feathub-examples 代碼庫中提供額外代碼示例,大家可以自由使用嘗試。歡迎大家提供反饋,以及貢獻 PR。
四、問答環(huán)節(jié)
Q1:在 point_in_time join 時,特征穿越是由數(shù)據(jù)亂序延遲還是由人工寫 Join 時導(dǎo)致的?
A1:原則上都有,即使數(shù)據(jù)沒有亂序,如果在 Join 時沒有考慮到 timestamp 字段,就可能導(dǎo)致亂序。在實際場景中,源數(shù)據(jù)可能也會亂序。這時候可以使用類似于 Flink 中的 watermark 策略來等待晚到的數(shù)據(jù),降低亂序的影響。另外我們可以用定期的離線作業(yè)來 backfill 在線特征數(shù)據(jù),從而進一步降低數(shù)據(jù)亂序的影響。
Q2:FeatHub 上線后,如何去產(chǎn)出過去的訓(xùn)練數(shù)據(jù)及其對應(yīng)的特征?FeatHub 是否支持歷史數(shù)據(jù)的回放?
A2:FeatHub API 是能支持回放的, 但目前這部分功能還沒有經(jīng)過生產(chǎn)驗證。FeatHub 將支持使用 Flink 和 Spark 作為執(zhí)行引擎,因此可以復(fù)用 Flink 和 Spark 的計算能力來完成歷史數(shù)據(jù)的回放。例如, 我們可以啟動一個 Spark 作業(yè),設(shè)置 Source 來處理過去一個月內(nèi)所有的 HDFS 上的數(shù)據(jù),并執(zhí)行所定義的特征拼接和聚合邏輯,然后將計算得到的特征輸出。
Q3. FeatHub 只負責(zé)離線特征計算,如何處理在線部分特征?
A3:特征計算分為離線、近線和在線,F(xiàn)link 是一個近線執(zhí)行引擎,可以實時計算例如最近 5 分鐘內(nèi)的用戶點擊次數(shù)這樣的特征,同時也可以支持離線計算。因此 FeatHub 可以支持離線和近線特征計算。FeatHub 將來有計劃去支持在線特征計算,使用基于 Feature Service 的架構(gòu),來計算 OnDemandFeatureView 所表達的特征。
Q4:FeatHub 在阿里云中提供服務(wù),目前有哪些上下游的生態(tài)支持,比如 ODPS 等?
A4:FeatHub 將會支持所有 Flink 所支持的 Source/Sink,包括 ODPS,Holo 等阿里云提供的服務(wù)。目前 FeatHub 只支持 Kafka 和 FileSystem。我們會逐步添加更多的儲存支持。