一文揭秘阿里實(shí)時(shí)計(jì)算Blink核心技術(shù):如何做到唯快不破?
實(shí)時(shí)計(jì)算in阿里巴巴
1999年起,阿里從電商平臺(tái)開始不斷拓展業(yè)務(wù),在金融、支付、物流、文娛各個(gè)領(lǐng)域衍生出眾多產(chǎn)品,例如依托于淘寶、天貓為主的電商平臺(tái)、阿里媽媽廣告平臺(tái)、螞蟻金服支付寶、阿里云、大文娛等。今天的阿里它已經(jīng)不僅僅是一個(gè)電商平臺(tái),而是一個(gè)龐大的應(yīng)用生態(tài)。阿里巴巴目前是全球***的電商平臺(tái),2016財(cái)年收入達(dá)到5500億美金。在阿里平臺(tái)上有5億的用戶,相當(dāng)于中國(guó)人口的1/3,每天有近1000萬(wàn)用戶通過(guò)阿里平臺(tái)交易。
阿里儼然成為巨大的商業(yè)航母,在這艘航母之上,海量的用戶和應(yīng)用必然會(huì)產(chǎn)生大量的數(shù)據(jù)。目前,阿里巴巴的數(shù)據(jù)量級(jí)已經(jīng)達(dá)到EB級(jí)別,每天的增長(zhǎng)量達(dá)到PB級(jí)別,實(shí)時(shí)計(jì)算日常峰值處理的數(shù)據(jù)量可達(dá)到1億每秒,今年雙11更是達(dá)到了驚人的4.7億每秒。
實(shí)時(shí)計(jì)算在阿里巴巴內(nèi)部應(yīng)用廣泛。隨著新經(jīng)濟(jì)體的出現(xiàn)與發(fā)展,技術(shù)的革新和用戶需求的提升,人們?cè)絹?lái)越需要實(shí)時(shí)計(jì)算的能力,它的***好處就是能夠基于實(shí)時(shí)變化數(shù)據(jù)更新大數(shù)據(jù)處理的狀態(tài)和結(jié)果。接下來(lái),舉兩個(gè)例子來(lái)闡釋實(shí)時(shí)計(jì)算在阿里內(nèi)部應(yīng)用的場(chǎng)景:
1.雙11大屏
每年雙11阿里都會(huì)聚合有價(jià)值的數(shù)據(jù)展現(xiàn)給媒體,GMV大屏是其中之一。整個(gè)GMV大屏是非常典型的實(shí)時(shí)計(jì)算,每條交易數(shù)據(jù)經(jīng)過(guò)聚合展現(xiàn)在大屏之上。從DataBase寫入一條數(shù)據(jù)開始,到數(shù)據(jù)實(shí)時(shí)處理寫入HBase,***展現(xiàn)在大屏之上,整個(gè)過(guò)程的鏈路十分長(zhǎng)。整個(gè)應(yīng)用存在著許多挑戰(zhàn):
1)大屏展現(xiàn)需要秒級(jí)延遲,這需要實(shí)時(shí)計(jì)算延遲在亞秒級(jí)別
2)雙11大量數(shù)據(jù)需要在一個(gè)Job中聚合完成
3)Exactly-Once 保持?jǐn)?shù)據(jù)計(jì)算的精確性
4)系統(tǒng)高可用,不存在卡頓和不可用的情況
這個(gè)應(yīng)用場(chǎng)景的SLA非常高,要求秒級(jí)延遲和數(shù)據(jù)的精確性,但它的計(jì)算并不復(fù)雜,接下來(lái)介紹更為復(fù)雜的應(yīng)用。
2.實(shí)時(shí)機(jī)器學(xué)習(xí)
機(jī)器學(xué)習(xí)一般有兩個(gè)重要的組件:Feature 和Model。傳統(tǒng)的機(jī)器學(xué)習(xí)使用批計(jì)算對(duì)Feature的采集和Model的訓(xùn)練,這樣更新頻率太低,無(wú)法適應(yīng)數(shù)據(jù)在不斷變化的應(yīng)用的需求。例如在雙11時(shí),商品的價(jià)格、活動(dòng)的規(guī)則與平時(shí)完全不同,依據(jù)之前的數(shù)據(jù)進(jìn)行訓(xùn)練得不到***的效果。因此,只有實(shí)時(shí)收集Feature并訓(xùn)練Model,才能擬合出較為滿意的結(jié)果。為此,我們開發(fā)了實(shí)時(shí)機(jī)器學(xué)習(xí)平臺(tái)。
此實(shí)時(shí)機(jī)器學(xué)習(xí)平臺(tái)主要包括兩個(gè)部分:實(shí)時(shí)Feature計(jì)算和實(shí)時(shí)Model計(jì)算。這套系統(tǒng)同樣擁有很多挑戰(zhàn),具體如下:
1)機(jī)器學(xué)習(xí)需要采集各種各樣Metrics,存在許多DataSource
2)維度多,如用戶維度、商品維度。維度的疊加甚至是笛卡兒積導(dǎo)致***的Metrics是海量的,State非常巨大
3)機(jī)器學(xué)習(xí)計(jì)算復(fù)雜,耗用大量CPU
4)某些數(shù)據(jù)不能存在State中,需要外部存儲(chǔ),存在大量外部IO
3.實(shí)時(shí)A/B Testing
用戶的Query也有可能不停變化,典型的例子有實(shí)時(shí)的A/B Testing。
算法工程師在調(diào)優(yōu)Model時(shí)會(huì)涉及多種Model,不同的Model有不同的計(jì)算模式和方法,產(chǎn)生不同的計(jì)算結(jié)果。因此,往往會(huì)有不同的Query訂閱實(shí)時(shí)數(shù)據(jù),產(chǎn)生結(jié)果后根據(jù)用戶回饋迭代Model,最終得到***模型。A/B Tesing的挑戰(zhàn)在于算法工程師往往計(jì)算很多Metrics,所有的Metrics都通過(guò)實(shí)時(shí)計(jì)算進(jìn)行統(tǒng)計(jì)會(huì)浪費(fèi)大量資源。
針對(duì)這個(gè)挑戰(zhàn),我們?cè)O(shè)計(jì)了A/B Testing的框架開發(fā)平臺(tái)。它用來(lái)同步算法工程師感興趣的Metrics進(jìn)行聚合,收集起來(lái)并發(fā)送到Druid引擎。這樣,算法工程師根據(jù)不同Job的要求清洗數(shù)據(jù)到Druid,***在Druid之上對(duì)不同的Metrics進(jìn)行統(tǒng)計(jì)分析,從而找到***的算法Model。
綜上,實(shí)時(shí)計(jì)算在阿里巴巴內(nèi)部存在如下挑戰(zhàn):
1)業(yè)務(wù)龐大,場(chǎng)景多,大量的機(jī)器學(xué)習(xí)需求,這些因素一起導(dǎo)致了計(jì)算邏輯十分復(fù)雜
2)數(shù)據(jù)量大,作業(yè)多,因此整個(gè)實(shí)時(shí)計(jì)算的機(jī)器規(guī)模十分巨大
3)要保障低延遲和數(shù)據(jù)精確性,同時(shí)要滿足高吞吐量的需求
Flink的選定及優(yōu)化
為了應(yīng)對(duì)上述挑戰(zhàn),我們調(diào)研了許多計(jì)算框架,最終選定Flink,原因如下:
1.Flink很好地引入和設(shè)計(jì)了State,基于State復(fù)雜的邏輯計(jì)算如join能得到很好的描述
2.Flink引入了Chandy-Lamport 算法,在此算法的支撐下可以***實(shí)現(xiàn)Exactly-Once,并能在低延遲下實(shí)現(xiàn)高吞吐量。
然而,F(xiàn)link在State、Chandy-Lamport 算法等方面還有很多缺陷,為此阿里開辟了名為Blink的項(xiàng)目。
Blink是開源Flink與阿里巴巴Improvement的結(jié)合,主要分兩大塊:
1.BlinkRuntime
包括存儲(chǔ)、調(diào)度和計(jì)算,不同公司在使用Flink時(shí),存儲(chǔ)、調(diào)度以及底層優(yōu)化等方面會(huì)有諸多不同,阿里巴巴的blink內(nèi)部也對(duì)Runtime做了諸多個(gè)性化的優(yōu)化,這一層不好與Apache Flink社區(qū)統(tǒng)一,我們稱之為Blink Runtime。
2.Flink SQL
原生的Flink只有比較底層的DataStream API,用戶在使用時(shí)需要設(shè)計(jì)實(shí)現(xiàn)大量的代碼,此外DataStream本身也有設(shè)計(jì)上的缺陷。為了方便用戶使用,阿里巴巴團(tuán)隊(duì)設(shè)計(jì)了流計(jì)算的Flink SQL并推回了社區(qū)。取名Flink SQL而不是Blink SQL,主要原因Blink和Flink在SQL這個(gè)用戶API上面是完全和社區(qū)統(tǒng)一的,另外Apache Flink的大部分功能都是阿里巴巴貢獻(xiàn)的,所以說(shuō)Flink SQL就是Blink SQL,沒(méi)有特別大的區(qū)別。
BlinkRuntime核心優(yōu)化解密
1.部署和模型的優(yōu)化
優(yōu)化包含以下幾點(diǎn):
1)解決大規(guī)模部署問(wèn)題。Flink中一個(gè)Cluster只有一個(gè)JobMaster來(lái)管理所有的Job。隨著Job的不斷增加,單一的Master無(wú)法承接更多的Job,產(chǎn)生了瓶頸。因此,我們重構(gòu)了架構(gòu),使每一個(gè)Job擁有自己的Master。
2)早期的Flink中TaskManager管理很多Task,某一個(gè)Task的問(wèn)題會(huì)導(dǎo)致TaskManager崩潰,進(jìn)而影響其他Job。我們使每一個(gè)Job擁有自己的TaskManager,增強(qiáng)了Job的隔離。
3)引入ResourceManager。ResourceManager可以和JobMaster通訊,實(shí)時(shí)動(dòng)態(tài)地調(diào)整資源,達(dá)到***的集群部署。
4)我們不僅將這些優(yōu)化應(yīng)用在YarnCluster上,還應(yīng)用到Mesos和Standalone的部署上。
有了這些工作,F(xiàn)link就可以應(yīng)用到大規(guī)模的集群部署。
2.Incremental Checkpoint
實(shí)時(shí)計(jì)算需要不停的在checkpoint的時(shí)候來(lái)保留計(jì)算狀態(tài)。早期的Flink的checkpoint的設(shè)計(jì)存在缺陷,在每個(gè)checkpoint發(fā)生的時(shí)候,它會(huì)讀取所有舊的狀態(tài)數(shù)據(jù),和新的數(shù)據(jù)合并后按照全量的方式寫入磁盤。隨著State的不斷增大,在每次做checkpoint的時(shí)候所需要的數(shù)據(jù)讀取和寫入的量級(jí)是十分巨大。 這就導(dǎo)致Job的checkpoint的間隔需要設(shè)置的很大,不能小于1分鐘。越大的checkpoint的間隔, failover的時(shí)候回退的計(jì)算就越大,造成的數(shù)據(jù)延遲也就越嚴(yán)重。
為了減少checkpoint間隔,我們提出了Incremental Checkpoint的設(shè)計(jì)。概括的說(shuō)就是在checkpoint的時(shí)候只存儲(chǔ)增量的state變化的數(shù)據(jù)。由于歷史上每個(gè)checkpoint的數(shù)據(jù)都已經(jīng)保存,后面的checkpoint只需要將不同的數(shù)據(jù)放入存儲(chǔ),這樣每次做checkpoint需要更新的數(shù)據(jù)量就非常小,使得checkpoint可以在若干秒級(jí)內(nèi)完成,這就大大減小了failover時(shí)可能引起的延遲。
3.異步IO
很多時(shí)候我們不得不將數(shù)據(jù)放在外部存儲(chǔ)中,這樣在計(jì)算過(guò)程中就需要通過(guò)網(wǎng)絡(luò)IO讀取數(shù)據(jù)。傳統(tǒng)的方式使用 Sync-IO的讀取方式,在發(fā)出數(shù)據(jù)請(qǐng)求之后,只有等待到結(jié)果返回之后才能開始下一個(gè)數(shù)據(jù)請(qǐng)求,這種做法造成了CPU資源的浪費(fèi),因?yàn)镃PU在大多數(shù)情況下都在等待網(wǎng)絡(luò)IO的請(qǐng)求返回。Sync-IO使得CPU的資源利用率無(wú)法提高到***,也就大大影響了單位CPU下的計(jì)算吞吐。為此提升計(jì)算吞吐,我們?cè)O(shè)計(jì)了Async-IO的數(shù)據(jù)讀取框架,它允許異步地多線程地讀取數(shù)據(jù)。
每次數(shù)據(jù)請(qǐng)求發(fā)出后不需要等待數(shù)據(jù)返回就繼續(xù)發(fā)送下一個(gè)數(shù)據(jù)請(qǐng)求。當(dāng)數(shù)據(jù)請(qǐng)求從外部存儲(chǔ)返回后,計(jì)算系統(tǒng)會(huì)調(diào)用callback方法處理數(shù)據(jù)。如果數(shù)據(jù)計(jì)算不需要保序,數(shù)據(jù)返回之后就會(huì)快速經(jīng)過(guò)計(jì)算發(fā)出。如果用戶需要數(shù)據(jù)的計(jì)算保序時(shí),我們使用buffer暫時(shí)保存先到的數(shù)據(jù),等前部數(shù)據(jù)全部到達(dá)后再批量地發(fā)送。在使用了Async-IO之后,根據(jù)設(shè)置的buffer大小不同計(jì)算吞吐可以提升幾十倍甚至幾百倍,這就極大地提升了單位CPU利用率和整體的計(jì)算性能。
值得一提的是,以上所述的所有Blink Runtime優(yōu)化已經(jīng)全部貢獻(xiàn)給了Apache Flink社區(qū)。
Flink SQL核心功能解密
1.阿里完成Apache Flink SQL 80%研發(fā)工作
目前,Apache Flink SQL 80%的功能是阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)貢獻(xiàn)的,包括兩百個(gè)提交和近十萬(wàn)行代碼。使用Flink SQL的原因是因?yàn)槲覀儼l(fā)現(xiàn)了底層API給用戶的遷移、上線帶來(lái)的極大不便。那么,我們又為什么選擇SQL?主要原因如下:
1)SQL是十分通用的描述性語(yǔ)言,SQL適合用來(lái)讓用戶十分方便的描述Job的需求。
2)SQL擁有比較好的優(yōu)化框架,使得用戶只需要專注于業(yè)務(wù)邏輯得設(shè)計(jì)而不用關(guān)心狀態(tài)管理,性能優(yōu)化等等復(fù)雜得設(shè)計(jì),這樣就大大降低了使用門檻。
3)SQL易懂,適合不同領(lǐng)域的人使用。使用SQL的用戶往往都不需要特別多的計(jì)算機(jī)編程基礎(chǔ),從產(chǎn)品設(shè)計(jì)到產(chǎn)品開發(fā)各種人員都可以快速掌握SQL的使用方法。
4)SQL的API十分穩(wěn)定,在做機(jī)構(gòu)升級(jí),甚至更換計(jì)算引擎時(shí)都不用修改用戶的Job而繼續(xù)使用。
5)有些應(yīng)用場(chǎng)景需要流式更新,批式驗(yàn)證。使用SQL可以統(tǒng)一批計(jì)算和流計(jì)算的查詢query。真正實(shí)現(xiàn)一個(gè)Query,同樣的結(jié)果。
2.流處理 VS 批處理
要想設(shè)計(jì)和批處理統(tǒng)一的流計(jì)算SQL,就要了解流處理和批處理的區(qū)別。兩者的核心區(qū)別在于流處理的數(shù)據(jù)是無(wú)窮的而批處理的數(shù)據(jù)是有限的。這個(gè)本質(zhì)區(qū)別又引入另外三個(gè)更具體的區(qū)別:
1)流處理會(huì)不斷產(chǎn)生結(jié)果而不會(huì)結(jié)束,批處理往往只返回一個(gè)最終結(jié)果并且結(jié)束。比方說(shuō),如果要統(tǒng)計(jì)雙11的交易金額,使用批處理計(jì)算就要在雙11當(dāng)天的所有交易結(jié)束后,再開始計(jì)算所有買家花費(fèi)的總金額并得到一個(gè)最終數(shù)值。而流處理需要追蹤實(shí)時(shí)的交易金額,實(shí)時(shí)的計(jì)算并更新結(jié)果。
2)流計(jì)算需要做checkpoint并保留狀態(tài),這樣在failover的時(shí)候能夠快速續(xù)跑。而批計(jì)算由于它的輸入數(shù)據(jù)往往是被持久化存儲(chǔ)過(guò)的,因此往往不需要保留狀態(tài)。
3)流數(shù)據(jù)會(huì)不斷更新,例如某一買家的花費(fèi)總金額在不斷變化,而批處理的數(shù)據(jù)是一天花費(fèi)的總金額,是固定的,不會(huì)變化的。流數(shù)據(jù)處理是對(duì)最終結(jié)果的一個(gè)提前觀測(cè),往往需要把提前計(jì)算的結(jié)果撤回(Retraction)做更改而批計(jì)算則不會(huì)。
3.Query Configuration
上面提到的這些區(qū)別都不涉及用戶的業(yè)務(wù)邏輯,也就是說(shuō)這些區(qū)別不會(huì)反應(yīng)在SQL的不同。我們認(rèn)為這些區(qū)別只是一個(gè)job的屬性不同。為了描述流計(jì)算所特有的一些屬性,例如什么時(shí)候產(chǎn)生流計(jì)算結(jié)果和怎么保留狀態(tài),我們?cè)O(shè)計(jì)容許用戶配置的Query Configuration,它主要包括兩個(gè)部分:
Latency SLA
定義了從數(shù)據(jù)產(chǎn)生到展現(xiàn)的延遲,如雙11大屏是秒級(jí)別。用戶根據(jù)自己的需要配置不同SLA,我們的SQL系統(tǒng)會(huì)根據(jù)SLA的要求做***的優(yōu)化,使得在滿足用戶需求的同時(shí)達(dá)到系統(tǒng)性能的***。
State Retention/TTL
流計(jì)算是永不停止的,但是流數(shù)據(jù)中的State往往不需要保留很久,保留過(guò)久勢(shì)必對(duì)存儲(chǔ)是個(gè)浪費(fèi),也極大的影響了性能。所以我們?nèi)菰S用戶設(shè)置合理的TTL(過(guò)期時(shí)間)來(lái)獲得更好的計(jì)算性能。
我們通過(guò)Query Configuration描述了流和批所不同的一些屬性。接下來(lái)我們需要繼續(xù)考慮如何設(shè)計(jì)流式的SQL?
4.動(dòng)態(tài)表(Dynamic-Table)
問(wèn)題關(guān)鍵在于SQL在批處理中對(duì)表操作而流數(shù)據(jù)中并沒(méi)有表。因此,我們創(chuàng)建了數(shù)據(jù)會(huì)隨著時(shí)間變化的動(dòng)態(tài)表。動(dòng)態(tài)表是流的另一種表現(xiàn)形式,它們之間具有對(duì)偶性,即它們可以互相轉(zhuǎn)換而不破壞數(shù)據(jù)的一致性。以下是一個(gè)例子:
如圖,左邊是輸入流,我們?yōu)槊恳粭l數(shù)據(jù)產(chǎn)生Dynamic-Table,再將Table的變化用Changelog發(fā)送出去。這樣兩次變化后,輸入流和輸出流中的數(shù)據(jù)始終保持一致,這就證明了引入Dynamic-Table并沒(méi)有丟失語(yǔ)義和數(shù)據(jù)。
有了動(dòng)態(tài)表的概念,我們就可以應(yīng)用傳統(tǒng)SQL作用于流上。值得一提的是,Dynamic-Table是虛擬的存在著,它并不需要實(shí)際的存儲(chǔ)來(lái)落地。我們?cè)賮?lái)看一個(gè)例子:
如圖,當(dāng)有輸入流的時(shí)候我們進(jìn)行連續(xù)查詢。我們將Stream理解為一個(gè)Dynamic-Table,動(dòng)態(tài)查詢是基于Dynamic-Table產(chǎn)生一個(gè)新的Dynamic-Table,如果需要新產(chǎn)生的Dynamic-Table還可以繼續(xù)產(chǎn)生流。這里,因?yàn)榧尤肓诉B續(xù)查詢的聚合計(jì)算,左右兩邊的流已經(jīng)發(fā)生了變換??傊畡?dòng)態(tài)表的引入提供了我們?cè)诹魃献鲞B續(xù)SQL查詢的能力。
5.Stream SQL是沒(méi)有必要存在的
通過(guò)上面的討論,我們發(fā)現(xiàn)有了Dynamic-Table之后我們不需要再創(chuàng)造任何新的流式SQL的語(yǔ)義。因此我們得出這樣的結(jié)論:流式SQL是沒(méi)必要存在的。ANSI SQL完全可以描述Stream SQL的語(yǔ)義,保持ANSI SQL的標(biāo)準(zhǔn)語(yǔ)義是我們構(gòu)建Flink SQL的一個(gè)基本原則。
6.ANSI SQL功能實(shí)現(xiàn)
基于上面的理論基礎(chǔ),我們繼而實(shí)現(xiàn)了流計(jì)算所需要的若干ANSI SQL功能,包括:DML、DDL、UDF/UDTF/UDAF、連接Join、撤回(Retraction)、Window聚合等等, 除了這些功能之外,我們還做了大量的查詢優(yōu)化,從而保障了Flink SQL即能滿足用戶的各種查詢的需求,同時(shí)兼具優(yōu)異的查詢性能。接下來(lái),簡(jiǎn)要介紹其中幾項(xiàng):
1)JOIN
流和動(dòng)態(tài)表具有對(duì)偶性,一條SQL看似是Table的join,事實(shí)上是流的join。
例如Inner Join的實(shí)現(xiàn)原理如下:數(shù)據(jù)會(huì)從輸入的兩邊任意一條流而來(lái),一邊數(shù)據(jù)先來(lái)會(huì)被存在State中并按照J(rèn)oining key查詢另外一邊的State,如果存在就會(huì)輸出結(jié)果,不存在則不輸出,直到對(duì)面數(shù)據(jù)來(lái)了之后才產(chǎn)生結(jié)果。
總之,兩個(gè)流具有兩個(gè)state,一邊的數(shù)據(jù)到達(dá)后存下來(lái)等待另外一邊數(shù)據(jù),全部到達(dá)后inner join產(chǎn)生結(jié)果。 除了兩條流的join之外,我們還引入了流和外部表的join。我們的機(jī)器學(xué)習(xí)平臺(tái)會(huì)把大量的數(shù)據(jù)存儲(chǔ)在HBase中,查詢HBase中的數(shù)據(jù)的操作實(shí)際上是在連接一個(gè)外部表。連接外部表往往存在兩個(gè)模式:
a)Look up方式。流數(shù)據(jù)到達(dá)時(shí)即時(shí)地查詢外部表,從而得到結(jié)果。
b)Snapshot方式。流數(shù)據(jù)到達(dá)時(shí)即時(shí)地發(fā)送snapshot的版本信息給外部存儲(chǔ)service從而查詢數(shù)據(jù),外部表存儲(chǔ)根據(jù)版本信息返回結(jié)果。
值得一提的是,我們?cè)O(shè)計(jì)的這個(gè)流和外部表關(guān)聯(lián)的這個(gè)功能沒(méi)有引入任何新的語(yǔ)法,是完全按照SQL-2011的標(biāo)準(zhǔn)實(shí)現(xiàn)的。同樣的查詢?cè)谂?jì)算上也適用。
2)Retraction
撤回是流計(jì)算的重要概念,舉一個(gè)例子作解釋:計(jì)算詞頻
詞頻的計(jì)算是指對(duì)所有英文單詞統(tǒng)計(jì)頻率,并最終按照頻率統(tǒng)計(jì)不同頻率下不同單詞的個(gè)數(shù)。例如,如果一個(gè)統(tǒng)計(jì)的初始狀態(tài)只有Hello World Bark三個(gè)單詞,且每個(gè)單詞只出現(xiàn)一次,那么詞頻的最終結(jié)果就是出現(xiàn)頻率為1的單詞有3個(gè)(出現(xiàn)頻率為其他次數(shù)的完全沒(méi)有),因此結(jié)果表只有一行“1——3”。當(dāng)單詞不斷更新,再增加一個(gè)Hello時(shí),因?yàn)镠ello的出現(xiàn)頻率變?yōu)?次,我們?cè)谠~頻的結(jié)果表中插入“2——1”這么一行新的數(shù)據(jù)。
顯然,出現(xiàn)兩次的單詞是一個(gè),那么“2——1”這個(gè)結(jié)果是對(duì)的,但是出現(xiàn)頻率為1次的單詞數(shù)已經(jīng)錯(cuò)了,應(yīng)該是2個(gè),而不是3個(gè)。出現(xiàn)這種問(wèn)題的本質(zhì)原因是因?yàn)榱饔?jì)算輸出的結(jié)果是對(duì)計(jì)算的一個(gè)提前觀測(cè),隨著數(shù)據(jù)的不斷更新,計(jì)算結(jié)果必然會(huì)發(fā)生改變,這就要求我們對(duì)之前發(fā)生的結(jié)果做撤回(retraction)再把更新的結(jié)果發(fā)出去,不然數(shù)據(jù)結(jié)果就不錯(cuò)誤。對(duì)于上面的例子,當(dāng)Hello的頻率從1變到2的時(shí)候,我們不僅需要在結(jié)果表中插入“2——1”這么一行,還需要對(duì)“1——3”這一行做撤回更新操作。
值得一提的是什么時(shí)候需要撤回,什么時(shí)候不需要,完全由SQL的Query Optimizer來(lái)判斷,這個(gè)用戶是完全不需要感知的,用戶只需要通過(guò)SQL描述他的業(yè)務(wù)計(jì)算邏輯就好了。如圖所示,***個(gè)場(chǎng)景不需要撤回而第二個(gè)需要,這完全是由優(yōu)化框架決定而非用戶 。這一點(diǎn),大大體現(xiàn)了使用SQL,并利用SQL中所擁有的天然優(yōu)化框架的好處。
3)Window聚合
Window聚合是Flink SQL的一個(gè)重要能力。圖中的這個(gè)例子我們對(duì)每一個(gè)小時(shí)的數(shù)據(jù)做聚合統(tǒng)計(jì)。除了這種Tumble window我們還支持了Sliding Window和Session Window。將來(lái)還會(huì)支持用戶自定義的window。
4)查詢優(yōu)化Query Optimization
除了添加新的功能,我們還做了大量的查詢優(yōu)化。例如micro-batching。如果沒(méi)有micro-batching,處理每一條數(shù)據(jù)就會(huì)伴隨著幾次IO讀寫。有了micro-batching之后我們可以用幾次IO處理來(lái)處理上千條數(shù)據(jù)。除此之外,我們還做了大量的的filter/join/aggregate pushdown以及TopN的優(yōu)化,下面再舉例解釋TopN的優(yōu)化:
如上圖,我們想取銷售量前三的city,對(duì)用戶的Query有兩種底層的實(shí)現(xiàn):
a)一種方式是當(dāng)沒(méi)一條數(shù)據(jù)來(lái)的時(shí)候,對(duì)保存的所有city進(jìn)行排序,再截取前三個(gè)city。這種設(shè)計(jì)每條數(shù)據(jù)跟新都會(huì)重新排列所有city,勢(shì)必會(huì)造成大量計(jì)算資源浪費(fèi)。
b)我們的Query Optimizer會(huì)自動(dòng)識(shí)別到查詢語(yǔ)句,對(duì)這種計(jì)算做優(yōu)化,真正執(zhí)行過(guò)程中只需要不停的更新排前三的city就可以了,這樣大大優(yōu)化了計(jì)算的復(fù)雜度,提升了性能
阿里巴巴實(shí)時(shí)計(jì)算應(yīng)用
基于流計(jì)算SQL之上我們開發(fā)了兩個(gè)計(jì)算平臺(tái)。
1.阿里云流計(jì)算開發(fā)平臺(tái)
一個(gè)是阿里云流計(jì)算平臺(tái)(streamCompute),該平臺(tái)允許用戶編寫SQL,并在平臺(tái)內(nèi)部調(diào)試debug。調(diào)試正確后,用戶可以通過(guò)這個(gè)平臺(tái)直接將作業(yè)發(fā)布在阿里云集群上部署,部署完成后后檢測(cè)運(yùn)維上線的。因此這個(gè)平臺(tái)整合了所有實(shí)時(shí)計(jì)算的需求,集開發(fā)、Debug、上線部署、運(yùn)維于一體,大大加速了用戶開發(fā)和上線的效率。值得一提的是,2017年雙11期間阿里集團(tuán)絕大多數(shù)的實(shí)時(shí)計(jì)算Job均通過(guò)這個(gè)平臺(tái)發(fā)布。我們今年9月開始,通過(guò)阿里云,包括公共云、專有云也將這個(gè)平臺(tái)開放給外部企業(yè),讓他們能夠使用到阿里巴巴實(shí)時(shí)計(jì)算的能力。
2.阿里實(shí)時(shí)機(jī)器學(xué)習(xí)平臺(tái)Porsche
為了方便算法同學(xué)開發(fā)機(jī)器學(xué)習(xí)任務(wù),我們基于Flink SQL以及Hbase,設(shè)計(jì)實(shí)現(xiàn)了一個(gè)面向算法人員、支持可視化自助開發(fā)運(yùn)維的在線機(jī)器學(xué)習(xí)平臺(tái)——Porsche。如上圖所示,用戶在Porsche平臺(tái)的IDE,通過(guò)可視化的方式將組件拖入畫布中,配置好組件屬性,定義好完整的計(jì)算DAG。這個(gè)DAG會(huì)被翻譯成SQL,最終提交給Blink執(zhí)行。另外,值得一提的是,Porsche平臺(tái)還支持Tensorflow,今年雙11也是大放異彩,本平臺(tái)免去了算法同學(xué)學(xué)習(xí)使用SQL的成本,暫時(shí)只對(duì)內(nèi)開放。
雙11實(shí)時(shí)計(jì)算總結(jié)
上圖是阿里巴巴實(shí)時(shí)計(jì)算架構(gòu),底層是數(shù)千規(guī)模的物理機(jī),之上是統(tǒng)一部署的Resource Management和Storage,然后是Blink Runtime和Flink SQL,用戶通過(guò)StreamCompute和Porsche平臺(tái)提交Job,現(xiàn)在已經(jīng)在阿里內(nèi)部支持了數(shù)百個(gè)工程師近千個(gè)Flink SQL Job。上述就是阿里巴巴實(shí)時(shí)計(jì)算的現(xiàn)狀。
在實(shí)時(shí)計(jì)算的助力下,阿里雙11拿到1682億的輝煌戰(zhàn)果,實(shí)時(shí)計(jì)算的貢獻(xiàn)主要體現(xiàn)在以下幾點(diǎn):
1.本次雙11是互聯(lián)網(wǎng)歷史***規(guī)模的并發(fā),每秒幾十萬(wàn)的交易和支付的實(shí)時(shí)聚合統(tǒng)計(jì)操作全部是是由Blink計(jì)算帶來(lái)的
2.3分01秒100億數(shù)據(jù)的展現(xiàn)不僅需要較高的Data Base的高吞吐能力,還考驗(yàn)著實(shí)時(shí)計(jì)算的速度
3.算法平臺(tái)幫助算法同學(xué)取得了很好的搜索和推薦效果,獲得了整體GMV的增長(zhǎng)
總之,實(shí)時(shí)計(jì)算不僅滿足了阿里巴巴內(nèi)部多種多樣的需求,還提升了GMV。我們希望通過(guò)阿里云實(shí)時(shí)計(jì)算平臺(tái)(StreamCompute)把Blink實(shí)時(shí)計(jì)算能力輸出給阿里之外的所有企業(yè),讓他們能從中獲益。
大沙,阿里巴巴高級(jí)技術(shù)專家,負(fù)責(zé)實(shí)時(shí)計(jì)算Flink SQL,之前在美國(guó)臉書任職,Apache Flink committer。
【本文為51CTO專欄作者“阿里巴巴官方技術(shù)”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】