基于 Flink 構(gòu)建大規(guī)模實時風(fēng)控系統(tǒng)在阿里巴巴的落地
- 基于 Flink 構(gòu)建風(fēng)控系統(tǒng)
- 阿里風(fēng)控實戰(zhàn)
- 大規(guī)模風(fēng)控技術(shù)難點
目前 Flink 基本服務(wù)于集團的所有 BU ,在雙十一峰值的計算能力達到 40 億條每秒,計算任務(wù)達到了 3 萬多個,總共使用 100 萬+ Core ;幾乎涵蓋了集團內(nèi)的所有具體業(yè)務(wù),比如:數(shù)據(jù)中臺、AI 中臺、風(fēng)控中臺、實時運維、搜索推薦等。
01基于 Flink 構(gòu)建風(fēng)控系統(tǒng)
風(fēng)控是一個很大的話題,涉及到規(guī)則引擎、NoSQL DB、CEP 等等,本章主要講一些風(fēng)控的基本概念。在大數(shù)據(jù)側(cè),我們把風(fēng)控劃分成 3 × 2 的關(guān)系:
- 2 代表風(fēng)控要么是基于規(guī)則的,要么是基于算法或模型的;
- 3 代表包括三種風(fēng)控類型:事先風(fēng)控、事中風(fēng)控和事后風(fēng)控。
1.1 三種風(fēng)控業(yè)務(wù)
對于事中風(fēng)控和事后風(fēng)控來講,端上的感知是異步的,對于事先風(fēng)控來講,端上的感知是同步的。
對于事先風(fēng)控這里稍做一些解釋,事先風(fēng)控是把已經(jīng)訓(xùn)練好的模型或者把已經(jīng)計算好的數(shù)據(jù)存在 Redis 、MongoDB 等數(shù)據(jù)庫中;
- 一種方式是端上有類似 Sidden 、Groovy 、Drools 這樣的規(guī)則引擎直接去 Redis 、MongoDB 取數(shù)據(jù)來返回結(jié)果;
- 另外一種方式是基于 Kubeflow KFserving ,端上請求過來之后基于訓(xùn)練好的算法和模型返回結(jié)果。
整體來講這兩種方式的時延都在 200 毫秒左右,可以作為一個同步的 RPC 或 HTTP 請求。
對于 Flink 相關(guān)的大數(shù)據(jù)場景是一個異步的風(fēng)控請求,它的異步時效性非常低,通常是一秒或者兩秒。如果追求超低時延,則可以認(rèn)為它是一種事中的風(fēng)控,風(fēng)控決策過程可以由機器介入處理。
很常見的一種類型是用 Flink SQL 做指標(biāo)閾值的統(tǒng)計、用 Flink CEP 做行為序列規(guī)則分析,還有一種是用 Tensorflow on Flink ,在 Tensorflow 中進行算法描述,然后用 Flink 來執(zhí)行 Tensorflow 規(guī)則的計算。
1.2 Flink 是規(guī)則風(fēng)控最佳選擇
目前 Flink 是阿里集團內(nèi)的風(fēng)控最佳選擇,主要有三個原因:
- 事件驅(qū)動
- 毫秒級的延遲
- 流批一體
1.3 規(guī)則風(fēng)控三要素
在規(guī)則風(fēng)控里面有三個要素,后面講的所有內(nèi)容都是圍繞這三者展開的:
- 事實 Facts:是指風(fēng)控事件,可能來自業(yè)務(wù)方或者日志埋點,是整個風(fēng)控系統(tǒng)的輸入;
- 規(guī)則 Rules:往往是由業(yè)務(wù)側(cè)來定義,即這個規(guī)則要滿足什么樣的業(yè)務(wù)目標(biāo);
- 閾值 Threshold:規(guī)則所對應(yīng)描述的嚴(yán)重程度。
1.4 Flink 規(guī)則表達增強
對于 Flink 來說,可以分成無狀態(tài)規(guī)則和有狀態(tài)規(guī)則兩類,其中有狀態(tài)規(guī)則是 Flink 風(fēng)控的核心:
- 無狀態(tài)規(guī)則:主要是做數(shù)據(jù)的 ETL,一種場景是當(dāng)某個事件的一個字值段大于 X 就觸發(fā)當(dāng)前的風(fēng)控行為;另一種場景是 Flink 任務(wù)的下游是一個基于模型或算法的風(fēng)控,在 Flink 側(cè)不需要做規(guī)則判斷,只是把數(shù)據(jù)向量化、歸一化,例如多流關(guān)聯(lián)、Case When 判斷等把數(shù)據(jù)變成 0/1 的向量,然后推送到下游的 TensorFlow 做預(yù)測。
- 有狀態(tài)規(guī)則:
- 統(tǒng)計型規(guī)則:基于統(tǒng)計分析的計算規(guī)則,比如 5 分鐘以內(nèi)訪問次數(shù)大于 100 次,則認(rèn)為觸發(fā)了風(fēng)控;
- 序列型規(guī)則:事件序列中,某事件對前序后序事件有影響,比如點擊、加入購物車、刪掉三個事件,這種連續(xù)的行為序列是一個特殊行為,可能認(rèn)為這個行為在惡意降低商家商品的評價分?jǐn)?shù),但這三個事件獨立來看并不是一個風(fēng)控事件;阿里云實時計算 Flink 完善了基于序列的規(guī)則能力,為云上和集團內(nèi)的電商交易場景提供技術(shù)護航;
- 混合型規(guī)則:統(tǒng)計型和序列性兩者組合。
02阿里風(fēng)控實戰(zhàn)
本章主要介紹阿里在工程上是如何滿足上面提到的風(fēng)控三要素。
從整體的技術(shù)來看,目前分成感知、處置和洞察三個模塊:
- 感知:目的是感知所有的異常以及提前發(fā)現(xiàn)問題,比如捕捉一些與常見數(shù)據(jù)分布不同的數(shù)據(jù)類型,并輸出這種異常的列表;又比如說某年因為騎行政策的調(diào)整頭盔銷售量升高,連帶著就會出現(xiàn)相關(guān)產(chǎn)品的點擊率、轉(zhuǎn)化率上升,這種情況需要及時被感知捕捉到,因為它是一個正常的行為而非作弊;
- 處置:即如何做規(guī)則的執(zhí)行,現(xiàn)在有小時、實時、離線三道防線,相比于之前單條策略的匹配,關(guān)聯(lián)和集成之后的準(zhǔn)確性會更高,比如就關(guān)聯(lián)最近一段時間內(nèi)某些用戶的持續(xù)行為來進行綜合研判;
- 洞察:為了發(fā)現(xiàn)一些當(dāng)前沒有感知,同時也沒有辦法直接用規(guī)則描述的風(fēng)控行為,比如風(fēng)控需要對樣本進行高度抽象來進行表示,要先投影到合適的子空間,然后再結(jié)合時間維度在高維里面發(fā)現(xiàn)一些特征來做新異常的識別。
2.1 階段一:SQL 實時關(guān)聯(lián) & 實時統(tǒng)計
在這個階段有一個基于 SQL 評價風(fēng)控系統(tǒng),用簡單的 SQL 做一些實時的關(guān)聯(lián)、統(tǒng)計,比如用 SQL 進行聚合操作 SUM(amount) > 50 ,其中規(guī)則就是 SUM(amount),規(guī)則對應(yīng)的閾值是 50;假設(shè)現(xiàn)在有 10、20、50、100 這 4 種規(guī)則同時在線上運行,因為單Flink SQL作業(yè)只能執(zhí)行一種規(guī)則,那么就需要為這4個閾值分別申請 4 個 Flink Job。優(yōu)點是開發(fā)邏輯簡單,作業(yè)隔離性高,但缺點是極大浪費計算資源。
2.2 階段二:Broadcast Stream
階段一的風(fēng)控規(guī)則主要問題是規(guī)則和閾值不可變,在 Flink 社區(qū)目前會有一些解決方案,比如基于 BroadcastStream 來實現(xiàn),在下面的圖中 Transaction Source 負(fù)責(zé)事件的接入,Rule Source 則是一個BroadcastStream,當(dāng)有新的閾值時可以通過 BroadcastStream 廣播到各個算子。
舉個例子,判斷在一分鐘以內(nèi)連續(xù)訪問超過 10 次的風(fēng)控對象,但是在 618 或雙 11 可能要把它變成 20 或 30 次,才會被風(fēng)控系統(tǒng)下游的在線系統(tǒng)感知到。
如果在第一階段的話,只有兩種選擇:第一種是所有的作業(yè)全量在線上跑;第二種是在某一刻停止掉一個Flink作業(yè),新拉起一個基于新指標(biāo)的作業(yè)。
如果是基于 BroadcastStream 就可以實現(xiàn)規(guī)則指標(biāo)閾值的下發(fā),直接修改線上指標(biāo)閾值而不需要作業(yè)重啟。
2.3 階段三:Dynamic CEP
階段二的主要問題是只能做到指標(biāo)閾值的更新,雖然它極大的方便了個業(yè)務(wù)系統(tǒng),但實際上很難滿足上層業(yè)務(wù)。訴求主要有兩個:結(jié)合 CEP 以實現(xiàn)行為序列的感知;結(jié)合 CEP 后依然能做到動態(tài)修改閾值甚至是規(guī)則本身。
階段三,阿里云 Flink 做了 CEP 相關(guān)的高度抽象,解耦了 CEP 規(guī)則和 CEP 執(zhí)行節(jié)點,也就是說規(guī)則可以存在 RDS、Hologres 等外部第三方存儲里,CEP 作業(yè)發(fā)布上去之后,就可以加載數(shù)據(jù)庫中的 CEP 規(guī)則來做到動態(tài)替換,因此作業(yè)的表達能力會增強。
其次是作業(yè)的靈活性會增強,比如想看到某一個 APP 下面的一些行為并對這個行為的指標(biāo)閾值做更新,可以通過第三方存儲更新 CEP 規(guī)則而非 Flink 本身。
這樣做還有一個優(yōu)勢是可以把規(guī)則給暴露給上層業(yè)務(wù)方,來讓業(yè)務(wù)真真正正的撰寫風(fēng)控規(guī)則,我們成為一個真正的規(guī)則中臺,這就是動態(tài) CEP 能力所帶來的好處。在阿里云的服務(wù)中,動態(tài) CEP 能力已經(jīng)被集成在最新版本中,阿里云全托管 Flink 服務(wù)極大的簡化了風(fēng)控場景的開發(fā)周期。
2.4 階段四:Shared Computing
在階段三的基礎(chǔ)上再往前一步,阿里云實踐出 "共享計算" 的解決方案。這套共享計算的方案中,CEP 規(guī)則完全可以被建模平臺來描述,暴露給上層客戶或業(yè)務(wù)方一個非常友好的規(guī)則描述平臺,可以通過類似拖拉拽或者其他的方式進行耦合,然后在調(diào)度引擎上選擇事件接入源來運行規(guī)則。比如現(xiàn)在兩個建模都是服務(wù)于淘寶 APP,完全可以落到同一個 Fact 的 Flink CEP 作業(yè)上,這樣就可以把業(yè)務(wù)方、執(zhí)行層和引擎層完全解耦。當(dāng)前阿里云共享計算的解決方案已經(jīng)非常成熟,有豐富的客戶落地實踐。
2.5 階段五:業(yè)務(wù)開發(fā)和平臺建設(shè)分離
在引擎?zhèn)?、平臺側(cè)和業(yè)務(wù)側(cè)三方之間,階段四可以做到引擎?zhèn)群推脚_側(cè)之間的解耦,但是對業(yè)務(wù)側(cè)來講依然是高度綁定的。兩者的工作模式依然是甲方和乙方的協(xié)同關(guān)系,即 業(yè)務(wù)側(cè)掌握著業(yè)務(wù)規(guī)則,平臺側(cè)接受業(yè)務(wù)團隊的風(fēng)控需求,從而進行風(fēng)控規(guī)則的開發(fā)。但平臺團隊通常人員優(yōu)先,而業(yè)務(wù)團隊隨著業(yè)務(wù)發(fā)展會越來越壯大。
這個時候業(yè)務(wù)側(cè)本身可以抽象出來一些基本概念,沉淀出一些業(yè)務(wù)共性的規(guī)范,并組裝成一個比較友好的 DSL ,然后通過阿里云完全解耦的 Open API 實現(xiàn)作業(yè)的提交。
由于要同時支持集團內(nèi)接近 100 個 BU,沒有辦法為每一個 BU 都做定制化的支持,只能把引擎的能力盡可能的開放出去,然后業(yè)務(wù)側(cè)通過 DSL 的封裝提交到平臺上,真正做到了只暴露一個中臺給客戶。
03大規(guī)模風(fēng)控技術(shù)難點
本章主要介紹一些大規(guī)模風(fēng)控的技術(shù)難點,以及阿里云在全托管 Flink 商業(yè)化產(chǎn)品中如何突破這些技術(shù)難點。
3.1 細(xì)粒度資源調(diào)整
在流計算系統(tǒng)中,數(shù)據(jù)源往往不是阻塞的節(jié)點。上游的數(shù)據(jù)讀取節(jié)點由于沒有計算邏輯不存在性能問題,下游的數(shù)據(jù)處理節(jié)點才是整個任務(wù)的性能瓶頸。
由于 Flink 的作業(yè)是以 Slot 來做資源劃分的,默認(rèn) Source 節(jié)點和工作節(jié)點具有相同的并發(fā)度。在這種情況下我們希望可以單獨調(diào)整 Source 節(jié)點和 CEP 工作節(jié)點的并發(fā)度,比如在下圖中可以看到某個作業(yè)的 CEP 工作節(jié)點并發(fā)度可以達到 2000,而 Source 節(jié)點則只需要 2 個并行度,這樣可以極大的提升 CEP 節(jié)點的工作性能。
另外是對 CEP 工作節(jié)點所在的 TM 內(nèi)存、CPU 資源的劃分,在開源 Flink 中 TM 整體同構(gòu)的,也就是說 Source 節(jié)點和工作節(jié)點是完全相同的規(guī)格。從節(jié)省資源的角度考慮,真實生產(chǎn)環(huán)境下 Source 節(jié)點并不需要 CEP 節(jié)點一樣多的內(nèi)存、CPU 資源, Source 節(jié)點只需要較小的 CPU 和內(nèi)存就已經(jīng)能夠滿足數(shù)據(jù)抓取。
阿里云全托管 Flink 可以實現(xiàn)讓 Source 節(jié)點和 CEP 節(jié)點運行在異構(gòu)的 TM 上,即 CEP 工作節(jié)點 TM 資源顯著大于 Source 節(jié)點 TM 資源,CEP 工作執(zhí)行效率會變得更高??紤]細(xì)粒度資源調(diào)整帶來的優(yōu)化,云上全托管服務(wù)相比自建 IDC Flink 可節(jié)約 20% 成本。
3.2 流批一體 & 自適應(yīng) Batch Scheduler
流引擎和批引擎如果沒有采用相同一套執(zhí)行模式往往會遇到數(shù)據(jù)口徑不一致的情況,出現(xiàn)這種問題的原因是流規(guī)則在批規(guī)則下很難真正的完全描述出來;比如在 Flink 中有一個特殊的 UDF,但是在 Spark 引擎中卻并沒有對應(yīng)的 UDF。當(dāng)這種數(shù)據(jù)口徑不一致的時候,選擇哪一方面的數(shù)據(jù)口徑就成為了一個非常重要的問題。
在 Flink 流批一體的基礎(chǔ)上,用流模式描述的 CEP 規(guī)則,完全可以在批模式下以相同的口徑再跑一次并得到一樣的結(jié)果,這樣就不需要再去開發(fā)批模式相關(guān)的 CEP 作業(yè)。
在此之上,阿里實現(xiàn)了自適應(yīng)的 Batch Scheduler。其實 CEP 規(guī)則每天的效果產(chǎn)出并不一定是均衡的,比如說今天的行為序列中并沒有任何異常行為,下游只有很少的數(shù)據(jù)輸入,此時會為批分析預(yù)留一個彈性的集群;當(dāng) CEP 的結(jié)果很少時,下游的批分析只需要很小的資源,甚至每個批分析工作節(jié)點的并行度都不需要在一開始的時候就指定,工作節(jié)點可以根據(jù)上游數(shù)據(jù)的輸出以及任務(wù)負(fù)載來自動調(diào)整批模式下的并行度,真正做到了彈性批分析,這是阿里云 Flink 流批一體 Batch Scheduler 的獨特優(yōu)勢。
3.3 合并讀取降低公共層壓力
這是在實踐中遇到的問題,當(dāng)前的開發(fā)模式基本都是基于數(shù)據(jù)中臺的,比如實時數(shù)倉。在實時數(shù)倉的場景下,數(shù)據(jù)源可能不會很多,但是中間層 DWD 會變得很多,中間層可能會被演化成很多 DWS 層,甚至也會演變成很多數(shù)據(jù)集市給到各個部門來使用,這種情況下單表的讀取壓力會很大。
通常多個源表彼此關(guān)聯(lián)(打?qū)挘亩纬梢粋€ DWD 層 ,從單個源表的視角看,它會被多個 DWD 表依賴。DWD 層也會被多個不同業(yè)務(wù)域的作業(yè)消費形成 DWS。基于這種情況阿里實現(xiàn)了基于 Source 的合并,只需要讀一次 DWD 在 Flink 側(cè)會幫你加工成多張業(yè)務(wù)域的 DWS 表,可以非常大的減緩對公共層的執(zhí)行壓力。
3.4 KV 分離設(shè)計的狀態(tài)后端
CEP 節(jié)點在執(zhí)行的時候,會涉及到非常大規(guī)模的本地數(shù)據(jù)讀取,尤其是在行為序列的計算模式下,因為需要緩存前面所有的數(shù)據(jù)或者是一定時間內(nèi)的行為序列。
在這種情況下,比較大的一個問題是對后端狀態(tài)存儲(比如:RocksDB)有非常大的性能開銷,進而會影響 CEP 節(jié)點的性能。目前阿里實現(xiàn)了 KV 分離設(shè)計的狀態(tài)后端,阿里云 Flink 默認(rèn)使用 Gemini 作為狀態(tài)后段,CEP 場景下實測性能至少有 100% 的提升。
3.5 維度數(shù)據(jù)分區(qū)加載
風(fēng)控在很多情況下是要基于歷史行為來做分析的,歷史的行為數(shù)據(jù)一般都會存在 Hive 或 ODPS 表里,這個表的規(guī)??赡苁?TB 級別的。開源的 Flink 默認(rèn)需要在每一個維表節(jié)點上加載這個超級大的維度表,這種方式實際上是不現(xiàn)實的。阿里云實現(xiàn)了基于 Shuffle 來做內(nèi)存數(shù)據(jù)的分割,維表節(jié)點只會加載屬于當(dāng)前這個 Shuffle 分區(qū)的數(shù)據(jù)。