面向B端算法實(shí)時(shí)業(yè)務(wù)支撐的工程實(shí)踐
一、 背景
在營(yíng)銷場(chǎng)景下,算法同學(xué)會(huì)對(duì)廣告主提供個(gè)性化的營(yíng)銷工具,幫助廣告主更好的精細(xì)化營(yíng)銷,在可控成本內(nèi)實(shí)現(xiàn)更好的ROI提升。我們?cè)谶@一段時(shí)間支持了多個(gè)實(shí)時(shí)業(yè)務(wù)場(chǎng)景,比如出價(jià)策略的實(shí)時(shí)化預(yù)估、關(guān)鍵詞批量服務(wù)同步、實(shí)時(shí)特征等場(chǎng)景,了解到業(yè)務(wù)側(cè)同學(xué)來(lái)說(shuō),針對(duì)ODPS場(chǎng)景來(lái)說(shuō)大部分可以靈活使用,但對(duì)于Blink使用還有不足,我們這里針對(duì)場(chǎng)景積累了一些經(jīng)驗(yàn),希望對(duì)大家有一些幫助。
二、 技術(shù)選型
為什么要選擇Blink?大部分離線場(chǎng)景如果對(duì)于時(shí)效性沒(méi)有要求,或者數(shù)據(jù)源是Batch模式的,非Streaming的(比如TT、SLS、SWIFT、順序)等,這個(gè)場(chǎng)景的話選擇ODPS就比較不錯(cuò);總體來(lái)說(shuō),數(shù)據(jù)源是實(shí)時(shí)的(如TT/SLS/SWIFT)、需要順序讀取ODPS、對(duì)時(shí)效性要求高的場(chǎng)景,選擇Blink是比較好的。
Blink目前也是支持Batch模式和Steaming模式。Batch模式是指有固定的起始時(shí)間和結(jié)束時(shí)間, 相比ODPS而來(lái),他最大的優(yōu)勢(shì)是提前申請(qǐng)資源,可是獨(dú)占的,這樣可以保障時(shí)效性;Streaming模式就是傳統(tǒng)意義上的實(shí)時(shí)消費(fèi),可實(shí)現(xiàn)毫秒級(jí)的處理。
從開(kāi)發(fā)模式上看,主要分為Data Stream模式,類似于ODPS MR;第二種是SQL模式;從易用性角度看,SQL無(wú)疑是使用成本最低的;但對(duì)于復(fù)雜場(chǎng)景,Data Stream的掌控能力也是最好的,可靈活定義各類cache和數(shù)據(jù)結(jié)構(gòu),以及同時(shí)支持多類場(chǎng)景。
特點(diǎn) | 優(yōu)勢(shì) | |
Datastream Batch | 有固定起始時(shí)間和結(jié)束時(shí)間 | 提前申請(qǐng)資源 |
Datastream Streaming | 更細(xì)粒度地控制實(shí)時(shí)計(jì)算作業(yè) | 實(shí)時(shí)性較強(qiáng),自定義source和sink |
BlinkSQL Streaming | 簡(jiǎn)單場(chǎng)景快速上手 | 使用成本低 |
ODPS | 批量作業(yè)開(kāi)發(fā) | 吞吐量較大 |
三 、主要場(chǎng)景
1. 實(shí)時(shí)replay出價(jià)策略評(píng)估
業(yè)務(wù)背景
Replay系統(tǒng)是一套集線上競(jìng)價(jià)日志搜集、結(jié)構(gòu)化、后續(xù)處理的模擬系統(tǒng)。該系統(tǒng)記錄了直通車線上引擎在召回之后的競(jìng)價(jià)信息,主要涵蓋了線上的召回、出價(jià)、打分等隊(duì)列信息。結(jié)合排序以及扣費(fèi)公式,可以利用該日志實(shí)現(xiàn)對(duì)線上競(jìng)價(jià)環(huán)境的模擬。簡(jiǎn)單來(lái)說(shuō),就是可以評(píng)估bidword上如果當(dāng)時(shí)采用其他的出價(jià),會(huì)帶來(lái)什么樣的結(jié)果。通過(guò)replay系統(tǒng),算法團(tuán)隊(duì)和廣告主可以在線上AB測(cè)試之前,利用離線流量預(yù)估用戶策略修改之后帶來(lái)的效果,這樣可以盡可能地減少策略的修改帶給線上的影響,讓結(jié)果變得更加可控。同時(shí)在進(jìn)行負(fù)向策略測(cè)試的過(guò)程中,可以盡可能地減少對(duì)大盤的收益影響。
算法團(tuán)隊(duì)希望基于在線精排召回日志實(shí)現(xiàn)業(yè)務(wù)側(cè)多種出價(jià)策略評(píng)估,回放1天內(nèi)采樣日志(10億數(shù)據(jù)),在出價(jià)策略上評(píng)估,并支持ad的實(shí)時(shí)下線,避免下線ad對(duì)出價(jià)策略有影響,并且預(yù)期希望10億數(shù)據(jù)量在1-2個(gè)小時(shí)內(nèi)跑完。
主要挑戰(zhàn)
- 1千萬(wàn)物料數(shù)據(jù)如何加載;
- 高qps(100萬(wàn))下線ad的實(shí)時(shí)同步;
- 業(yè)務(wù)側(cè)解耦,整個(gè)實(shí)時(shí)job鏈路如何實(shí)現(xiàn)和業(yè)務(wù)解耦
解決方案
物料數(shù)據(jù)加載:直接在blink啟動(dòng)時(shí)加載所有數(shù)據(jù),避免高qps情況下,對(duì)igraph訪問(wèn)造成壓力;另外采用廣播模式,僅一次加載,每個(gè)節(jié)點(diǎn)都可以使用,避免多次加載odps數(shù)據(jù);
下線的ad信息采用分桶的方式存入到IGraph中,并周期性cache方式全量讀取全量下線ad,將查詢的200W+qps控制在1w左右,并使用RateLimit限流組件控制訪問(wèn)并發(fā),把IGraph并發(fā)控制限制在40萬(wàn)左右,實(shí)現(xiàn)整體流量平滑;
整體實(shí)時(shí)工程框架,預(yù)留UDF接口,讓業(yè)務(wù)側(cè)僅實(shí)現(xiàn)SDK即可,其他工程性能、并發(fā)、限流、埋點(diǎn)等邏輯內(nèi)部實(shí)現(xiàn)即可,支持工程框架和算法策略Replay解耦。
總結(jié)
基于此業(yè)務(wù)需求,我們基于blink streaming Batch模式的靈活能力,實(shí)現(xiàn)了對(duì)tt數(shù)據(jù)固定開(kāi)始和結(jié)束時(shí)間的數(shù)據(jù)處理。沉淀了讀寫tt組件 ,ODPS組件,iGraph組件和埋點(diǎn)組件 ,這些沉淀的組件很好地支持了后續(xù)相似業(yè)務(wù)的作業(yè)開(kāi)發(fā),同時(shí)組件作為之后作業(yè)產(chǎn)品化提供了基礎(chǔ)能力。
2. 實(shí)時(shí)特征
業(yè)務(wù)背景
隨著B(niǎo)端算法發(fā)展,模型升級(jí)帶來(lái)的增量紅利越來(lái)越少,需要考慮從客戶實(shí)時(shí)信息方面進(jìn)一步捕捉用戶意圖,更全面、更實(shí)時(shí)的挖掘潛在需求,從B端視角進(jìn)一步提升增長(zhǎng)空間,基于線上用戶行為日志產(chǎn)出用戶行為實(shí)時(shí)特征,算法團(tuán)隊(duì)使用實(shí)時(shí)數(shù)據(jù)改進(jìn)線上模型。
基于此需求我們產(chǎn)出一條用戶實(shí)時(shí)特征產(chǎn)出鏈路,通過(guò)解析上游A+數(shù)據(jù)源獲取用戶實(shí)時(shí)特征,實(shí)時(shí)特征主要包含以下幾種:
- 獲取用戶近50條特征數(shù)據(jù)值,并產(chǎn)出到igraph中。
- 輸出具有某種特征的用戶id,并按照分鐘時(shí)間聚合
- 輸出某種特征近1小時(shí)的和、均值或者數(shù)目
主要挑戰(zhàn)
- 實(shí)時(shí)特征數(shù)據(jù)開(kāi)發(fā)數(shù)量非常多,對(duì)于每個(gè)特征數(shù)據(jù)都需要開(kāi)發(fā)實(shí)時(shí)數(shù)據(jù)鏈路、維護(hù),開(kāi)發(fā)成本、運(yùn)維成本較高,重復(fù)造輪子;
- 特征數(shù)據(jù)開(kāi)發(fā)要求開(kāi)發(fā)者了解:
- 數(shù)據(jù)源頭,會(huì)基于事實(shí)數(shù)據(jù)源進(jìn)行ETL處理;
- 計(jì)算引擎,flink sql維護(hù)了一套自己的計(jì)算語(yǔ)義,需要學(xué)習(xí)了解并根據(jù)場(chǎng)景熟練使用;
- 存儲(chǔ)引擎,實(shí)時(shí)數(shù)據(jù)開(kāi)發(fā)好需要落地才能服務(wù),故需要關(guān)系存儲(chǔ)引擎選型,例如igraph、hbase、hologres等;
- 查詢優(yōu)化方法,不同存儲(chǔ)引擎都有自己的查詢客戶端、使用及優(yōu)化方法,故要學(xué)習(xí)不同引擎使用方法。
解決方案
從產(chǎn)品設(shè)計(jì)角度,設(shè)計(jì)一套實(shí)時(shí)平臺(tái)能力,讓開(kāi)發(fā)實(shí)時(shí)特征跟在odps開(kāi)發(fā)離線表一樣簡(jiǎn)單。產(chǎn)品優(yōu)勢(shì)是讓用戶只需要懂SQL就可以開(kāi)發(fā)實(shí)時(shí)特征:
- 不需要了解實(shí)時(shí)數(shù)據(jù)源
- 不需要了解底層存儲(chǔ)引擎
- 只用sql就可以查詢實(shí)時(shí)特征數(shù)據(jù),不需要學(xué)習(xí)不同引擎查詢方法
整個(gè)實(shí)時(shí)開(kāi)發(fā)產(chǎn)品聯(lián)動(dòng)極光平臺(tái)、dolphin引擎、blink引擎和存儲(chǔ)引擎,把整個(gè)流程串聯(lián)打通,給用戶提供端到端的開(kāi)發(fā)體驗(yàn),無(wú)需感知跟自己工作無(wú)關(guān)的技術(shù)細(xì)節(jié)。
相關(guān)平臺(tái)介紹:
Dolphin智能加速分析引擎:Dolphin智能加速分析引擎源自阿里媽媽數(shù)據(jù)營(yíng)銷平臺(tái)達(dá)摩盤(DMP)場(chǎng)景,在通用OLAP MPP計(jì)算框架的基礎(chǔ)上,針對(duì)營(yíng)銷場(chǎng)景的典型計(jì)算(標(biāo)簽圈人,洞察分析)等,進(jìn)行了大量存儲(chǔ)、索引和計(jì)算算子級(jí)別的性能優(yōu)化,實(shí)現(xiàn)了在計(jì)算性能,存儲(chǔ)成本,穩(wěn)定性等各個(gè)方面的大幅度的提升。Dolphin本身定位是加速引擎,數(shù)據(jù)存儲(chǔ)和計(jì)算算子依賴于底層的odps, hologres等引擎。通過(guò)插件形式,在hologres中,完成了算子集成和底層數(shù)據(jù)存儲(chǔ)和索引的優(yōu)化,實(shí)現(xiàn)了特定計(jì)算場(chǎng)景計(jì)算性能和支撐業(yè)務(wù)規(guī)模的數(shù)量級(jí)的提升。目前Dolphin的核心計(jì)算能力主要包括:基數(shù)計(jì)算內(nèi)核,近似計(jì)算內(nèi)核,向量計(jì)算內(nèi)核,SQL結(jié)果物化及跨DB訪問(wèn)等。Dolphin同時(shí)實(shí)現(xiàn)了一套SQL轉(zhuǎn)譯和優(yōu)化能力,自動(dòng)將原始用戶輸入SQL,轉(zhuǎn)化成底層優(yōu)化的存儲(chǔ)格式和計(jì)算算子。用戶使用,不需要關(guān)心底層數(shù)據(jù)存儲(chǔ)和計(jì)算模式,只需要按照原始數(shù)據(jù)表拼寫SQL,極大的提升了用戶使用的便利性。
極光消費(fèi)者運(yùn)營(yíng)平臺(tái):極光是面向營(yíng)銷加速場(chǎng)景的一站式研發(fā)平臺(tái),通過(guò)平臺(tái)產(chǎn)品化的方式,可以讓特色引擎能力更好賦能用戶。極光支持的特色場(chǎng)景包含超大規(guī)模標(biāo)簽交并差(百億級(jí)標(biāo)簽圈選毫秒級(jí)產(chǎn)出)、人群洞察(上千億規(guī)模秒級(jí)查詢)、秒級(jí)效果歸因(事件分析、歸因分析)、實(shí)時(shí)和百萬(wàn)級(jí)人群定向等能力。極光在營(yíng)銷數(shù)據(jù)引擎的基礎(chǔ)上提供了一站式的運(yùn)維管控、數(shù)據(jù)治理以及自助接入等能力,讓用戶使用更加便捷;極光沉淀了搜推廣常用的數(shù)據(jù)引擎模板,包含基數(shù)計(jì)算模板、報(bào)表模板、歸因模板、人群洞察模板、向量計(jì)算模板、近似計(jì)算模板、實(shí)時(shí)投放模板等,基于成熟的業(yè)務(wù)模板,讓用戶可以零成本、無(wú)代碼的使用。
根據(jù)目前的業(yè)務(wù)需求,封裝了實(shí)時(shí)數(shù)據(jù)源和存儲(chǔ)數(shù)據(jù)源使用舉例:
--- 注冊(cè)輸入表
create table if not exists source_table_name(
user_id String comment '',
click String comment '',
item_id String comment '',
behavior_time String comment ''
) with (
bizType='tt',
topic='topic',
pk='user_id',
timeColumn='behavior_time'
);
---- 創(chuàng)建輸出表
create table if not exists output_table_name (
user_id STRING
click STRING
) with (
bizType='feature',
pk='user_id'
);
實(shí)現(xiàn)實(shí)時(shí)特征算子:
concat_id:
- 含義:從輸入表輸入的記錄中,選取1個(gè)字段,按照timestamps倒序排成序列,可以配置參數(shù)按照id和timestamp去重,支持用戶取top k個(gè)數(shù)據(jù)
使用舉例:
-- 用戶最近點(diǎn)擊的50個(gè)商品id
insert into table ${output_table_name}
select nickname,
concat_id(true, item_id, behavior_time, 50) as rt_click_item_seq
from ${source_table}
group by user_id;
-- 1分鐘內(nèi)最近有特征行為用戶id列表
insert into table ${output_table_name}
select window_start(behavior_time) as time_id,
concat_id(true, user_id) as user_id_list
from ${source_table}
group by window_time(behavior_time, '1 MINUTE');
sum、avg、count:
- 含義:從輸入表輸入的記錄中,選取1個(gè)字段,對(duì)指定的時(shí)間范圍進(jìn)行求和、求平均值或計(jì)數(shù)
使用舉例
-- 每小時(shí)的點(diǎn)擊數(shù)和曝光數(shù)
insert into table ${output_table_name}
select
user_id,
window_start(behavior_time) as time_id,
sum(pv) as pv,
sum(click) as click
from ${source_table}
group by user_id,window_time(behavior_time, '1 HOUR');
總結(jié)
基于B端算法的實(shí)時(shí)特征需求,沉淀了一套基于blink sql + udf實(shí)現(xiàn)的實(shí)時(shí)特征產(chǎn)出系統(tǒng),對(duì)用戶輸入的sql進(jìn)行轉(zhuǎn)義,在Bayes平臺(tái)生成bink SQL Streaming任務(wù),產(chǎn)出實(shí)時(shí)特征數(shù)據(jù)存入iGraph當(dāng)中,沉淀了blink 寫入igraph組件,concat_id算子、聚合算子等基礎(chǔ)能力,為后續(xù)Dolphin streaming 實(shí)時(shí)特征產(chǎn)出系統(tǒng)打下了基礎(chǔ),支持后續(xù)多種特征算子擴(kuò)展方式,快速支持此類用戶需求。
3. 關(guān)鍵詞批量同步
業(yè)務(wù)背景
每天有很多商家通過(guò)不同渠道加入直通車;而在對(duì)新客承接方面存在比較大的空間。另一方面,對(duì)于系統(tǒng)的存量客戶的低活部分也有較大的優(yōu)化空間。系統(tǒng)買詞作為新客承接、低活促活的一個(gè)重要抓手,希望通過(guò)對(duì)直通車新客和低活客戶進(jìn)行更高頻率的關(guān)鍵詞更新(天級(jí)->小時(shí)級(jí)),幫助目標(biāo)客戶的廣告嘗試更多關(guān)鍵詞,存優(yōu)汰劣,達(dá)到促活的目標(biāo)。
基于此需求,我們?cè)诂F(xiàn)有天級(jí)別離線鏈路的基礎(chǔ)上補(bǔ)充小時(shí)級(jí)的消息更新鏈路,用來(lái)支持標(biāo)準(zhǔn)計(jì)劃下各詞包、以及智能計(jì)劃的系統(tǒng)詞更新,每小時(shí)消息更新量在千萬(wàn)量級(jí),使用Blink將全量ODPS請(qǐng)求參數(shù)調(diào)用faas的函數(shù)服務(wù),將每條請(qǐng)求的結(jié)果寫入到ODPS的輸出表中。更新頻率在兩個(gè)小時(shí),更新時(shí)間:早8點(diǎn)到晚22點(diǎn),單次增刪規(guī)模:增500W/刪500W。
主要挑戰(zhàn)
- blink批處理作業(yè)需要進(jìn)行小時(shí)級(jí)調(diào)度
- faas函數(shù)調(diào)用需要限流
解決方案
- 使用Blink UDF實(shí)現(xiàn)對(duì)request請(qǐng)求調(diào)用HSF的函數(shù)服務(wù)功能
- blink UDF使用RateLimiter進(jìn)行限流,訪問(wèn)函數(shù)服務(wù)的QPS可以嚴(yán)格被節(jié)點(diǎn)并行度進(jìn)行控制
- 在Dataworks平臺(tái)配置shell腳本,進(jìn)行Bayes平臺(tái)批計(jì)算任務(wù)調(diào)度
總結(jié)
基于此需求,使用blink sql batch模式實(shí)現(xiàn)了近實(shí)時(shí)的此類更新鏈路,打通了此類批處理作業(yè)的調(diào)度模式,為后續(xù)批作業(yè)產(chǎn)品化打下了基礎(chǔ)。
四、 未來(lái)展望
基于B端算法的業(yè)務(wù),Dolphin引擎目前已經(jīng)設(shè)計(jì)開(kāi)發(fā)了Dolphin streaming鏈路,用戶在極光平臺(tái)開(kāi)發(fā)實(shí)時(shí)特征變得跟在odps開(kāi)發(fā)離線表一樣簡(jiǎn)單,用戶無(wú)需了解實(shí)時(shí)數(shù)據(jù)源、底層存儲(chǔ)引擎,只需要用sql就可以查詢實(shí)時(shí)特征數(shù)據(jù)。但是B端算法業(yè)務(wù)中還有類似于本文中提到的批處理業(yè)務(wù),這些業(yè)務(wù)需要開(kāi)發(fā)blink batch sql、blink streaming batch模式、ODPS UDF和java code任務(wù),并且提供調(diào)度腳本,最后將項(xiàng)目進(jìn)行封裝提交給算法團(tuán)隊(duì)進(jìn)行使用。未來(lái)我們希望用戶能夠在極光平臺(tái)自助開(kāi)發(fā)批量計(jì)算業(yè)務(wù),降低算法同學(xué)開(kāi)發(fā)成本,提供一個(gè)可擴(kuò)展、低成本的批計(jì)算引擎能力,支持業(yè)務(wù)快速迭代,賦能業(yè)務(wù)落地快速拿到結(jié)果。
學(xué)習(xí)參考
- 對(duì)flink比較感興趣或者是初步接觸flink的同學(xué)可以參考以下內(nèi)容進(jìn)行一個(gè)初步學(xué)習(xí):
- Flink官方博客:https://flink.apache.org/blog/
- Flink Architecture:https://flink.apache.org/flink-architecture.html
- Flink技術(shù)專欄:https://blog.csdn.net/yanghua_kobe/category_6170573.html
- Flink源碼分析:https://medium.com/@wangwei09310931/flink-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-streamexecutionenvironment-4c1cd9695680
- Flink基本組件和邏輯計(jì)劃:http://chenyuzhao.me/2016/12/03/Flink%E5%9F%BA%E6%9C%AC%E7%BB%84%E4%BB%B6%E5%92%8C%E9%80%BB%E8%BE%91%E8%AE%A1%E5%88%92/