每秒千萬級實時數(shù)據(jù)處理系統(tǒng)是如何設(shè)計的?
閑魚目前實際生產(chǎn)部署環(huán)境越來越復雜,橫向依賴各種服務(wù)盤根錯節(jié),縱向依賴的運行環(huán)境也越來越復雜。
圖片來自 Pexels
當服務(wù)出現(xiàn)問題的時候,能否及時在海量的數(shù)據(jù)中定位到問題根因,成為考驗閑魚服務(wù)能力的一個嚴峻挑戰(zhàn)。
線上出現(xiàn)問題時常常需要十多分鐘,甚至更長時間才能找到問題原因,因此一個能夠快速進行自動診斷的系統(tǒng)需求就應運而生,而快速診斷的基礎(chǔ)是一個高性能的實時數(shù)據(jù)處理系統(tǒng)。
這個實時數(shù)據(jù)處理系統(tǒng)需要具備如下的能力:
- 數(shù)據(jù)實時采集、實時分析、復雜計算、分析結(jié)果持久化。
- 可以處理多種多樣的數(shù)據(jù)。包含應用日志、主機性能監(jiān)控指標、調(diào)用鏈路圖。
- 高可靠性。系統(tǒng)不出問題且數(shù)據(jù)不能丟。
- 高性能,低延時。數(shù)據(jù)處理的延時不超過 3 秒,支持每秒千萬級的數(shù)據(jù)處理。
本文不涉及問題自動診斷的具體分析模型,只討論整體實時數(shù)據(jù)處理鏈路的設(shè)計。
輸入輸出定義
為了便于理解系統(tǒng)的運轉(zhuǎn),我們定義該系統(tǒng)整體輸入和輸出。
輸入
服務(wù)請求日志(包含 traceid、時間戳、客戶端 IP、服務(wù)端 IP、耗時、返回碼、服務(wù)名、方法名)。
環(huán)境監(jiān)控數(shù)據(jù)(指標名稱、IP、時間戳、指標值)。比如 CPU、 JVM GC 次數(shù)、JVM GC 耗時、數(shù)據(jù)庫指標。
輸出
一段時間內(nèi)的某個服務(wù)出現(xiàn)錯誤的根因,每個服務(wù)的錯誤分析結(jié)果用一張有向無環(huán)圖表達。(根節(jié)點即是被分析的錯誤節(jié)點,葉子節(jié)點即是錯誤根因節(jié)點。葉子節(jié)點可能是一個外部依賴的服務(wù)錯誤也可能是 JVM 異常等等)。
架構(gòu)設(shè)計
在實際的系統(tǒng)運行過程中,隨著時間的推移,日志數(shù)據(jù)以及監(jiān)控數(shù)據(jù)是源源不斷的在產(chǎn)生的。
每條產(chǎn)生的數(shù)據(jù)都有一個自己的時間戳。而實時傳輸這些帶有時間戳的數(shù)據(jù)就像水在不同的管道中流動一樣。
如果把源源不斷的實時數(shù)據(jù)比作流水,那數(shù)據(jù)處理過程和自來水生產(chǎn)的過程也是類似的:
自然地,我們也將實時數(shù)據(jù)的處理過程分解成采集、傳輸、預處理、計算、存儲、計算與持久化幾個階段。
整體的系統(tǒng)架構(gòu)設(shè)計如下:
采集
采用阿里自研的 SLS 日志服務(wù)產(chǎn)品(包含 Logtail+LogHub 組件),Logtail 是采集客戶端。
之所以選擇 Logtail 是因為其優(yōu)秀的性能、高可靠性以及其靈活插件擴展機制,閑魚可以定制自己的采集插件實現(xiàn)各種各樣數(shù)據(jù)的實時采集。
傳輸
Loghub 可以理解為一個數(shù)據(jù)發(fā)布訂閱組件,和 Kafka 的功能類似,作為一個數(shù)據(jù)傳輸通道其更穩(wěn)定、更安全。
詳細對比文章參考:
- https://yq.aliyun.com/articles/35979?spm=5176.10695662.1996646101.searchclickresult.6f2c7fbe6g3xgP
預處理
實時數(shù)據(jù)預處理部分采用 Blink 流計算處理組件(開源版本叫做 Flink,Blink 是阿里在 Flink 基礎(chǔ)上的內(nèi)部增強版本)。
目前常用的實時流計算開源產(chǎn)品有 Jstorm、Spark Stream、Flink:
- Jstorm 由于沒有中間計算狀態(tài)的,其計算過程中需要的中間結(jié)果必然依賴于外部存儲,這樣會導致頻繁的 IO 影響其性能。
- Spark Stream 本質(zhì)上是用微小的批處理來模擬實時計算,實際上還是有一定延時。
- Flink 由于其出色的狀態(tài)管理機制保證其計算的性能以及實時性,同時提供了完備 SQL 表達,使得流計算更容易。
計算與持久化
數(shù)據(jù)經(jīng)過預處理后最終生成調(diào)用鏈路聚合日志和主機監(jiān)控數(shù)據(jù),其中主機監(jiān)控數(shù)據(jù)會獨立存儲在 TSDB 時序數(shù)據(jù)庫中,供后續(xù)統(tǒng)計分析。
TSDB 由于其針對時間指標數(shù)據(jù)的特別存儲結(jié)構(gòu)設(shè)計,非常適合做時序數(shù)據(jù)的存儲與查詢。
調(diào)用鏈路日志聚合數(shù)據(jù),提供給 Cep/Graph Service 做診斷模型分析。
Cep/Graph Service 是閑魚自研的一個應用,實現(xiàn)模型分析、復雜的數(shù)據(jù)處理以及外部服務(wù)進行交互,同時借助 RDB 實現(xiàn)圖數(shù)據(jù)的實時聚合。
最后 Cep/Graph Service 分析的結(jié)果作為一個圖數(shù)據(jù),實時轉(zhuǎn)儲在 Lindorm 中提供在線查詢。Lindorm 可以看作是增強版的 Hbase,在系統(tǒng)中充當持久化存儲的角色。
詳細設(shè)計與性能優(yōu)化
采集
日志和指標數(shù)據(jù)采集使用 Logtail,整個數(shù)據(jù)采集過程如圖:
其提供了非常靈活的插件機制,共有四種類型的插件:
- Inputs:輸入插件,獲取數(shù)據(jù)。
- Processors:處理插件,對得到的數(shù)據(jù)進行處理。
- Aggregators:聚合插件,對數(shù)據(jù)進行聚合。
- Flushers:輸出插件,將數(shù)據(jù)輸出到指定 Sink。
由于指標數(shù)據(jù)(比如 CPU、內(nèi)存、JVM 指標)的獲取需要調(diào)用本地機器上的服務(wù)接口獲取,因此應盡量減少請求次數(shù),在 Logtail 中,一個 Input 占用一個 Goroutine。
閑魚通過定制 Input 插件和 Processors 插件,將多個指標數(shù)據(jù)(比如 CPU、內(nèi)存、JVM 指標)在一個 Input 插件中通過一次服務(wù)請求獲取(指標獲取接口由基礎(chǔ)監(jiān)控團隊提供)。
并將其格式化成一個 Json 數(shù)組對象,在 Processors 插件中再拆分成多條數(shù)據(jù),以減少系統(tǒng)的 IO 次數(shù)同時提升性能。
傳輸
數(shù)據(jù)傳輸使用 LogHub,Logtail 寫入數(shù)據(jù)后直接由 Blink 消費其中的數(shù)據(jù),只需設(shè)置合理的分區(qū)數(shù)量即可。
分區(qū)數(shù)要大于等于 Blink 讀取任務(wù)的并發(fā)數(shù),避免 Blink 中的任務(wù)空轉(zhuǎn)。
預處理
預處理主要采用 Blink 實現(xiàn),主要的設(shè)計和優(yōu)化點:
①編寫高效的計算流程
Blink 是一個有狀態(tài)的流計算框架,非常適合做實時聚合、Join 等操作。在我們的應用中只需要關(guān)注出現(xiàn)錯誤的的請求上相關(guān)服務(wù)鏈路的調(diào)用情況。
因此整個日志處理流分成兩個流:
- 服務(wù)的請求入口日志作為一個單獨的流來處理,篩選出請求出錯的數(shù)據(jù)。
- 其他中間鏈路的調(diào)用日志作為另一個獨立的流來處理,通過和上面的流 Join On Traceid 實現(xiàn)出錯服務(wù)依賴的請求數(shù)據(jù)篩選。
如上圖所示通過雙流 Join 后,輸出的就是所有發(fā)生請求錯誤相關(guān)鏈路的完整數(shù)據(jù)。
②設(shè)置合理的 State 生命周期
Blink 在做 Join 的時候本質(zhì)上是通過 State 緩存中間數(shù)據(jù)狀態(tài),然后做數(shù)據(jù)的匹配。
而如果 State 的生命周期太長會導致數(shù)據(jù)膨脹影響性能,如果 State 的生命周期太短就會無法正常關(guān)聯(lián)出部分延遲到來的數(shù)據(jù),所以需要合理的配置 State 生存周期,對于該應用允許最大數(shù)據(jù)延遲為 1 分鐘。
- 使用niagara作為statebackend,以及設(shè)定state數(shù)據(jù)生命周期,單位毫秒
- state.backend.type=niagara
- state.backend.niagara.ttl.ms=60000
③開啟 MicroBatch/MiniBatch
MicroBatch 和 MiniBatch 都是微批處理,只是微批的觸發(fā)機制上略有不同。原理上都是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對 State 的訪問從而顯著提升吞吐,以及減少輸出數(shù)據(jù)量。
- 開啟join
- blink.miniBatch.join.enabled=true
- 使用 microbatch 時需要保留以下兩個 minibatch 配置
- blink.miniBatch.allowLatencyMs=5000
- 防止OOM,每個批次最多緩存多少條數(shù)據(jù)
- blink.miniBatch.size=20000
④Dynamic-Rebalance 替代 Rebalance
Blink 任務(wù)在運行時最忌諱的就是存在計算熱點,為保證數(shù)據(jù)均勻使用 Dynamic Rebalance,它可以根據(jù)當前各 Subpartition 中堆積的 Buffer 的數(shù)量,選擇負載較輕的 Subpartition 進行寫入,從而實現(xiàn)動態(tài)的負載均衡。
相比于靜態(tài)的 Rebalance 策略,在下游各任務(wù)計算能力不均衡時,可以使各任務(wù)相對負載更加均衡,從而提高整個作業(yè)的性能。
- 開啟動態(tài)負載
- task.dynamic.rebalance.enabled=true
⑤自定義輸出插件
數(shù)據(jù)關(guān)聯(lián)后需要將統(tǒng)一請求鏈路上的數(shù)據(jù)作為一個數(shù)據(jù)包通知下游圖分析節(jié)點,傳統(tǒng)的方式是通過消息服務(wù)來投遞數(shù)據(jù)。
但是通過消息服務(wù)有兩個缺點:
- 其吞吐量和 RDB 這種內(nèi)存數(shù)據(jù)庫相比還是較大差距(大概差一個數(shù)量級)。
- 在接受端還需要根據(jù) traceid 做數(shù)據(jù)關(guān)聯(lián)。
我們通過自定義插件的方式將數(shù)據(jù)通過異步的方式寫入 RDB,同時設(shè)定數(shù)據(jù)過期時間。
在 RDB 中以
寫入的同時只將 traceid 做為消息內(nèi)容通過 MetaQ 通知下游計算服務(wù),極大的減少了 MetaQ 的數(shù)據(jù)傳輸壓力。
圖聚合計算
Cep/Graph 計算服務(wù)節(jié)點在接收到 MetaQ 的通知后,綜合根據(jù)請求的鏈路數(shù)據(jù)以及依賴的環(huán)境監(jiān)控數(shù)據(jù),會實時生成診斷結(jié)果。
診斷結(jié)果簡化為如下形式:
說明本次請求是由于下游 JVM 的線程池滿導致的,但是一次調(diào)用并不能說明該服務(wù)是不可用的根本原因,需要分析整體的錯誤情況,那就需要對圖數(shù)據(jù)做實時聚合。
聚合設(shè)計如下(為了說明基本思路,做了簡化處理):
- 首先利用 Redis 的 Zrank 能力為根據(jù)服務(wù)名或 IP 信息為每個節(jié)點分配一個全局唯一排序序號。
- 為圖中的每個節(jié)點生成對應圖節(jié)點編碼,編碼格式。
- 對于頭節(jié)點:頭節(jié)點序號|歸整時間戳|節(jié)點編碼。
- 對于普通節(jié)點:|歸整時間戳|節(jié)點編碼。
- 由于每個節(jié)點在一個時間周期內(nèi)都有唯一的 Key,因此可以將節(jié)點編碼作為 Key 利用 Redis 為每個節(jié)點做計數(shù)。同時消除了并發(fā)讀寫的問題。
- 利用 Redis 中的 Set 集合可以很方便的疊加圖的邊。
- 記錄根節(jié)點,即可通過遍歷還原聚合后的圖結(jié)構(gòu)。
聚合后的結(jié)果大致如下:
這樣最終生成了服務(wù)不可用的整體原因,并且通過葉子節(jié)點的計數(shù)可以實現(xiàn)根因的排序。
收益
系統(tǒng)上線后,整個實時處理數(shù)據(jù)鏈路的延遲不超過 3 秒。閑魚服務(wù)端問題的定位時間從十多分鐘甚至更長時間下降到 5 秒內(nèi)。大大的提升了問題定位的效率。
展望
目前的系統(tǒng)可以支持閑魚每秒千萬的數(shù)據(jù)處理能力。后續(xù)自動定位問題的服務(wù)可能會推廣到阿里內(nèi)部更多的業(yè)務(wù)場景,隨之而來的是數(shù)據(jù)量的成倍增加,因此對于效率和成本提出了更好的要求。
未來我們可能做的改進:
- 能夠自動的減少或者壓縮處理的數(shù)據(jù)。
- 復雜的模型分析計算也可以在 Blink 中完成,減少 IO,提升性能。
- 支持多租戶的數(shù)據(jù)隔離。