Flink CEP在實(shí)時風(fēng)控場景的落地與優(yōu)化
一、Flink CEP 介紹
Flink CEP 是復(fù)雜事件處理(Complex Event Processing)的縮寫。它是基于Flink實(shí)現(xiàn)的復(fù)雜事件處理庫。它的核心功能是識別輸入數(shù)據(jù)流中符合特定模式,即Pattern的事件序列,并允許用戶針對這些序列進(jìn)行針對性處理。
1、什么是Flink CEP
這里是一個簡單的例子,可以讓大家對 Flink CEP 做了什么事情有一個基礎(chǔ)了解。
模式
首先,假設(shè)我們對模式ABBC感興趣。它代表的實(shí)際含義可能是A類事件發(fā)生后,連續(xù)發(fā)生了兩次B類事件,最后發(fā)生了一次C類事件。我們不要求這些事件是嚴(yán)格連續(xù)的,中間可以插入一些無關(guān)事件。
事件流
我們針對這種模式使用Flink CEP的API編寫了相關(guān)代碼。當(dāng)Flink CEP作業(yè)啟動后,遇到實(shí)際輸入事件流,如圖中的事件流,例如d1、a1、b1、b2、d2、c1、e1、a2,針對這樣的事件數(shù)據(jù)流,F(xiàn)link嘗試識別定義的匹配ABBC,最終得到匹配結(jié)果。例如,這里的匹配結(jié)果是a1、b1、b2、c1。
匹配
識別出這樣的匹配后,用戶就可以在作業(yè)中定義如何針對這些匹配進(jìn)行處理。常見的做法是將報警輸入到下游系統(tǒng)或其他地方,然后進(jìn)行進(jìn)一步處理。
2、Flink CEP的應(yīng)用場景
了解了Flink CEP的具體用途后,再來看一下Flink CEP通常用于解決哪些問題。在實(shí)際場景中,F(xiàn)link CEP得益于Flink的分布式特性以及毫秒級處理延遲和豐富的規(guī)則表達(dá)能力,得到了較多應(yīng)用。
這里舉三個典型的例子。
實(shí)時風(fēng)控
Flink CEP 可應(yīng)用于風(fēng)險用戶檢測,例如讀取并分析用戶行為日志,將 5 分鐘內(nèi)轉(zhuǎn)賬次數(shù)超過 10 次且金額大于 10000 的客戶識別為異常用戶,并進(jìn)行針對性的風(fēng)險提示。
實(shí)時營銷
我們可以利用 Flink CEP 來優(yōu)化營銷場景中的策略。比如檢測用戶行為日志,從而在電商大促時,找到“10 分鐘內(nèi),在購物車中添加超過 3 次的商品,但最終沒有付款”的用戶,針對性的調(diào)整營銷策略。類似的邏輯也可以應(yīng)用在實(shí)時營銷的反作弊場景中,這已經(jīng)在錢大媽以及阿里內(nèi)部有一些具體的落地案例。
物聯(lián)網(wǎng)
Flink CEP 可以用于檢測異常狀態(tài)并發(fā)出告警,比如共享單車被騎出指定區(qū)域,且 15 分鐘內(nèi)沒有回到指定區(qū)域時發(fā)出風(fēng)險提示。如果和物聯(lián)網(wǎng)傳感器結(jié)合,還可以用于檢測工業(yè)生產(chǎn)中的流水線異常。比如檢測到三個時間周期內(nèi),溫度傳感器都反饋溫度超過設(shè)置閾值,就發(fā)布報警等等。
二、動態(tài)多規(guī)則支持
接下來我們介紹阿里云實(shí)時計(jì)算團(tuán)隊(duì)基于Flink社區(qū)的FLIP-200所做的動態(tài)多規(guī)則支持。
1、動態(tài)規(guī)則支持的背景
在介紹我們?yōu)槭裁葱枰獎討B(tài)規(guī)則更新前,先看一下右邊這個圖,確定規(guī)則究竟包含哪些要素。我們認(rèn)為 Flink CEP 中的規(guī)則,或者我們剛才提到的 Pattern,由三部分組成,即閾值、條件和事實(shí)。
下面我們以“五分鐘內(nèi)通過廣告鏈接訪問某商品超過五次,但最終沒有購買”為例來介紹這三個要素。
- 閾值:例如5次就是我們定義的閾值,可以根據(jù)需要改成7次或者10次等。
- 事實(shí):是規(guī)則針對的動作,例如通過廣告鏈接訪問某商品以及購買等。
- 條件:用于描述我們?nèi)绾胃鶕?jù)閾值和事實(shí)去過濾我們想要的事件或動作。
在明確了這三個組成要素之后,我們可以理解為什么需要支持動態(tài)規(guī)則更新。頻繁變化的現(xiàn)實(shí)場景要求我們對初始規(guī)則內(nèi)容進(jìn)行調(diào)整或添加新的規(guī)則。
比如有一個 CEP 作業(yè)會在某個用戶在一分鐘內(nèi)連續(xù)進(jìn)行某操作超過 10 次后將其認(rèn)為是風(fēng)險用戶。
在特殊場景,如流量暴增或舉辦某些活動時,預(yù)期訪問次數(shù)會比平常多一些。10次的閾值就不太合適,我們可能想改成20或者30。在當(dāng)前的開源 Flink 實(shí)現(xiàn)下,如果想實(shí)現(xiàn)這一步,只能重新編寫 Java 代碼,然后重啟作業(yè),以使最新的規(guī)則生效。
這種做法的問題很明顯:
- 規(guī)則生效的時間成本較高。因?yàn)橐咄暾拇a開發(fā)和打包上線等一系列流程。而風(fēng)控領(lǐng)域的作業(yè)通常對延遲比較敏感。對于這些對延遲敏感的作業(yè)來說,上述時間成本難以接受。
- 如果規(guī)則的時間窗口較長且狀態(tài)較大,重啟作業(yè)的代價會更高。
因此,我們需要支持動態(tài)規(guī)則更新,也就是所謂的不重啟Flink作業(yè)來更新作業(yè)中實(shí)際應(yīng)用的規(guī)則。
為了實(shí)現(xiàn)這一點(diǎn),我們列出了兩個關(guān)鍵問題。
- 如何讓 Flink 作業(yè)不停機(jī)地加載新規(guī)則。
- 如何解決規(guī)則(Pattern)的序列化和反序列化。這是從第一個問題衍生而來。如果想讓作業(yè)不停機(jī)加載,作業(yè)就必須從某個地方拿到動態(tài)拿到新的 Pattern,并生成對應(yīng)的 Pattern 對象在作業(yè)中使用。
在其他公司的生產(chǎn)實(shí)踐中,我們也看到了針對上述兩個問題,大家提出了一些自己的解決方案。例如修改 Flink CEP 內(nèi)部實(shí)現(xiàn),即在 Flink CEP Operator 上添加注入規(guī)則的接口,使 Flink CEP Operator 在作業(yè)運(yùn)行中可以不停機(jī)地加載新規(guī)則,以及基于 groovy 引擎動態(tài)生成 Pattern 對象,從而解決序列化和反序列化問題。
然而,我們也注意到,這樣的實(shí)現(xiàn)方案存在一定的缺點(diǎn)。
- 數(shù)據(jù)庫壓力增大:通常情況下,規(guī)則都存儲在數(shù)據(jù)庫中。對 Flink CEP Operator進(jìn)行修改時,會讓 Flink CEP Operator 直接與數(shù)據(jù)庫交互,拉取最新規(guī)則。這樣一來,當(dāng) Flink CEP 的作業(yè)并發(fā)較多時,對于大作業(yè)中的每一個 Flink CEP 并發(fā),都需要連接數(shù)據(jù)庫讀取規(guī)則,這會給數(shù)據(jù)庫帶來額外壓力。
- 拉取規(guī)則同一性無法嚴(yán)格保證:可能 Subtask1 取得了規(guī)則一的某一個版本,而Subtask2 由于網(wǎng)絡(luò)問題或其他各種問題,拉取到了規(guī)則一的其他版本。這會導(dǎo)致不同并發(fā)之間使用了不同的規(guī)則,最終導(dǎo)致整個 Flink CEP 作業(yè)在邏輯上的不一致。
- 不利于拓展多規(guī)則支持:在修改 Flink CEP 并添加注入規(guī)則接口時,通常只支持修改單條規(guī)則,這并不利于拓展到對多規(guī)則的支持。
使用 Groovy 引擎動態(tài)生成 Pattern 對象也存在一些有待提高之處。
- 表達(dá)能力有限:通常只能配合 Aviator 表達(dá)式動態(tài)修改閾值,但較難修改規(guī)則整體邏輯。
- Groovy 腳本的編輯需要較多編程知識:對于風(fēng)控策略人員或運(yùn)營人員來說,他們可能對 Groovy 腳本的語法不太了解,這會產(chǎn)生額外的學(xué)習(xí)成本。
- 生成規(guī)則耗時較長:Groovy 是一個較重量級的引擎,生成規(guī)則的耗時相對較長。
2、FLIP-200的提出
在考慮以上背景和問題后,我們在Flink社區(qū)提出了 FLIP-200提案,并在阿里按照FLIP-200提案實(shí)現(xiàn)了一版 Flink CEP 中動態(tài)規(guī)則的支持。接下來將詳細(xì)介紹我們是如何實(shí)現(xiàn)的,以及如何解決之前提到的一些問題。
首先我們新增了 PatternProcessor 接口,用于完整定義Flink CEP中的一條規(guī)則。
PatternProcessor 包含 getId,getVersion 等用于獲取該 Pattern 唯一標(biāo)識符的方法;getTimestamp 等用于獲得時間戳,進(jìn)行調(diào)度的方法;getPattern 對象用于拿到 PatternProcessor 所內(nèi)嵌的規(guī)則;PatternProcessorFunction 用于描述如何處理找到的匹配。除此之外,為了功能的完整性,我們還添加了 PatternProcessorDiscoverer 和 PatternProcessorManager,用于描述如何發(fā)現(xiàn)和管理 Processor。
下面介紹一下在動態(tài)規(guī)則支持中的具體設(shè)計(jì)。
3、動態(tài)規(guī)則支持中的具體設(shè)計(jì)
首先,介紹一下 Flink 的 Operator Coordinator 機(jī)制。Operator Coordinator 顧名思義,它負(fù)責(zé)協(xié)調(diào) Flink 作業(yè)中的各個 operator。Operator Coordinator 本身運(yùn)行在 Job Manager 中,它可以向每個下游 Task Manager 中的每個 Operator 發(fā)送一些事件。
它之前主要在 Flink 的 Source 和 Sink 中應(yīng)用,用于發(fā)現(xiàn)和分配讀寫的 workload,以確保不會出現(xiàn)過于嚴(yán)重的數(shù)據(jù)傾斜等問題。我們也復(fù)用了這一機(jī)制實(shí)現(xiàn)了 Dynamic CEP Operator Coordinator。它是一個在 JobManager 中運(yùn)行的線程,它會調(diào)用我們剛才提到的 Pattern Processor Discover 接口從數(shù)據(jù)庫拿到序列化后的最新Pattern(如圖綠色圓圈里的P)。拿到之后,它會發(fā)送給下游與之關(guān)聯(lián)的 Dynamic CEP Operator。這些 Dynamic CEP Operator 會接受 Operator Coordinator發(fā)送的事件,并解析和反序列化成實(shí)際使用的 Pattern Processor,然后構(gòu)造對應(yīng)的 NFA(非確定有限狀態(tài)機(jī))。之后即可使用新構(gòu)造的NFA來處理上游發(fā)生的事件,并最終輸出到下游。另外,我們允許一個 CEP Operator 包含多個 NFA,這樣可以比較好的支持多規(guī)則。
基于這樣的設(shè)計(jì),我們可以實(shí)現(xiàn)不停機(jī)更新規(guī)則內(nèi)容,且僅有Operator Coordinator 會與外部規(guī)則數(shù)據(jù)庫進(jìn)行交互,減少了對數(shù)據(jù)庫的訪問,并且由于Flink框架保證了 Operator 在處理來自 Operator Coordinator 的事件的一致性,我們也保證了各個 Subtask 所使用的規(guī)則的一致性。
4、動態(tài)規(guī)則支持中 Pattern 的序列化和反序列化
接下來為大家介紹動態(tài)規(guī)則支持:Pattern的序列化和反序列化。
Pattern 本質(zhì)上是描述了規(guī)則匹配時使用的NFA(非確定狀態(tài)自動機(jī))的狀態(tài)轉(zhuǎn)換圖, 即根據(jù)輸入事件如何從一個狀態(tài)轉(zhuǎn)移到另一個狀態(tài)。
有了這樣一個基礎(chǔ)觀察后,我們可以了解到NFA對應(yīng)一個狀態(tài)轉(zhuǎn)換圖。我們可以稍作簡化,例如將一個復(fù)合 Pattern 規(guī)則定義為一個圖。在這個圖中,每個節(jié)點(diǎn)是一個子Pattern,而“邊”則是事件選擇策略,也就是說,我們?nèi)绾螐淖?Pattern1 的匹配中跳轉(zhuǎn)到 Pattern2 的匹配。我們也可以將每個圖看作一個更大的圖的子節(jié)點(diǎn),從而實(shí)現(xiàn)模式的嵌套。也就是說,某個模式的子 Pattern 本身也可以是一個完整的復(fù)合Pattern 。
那么我們該如何描述這個圖呢?在設(shè)計(jì)過程中,我們有一些基礎(chǔ)的想法或設(shè)計(jì)原則。
- 應(yīng)具備完整的表達(dá)能力。
- 序列化和反序列化相對方便。
- 易于拓展,方便集成。
- 格式應(yīng)該能讀取、編輯。
基于這些原則,我們最終選擇基于 JSON 來定義一套描述 Pattern 的規(guī)范?,F(xiàn)在我們給出一個簡單的例子來展示我們所定義的這套 JSON 規(guī)范。
首先,我們可以用 Java API 大致定義這樣一個示例Pattern:當(dāng)滿足 StartCondition 的事件出現(xiàn)大于等于三次之后,如果跟著一個滿足 EndtCondition 的事件,那么我們就認(rèn)為這是一個匹配。我們看到這個 Pattern 有兩個子 Pattern,第一個是 StartPattern,第二個是 EndPattern。他們定義的一個圖可能類似于下面這樣一個狀態(tài)轉(zhuǎn)換圖。
這里P1對應(yīng)著 Start Pattern,而P2對應(yīng)著End Pattern。在這張狀態(tài)轉(zhuǎn)換圖中,有兩條邊,第一條是P1指向自己的邊,代表滿足P1規(guī)則的事件可以重復(fù)出現(xiàn)。另外也有一條從 StartCondition 指向 EndCondition 的邊。
右圖則是這個狀態(tài)轉(zhuǎn)換圖的 JSON 描述。第一個是 node 字段,它是一個數(shù)組,包含每個子 Pattern 的完整描述,比如這里我們用 times 字段表示這個子 Pattern 對應(yīng)的 Condition,要被滿足大于等于三次。第二個是 edges 字段,它用于記錄邊的信息。
關(guān)于各個字段的完整定義、取值以及物理描述,可以參考阿里云官方官網(wǎng)文檔。
用戶可以通過這一套規(guī)范來使用 JSON 語法描述一個完整的 Pattern,從而實(shí)現(xiàn) Pattern 的序列化和反序列化。在數(shù)據(jù)庫中,實(shí)際存儲的可能是一個對應(yīng)的 JSON 字符串,它可以反序列化成一個對應(yīng)的 Pattern Processor 對象。
5、動態(tài)規(guī)則支持:拓展Condition
在描述完剛才提到的序列化和反序列化之后,我們將繼續(xù)介紹如何將這些功能應(yīng)用于動態(tài)修改 Condition 中的閾值。
Aviator Condition
我們基于 Aviator 表達(dá)式引擎定義了 AviatorCondition。它的構(gòu)造函數(shù)會接受一個表達(dá)式字符串,并根據(jù)輸入的表達(dá)式字符串生成 AviatorExpression,然后在 filter 方法中通過反射來解析傳入的事件字段和閾值,執(zhí)行 AviatorExpression,最后返回 true or false 作為 filter 這個方法的返回結(jié)果,用于判斷是否滿足 Aviator Condition。
我們這里舉一個例子,假設(shè)有一個叫 Event 的類,它有 price 和 action兩個字段。那么我們就可以構(gòu)造一個這樣的 AviatorCondition,其參數(shù)是一個表達(dá)式字符串,這個字符串里描述了對 Event 中事件字段的取值要求。比如我們要求 actinotallow==1&&price>20。如果我們想要更新閾值,就直接修改表達(dá)式,變成 actinotallow==0&&price>50。
注意這個字符串是傳入的參數(shù),它也可以在我們剛才介紹的 JSON 格式中定義和描述,所以我們也可以直接編輯數(shù)據(jù)庫中的字段進(jìn)行閾值的動態(tài)更新。
Groovy Condition
除了 Aviator 之外,為了滿足不同用戶的使用習(xí)慣,我們也支持了 Groovy 語法,允許將 Groovy 表達(dá)式作為參數(shù)生成對應(yīng)的 Condition。
Custom Args Condition
除此之外,為了更進(jìn)一步增強(qiáng)表達(dá)能力,支持一些高度定制化的需求,我們還實(shí)現(xiàn)了 Custom Args Condition,它支持用戶傳入自定義參數(shù),并允許用戶自定義參數(shù)的解析邏輯。例如我們可能想自定義解析邏輯支持更復(fù)雜的處理需求。我們之前提到的兩個 Condition(Aviator Condition和Groovy Condition)可以看作是 Custom Args Condition 的一個特殊實(shí)現(xiàn)。
6、動態(tài)規(guī)則支持——多規(guī)則支持
接下來介紹多規(guī)則的支持。
多規(guī)則的關(guān)鍵點(diǎn)在于如何在同一輸入數(shù)據(jù)流上使用多條規(guī)則。按照開源 Flink CEP 的方案,我們要在一個 Flink 作業(yè)中使用多條規(guī)則的話,需要定義多個 Pattern Stream(CEP API 提供的接口),對應(yīng)生成多個 CEP Operator。因此上游 input Source 會向不同的 Flink CEP Operator 發(fā)送數(shù)據(jù),這也代表著上游的數(shù)據(jù)需要多次網(wǎng)絡(luò)傳遞,從而帶來一些額外的開銷。
我們這里進(jìn)行了一個優(yōu)化,允許一個 Dynamic Flink CEP Operator 創(chuàng)建多個 NFA,這樣上游數(shù)據(jù)只需傳遞一次,避免了額外的拷貝和網(wǎng)絡(luò)數(shù)據(jù)傳輸,降低資源消耗。
7、動態(tài)規(guī)則支持:Demo
我們在阿里云官網(wǎng)上也提供了一個針對廣告投放中的實(shí)時反作弊場景實(shí)現(xiàn)的 demo,用于演示使用。相應(yīng)的代碼也在阿里云上開源。如果大家有興趣,可以去這里的網(wǎng)址查看。
https://help.aliyun.com/document_detail/459880.html
https://github.com/RealtimeCompute/ververica-cep-demo
三、CEP SQL語法增強(qiáng)&性能優(yōu)化
接下來介紹一下我們在 CEP SQL 方面所做的一些工作。
1、Flink CEP SQL:介紹
Flink CEP SQL 的核心是 MATCH_RECOGNIZE 語法,這是提到的 SQL2016 中定義的一套規(guī)范。它主要包含以下幾個部分:
PARTITION BY:用于定義邏輯,例如這里是 PARTITION BY user_name,它相當(dāng)于 group by user_name 或者 Java API 中的 key_by。
ORDER_BY:用來定義輸入數(shù)據(jù)的排序方式。因?yàn)?CEP 需要在持續(xù)數(shù)據(jù)流中識別特定模式,所以它必須按時間順序進(jìn)行排列。通常這里的字段是 row_time。
MEASURES:類似于普通SQL中的select操作,用于對識別出的序列進(jìn)行映射聚合等操作,并定義最終輸出結(jié)果。在圖中,我們使用了 FIRST 、LAST 和 COUNT 這些內(nèi)置函數(shù),對循環(huán)模式匹配到的A序列進(jìn)行了聚合計(jì)算。對于B序列,我們只做了一個映射操作。
PATTERN:它是 MATCH_RECOGNIZE 語法的核心,也就是我們所說的Pattern 定義。它類似于正則表達(dá)式的語法。例如 A+B 模式表示序列中允許先出現(xiàn)一個或連續(xù)多個 A 事件,再緊接著出現(xiàn)一個 B 事件。
DEFINE:它用于設(shè)定了A和B兩個模式變量對應(yīng)的事件匹配條件。例如,A 對應(yīng)的匹配條件是“event_type為A”。
2、Flink CEP SQL:示例
這里是一個 CEP SQL 的例子。假如我們有一個 Source 表,里面7條數(shù)據(jù),username 取值中有 Alice、Bob 等。如果對于這個表來應(yīng)用我們剛才上一個 PPT 中的A+B模式去做匹配,那么我們得到圖中這樣的結(jié)果表。
現(xiàn)在針對Alice進(jìn)行具體說明:Alice用戶的事件序列包含4個對應(yīng)事件,由于我們定義了 A+B 的 Pattern 以及 Flink CEP SQL 中使用的默認(rèn) After Match Strategy是 SKIP_TO _NEXT,所以實(shí)際輸出了三行。第一行對應(yīng)序列 AAAB,第二行對應(yīng)序列是 AAB,第三行對應(yīng)序列是 AB。
3、Flink CEP SQL:語法增強(qiáng)
剛才是基礎(chǔ)的 Flink CEP SQL 語法,接下來介紹在阿里內(nèi)部針對 CEP SQL 語法所做的一些增強(qiáng)。
輸出帶時間約束模式的匹配超時序列
在用戶行為模式識別的場景中,如果我們希望找到在10分鐘內(nèi)完成一系列動作的高質(zhì)量用戶,我們可以通過 WITHIN INTERVAL 語法來指定這個時間約束。
對于 Alice 用戶,它在10分鐘內(nèi)完成了指定的三個動作,這滿足了我們所說的10分鐘內(nèi)的時間約束,因此會有一條輸出。對于 Bob 來說,雖然也滿足了 ABC 的模式,但因?yàn)闈M足條件A的事件是在8:02完成的,而滿足條件 C 的事件是在8:15完成的,相隔時間已經(jīng)超過了10分鐘,所以它沒有對應(yīng)輸出。
我們剛才提到的典型場景是不輸出 Bob 的場景。但有時候,我們可能想找到流程中斷的原因,例如為什么 Bob 在10分鐘內(nèi)只做 A、B 兩件事而沒有做 C,即需要輸出超時的事件匹配序列(在時間限制內(nèi)未完全匹配的事件序列)。
為此,我們支持了定義 ONE ROW PER MATCH SHOW TIMEOUT MATCHES 語法,它的含義是允許展示匹配超時的事件,而不僅僅是展示匹配成功的事件。在正常的 Java API 中,這些超時序列是通過 Java API 的側(cè)輸出流,即所謂 SideOutput輸出到另一個 datastream 來實(shí)現(xiàn)的。
在 SQL 中,我們無法使用側(cè)輸出流,因此會輸出在結(jié)果表中。但如果 C 動作沒有發(fā)生,也就是 Bob 在10分鐘內(nèi)沒有完成這件事情,那么 action_c_time 列會出現(xiàn) Null 值。這樣便于用戶在后續(xù)針對性地過濾出 Bob 這類特殊用戶。
定義事件之間的連續(xù)性
Flink CEP 支持不同種類的事件連續(xù)性。例如,A.next(B) 要求事件 A 和事件 B 之間必須連續(xù)出現(xiàn),中間不能有任何其他類事件,如 C 類事件等。這類連續(xù)性我們稱之為嚴(yán)格連續(xù)。A.followedBy(B) 則允許 A 和 B 之間可以出現(xiàn)其他事件。這類連續(xù)性我們稱之為松散連續(xù)。而開源 CEP 只支持嚴(yán)格連續(xù)。我們通過添加{- x*?-}語法來支持使用松散連續(xù)。
其原理是 X 未在 DEFINE 當(dāng)中定義,那么它代表任意匹配。也就是說,我們允許 A和 B 之間出現(xiàn)任意一個無關(guān)事件,對應(yīng)我們所說的松散連續(xù)。
定義循環(huán)模式中的連續(xù)性和貪婪性
除了剛才的連續(xù)性增強(qiáng)外,我們還支持定義循環(huán)模式中的連續(xù)性和貪婪性。具體來說,循環(huán)模式中,例如這里的 A+,是一個單獨(dú)的循環(huán)模式。我們允許 A 類事件出現(xiàn)一次或多次,默認(rèn)要求是嚴(yán)格連續(xù)的 A 類事件,即不能有任何其他類型的事件。同時該模式會貪婪地匹配,盡可能地匹配最長的 A 序列。
我們這里實(shí)現(xiàn)了2個增強(qiáng):
- 支持非貪婪的語義。即嘗試盡可能匹配短序列,而非長序列。
- 允許模式內(nèi)部的松散連續(xù)。
具體語法上,一個問號(例如 A+?)對應(yīng)非貪婪,兩個問號(例如 A+??)對應(yīng)的是松散連續(xù)且貪婪,而三個問號對(例如 A+???)應(yīng)的松散連續(xù)且非貪婪。
循環(huán)模式指定的停止條件(Until)
這對應(yīng) Java 的 until 語法。表示允許在循環(huán)模式中,例如 A+{B}C 模式在匹配 A 類事件時,如果遇到 B 類事件,那么會立刻終止 A 類循環(huán)模式的匹配,進(jìn)入到下一部分 C類事件匹配中。
組合模式(Group Pattern)
最后一部分增強(qiáng)是組合模式。我們可以將多個模式組合成一個整體用在 next()、followedBy()這些函數(shù)中,支持整體的嵌套循環(huán)。它對應(yīng)著括號語法。
AFTER MATCH NO SKIP 策略
最后一個 CEP SQL 的增強(qiáng)則是對 AFTER MATCH NO SKIP 策略的支持。
Flink CEP SQL 中默認(rèn)的策略是 SKIP_TO_NEXT_ROW,它會丟棄以相同事件開始的所有部分匹配。實(shí)際 java API 中默認(rèn)的是 NO_SKIP 策略,它會把每個成功匹配都輸出出來。
這里有一個例子,我們可以比較這兩個模式的不同。例如對a b+模式來說,如果輸入的是a1、b1、b2、b3,如果是NO_SKIP策略,它會輸出三條匹配結(jié)果,即a1b1、a1b2、a1b2b3。而對于SKIP_TO_NEXT來說,它只會出a1b1。原因是我們在找到a1b1匹配之后,所有以a1開頭的匹配都會被丟棄,就是所謂的SKIP_TO_NEXT_ROW 。但 NO_SKIP 是更常見的使用策略,所以我們也拓展了對它的支持。
4、FLink CEP 性能優(yōu)化
這部分簡要介紹一下我們在內(nèi)部對 Flink CEP 進(jìn)行的性能優(yōu)化。
減少State訪問
通過增加 Cache、優(yōu)化 OnEvent/ProcessingTime()實(shí)現(xiàn),減少了大約30%的state 訪問,從而使大規(guī)模 Flink CEP 作業(yè)對 CPU 的消耗更少。
修復(fù) State 泄露
這是針對社區(qū)的一個開源bug的修復(fù)。主要針對生命周期較短的 key,如果這些 key的相關(guān)狀態(tài)沒有及時清理,可能會導(dǎo)致 state 不斷增大。當(dāng) key 包含一些隨機(jī)字段,例如 timestamp 或隨機(jī) ID 時,該問題非常容易出現(xiàn)。
小貼士
如果大家想使用 Flink CEP,盡量使用 ver1.16 以上的版本。在這些較新的版本中,社區(qū)有一個關(guān)鍵優(yōu)化,可以減少 Timer 注冊,從而極大幅度地減少作業(yè)的 CPU 消耗。
四、風(fēng)控場景實(shí)際案例
最后,介紹一下我們在支持客戶中遇到的風(fēng)控場景的典型應(yīng)用。
1、業(yè)務(wù)類場景應(yīng)用
交易風(fēng)控
例如客戶在電商交易或銀行交易中,可能會想檢測一些特殊用戶。如一段時間內(nèi)某個IP退款次數(shù)超過一定金額,就觸發(fā)熔斷,禁止該IP進(jìn)行額外交易。交易風(fēng)控是我們在支持客戶中遇到的最多的應(yīng)用場景。
內(nèi)容風(fēng)控
例如用戶在x分鐘內(nèi)發(fā)布超過y條帖子,對賬號進(jìn)行禁言或其他處理等。
物聯(lián)網(wǎng)風(fēng)控
例如,檢測設(shè)備異常,如果某個設(shè)備連續(xù)發(fā)生超過10次以上的異常,且超過15分鐘內(nèi)沒有恢復(fù),則發(fā)出報警消息等。
網(wǎng)安風(fēng)控
檢測到某臺電腦上的日志,例如用戶行為滿足點(diǎn)擊釣魚郵件、下載異常文件、執(zhí)行隱藏代碼等條件后觸發(fā)報警。
2、新功能應(yīng)用
初次接觸 FlinkCEP 的用戶可能對 Flink CEP 中的部分功能或用法不太熟悉。我們提供了2個比較常用的功能的使用示例,分別是用于獲取子 Pattern 之前匹配的事件 context.getEventsForPattern() 接口,以及 Flink1.16 引入的用于定義相鄰事件之間的時間間隔的新語法:WithinType.Previous_AND_CURRENT 。如果有類似需求的客戶也可以參考。