58同城實時計算平臺架構(gòu)實踐
導(dǎo)語
本文主要介紹58同城實時計算平臺技術(shù)演進,以及基于Flink打造的一站式實時計算平臺Wstream,涵蓋很多實踐經(jīng)驗、干貨和方法論,希望對您有所幫助。
背景
58同城作為覆蓋生活全領(lǐng)域的服務(wù)平臺,業(yè)務(wù)覆蓋招聘、房產(chǎn)、汽車、金融、二手及本地服務(wù)等各個方面。 豐富的業(yè)務(wù)線和龐大的用戶數(shù)每天產(chǎn)生海量用戶數(shù)據(jù)需要實時化的計算分析,實時計算平臺定位于為集團海量數(shù)據(jù)提供高效、穩(wěn)定、分布式實時計算的基礎(chǔ)服務(wù)。 本文主要介紹58同城基于Flink打造的一站式實時計算平臺Wstream。
實時計算場景
和很多互聯(lián)網(wǎng)公司一樣,實時計算在58擁有豐富的場景需求,主要包括以下幾類:
1.實時數(shù)據(jù)ETL
實時消費Kafka數(shù)據(jù)進行清洗、轉(zhuǎn)換、結(jié)構(gòu)化處理用于下游計算處理。
2.實時數(shù)倉
實時化數(shù)據(jù)計算,倉庫模型加工和存儲。 實時分析業(yè)務(wù)及用戶各類指標,讓運營更加實時化。
3.實時監(jiān)控
對系統(tǒng)和用戶行為進行實時檢測和分析,如業(yè)務(wù)指標實時監(jiān)控,運維線上穩(wěn)定性監(jiān)控,金融 風控等。
4.實時分析
特征平臺,用戶畫像,實時個性化推薦等。
平臺演進
在實時計算平臺建設(shè)過程中,主要是跟進開源社區(qū)發(fā)展以及實際業(yè)務(wù)需求,計算框架經(jīng)歷了Storm到 Spark Streaming到 Flink的發(fā)展,同時建設(shè)一站式實時計算平臺,旨在提升用戶實時計算需求開發(fā)上線管理監(jiān)控效率,優(yōu)化平臺管理。
實時計算引擎前期基于Storm和Spark Streaming構(gòu)建,很多情況下并不能很好的滿足業(yè)務(wù)需求,如商業(yè)部門基于Spark Streaming構(gòu)建的特征平臺希望將計算延遲由分鐘級降低到秒級,提升用戶體驗,運維監(jiān)控平臺基于Storm分析公司全量nginx日志對線上業(yè)務(wù)進行監(jiān)控,需要秒級甚至毫秒級別的延遲,Storm的吞吐能力成為瓶頸。 同時隨著實時需求不斷增加,場景更加豐富,在追求任務(wù)高吞吐低延遲的基礎(chǔ)上,對計算過程中間狀態(tài)管理,靈活窗口支持,以及exactly once語義保障的訴求越來越多。 Apache Flink開源之后,支持高吞吐低延遲的架構(gòu)設(shè)計以及高可用的穩(wěn)定性,同時擁有實時計算場景一系列特性以及支持實時Sql模型,使我們決定采用 Flink作為新一代實時計算平臺的計算引擎。
平臺規(guī)模
實時計算平臺當前主要基于Storm/Spark Streaming/Flink,集群共計500多臺機器,每天處理數(shù)據(jù)量6000億+,其中Flink經(jīng)過近一年的建設(shè),任務(wù)占比已經(jīng)達到50% 。
Flink穩(wěn)定性
Flink作為實時計算集群,可用性要求遠高于離線計算集群。 為保障集群可用性,平臺主要采用任務(wù)隔離以及高可用集群架構(gòu)保障穩(wěn)定性。
任務(wù)隔離
在應(yīng)用層面主要基于業(yè)務(wù)線以及場景進行機器隔離,隊列資源分配管理,避免集群抖動造成全局影響。
集群架構(gòu)
Flink集群采用了ON YARN模式獨立部署,為減少集群維護工作量,底層HDFS利用公司統(tǒng)一HDFS Federation架構(gòu)下建立獨立的namespace,減少Flink任務(wù)在checkpoint采用hdfs/rocksdb作為狀態(tài)存儲后端場景下由于hdfs抖動出現(xiàn)頻繁異常失敗。 在資源隔離層面,引入Node Label機制實現(xiàn)重要任務(wù)運行在獨立機器,不同計算性質(zhì)任務(wù)運行在合適的機器下,最大化機器資源的利用率。 同時在YARN資源隔離基礎(chǔ)上增加Cgroup進行物理cpu隔離,減少任務(wù)間搶占影響,保障任務(wù)運行穩(wěn)定性。
平臺化管理
Wstream是一套基于Apache Flink構(gòu)建的一站式、高性能實時大數(shù)據(jù)處理平臺。 提供SQL化流式數(shù)據(jù)分析能力,大幅降低數(shù)據(jù)實時分析門檻,支持通過DDL實現(xiàn)source/sink以及維表,支持UDF/UDAF/UDTF,為用戶提供更強大的數(shù)據(jù)實時處理能力。 支持多樣式應(yīng)用構(gòu)建方式FlinkJar/Stream SQL/Flink-Storm,以滿足不同用戶的開發(fā)需求,同時通過調(diào)試,監(jiān)控,診斷,探查結(jié)果等輔助手段完善任務(wù)生命周期管理。
流式sql能力建設(shè)
Stream SQL是平臺為了打造sql化實時計算能力,減小實時計算開發(fā)門檻,基于開源的 Flink,對底層sql模塊進行擴展實現(xiàn)以 下功能
1.支持自定義DDL語法(包括源表,輸出表,維表)
2.支持自定義UDF/UDTF/UDAF語法
3.實現(xiàn)了流與維表的join,雙流join
在支持大數(shù)據(jù)開源組件的同時,也打通了公司主流的實時存儲平臺。 同時為用戶提供基于Sql client的cli方式以及在Wstream集成了對實時sql能力的支持,為用戶提供在線開發(fā)調(diào)試sql任務(wù)的編輯器,同時支持代碼高亮,智能提示,語法校驗及運行時校驗,盡可能避免用戶提交到集群的任務(wù)出現(xiàn)異常。 另外也為用戶提供了向?qū)Щ渲梅绞剑鉀Q用戶定義table需要了解復(fù)雜的參數(shù)設(shè)置,用戶只需關(guān)心業(yè)務(wù)邏輯處理,像開發(fā)離線Hive一樣使用sql開發(fā)實時任務(wù)。
Storm任務(wù)遷移Flink
在完善Flink平臺建設(shè)的同時,我們也啟動Storm任務(wù)遷移Flink計劃,旨在提升實時計算平臺整體效率,減少機器成本和運維成本。 Flink-Storm作為官方提供Flink兼容Storm程序為我們實現(xiàn)無縫遷移提供了可行性,但是作為beta版本,在實際使用過程中存在很多無法滿足現(xiàn)實場景的情況,因此我們進行了大量改進,主要包括實現(xiàn)Storm任務(wù)on yarn ,遷移之后任務(wù)at least once語義保障,兼容Storm的 tick tuple機制等等。
通過對Fink-Storm的優(yōu)化,在無需用戶修改代碼的基礎(chǔ)上,我們已經(jīng)順利完成多個Storm版本集群任務(wù)遷移和集群下線,在保障實時性及吞吐量的基礎(chǔ)上可以節(jié)約計算資源40%以上,同時借助yarn統(tǒng)一管理實時計算平臺無需維護多套Storm集群,整體提升了平臺資源利用率,減輕平臺運維工作量。
任務(wù)診斷
指標監(jiān)控
Flink webUI 提供了大量的運行時信息供用戶了解任務(wù)當前運行狀況,但是存在無法獲取歷史metrics的問題導(dǎo)致用戶無法了解任務(wù)歷史運行狀態(tài),因此我們采用了Flink原生支持的Prometheus進行實時指標采集和存儲,Prometheus是一個開源的監(jiān)控和報警系統(tǒng),通過pushgateway的方式實時上報metrics,Prometheus集群采用Fedration部署模式,meta節(jié)點定時抓取所有子節(jié)點指標進行匯總,方便統(tǒng)一數(shù)據(jù)源提供給Grafana進行可視化以及告警配置。
任務(wù)延遲
吞吐能力和延遲作為衡量實時任務(wù)性能最重要的指標,我們經(jīng)常需要通過這兩個指標來調(diào)整任務(wù)并發(fā)度和資源配置。 Flink Metrics提供latencyTrackingInterval參數(shù)啟用任務(wù)延遲跟蹤,打開會顯著影響集群和任務(wù)性能,官方高度建議只在debug下使用。 在實踐場景下,F(xiàn)link任務(wù)數(shù)據(jù)源基本都是Kafka,因此我們采用topic消費堆積作為衡量任務(wù)延遲的指標,監(jiān)控模塊實時通過Flink rest獲取任務(wù)正在消費topic的offset,同時通過Kafka JMX獲取對應(yīng)topic的logsize,采用logsize– offset作為topic的堆積。
日志檢索
Flink 作為分布式計算引擎,所有任務(wù)會由YARN統(tǒng)一調(diào)度到任意的計算節(jié)點,因此任務(wù)的運行日志會分布在不同的機器,用戶定位日志困難,我們通過調(diào)整log4j日志框架默認機制,按天切分任務(wù)日志,定期清理過期日志,避免異常任務(wù)頻繁寫滿磁盤導(dǎo)致計算節(jié)點不可用的情況,同時在所有計算節(jié)點部署agent 實時采集日志,匯聚寫入Kafka,通過日志分發(fā)平臺實時將數(shù)據(jù)分發(fā)到ES,方便用戶進行日志檢索和定位問題。
Flink優(yōu)化
在實際使用過程中, 我們也針對業(yè)務(wù)場景進行了一些優(yōu)化和擴展,主要包括:
1.Storm任務(wù)需要Storm引擎提供ack機制保障消息傳遞at least once語義,遷移到Flink無法使用ack機制,我們通過定制KafakSpout實現(xiàn)checkpoint相關(guān)接口,通過Flink checkpoint機制實現(xiàn)消息傳遞不丟失。 另外Flink-Storm默認只能支持standalone的提交方式,我們通過實現(xiàn)yarn client相關(guān)接口增加了storm on yarn的支持。
2.Flink 1.6推薦的是一個TaskManager對應(yīng)一個slot的使用方式,在申請資源的時候根據(jù)最大并發(fā)度申請對應(yīng)數(shù)量的TaskManger,這樣導(dǎo)致的問題就是在任務(wù)設(shè)置task slots之后需要申請的資源大于實際資源。 我們通過在ResoureManager請求資源管理器SlotManager的時候增加TaskManagerSlot相關(guān)信息 ,用于維護申請到的待分配TaskManager和slot,之后對于SlotRequests請求不是直接申請TaskManager,而是先從SlotManager申請是否有足夠slot,沒有才會啟動新的TaskManger,這樣就實現(xiàn)了申請資源等于實際消耗資源,避免任務(wù)在資源足夠的情況下無法啟動。
3.Kafak Connector改造,增加自動換行支持,另外針對08source無法設(shè)置client.id,通過將client.id生成機制優(yōu)化成更有標識意義的id,便于Kafka層面管控
4.Flink提交任務(wù)無法支持第三方依賴jar包和配置文件供TaskManager使用,我們通過修改flink啟動腳本,增加相關(guān)參數(shù)支持外部傳輸文件,之后在任務(wù)啟動過程中通過將對應(yīng)的jar包和文件加入classpath,借助yarn的文件管理機制實現(xiàn)類似spark對應(yīng)的使用方式,方便用戶使用
5.業(yè)務(wù)場景存在大量實時寫入hdfs需求,F(xiàn)link 自帶BucketingSink默認只支持string和avro格式,我們在此基礎(chǔ)上同時支持了LZO及Parquet格式寫入,極大提升數(shù)據(jù)寫入性能。
后續(xù)規(guī)劃
實時計算平臺當前正在進行Storm任務(wù)遷移Flink集群,目前已經(jīng)基本完成,大幅提升了平臺資源利用率和計算效率。 后續(xù)將繼續(xù)調(diào)研完善Flink相關(guān)能力,推動Flink在更多的實時場景下的應(yīng)用,包括實時規(guī)則引擎,實時機器學習等。