技術(shù)干貨|阿里云基于Hudi構(gòu)建Lakehouse實踐探索
一、數(shù)據(jù)湖與Lakehouse
2021年開發(fā)者大會上,我們的一位研究員分享的一個議題,提到了很多數(shù)據(jù),主要想闡述的是行業(yè)發(fā)展到現(xiàn)在這個階段,數(shù)據(jù)的膨脹非常厲害,數(shù)據(jù)增速非??膳隆o論是數(shù)據(jù)規(guī)模還是生產(chǎn)處理的實時化,到生產(chǎn)處理的智能化,以及數(shù)據(jù)加速上云的云化過程。
這些數(shù)據(jù)來自Gartner、IDC的分析,都是行業(yè)最權(quán)威的分析報告里沉淀總結(jié)出來的。這就意味著我們在數(shù)據(jù)領(lǐng)域尤其是分析領(lǐng)域的機遇和挑戰(zhàn)都很大。
在海量的數(shù)據(jù)上,我們要真正做好數(shù)據(jù)價值的挖掘和使用會面臨很多挑戰(zhàn),第一是現(xiàn)有的架構(gòu)慢慢都要往云上架構(gòu)遷移;第二個是數(shù)據(jù)量;第三個是Serverless按量付費,慢慢從嘗試性的選擇成為默認選擇;第四是還有多樣化的應(yīng)用、異構(gòu)數(shù)據(jù)源。相信大家接觸過云都知道,無論是哪個云廠商都有很多種云服務(wù)可供,尤其是數(shù)據(jù)類服務(wù)數(shù)量繁多。這時候,大量數(shù)據(jù)源一定會帶來一個問題:分析難度大,尤其是想做關(guān)聯(lián)分析的時候,異構(gòu)數(shù)據(jù)源怎么連接起來是很大的問題。其次是差異化的數(shù)據(jù)格式,通常我們做數(shù)據(jù)寫入會時選擇方便、簡單的格式,比如CSV、Json格式,但對分析來講,這些格式往往是非常低效的,尤其是數(shù)據(jù)到了TB、PB級的時候,根本沒法分析。這時候Parquet、ORC等面向分析的列存格式就衍生出來了。當(dāng)然還包括鏈路安全以及差異化群體等等,數(shù)據(jù)量膨脹的過程中又增加了很多的分析難度。
在真實的客戶場景里,很多數(shù)據(jù)已經(jīng)上云了和“入湖”了。湖是什么?我們對湖的定義和理解更像AWS的S3或者阿里云OSS這種對象存儲,是簡單易用的API形式,可以存各種各樣差異化的數(shù)據(jù)格式,有無限的容量、按量付費等非常多的好處。之前想基于湖做分析非常麻煩,很多時候要做T+1的建倉和各種云服務(wù)投遞。有時候數(shù)據(jù)格式不對就要人肉做ETL,如果數(shù)據(jù)已經(jīng)在湖里要做元信息發(fā)現(xiàn)分析等等,整個運維鏈路很復(fù)雜,問題也很多。這里都是線上客戶實際面臨的離線數(shù)據(jù)湖問題,有些優(yōu)先級偏高,有些低些,總而言之問題非常多。
其實Databricks大概19年就開始將研究重點從Spark方向,慢慢往Lakehouse方向調(diào)整了。他們發(fā)表了兩篇論文,這兩篇論文為數(shù)據(jù)湖怎么被統(tǒng)一訪問、怎么被更好地訪問提供了理論層面的定義。
基于Lakehouse的新概念,想做到的是屏蔽格式上的各種差異,為不同的應(yīng)用提供統(tǒng)一的接口以及更加簡化的數(shù)據(jù)訪問、數(shù)據(jù)分析能力。架構(gòu)說實現(xiàn)數(shù)據(jù)倉、數(shù)據(jù)湖、Lakehouse一步步演進。
他的兩篇論文闡述了很多新概念:第一,怎么設(shè)計和實現(xiàn)MVCC,能讓離線數(shù)倉也有像數(shù)據(jù)庫一樣的MVCC能力,從而滿足大部分對批事務(wù)的需求;第二,提供不同的存儲模式,能夠適應(yīng)不同的讀和寫Workload;第三,提供一些近實時的寫入和合并能力,為數(shù)據(jù)量提供鏈路能力??傊?,他的思路能夠較好解決離線數(shù)據(jù)分析的難題。
目前業(yè)界有三款產(chǎn)品相對比較流行,第一個是Delta Lake,它是Databricks自己發(fā)布的數(shù)據(jù)湖管理協(xié)議;第二個是Iceberg,Iceberg也是Apache的一個開源項目;第三個是Hudi,Hudi最早由Uber內(nèi)部研發(fā),后來開源的項目(早期用得比較多的是Hive的ACID)。目前這三個產(chǎn)品因為可以對接HDFS的API,可以適配底層的湖存儲,而OSS又可以適配到HDFS存儲接口。由于核心原理相似,三個產(chǎn)品各方面的能力都在逐漸靠近,同時有了論文做理論支撐,我們才會有方向去實踐。
對我們來說,當(dāng)時選擇Hudi也是因為其產(chǎn)品成熟度方面的原因,還有它面向數(shù)據(jù)庫方面的數(shù)據(jù)入湖能力,形態(tài)上比較滿足我們在數(shù)據(jù)庫團隊做CDC方面的業(yè)務(wù)需求。
Hudi早期的定義是Hadoop Updates anD Incrementals的縮寫,后面是面向Hadoop的Update、Delete、Insert的概念,核心邏輯是事務(wù)版本化、狀態(tài)機控制和異步化執(zhí)行,模擬整個MVCC的邏輯,提供對于內(nèi)部列存文件比如Parquet、ORC等對象列表的增量式管理,實現(xiàn)高效的存儲讀寫。它和Databricks定義的Lakehouse概念很相似,不謀而合,Iceberg也是一樣,它的能力也在逐步往這個方向提升。
Hudi官方網(wǎng)站對外提供的架構(gòu)是這樣的形態(tài)。之前我們做技術(shù)選型、調(diào)研的時候發(fā)現(xiàn)很多同行也已經(jīng)充分使用Hudi做數(shù)據(jù)入湖和離線數(shù)據(jù)管理的方案選型。第一,因為產(chǎn)品比較成熟;第二,它符合我們CDC的需求;第三,Delta Lake有一套開源版本,一套內(nèi)部優(yōu)化版本,對外只提供開源版本,我們認為它不一定把最好的東西呈現(xiàn)。Iceberg起步比較晚,早期相比其他兩個產(chǎn)品能力不太完全,所以沒有考慮它。因為我們都是Java團隊,也有自己的Spark產(chǎn)品,Hudi正好比較契合我們用自己的runtime支持數(shù)據(jù)入湖的能力,因此也就選擇了Hudi。
當(dāng)然我們也一直在關(guān)注這三個產(chǎn)品的發(fā)展,后來國內(nèi)的一個開源項目StarLake,也是做類似的事情。每種產(chǎn)品都在進步,長期來看能力基本對齊,我覺得會和論文里定義的能力慢慢吻合。
“以開源Hudi為列式、多版本格式為基礎(chǔ),將異構(gòu)數(shù)據(jù)源增量、低延遲入湖,存儲在開放、低成本的對象存儲上,并且在這個過程中要實現(xiàn)數(shù)據(jù)布局優(yōu)化、元信息進化的能力,最終實現(xiàn)離線數(shù)據(jù)統(tǒng)一管理,無差別支持上面的計算和分析能力,這是整體的方案。”這是我們對Lakehouse的理解,以及我們的技術(shù)探索方向。
二、阿里云Lakehouse實踐
下面介紹一下阿里云Lakehouse的技術(shù)探索和具體的實踐。首先,大概介紹一下阿里云數(shù)據(jù)庫團隊近年來一直提的概念“數(shù)據(jù)庫、倉、湖一體化”戰(zhàn)略。
大家都知道數(shù)據(jù)庫產(chǎn)品分為四個層次:一是DB;二是NewSQL/NoSQL產(chǎn)品;三是數(shù)倉產(chǎn)品;四是湖數(shù)據(jù)產(chǎn)品。越往上數(shù)據(jù)的價值密度越大,會以元表元倉形式的數(shù)據(jù)關(guān)聯(lián)到分析中去,比如DB數(shù)據(jù)格式非常簡單、清晰; 越往下數(shù)據(jù)量越來越龐大,數(shù)據(jù)形式越來越復(fù)雜,有各種各樣的存儲格式,數(shù)據(jù)湖形式有結(jié)構(gòu)化、半結(jié)構(gòu)化、非結(jié)構(gòu)化,要分析就必須要做一定的提煉、挖掘,才能真正把數(shù)據(jù)價值用起來。
四個存儲方向有各自的領(lǐng)域,同時又有關(guān)聯(lián)分析訴求,主要就是要打破數(shù)據(jù)孤島,讓數(shù)據(jù)一體化,才能讓價值更立體化。如果只是做一些日志分析,例如關(guān)聯(lián)的地域、客戶來源的話,也只是使用了GroupBy或者是Count等相對簡單的分析能力。對于底層數(shù)據(jù),可能要做多次清洗、回流,才能往向在線化、高并發(fā)的場景一層層分析。這里不僅僅直接將數(shù)據(jù)從湖到庫寫入,也可以到倉,到NoSQL/NewSQL的產(chǎn)品里,到KV系統(tǒng)里去,利用好在線化的查詢能力,等等。
反過來也是一樣,這些數(shù)據(jù)庫/NewSQL產(chǎn)品甚至數(shù)倉中的數(shù)據(jù)也會向下流動,構(gòu)建低成本、大容量的存儲備份、歸檔,降低上面的存儲壓力、分析吞吐壓力,且可以形成強大的聯(lián)合分析能力。這也是我自己對數(shù)據(jù)庫、倉、湖一體化的理解。
剛才講了數(shù)據(jù)庫的發(fā)展方向和定位,再看看數(shù)據(jù)庫下面OLAP本身的分層數(shù)倉體系中Lakehouse是怎樣的定位。做過數(shù)倉產(chǎn)品的同學(xué)都比我熟悉,(PPT圖示)基本上是這樣的分層體系,最開始各種各樣的形態(tài)非數(shù)倉或者非數(shù)據(jù)湖系統(tǒng)外面有各種各樣的形式存儲數(shù)據(jù),我們理解通過Lakehouse的能力,做入湖、建倉,通過清洗、沉淀和聚合,形成ODS或者是CDM層,這里做了初步的數(shù)據(jù)聚合和匯總能力,形成數(shù)據(jù)集市的概念。
這些數(shù)據(jù)在阿里云上我們會基于Hudi的協(xié)議,基于Parquet文件格式存到整個OSS上面,內(nèi)部通過ETL把初始數(shù)據(jù)集進一步聚合為更清晰、更面向業(yè)務(wù)的數(shù)據(jù)集上,然后再構(gòu)建ETL,往實時數(shù)倉里導(dǎo)入,等等。或者這些數(shù)據(jù)集直接面向低頻的交互式分析、BI分析,或面向Spark等引擎做機器學(xué)習(xí),最終輸出到整個數(shù)據(jù)應(yīng)用上,這是整體的分層體系。
整個過程中,我們會接入統(tǒng)一的元信息體系。因為如果系統(tǒng)的每個部分都有自己的術(shù)語,都要保留一份自己的元信息,對OLAP體系來講是分裂的,因此元信息一定要統(tǒng)一,調(diào)度也是一樣。不同數(shù)據(jù)倉層次的表在不同的地方要串聯(lián)起來,一定要有完整、統(tǒng)一的調(diào)度能力。以上是我理解Lakehouse在OLAP體系中的的定位,主要是貼源層,匯聚離線數(shù)據(jù)的能力。
前面介紹了Lakehouse在數(shù)據(jù)庫和OLAP團隊里的定位,后面重點介紹一下Lakehouse在我們的領(lǐng)域設(shè)計是怎樣的形態(tài)。因為之前我自己用過K8s做分析系統(tǒng)上云,所以對K8s的很多理念還是比較清楚。
在我們自己設(shè)計的時候也試圖參考、學(xué)習(xí)一下K8s的體系。K8s有我們經(jīng)常提到的DevOps概念,這是一種實踐范式。在這個范式下會創(chuàng)建很多實例,在實例里會管理很多應(yīng)用,這些應(yīng)用最終通過Pod方式被原子性調(diào)度執(zhí)行,Pod里再跑一些業(yè)務(wù)邏輯,各種各樣的Container。
我們認為Lakehouse也是一種范式,一種處理離線數(shù)據(jù)的范式。在這里,數(shù)據(jù)集是我們的核心概念,比如要構(gòu)建一套面向某種場景、某個方向的數(shù)據(jù)集。我們能要定義A、B、C不同數(shù)據(jù)集,在我們看來這是一個實例。圍繞這個數(shù)據(jù)集編排各種各樣的Workload工作負載,比如做DB入湖。還有分析優(yōu)化類的Workload,比如索引構(gòu)建,比如像z-ordering、Clustering、Compaction等技術(shù),查詢優(yōu)化能力提升得更好。還有就是Management類型的Workload,比如定期把歷史數(shù)據(jù)清理了,做冷熱存儲分層,因為OSS提供了很多這樣的能力,把這些能力用好。最下面一層是各種Job,我們內(nèi)部是基于Spark建設(shè)離線計算能力,我們把Workload前后編排成小的job,原子的job全部彈性到Spark上執(zhí)行,以上是我們對于Lakehouse在技術(shù)實踐中的領(lǐng)域設(shè)計。
這是整體的技術(shù)架構(gòu)。首先,在云上有各種各樣的數(shù)據(jù)源,通過編排定義各種各樣的Workload,跑在我們自己的Spark彈性計算上。核心的存儲是基于Hudi+OSS,我們也支持別的HDFS系統(tǒng),比如阿里云的LindormDFS,內(nèi)部元信息系統(tǒng)管理庫、表、列等元信息。后面基于K8s調(diào)度所有的管控服務(wù)。上層通過原生的Hudi接口,對接計算、分析能力。這是整個彈性架構(gòu)。
其實Serverless Spark是我們的計算基礎(chǔ),提供作業(yè)級彈性,因為Spark本身也支持Spark Streaming,通過短時間彈出一個Spark作業(yè)實現(xiàn)流計算。選擇OSS和LindormDFS做存儲基礎(chǔ),主要利用低成本、無限容量的好處。
在這個架構(gòu)上,怎么連通用戶的數(shù)據(jù)實現(xiàn)數(shù)據(jù)入湖到存儲、到分析的能力呢?以上是我們基于VPC構(gòu)建的安全方案。首先我們是共享集群模式,用戶側(cè)可以通過SDK和VPDN網(wǎng)絡(luò)連接過來,再由阿里云內(nèi)部網(wǎng)關(guān)打通計算集群,實現(xiàn)管理和調(diào)度;再通過阿里云彈性網(wǎng)卡技術(shù),聯(lián)通用戶的VPC實現(xiàn)數(shù)據(jù)通路,同時實現(xiàn)路由能力和網(wǎng)絡(luò)隔離能力,不同用戶還可能有子網(wǎng)網(wǎng)段沖突問題,通過彈性網(wǎng)卡技術(shù)可以實現(xiàn)相同網(wǎng)段同時連接同一個計算集群的能力。
用過阿里云OSS的同學(xué)都知道,OSS本身是阿里云VPC網(wǎng)絡(luò)里的公網(wǎng),它是共享區(qū),不需要復(fù)雜的網(wǎng)絡(luò)。而RDS和Kafka是部署在用戶的VPC里,通過一套網(wǎng)絡(luò)架構(gòu)就可以實現(xiàn)多種網(wǎng)絡(luò)打通。對比VPC網(wǎng)段沖突,共享區(qū)域沒有這樣的問題。其次,數(shù)據(jù)之間隔離,ENI有端到端的限制,比如VPC有ID標(biāo)志、有不同的授權(quán)要求,非法用戶嘗試連接VPC,如果不是這個網(wǎng)卡則網(wǎng)絡(luò)包無法聯(lián)通,就可以實現(xiàn)安全的隔離和數(shù)據(jù)的通路。
網(wǎng)絡(luò)架構(gòu)已經(jīng)確定了,怎么運行執(zhí)行呢?在整個設(shè)計里,我們會以K8s的DSL設(shè)計為榜樣,前面提到會定義很多入湖任務(wù),一個Workload可能有很多小任務(wù),這時候需要類似定義DSL的編排能力,job1、job2、再到j(luò)ob3,定義一套編排腳本;這些編排腳本,通過SDK、控制臺等入口提交過來,再通過API Server接收并由Scheduler調(diào)度起來。這個Scheduler會和Spark的網(wǎng)關(guān)之間打通,實現(xiàn)任務(wù)管理、狀態(tài)管理、任務(wù)分發(fā)等,最終調(diào)度內(nèi)部的K8s拉起作業(yè)來執(zhí)行。有些全量作業(yè)跑一次,比如DB拉一次就行了,還有常駐的流式作業(yè)、有觸發(fā)式的異步作業(yè)、定時異步作業(yè)等等,不同的形態(tài)相同的調(diào)度能力,從而可以擴展。過程中有作業(yè)狀態(tài)持續(xù)反饋狀態(tài)、間隙性統(tǒng)計等等。在K8s里,K8s Master承擔(dān)了這樣的角色,同樣有API Server和Scheduler的角色。在我們這里也是類似,也是通過一主多從架構(gòu)實現(xiàn)調(diào)度能力HA機制等等。
在這里,為什么我們要把一個Workload面向用戶側(cè)的任務(wù)拆成N個不同的job?因為這些任務(wù)完全放在一個進程里跑,整個Workload的水位變化非常大,做彈性調(diào)度非常難。全量任務(wù)跑一次就可以了,但是配多少資源合適呢?很多時候Spark沒有那么靈活,尤其是異步任務(wù)和定時任務(wù)拉起來消耗很大,但是用完之后又不知道下一次什么時候來,很難預(yù)測。就像很多信號系統(tǒng)處理里,需要做傅里葉變換一樣,把復(fù)雜的波型拆成多個簡單的波型,信號處理就簡單起來。我們也是有這樣感性的理解。用不同的Job來執(zhí)行Workload中不同角色的任務(wù),就很容易實現(xiàn)彈性能力。像定時或臨時性的觸發(fā)Job,臨時拉一個job,資源消耗與常駐的流式任務(wù)完全無關(guān),就可以完全不影響流式任務(wù)的穩(wěn)定性、入湖延遲等等。這是設(shè)計背后的思考,就是讓復(fù)雜的問題簡單化。因為基于彈性的角度來講,拆得波形越簡單,彈性就會更好做,預(yù)測也會簡單一點。
入湖里會涉及很多用戶的賬密信息,因為不是所有云產(chǎn)品都以AWS的IAM或阿里云的RAM等系統(tǒng)來構(gòu)建完全云化的資源權(quán)限控制。很多產(chǎn)品還是以賬密方式做認證和授權(quán)管理,包括用戶自建的系統(tǒng),數(shù)據(jù)庫系統(tǒng)等等。這樣,用戶要把所有的連接賬密都交給我們,怎么更安全的管理它們?我們是基于阿里云的兩套體系:一套是KMS,以硬件級數(shù)據(jù)加密體系來加密用戶數(shù)據(jù);第二套是STS,完全云化的三方鑒權(quán)能力,實現(xiàn)用戶數(shù)據(jù)的安全訪問,尤其是敏感數(shù)據(jù)的隔離或者保護的機制,這就是我們現(xiàn)在的整個體系。
還有一個問題,不同用戶之間通過各種機制完全隔離開了,但是同一個用戶有很多的任務(wù)。在Lakehouse概念中有四層結(jié)構(gòu),一個數(shù)據(jù)集下面有多個庫,庫下面有多個表,表下面有不同的分區(qū),分區(qū)下面是不同的數(shù)據(jù)文件。用戶有子賬號體系、有各種不同的作業(yè),因此操作數(shù)據(jù)時可能會出現(xiàn)相互影響。
比如不同的入湖任務(wù)都想要寫同一張表,線上A任務(wù)已經(jīng)正常運行了,結(jié)果另外的用戶配置了B任務(wù),也要寫入同一個空間,這就有可能把已經(jīng)上線的A任務(wù)數(shù)據(jù)全部沖掉,這是很危險的事情。還有其他用戶刪除作業(yè)的行為,可能會刪掉線上正在運行任務(wù)的數(shù)據(jù),有可能其他任務(wù)還在訪問,但又不能感知它;還比如通過別的云服務(wù)、或是VPC內(nèi)別的程序、自己部署的服務(wù)等等,都可能操作這個表,導(dǎo)致數(shù)據(jù)出問題。因此我們設(shè)計了一整套機制,一方面是在表級別實現(xiàn)鎖的機制,如果有任務(wù)最早就占有一張數(shù)據(jù)寫入權(quán)限時,后面的任務(wù)在這個任務(wù)生命周期結(jié)束之前,都不允許再寫入,不可以寫臟了。
另一方面基于OSS的Bucket Policy能力,構(gòu)建不同程序的權(quán)限校驗?zāi)芰?。只允許Lakehouse的的任務(wù)有權(quán)限寫數(shù)據(jù),而其他程序不允許寫,但其他程序可以讀。同一個賬號的這些數(shù)據(jù)本來就是為了共享、為了分析,為了各種應(yīng)用場景的接入,就是可以讀,但絕對不可以污染它。我們在這些方面做了可靠性工作。
我們更多講的架構(gòu)體系,回到整體看一下怎么理解數(shù)據(jù)模型,我們認為整個過程是以行為中心(因為數(shù)倉還是一行行的數(shù)據(jù),存儲在表的范圍內(nèi)),以行數(shù)據(jù)構(gòu)建統(tǒng)一入湖、存儲、分析,元信息模型等。首先有各種各樣的數(shù)據(jù)源(有文本或二進制,binlog就是二進制的數(shù)據(jù);或者類似Kafka中可以存儲各種二進制),這些數(shù)據(jù)最終通過各種各樣Connector、Reader(不同的系統(tǒng)有不同的叫法),把數(shù)據(jù)讀過來,映射成行數(shù)據(jù)。在這些行數(shù)據(jù)中,有關(guān)鍵的描述信息,比如來源信息、變更類型等等,還有可變的列集合。再通過一系列的規(guī)則轉(zhuǎn)化,比如濾掉某些數(shù)據(jù),要為數(shù)據(jù)生成主鍵,要段定義版本、類型轉(zhuǎn)換等等;最后再通過Hudi Payload封裝、轉(zhuǎn)換、元信息信息維護、文件生成等等方式,最終寫到湖存儲里。
在存儲里通過元信息、分區(qū)等數(shù)據(jù)維護,并對接后續(xù)計算和分析,就無縫看到湖、倉里所有存的數(shù)據(jù)的元信息,無縫對接不同形態(tài)的應(yīng)用場景。
下面介紹一下我們對常見數(shù)據(jù)源接入形式的支持。DB入湖是最常見的場景,在阿里云上,有RDS和PolarDB等產(chǎn)品。以MySQL引擎舉例,一般都是有主庫、從庫、離線庫等架構(gòu),可能還有主從接入點,但是萬變不離其宗。DB入湖要先做一次全量同步,再做增量同步。對用戶來講,DB入湖是明確的Workload,但對系統(tǒng)來講要先做好全量同步這件事情,再自動對接增量同步這件事情,數(shù)據(jù)還要通過一定的機制把位點銜接住,確保數(shù)據(jù)的正確性。整個調(diào)度過程通過統(tǒng)一的管控服務(wù)獲取DB信息,自動選擇從庫或線上壓力最小的實例,進行全量同步寫到庫里,并維護好相應(yīng)的Watermark,記錄全量從什么時間點開始的、從庫和主庫之間有多少延遲等。全量做完之后,開始做增量任務(wù),利用DTS等同步binlog服務(wù),基于前面的Watermark做數(shù)據(jù)回溯,開始做增量。利用Hudi里的Upsert能力,以用戶定義的PK和版本按照一定邏輯把數(shù)據(jù)合并,確保數(shù)據(jù)最終一致,分析側(cè)的正確性。
在整個Watremark維護上需要考慮很多,如果全量掛了,再重試一下,位點應(yīng)該從哪里開始,如果增量掛了,不僅要考慮增量之前已經(jīng)進行到哪里,還要漸進式的維護增量位點,不能每次增量一掛就回退到最開始全量前的位點,那后面數(shù)據(jù)延遲太嚴重了。在Lakehouse表級別維護這些信息,在Workload運行時、重啟、重試等過程可以自動銜接,對用戶透明。
第二個是像類消息產(chǎn)品的入湖,我們也做了一些技術(shù)探索和業(yè)務(wù)嘗試,它的數(shù)據(jù)不像DB一樣Schema都很明確。像阿里云現(xiàn)有的Kafka服務(wù)里,它的Schema只有兩個字段,Key和Value,Key描述消息Id,value自定義,大部分時候是一個Json,或者是二進制串。首先要解決怎么映射成行,會有很多邏輯處理,比如先做一些Schema推斷,得到原始的結(jié)構(gòu)。Json原來的嵌套格式比較容易存儲,但是分析起來比較費勁,只有打平成一個寬表分析才方便,所以還要做一些嵌套打平、格式展開等等邏輯,再配合前面提到的核心邏輯,最終實現(xiàn)文件寫入、元信息合并等等。這個元信息合并就是指,源頭的列的個數(shù)不確定,對于不同的行有時候有這個列,有時候沒有。而對于Hudi來講,需要在應(yīng)用層把元信息維護好。Lakehouse里的Schema Evolution,就是Schema的合并、列的兼容處理、新增列的自動維護等等。
我們內(nèi)部有基于Lindorm的方案。Lindorm是我們自研兼容HBase、Cassandra等大寬表接口的KV行存。它有很多的歷史文件和很多Log數(shù)據(jù),通過內(nèi)部的LTS服務(wù)調(diào),把全量和增量數(shù)據(jù)通過Lakehouse方式存在轉(zhuǎn)換成列存文件,支持分析。
對Kafka、SLS系統(tǒng)中都有分片(Partition、Shard)概念,流量變化很大時需要自動擴縮容,因此消費側(cè)要主動感知變化,不影響數(shù)據(jù)正確性的持續(xù)消費。并且這種數(shù)據(jù)都是偏Append-Only,正好可以利用好Hudi小文件合并能力,讓下游分析更簡單、更快、更高效。
三、客戶最佳實踐
以上是技術(shù)探索的分享,接下來會介紹一下在客戶的應(yīng)用。之前一個跨境電商的客戶,他們問題就是DB數(shù)據(jù)不容易分析,目前有PolarDB和MongoDB系統(tǒng),希望把所有數(shù)據(jù)近實時入湖到OSS上做分析。現(xiàn)在業(yè)界聯(lián)邦分析FederatedAnalytics,問題在于直連查詢數(shù)據(jù)時原庫的壓力很大,最好的方式就是入湖到離線湖中里做分析。通過Lakehouse方式構(gòu)建離線湖倉,再對接計算和分析,或者對接ETL清晰,規(guī)避對線上數(shù)據(jù)的影響,同一架構(gòu)把整體數(shù)據(jù)平臺構(gòu)建起來,應(yīng)用、分析百花齊放,不影響任何東西。
這個客戶的難點是他們有很多庫、表以及各種各樣的應(yīng)用case,我們在Hudi上做了很多優(yōu)化,也完成了20多個patch貢獻到社區(qū)里完善Hudi,包括元信息打通、部分Schema Evolution能力,在客戶側(cè)也應(yīng)用起來。
另一個客戶數(shù)是Kafka日志近實時分析。原來他們的方案需要人肉做很多步驟,包括入湖、數(shù)據(jù)管理、小文件合并等。通過Lakehouse方案,對接客戶數(shù)據(jù),自動合并入湖,維護元信息,客戶直接應(yīng)用就可以了,內(nèi)部直接打通了。
還有一個問題小文件,在他們的場景里與Hudi社區(qū)一起參與Clustering技術(shù)的建設(shè)。Clustering就是自動將小文件合并成大文件,因為大文件利于分析。其次,在合并過程中,可以按照某些特定列把數(shù)據(jù)排序,后續(xù)訪問這些數(shù)據(jù)列時,性能會好很多。
四、未來展望
最后,我再分享一下我們團隊對未來的思考,Lakehouse可以怎么應(yīng)用起來。
第一,更豐富的入湖數(shù)據(jù)源。Lakehous重要的價值在于屏蔽各種數(shù)據(jù)差異,打破數(shù)據(jù)孤島。在云上很多系統(tǒng)中有各種各樣的數(shù)據(jù),有很大的分析價值,未來要統(tǒng)一更多的數(shù)據(jù)源,只支持一個DB或Kafka,客戶價值不是最大化的。只有把足量的數(shù)據(jù)匯總到一起,形成大的離線湖倉,并且屏蔽復(fù)雜度,對用戶的價值才愈發(fā)明顯。除了云產(chǎn)品,還有其他形式的入湖,像專有云、自建系統(tǒng)、自主上傳場景等。主要還是強化貼源層的能力。
第二,更低成本、更可靠的存儲能力,圍繞數(shù)據(jù)生命周期管理。因為阿里云OSS有非常豐富的計費方式,支持多種存儲(標(biāo)準存儲、低頻存儲、冷存儲以及更冷的存儲)等等,計費邏輯里幾十項,一般人不完全清楚。但對用戶來講,成本永遠是設(shè)計中心心,尤其是構(gòu)建海量的離線湖倉,因為數(shù)據(jù)量越來越大、成本就越來越多。
之前接觸過一個客戶,他需要存儲三十年的數(shù)據(jù),他們的業(yè)務(wù)是股票分析,要把交易所、券商的所有數(shù)據(jù)全部爬下來,傳到大的湖倉里。因為要做三十年的分析,成本優(yōu)化是非常關(guān)鍵的。原來選擇在線系統(tǒng),存幾個月就扛不住了,因為數(shù)據(jù)量太大了。分析數(shù)據(jù)是有從冷到熱、從相對低頻到高頻訪問的特點,Lakehouse利用這些特點,通過定義規(guī)則和邏輯,自動屏蔽用戶對哪些目錄需要冷存儲、哪些目錄需要熱存儲的復(fù)雜維護,幫用戶走得更進一步。
第三,更強的分析能力。在Hudi加速分析的能力里,除了前面提到的Clustering,還有Compaction。Clustering就是小文件合并,比如日志場景,每寫入一批就產(chǎn)生一個文件,這些文件一般都不是很大,但文件越小越碎分析時的訪問代價很大。訪問一個文件就要做鑒權(quán)、建連接、元信息訪問。訪問一個大文件這些過程只做一次,而訪問小文件則成倍放大,開銷非常大。在Append場景,通過Clustering快速合并小文件成大文件,規(guī)避因為寫入而導(dǎo)致的分析性能線性退化問題,確保分析高效。
在Hudi中如果是Merge On Read類型的表,比如Delete、Update都會快速寫到log文件,在后續(xù)讀的時候Merge數(shù)據(jù),形成完整的邏輯的數(shù)據(jù)視圖。這里問題也很明顯,如果有1000個log文件,每次讀需要合并1000次,分析能力退化肯定非常嚴重。這時Hudi的Compaction能力就會定期把log文件合并起來。前面提到,如果完全要在同一個入湖作業(yè)里實現(xiàn),尤其是文件合并,計算開銷很大,在做這些重負載的時候,對入湖鏈路的延遲影響很大,一定要通過異步化調(diào)度的方式,實現(xiàn)寫延遲保障。并且這些過程都是可彈性的,不論是100個文件要合還是1萬個文件要合,都是可以快速彈性而不影響延遲,非常有優(yōu)勢。
第四,更豐富的場景化應(yīng)用。個人覺得Lakehouse還是面向貼源層的能力,配合做一定程度的聚合。因為更高層次的聚合性和實時性,有更多實時數(shù)倉選擇,現(xiàn)在業(yè)界比較火的DorisDB、ClickHouse對實時的高頻分析有很大優(yōu)勢。基于Hudi、Lakehouse、OSS做實時分析沒有太多優(yōu)勢,所以還是以構(gòu)建貼源層的能力為主。
原來都是近實時入湖場景,但是可能有些用戶沒有這么多實時性要求,周期性的T+1邏輯建倉可以滿足,可以利用Hudi+Lakehouse能力,每天查詢一部分邏輯增量數(shù)據(jù)并寫入Hudi,并維護分區(qū),和實現(xiàn)Schema Evolution能力。
早期數(shù)據(jù)量越來越大,客戶通過分庫分表實現(xiàn)邏輯拆分。分析的時候發(fā)現(xiàn)庫、表太多了,分析、關(guān)聯(lián)難度大,這時候可以通過構(gòu)建多庫多表合并建倉能力,匯總到一張表后做分析。
然后是跨區(qū)域融合分析,有很多客戶提這樣的需求,尤其是海外。有些客戶要服務(wù)海外用戶,必須有部分業(yè)務(wù)在海外,特別在跨境電商的場景,而它的采購體系、倉儲體系、物流體系、分銷體系等又都在國內(nèi)建設(shè),很多數(shù)據(jù)想要融合分析怎么辦?首先OSS提供了跨域復(fù)制,但也只是到數(shù)據(jù)層面,沒有任何邏輯,在這里可以通過Lakehouse做邏輯層建設(shè),把不同region數(shù)據(jù)混合在一起,匯總到同一個區(qū)域之后,提供統(tǒng)一的SQL join、union等能力。
最后Hudi有TimeTravel、Incremental query的能力,這時候構(gòu)建incremental ETL清洗不同的表,在一定程度上通用化,讓用戶用得更簡單。未來內(nèi)置更多場景化能力,讓用戶構(gòu)建和應(yīng)用湖倉更加簡單!