攜程基于Storm的實時大數(shù)據(jù)平臺實踐
本文講解了攜程在實時數(shù)據(jù)平臺的一些實踐,按照時間順序來說明我們是怎么一步一步構(gòu)建起這個實時數(shù)據(jù)平臺的,目前有一些什么新的嘗試,未來的方向是怎么樣的,希望對需要構(gòu)建實時數(shù)據(jù)平臺的公司和同學(xué)有所借鑒。
為什么要做實時數(shù)據(jù)平臺
首先先介紹一下背景,為什么我們要做這個數(shù)據(jù)平臺?其實了解攜程的業(yè)務(wù)的話,就會知道攜程的業(yè)務(wù)部門是非常多的,除了酒店和機票兩大業(yè)務(wù)之外,有近20個SBU和公共部門,他們的業(yè)務(wù)形態(tài)差異較大,變化也快,原來那種Batch形式的數(shù)據(jù)處理方式已經(jīng)很難滿足各個業(yè)務(wù)數(shù)據(jù)獲取和分析的需要,他們需要更為實時地分析和處理數(shù)據(jù)。
其實在這個統(tǒng)一的實時平臺之前,各個部門自己也做一些實時數(shù)據(jù)分析的應(yīng)用,但是其中存在很多的問題:
首先是技術(shù)選型五花八門,消息隊列有用ActiveMQ的,有用RabbitMQ的,也有用Kafka的,分析平臺有用Storm的,有用Spark-streaming的,也有自己寫程序處理的;由于業(yè)務(wù)部門技術(shù)力量參差不齊,并且他們的主要精力還是放在業(yè)務(wù)需求的實現(xiàn)上,所以這些實時數(shù)據(jù)應(yīng)用的穩(wěn)定性往往難以保證。
其次就是缺少周邊設(shè)施,比如說像報警、監(jiān)控這些東西。
***就是數(shù)據(jù)和信息的共享不順暢,如果度假要使用酒店的實時數(shù)據(jù),兩者分析處理的系統(tǒng)不同就會很難弄。所以在這樣前提下,就需要打造一個統(tǒng)一的實時數(shù)據(jù)平臺。
需要怎樣的實時數(shù)據(jù)平臺
這個統(tǒng)一的數(shù)據(jù)平臺需要滿足4個需求:
- 首先是穩(wěn)定性,穩(wěn)定性是任何平臺和系統(tǒng)的生命線;
- 其次是完整的配套設(shè)施,包括測試環(huán)境,上線、監(jiān)控和報警;
- 再次是方便信息共享,信息共享有兩個層面的含義,1、是數(shù)據(jù)的共享;2、是應(yīng)用場景也可以共享,比如說一個部門會受到另一個部門的一個實時分析場景的啟發(fā),在自己的業(yè)務(wù)領(lǐng)域內(nèi)也可以做一些類似的應(yīng)用;
- ***服務(wù)響應(yīng)的及時性,用戶在開發(fā)、測試、上線及維護整個過程都會遇到各種各樣的問題,都需要得到及時的幫助和支持。
如何實現(xiàn)
在明確了這些需求之后我們就開始構(gòu)建這個平臺,當(dāng)然***步面臨的肯定是一個技術(shù)選型的問題。消息隊列這邊Kafka已經(jīng)成為了一個既定的事實標(biāo)準(zhǔn);但是在實時處理平臺的選擇上還是有蠻多候選的系統(tǒng),如Linkedin的Samza, apache的S4,最主流的當(dāng)然是Storm和Spark-streaming啦。
出于穩(wěn)定和成熟度的考量,當(dāng)時我們***是選擇了Storm作為實時平臺。如果現(xiàn)在讓我重新再來看的話,我覺得Spark-streaming和Storm都是可以的,因為這兩個平臺現(xiàn)在都已經(jīng)比較成熟了。
架構(gòu)圖的話就比較簡單,就是從一些業(yè)務(wù)的服務(wù)器上去收集這個日志,或者是一些業(yè)務(wù)數(shù)據(jù),然后實時地寫入Kafka里面,Storm作業(yè)從Kafka讀取數(shù)據(jù),進行計算,把計算結(jié)果吐到各個業(yè)務(wù)線依賴的外部存儲中。
那我們僅僅構(gòu)建這些就夠了嗎?當(dāng)然是遠遠不夠的,因為這樣僅僅是一些運維的東西,你只是把一個系統(tǒng)的各個模塊搭建起來。
前面提到的平臺的兩個最關(guān)鍵的需求:數(shù)據(jù)共享和平臺整體的穩(wěn)定性很難得到保證,我們需要做系統(tǒng)治理來滿足這兩個平臺的關(guān)鍵需求。
首先說說數(shù)據(jù)共享的問題,我們通常認(rèn)為就是數(shù)據(jù)共享的前提是指用戶要清晰的知道使用數(shù)據(jù)源的那個業(yè)務(wù)含義和其中數(shù)據(jù)的Schema,用戶在一個集中的地方能夠非常簡單地看到這些信息;我們解決的方式是使用Avro的方式定義數(shù)據(jù)的Schema,并將這些信息放在一個統(tǒng)一的Portal站點上;數(shù)據(jù)的生產(chǎn)者創(chuàng)建Topic,然后上傳Avro格式的Schema,系統(tǒng)會根據(jù)Avro的Schema生成Java類,并生成相應(yīng)的JAR,把JAR加入Maven倉庫;對于數(shù)據(jù)的使用者來說,他只需要在項目中直接加入依賴即可。
此外,我們封裝了Storm的API,幫用戶實現(xiàn)了反序列化的過程,示例代碼如下,用戶只要繼承一個類,然后制定消息對應(yīng)的類,系統(tǒng)能夠自動完成消息的反序列化,你在process方法中拿到的就是已經(jīng)反序列化好的對象,對用戶非常方便。
其次我們來說說資源控制,這個是保證平臺穩(wěn)定性的基礎(chǔ),我們知道Storm其實在資源隔離方面做得并不是太好,所以我們需要對用戶的Storm作業(yè)的并發(fā)做一些控制。我們的做法還是封裝Storm的接口,將原來設(shè)定topology和executor并發(fā)的方法去掉,而把這些設(shè)置挪到Portal中。下面是示例的代碼:
另外,我們前面已經(jīng)提到過了,我們做了一個統(tǒng)一的Portal方便用戶管理,用戶可以查看Topic相關(guān)信息,也可以用來管理自己的Storm作業(yè),配置,啟動,Rebalance,監(jiān)控等一系列功能都能夠在上面完成。
在完成了這些功能之后,我們就開始初期業(yè)務(wù)的接入了,初期業(yè)務(wù)我們只接了兩個數(shù)據(jù)源,這兩個數(shù)據(jù)源的流量都比較大,就是一個是UBT(攜程的用戶行為數(shù)據(jù)),另一個是Pprobe的數(shù)據(jù)(應(yīng)用流量日志),那基本上是攜程用行為的訪問日志。主要應(yīng)用集中在實時的數(shù)據(jù)分析和數(shù)據(jù)報表上。
在平臺搭建的初期階段,我們有一些經(jīng)驗和大家分享一下:
- 最重要的設(shè)計和規(guī)劃都需要提前做好,因為如果越晚調(diào)整的話其實付出的成本會越大的;
- 集中力量實現(xiàn)了核心功能;
- 盡早的接入業(yè)務(wù),在核心功能完成并且穩(wěn)定下來的前提下,越早接入業(yè)務(wù)越好,一個系統(tǒng)只有真正被使用起來,才能不斷進化;
- 接入的業(yè)務(wù)一定要有一定的量,因為我們最開始接入就是整個攜程的整個UBT,就是用戶行為的這個數(shù)據(jù),這樣才能比較快的幫助整個平臺穩(wěn)定下來。因為你平臺剛剛建設(shè)起來肯定是有各種各樣的問題的,就是通過大流量的驗證之后,一個是幫平臺穩(wěn)定下來,修復(fù)各種各樣的bug,第二個是說會幫我們積累技術(shù)上和運維上的經(jīng)驗。
在這個之后我們就做了一系列工作來完善這個平臺的“外圍設(shè)施”:
首先就是把Storm的日志導(dǎo)入到ES里面,通過Kanban展示出來;原生的Storm日志查看起來不方便,也沒有搜索的功能,數(shù)據(jù)導(dǎo)入ES后可以通過圖標(biāo)的形式展現(xiàn)出來,也有全文搜索的功能,排錯時非常方便。
其次就是metrics相關(guān)的一些完善;除了Storm本身Build in的metrics之外我們還增加了一些通用的埋點,如從消息到達Kafka到它開始被消費所花的時間等;另外我們還是實現(xiàn)了自定義的MetricsConsumer,它會把所有的metrics信息實時地寫到攜程自己研發(fā)的看板系統(tǒng)Dashboard和Graphite中,在Graphite中的信息會被用作告警。
第三就是我們建立了完善的告警系統(tǒng),告警基于輸出到Graphite的metrics數(shù)據(jù),用戶可以配置自己的告警規(guī)則并設(shè)置告警的優(yōu)先級,對于高優(yōu)先級的告警,系統(tǒng)會使用TTS的功能自動撥打聯(lián)系人的電話,低優(yōu)先級的告警則是發(fā)送郵件;默認(rèn)情況下,我們會幫用戶添加Failed數(shù)量和消費堵塞的默認(rèn)的告警。
第四,我們提供了適配攜程Message Queue的通用的Spout和寫入Redis,HBbase,DB的通用的Bolt,簡化用戶的開發(fā)工作。
***我們在依賴管理上也想了一些方法,方便API的升級;在muise-core(我們封裝的Storm API項目)的2.0版本,我們重新整理了相關(guān)的API接口,之后的版本盡量保證接口向下兼容,然后推動所有業(yè)務(wù)都升級一遍,之后我們把muise-core的jar包作為標(biāo)準(zhǔn)的Jar包之一放到每臺supervisor的storm安裝目錄的lib文件夾下,在之后的升級中,如果是強制升級,就聯(lián)系用戶,逐個重啟Topology,如果這次升級不需要強制推廣,等到用戶下次重啟Topology時,這個升級就會生效。
在做完這些工作之后,我們就開始大規(guī)模的業(yè)務(wù)接入了,其實目前基本上覆蓋了攜程的所有的技術(shù)團隊,應(yīng)用的類型也比初期要豐富很多。
下面給大家簡單介紹一下,在攜程的一些實時應(yīng)用;
主要分為下面四類:
- 實時數(shù)據(jù)報表;
- 實時的業(yè)務(wù)監(jiān)控;
- 基于用戶實時行為的營銷;
- 風(fēng)控和安全的應(yīng)用。
***個展示的是攜程這邊的網(wǎng)站數(shù)據(jù)監(jiān)控平臺cDataPortal,攜程會對每個網(wǎng)頁訪問的性能做一些很詳細的監(jiān)控,然后會通過各種圖表展示出來。
第二個應(yīng)用是攜程在AB Testing的應(yīng)用,其實大家知道AB Testing只有在經(jīng)過比較長的一段時間,才能得到結(jié)果,需要達到一定的量之后才會在統(tǒng)計上有顯著性;那它哪里需要實時計算呢?實時計算主要在這邊起到一個監(jiān)控和告警的作用:當(dāng)AB Testing上線之后,用戶需要一系列的實時指標(biāo)來觀察分流的效果,來確定它配置是否正確;另外需要查看對于訂單的影響,如果對訂單產(chǎn)生了較大的影響,需要能夠及時發(fā)現(xiàn)和停止。
第三個應(yīng)用是和個性化推薦相關(guān),推薦其實更多的是結(jié)合用戶的歷史偏好和實時偏好來給大家推薦一些場景。這邊實時偏好的收集其實就是通過這個實時平臺來做的。比較相似的應(yīng)用有根據(jù)用戶實時的訪問行為推送一些比較感興趣的攻略,團隊游會根據(jù)用戶的實時訪問,然后給用戶推送一些優(yōu)惠券之類的。
那些曾經(jīng)踩過的坑
在說完了實時數(shù)據(jù)平臺在攜程的應(yīng)用,讓我們簡單來聊聊這個過程中我們的一些經(jīng)驗。
首先是技術(shù)上的,先講一下我們遇到的坑吧。
我們使用的Storm版本是0.9.4,我們遇到了兩個Storm本身的BUG,當(dāng)然這兩個bug是比較偶發(fā)性的,大家可以看一下,如果遇到相應(yīng)的問題的話,可以參考一下:
storm-763:Nimbus已經(jīng)將worker分配到其他的節(jié)點,但是其他worker的netty客戶端不連接新的worker;
應(yīng)急處理:Kill掉這個worker的進程或是重啟相關(guān)的作業(yè)。
storm-643:當(dāng)failed list不為空時,并且一些offset已經(jīng)超出了Range范圍,KafkaUtils會不斷重復(fù)地去取相關(guān)的message;
另外就是在用戶使用過程中的一些問題,比如說如果可能,我們一般會推薦用戶使用localOrShuffleGrouping,在使用它時,上下游的Bolt數(shù)要匹配,否則會出現(xiàn)下游的大多數(shù)Bolt沒有收到數(shù)據(jù)的情況,另外就是用戶要保證Bolt中的成員變量都要是可序列化的,否則在集群上運行時就會報錯。
然后就是關(guān)于支持和團隊的經(jīng)驗,首先在大量接入前其告警和監(jiān)控設(shè)施是必須的,這兩個系統(tǒng)是大量接入的前提,否則難以在遇到非常問題時及時發(fā)現(xiàn)或是快速定位解決。
第二就是說清晰的說明、指南和Q&A能夠節(jié)約很多支持的時間。用戶在開發(fā)之前,你只要提供這個文檔給他看,然后有問題再來咨詢。
第三就是要把握一個接入節(jié)奏,因為我們整個平臺的開發(fā)人員比較少,也就三個到四個同學(xué),雖然已經(jīng)全員客服了去應(yīng)對各個BU的各種各樣的問題,但是如果同時接入太多項目的話還會忙不過來;另外支持還有重要的一點就是“授人以漁”,在支持的時候給他們講得很細吧,讓他們了解Kafka和Storm的基本知識,這樣的話有一些簡單問題他們可以內(nèi)部消化,不用所有的問題都來找你的團隊支持。
新的探索
前面講的是我們基本上去年的工作,今年我們在兩個方向上做了一些新的嘗試:Streaming CQL和JStorm,和大家分享下這兩個方面的進展:
Streaming CQL是華為開元的一個實時流處理的SQL引擎,它的原理就是把SQL直接轉(zhuǎn)化成為Storm的Topology,然后提交到Storm集群中。它的語法和標(biāo)準(zhǔn)的SQL很接近,只是增加了一些窗口函數(shù)來應(yīng)對實時處理的場景。
下面我通過一個簡單的例子給大家展示一個簡單的例子,給大家有個直觀的感受。我的例子是
從kafka中讀取數(shù)據(jù),類型為ubt_action;
取出其中的page,type,action,category等字段然后每五秒鐘按照page, type字段做一次聚合;
***把結(jié)果寫到console中。
如果需要用Storm實現(xiàn)的話,一般你需要實現(xiàn)4個類和一個main方法;使用Streaming CQL的話你只需要定義輸入的Stream和輸出的Stream,使用一句SQL就能實現(xiàn)業(yè)務(wù)邏輯,非常簡單和清晰。
那我們在華為開源的基礎(chǔ)上也做了一些工作:
- 增加Redis,Hbase,Hive(小表,加載內(nèi)存)作為Data Source;
- 增加Hbase,MySQL / SQL Server,Redis作為數(shù)據(jù)輸出的Sink;
- 修正MultiInsert語句解析錯誤,并反饋到社區(qū);
- 為where語句增加了In的功能;
- 支持從攜程的消息隊列Hermes中讀取數(shù)據(jù)。
Streaming CQL***的優(yōu)勢就是能夠使不會寫Java的BI的同事,非常方便地實現(xiàn)一些邏輯簡單的實時報表和應(yīng)用,比如下面說到的一個度假的例子基本上70行左右就完成了,原來開發(fā)和測試的時間要一周左右,現(xiàn)在一天就可以完整,提高了他們的開發(fā)效率。
【案例】
度假BU需要實時地統(tǒng)計每個用戶訪問“自由行”、“跟團游”、“半自助游”產(chǎn)品的占比,進一步豐富用戶畫像的數(shù)據(jù):
- 數(shù)據(jù)流:UBT的數(shù)據(jù);
- Data Source:使用Hive中的product的維度表;
- 輸出:Hbase。
今年我們嘗試的第二個方向就是Jstorm,Storm的內(nèi)核使用Clojure編寫,這給后續(xù)深入的研究和維護帶來了一定的困難,而Jstorm是阿里開源的項目,它完全兼容storm的編程模型,內(nèi)核全部使用Java來編寫,這就方便了后續(xù)的研究和深入地調(diào)研;阿里的Jstorm團隊非常Open,也非常專業(yè)化,我們一起合作解決了一些在使用上遇到的問題;除了內(nèi)核使用Java編寫這個優(yōu)勢之外,Jstorm對比storm在性能上也有一定的優(yōu)勢,此外它還提供了資源隔離和類似于Heron之類的反壓力機制,所以能夠更好的處理消息擁塞的這種情況。
我們現(xiàn)在基本上已經(jīng)把三分之一的storm應(yīng)用已經(jīng)遷到Jstorm上了,我們使用的版本是2.1;在使用過程中有一些經(jīng)驗跟大家分享一下:
***點是我們在與kafka集成中遇到的一些問題,這些在新版本中已經(jīng)修復(fù)了:
在Jstorm中,Spout的實現(xiàn)有兩種不同的方式:Multi Thread(nextTuple,ack & fail方法在不同的進程中調(diào)用)和Single Thread,原生的Storm的Kafka Spout需要使用Single Thread的方式運行;
修復(fù)了Single Thread模式的1個問題(新版本已經(jīng)修復(fù))。
第二點是Jstorm的metrics機制和storm的機制完全不兼容,所以相關(guān)的代碼都需要重寫,主要包括適配了Kafka Spout和我們Storm的API中的Metrics和使用MetricsUploader的功能實現(xiàn)了數(shù)據(jù)寫入Dashboard和Graphite的功能這兩點,此外我們結(jié)合了兩者的API提供了一個統(tǒng)一的接口,能兼容兩個環(huán)境,方便用戶記錄自定義的metrics。
以上就是我要分享的內(nèi)容,在結(jié)尾處,我簡單總結(jié)一下我們的整體架構(gòu):
底層是消息隊列和實時處理系統(tǒng)的開源框架,也包括攜程的一些監(jiān)控和運維的工具,第二層就是API和服務(wù),而最上面通過Portal的形式講所有的功能提供給用戶。
未來方向
在分享的***,我來和大家聊聊實時數(shù)據(jù)平臺未來的發(fā)展方向,主要有兩個:
繼續(xù)推動平臺整體向Jstorm遷移,當(dāng)然我們也會調(diào)研下剛剛開源的Twitter的Heron,與Jstorm做一個對比;
對于dataflow模型的調(diào)研和落地,去年google發(fā)表了dataflow相關(guān)的論文(強烈建議大家讀讀論文或是相應(yīng)的介紹文章),它是新一代實時處理的模型,能在保證實時性的同時又能保證數(shù)據(jù)的正確性,目前開源的實現(xiàn)有兩個:Spark 2.0中Structured Streaming和Apache的另一個開源項目BEAM,BEAM實現(xiàn)了Google Dataflow的API,并且在Spark和Flink上實現(xiàn)了相應(yīng)的Executor。
下半年我們還會做一些調(diào)研和探索性的嘗試,并尋找合適的落地場景。
作者介紹:張翼,攜程大數(shù)據(jù)平臺負(fù)責(zé)人,浙江大學(xué)碩士畢業(yè),2015年初加入攜程,主導(dǎo)了攜程實時數(shù)據(jù)計算平臺的建設(shè),以及攜程大數(shù)據(jù)平臺整合和平臺技術(shù)的演進。進入互聯(lián)網(wǎng)行業(yè)近10年,從事大數(shù)據(jù)平臺和架構(gòu)的工作超過6年。