如何打造一款極速數據湖分析引擎
前言
隨著數字產業(yè)化和產業(yè)數字化成為經濟驅動的重要動力,企業(yè)的數據分析場景越來越豐富,對數據分析架構的要求也越來越高。新的數據分析場景催生了新的需求,主要包括三個方面:
- 用戶希望用更加低廉的成本,更加實時的方式導入并存儲任何數量的關系數據數據(例如,來自業(yè)務線應用程序的運營數據庫和數據)和非關系數據(例如,來自移動應用程序、IoT 設備和社交媒體的運營數據庫和數據)
- 用戶希望自己的數據資產受到嚴密的保護
- 用戶希望數據分析的速度變得更快、更靈活、更實時
數據湖的出現很好的滿足了用戶的前兩個需求,它允許用戶導入任何數量的實時獲得的數據。用戶可以從多個來源收集數據,并以其原始形式存儲到數據湖中。數據湖擁有極高的水平擴展能力,使得用戶能夠存儲任何規(guī)模的數據。同時其底層通常使用廉價的存儲方案,使得用戶存儲數據的成本大大降低。數據湖通過敏感數據識別、分級分類、隱私保護、資源權限控制、數據加密傳輸、加密存儲、數據風險識別以及合規(guī)審計等措施,幫助用戶建立安全預警機制,增強整體安全防護能力,讓數據可用不可得和安全合規(guī)。
為了進一步滿足用戶對于數據湖分析的要求,我們需要一套適用于數據湖的分析引擎,能夠在更短的時間內從更多來源利用更多數據,并使用戶能夠以不同方式協同處理和分析數據,從而做出更好、更快的決策。本篇文章將向讀者詳細揭秘這樣一套數據湖分析引擎的關鍵技術,并通過StarRocks 來幫助用戶進一步理解系統的架構。
之后我們會繼續(xù)發(fā)表兩篇文章,來更詳細地介紹極速數據湖分析引擎的內核和使用案例:
- 代碼走讀篇:通過走讀 StarRocks 這個開源分析型數據庫內核的關鍵數據結構和算法,幫助讀者進一步理解極速數據湖分析引擎的原理和具體實現。
- Case Study 篇:介紹大型企業(yè)如何使用 StarRocks 在數據湖上實時且靈活的洞察數據的價值,從而幫助業(yè)務進行更好的決策,幫助讀者進一步理解理論是如何在實際場景落地的。
什么是數據湖
什么是數據湖,根據 Wikipedia 的定義,“A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or files”。通俗來說可以將數據湖理解為在廉價的對象存儲或分布式文件系統之上包了一層,使這些存儲系統中離散的 object 或者 file 結合在一起對外展現出一個統一的語義,例如關系型數據庫常見的“表”語義等。
了解完數據湖的定義之后,我們自然而然地想知道數據湖能為我們提供什么獨特的能力,我們?yōu)槭裁匆褂脭祿?
在數據湖這個概念出來之前,已經有很多企業(yè)或組織大量使用 HDFS 或者 S3 來存放業(yè)務日常運作中產生的各式各樣的數據(例如一個制作 APP 的公司可能會希望將用戶所產生的點擊事件事無巨細的記錄)。因為這些數據的價值不一定能夠在短時間內被發(fā)現,所以找一個廉價的存儲系統將它們暫存,期待在將來的一天這些數據能派上用場的時候再從中將有價值的信息提取出來。然而 HDFS 和 S3 對外提供的語義畢竟比較單一(HDFS 對外提供文件的語義,S3對外提供對象的語義),隨著時間的推移工程師們可能都無法回答他們到底在這里面存儲了些什么數據。為了防止后續(xù)使用數據的時候必須將數據一一解析才能理解數據的含義,聰明的工程師想到將定義一致的數據組織在一起,然后再用額外的數據來描述這些數據,這些額外的數據被稱之為“元”數據,因為他們是描述數據的數據。這樣后續(xù)通過解析元數據就能夠回答這些數據的具體含義。這就是數據湖最原始的作用。
隨著用戶對于數據質量的要求越來越高,數據湖開始豐富其他能力。例如為用戶提供類似數據庫的 ACID 語義,幫助用戶在持續(xù)寫入數據的過程中能夠拿到 point-in-time 的視圖,防止讀取數據過程中出現各種錯誤。或者是提供用戶更高性能的數據導入能力等,發(fā)展到現在,數據湖已經從單純的元數據管理變成現在擁有更加豐富,更加類似數據庫的語義了。
用一句不太準確的話描述數據湖,就是一個存儲成本更廉價的“AP 數據庫”。但是數據湖僅僅提供數據存儲和組織的能力,一個完整的數據庫不僅要有數據存儲的能力,還需要有數據分析能力。因此怎么為數據湖打造一款高效的分析引擎,為用戶提供洞察數據的能力,將是本文所要重點闡述的部分。下面通過如下幾個章節(jié)一起逐步拆解一款現代的 OLAP 分析引擎的內部構造和實現:
- 怎么在數據湖上進行極速分析
- 現代數據湖分析引擎的架構
怎么在數據湖上進行極速分析?
從這一節(jié)開始,讓我們開始回到數據庫課程,一個用于數據湖的分析引擎和一個用于數據庫的分析引擎在架構上別無二致,通常我們認為都會分為下面幾個部分:
- Parser:將用戶輸入的查詢語句解析成一棵抽象語法樹
- Analyzer:分析查詢語句的語法和語義是否正確,符合定義
- Optimizer:為查詢生成性能更高、代價更低的物理查詢計劃
- Execution Engine:執(zhí)行物理查詢計劃,收集并返回查詢結果
對于一個數據湖分析引擎而言,Optimizer 和 Execution Engine 是影響其性能兩個核心模塊,下面我們將從三個維度入手,逐一拆解這兩個模塊的核心技術原理,并通過不同技術方案的對比,幫助讀者理解一個現代的數據湖分析引擎的始末。
RBO vs CBO
基本上來講,優(yōu)化器的工作就是對給定的一個查詢,生成查詢代價最低(或者相對較低)的執(zhí)行計劃。不同的執(zhí)行計劃性能會有成千上萬倍的差距,查詢越復雜,數據量越大,查詢優(yōu)化越重要。
Rule Based Optimization (RBO) 是傳統分析引擎常用的優(yōu)化策略。RBO 的本質是核心是基于關系代數的等價變換,通過一套預先制定好的規(guī)則來變換查詢,從而獲得代價更低的執(zhí)行計劃。常見的 RBO 規(guī)則謂詞下推、Limit 下推、常量折疊等。在 RBO 中,有著一套嚴格的使用規(guī)則,只要你按照規(guī)則去寫查詢語句,無論數據表中的內容怎樣,生成的執(zhí)行計劃都是固定的。但是在實際的業(yè)務環(huán)境中,數據的量級會嚴重影響查詢的性能,而 RBO 是沒法通過這些信息來獲取更優(yōu)的執(zhí)行計劃。
為了解決 RBO 的局限性,Cost Based Optimization (CBO) 的優(yōu)化策略應運而生。CBO 通過收集數據的統計信息來估算執(zhí)行計劃的代價,這些統計信息包括數據集的大小,列的數量和列的基數等信息。舉個例子,假設我們現在有三張表 A,B 和 C,在進行 A join B join C 的查詢時如果沒有對應的統計信息我們是無法判斷不同 join 的執(zhí)行順序代價上的差異。如果我們收集到這三張表的統計信息,發(fā)現 A 表和 B 表的數據量都是 1M 行,但是 C 表的 數據量僅為 10 行,那么通過先執(zhí)行 B join C 可以大大減少中間結果的數據量,這在沒有統計信息的情況下基本不可能判斷。
隨著查詢復雜度的增加,執(zhí)行計劃的狀態(tài)空間會變的非常巨大。刷過算法題的小伙伴都知道,一旦狀態(tài)空間非常大,通過暴力搜索的方式是不可能 AC 的,這時候一個好的搜索算法格外重要。通常 CBO 使用動態(tài)規(guī)劃算法來得到最優(yōu)解,并且減少重復計算子空間的代價。當狀態(tài)空間達到一定程度之后,我們只能選擇貪心算法或者其他一些啟發(fā)式算法來得到局部最優(yōu)。本質上搜索算法是一種在搜索時間和結果質量做 trade-off 的方法。
(常見 CBO 實現架構)
Record Oriented vs Block Oriented
執(zhí)行計劃可以認為是一串 operator(關系代數的運算符)首尾相連串起來的執(zhí)行流,前一個 operator 的 output 是下一個 operator 的 input。傳統的分析引擎是 Row Oriented 的,也就是說 operator 的 output 和 input 是一行一行的數據。
舉一個簡單的例子,假設我們有下面一個表和查詢:
CREATE TABLE t (n int, m int, o int, p int);
SELECT o FROM t WHERE m < n + 1;
例子來源:GitHub - jordanlewis/exectoy
上述查詢語句展開為執(zhí)行計劃的時候大致如下圖所示:
通常情況下,在 Row Oriented 的模型中,執(zhí)行計劃的執(zhí)行過程可以用如下偽碼表示:
next:
for:
row = source.next()
if filterExpr.Eval(row):
// return a new row containing just column o
returnedRow row
for col in selectedCols:
returnedRow.append(row[col])
return returnedRow
根據 DBMSs On A Modern Processor: Where Does Time Go? 的評估,這種執(zhí)行方式存在大量的 L2 data stalls 和 L1 I-cache stalls、分支預測的效率低等問題。
隨著磁盤等硬件技術的蓬勃發(fā)展,各種通過 CPU 換 IO 的壓縮算法、Encoding 算法和存儲技術的廣泛使用,CPU 的性能逐漸成為成為分析引擎的瓶頸。為了解決 Row Oriented 執(zhí)行所存在的問題,學術界開始思考解決方案,Block oriented processing of Relational Database operations in modern Computer Architectures 這篇論文提出使用按 block 的方式在 operator 之間傳遞數據,能夠平攤條件檢查和分支預測的工作的耗時,MonetDB/X100: Hyper-Pipelining Query Execution 在此基礎上更進一步,提出將通過將數據從原來的 Row Oriented,改變成 Column Oriented,進一步提升 CPU Cache 的效率,也更有利于編譯器進行優(yōu)化。在 Column Oriented 的模型中,執(zhí)行計劃的執(zhí)行過程可以用如下偽碼表示:
// first create an n + 1 result, for all values in the n column
projPlusIntIntConst.Next():
batch = source.Next()
for i < batch.n:
outCol[i] = intCol[i] + constArg
return batch
// then, compare the new column to the m column, putting the result into
// a selection vector: a list of the selected indexes in the column batch
selectLTIntInt.Next():
batch = source.Next()
for i < batch.n:
if int1Col < int2Col:
selectionVector.append(i)
return batch with selectionVector
// finally, we materialize the batch, returning actual rows to the user,
// containing just the columns requested:
materialize.Next():
batch = source.Next()
for s < batch.n:
i = selectionVector[i]
returnedRow row
for col in selectedCols:
returnedRow.append(cols[col][i])
yield returnedRow
可以看到,Column Oriented 擁有更好的數據局部性和指令局部性,有利于提高 CPU Cache 的命中率,并且編譯器更容易執(zhí)行 SIMD 優(yōu)化等。
Pull Based vs Push Based
數據庫系統中,通常是將輸入的 SQL 語句轉化為一系列的算子,然后生成物理執(zhí)行計劃用于實際的計算并返回結果。在生成的物理執(zhí)行計劃中,通常會對算子進行 pipeline。常見的 pipeline 方式通常有兩種:
- 基于數據驅動的 Push Based 模式,上游算子推送數據到下游算子
- 基于需求的 Pull Based 模式,下游算子主動從上游算子拉取數據。經典的火山模型就是 Pull Based 模式。
Push Based 的執(zhí)行模式提高了緩存效率,能夠更好地提升查詢性能。
參考:Push vs. Pull-Based Loop Fusion in Query Engines
現代數據湖分析引擎的架構
通過上一節(jié)的介紹,相信讀者已經對數據湖分析引擎的前沿理論有了相應了解。在本節(jié)中,我們以 StarRocks 為例,進一步介紹數據湖分析引擎是怎么有機的結合上述先進理論,并且通過優(yōu)雅的系統架構將其呈現給用戶。
如上圖所示,StarRocks 的架構非常簡潔,整個系統的核心只有 Frontend (FE)、Backend (BE) 兩類進程,不依賴任何外部組件,方便部署與維護。其中 FE 主要負責解析查詢語句(SQL),優(yōu)化查詢以及查詢的調度,而 BE 則主要負責從數據湖中讀取數據,并完成一系列的 Filter 和 Aggregate 等操作。
Frontend
FE 的主要作用將 SQL 語句通過一系列轉化和優(yōu)化,最終轉換成 BE 能夠認識的一個個 Fragment。一個不那么準確但易于理解的比喻,如果把 BE 集群當成一個分布式的線程池的話,那么 Fragment 就是線程池中的 Task。從 SQL 文本到 Fragment,FE 的主要工作包含以下幾個步驟:
- SQL Parse:將 SQL 文本轉換成一個 AST(抽象語法樹)
- Analyze:基于 AST 進行語法和語義分析
- Logical Plan:將 AST 轉換成邏輯計劃
- Optimize:基于關系代數,統計信息,Cost 模型對邏輯計劃進行重寫,轉換,選擇出 Cost “最低” 的物理執(zhí)行計劃
- 生成 Fragment:將 Optimizer 選擇的物理執(zhí)行計劃轉換為 BE 可以直接執(zhí)行的 Fragment
- Coordinate:將 Fragment 調度到合適的 BE 上執(zhí)行
Backend
BE 是 StarRocks 的后端節(jié)點,負責接收 FE 傳下來的 Fragment 執(zhí)行并返回結果給 FE。StarRocks 的 BE 節(jié)點都是完全對等的,FE 按照一定策略將數據分配到對應的 BE 節(jié)點。常見的 Fragment 工作流程是讀取數據湖中的部分文件,并調用對應的 Reader (例如,適配 Parquet 文件的 Parquet Reader 和適配 ORC 文件的 ORC Reader等)解析這些文件中的數據,使用向量化執(zhí)行引擎進一步過濾和聚合解析后的數據后,返回給其他 BE 或 FE。
總結
本篇文章主要介紹了極速數據湖分析引擎的核心技術原理,從多個維度對比了不同技術實現方案。