10億+/秒!看阿里如何搞定實(shí)時(shí)數(shù)倉高吞吐實(shí)時(shí)寫入與更新
原創(chuàng)作者 | 胡一博(上唐)
數(shù)據(jù)實(shí)時(shí)入倉所面臨的挑戰(zhàn):高性能、可更新、大規(guī)模
大數(shù)據(jù)場景下,實(shí)時(shí)數(shù)據(jù)如何寫入實(shí)時(shí)數(shù)倉永遠(yuǎn)是一個(gè)比較大的話題,根據(jù)業(yè)務(wù)場景需求,常見的寫入類型有:
- Append only:傳統(tǒng)日志類數(shù)據(jù)(日志、埋點(diǎn)等)中,記錄(Record)和記錄之間沒有關(guān)聯(lián)性,因此新來的記錄只需要append到系統(tǒng)中就好了。這是傳統(tǒng)大數(shù)據(jù)系統(tǒng)最擅長的一種類型。
- Insert or Replace:根據(jù)設(shè)置的主鍵(Primary Key, PK)進(jìn)行檢查,如果系統(tǒng)中不存在此PK,就把這行記錄append進(jìn)系統(tǒng); 如果存在,就把系統(tǒng)中舊的記錄用新的記錄整行覆蓋。典型的使用場景有:
上游數(shù)據(jù)庫通過Binlog實(shí)時(shí)同步,這種寫入就是Insert or Replace。
Flink的結(jié)果實(shí)時(shí)寫出。Flink持續(xù)刷新結(jié)果,需要Insert or Replace的寫目標(biāo)表。
Lambda架構(gòu)下的離線回刷。Lambda架構(gòu)下離線鏈路T+1回刷實(shí)時(shí)結(jié)果表中昨天的記錄。
- Insert or Update:通常使用在多個(gè)應(yīng)用更新同一行數(shù)據(jù)的不同字段,實(shí)現(xiàn)多個(gè)數(shù)據(jù)源的JOIN。如果這行記錄存在,各個(gè)應(yīng)用直接根據(jù)PK去update各自的字段;但如果這行記錄不存在,那么第一個(gè)要寫入這行記錄的應(yīng)用就需要INSERT這行記錄。典型的使用場景:
畫像類應(yīng)用。這類應(yīng)用在實(shí)時(shí)風(fēng)控、實(shí)時(shí)廣告投放等非常常見。上游多個(gè)Flink Job實(shí)時(shí)計(jì)算畫像的不同維度,并實(shí)時(shí)寫入到同一行記錄的不同字段中。
實(shí)時(shí)離線數(shù)據(jù)整合。在需要同時(shí)用到實(shí)時(shí)和離線計(jì)算的場合,把同一個(gè)PK的實(shí)時(shí)和離線結(jié)果放在同一行記錄的不同字段中,就可以方便的同時(shí)取到實(shí)時(shí)和離線的計(jì)算結(jié)果。
下文中,我們把Insert or Replace和Insert or Update統(tǒng)稱為Upsert。
而要保持非常高效的寫入性能,實(shí)時(shí)數(shù)倉技術(shù)都面臨著非常大的挑戰(zhàn),典型的挑戰(zhàn)有以下幾個(gè)方面:
挑戰(zhàn)一:Merge on Read還是Merge on Write?
Upsert模式下,新舊數(shù)據(jù)的合并發(fā)生在什么時(shí)候,如果希望查詢性能好,那么肯定希望合并發(fā)生在寫入時(shí)(Merge on Write)。這樣,在系統(tǒng)中任何時(shí)刻任一主鍵都只有一條記錄;而如果希望寫入性能好,那么就是寫入不做合并,查詢時(shí)再做合并(Merge on Read)。這對于查詢是非常不友好的,極大限制查詢性能。
Merge on Read原理示例:
Merge on Write原理示例:
挑戰(zhàn)二:是否支持主鍵(Primary Key)模型?
實(shí)時(shí)數(shù)倉在數(shù)據(jù)模型上是不是支持主鍵對于Upsert的實(shí)時(shí)寫入是至關(guān)重要的。如果沒有主鍵,在寫入側(cè)數(shù)據(jù)的更新就很容易退化成全表更新,性能非常差,在查詢側(cè),Merge On Read也無從做起。
挑戰(zhàn)三:是否保證寫入的Exactly Once?
如果上游因?yàn)閒ailover等因素導(dǎo)致寫入重復(fù)執(zhí)行,能不能保證系統(tǒng)中只有一條記錄(Merge on Write)或者查詢時(shí)等效只有一條數(shù)據(jù)(Merge on Read)且是最新的數(shù)據(jù)?大數(shù)據(jù)系統(tǒng)復(fù)雜,上游系統(tǒng)failover是常態(tài),不能因?yàn)樯嫌蝔ailover,就導(dǎo)致實(shí)時(shí)數(shù)倉數(shù)據(jù)重復(fù)。
問題四:數(shù)據(jù)是否寫入即可見?
數(shù)據(jù)寫入的時(shí)效性也是實(shí)時(shí)數(shù)倉的重要能力之一。對于BI類等延遲不敏感的業(yè)務(wù)查詢,如果寫入時(shí)延幾秒甚至幾分鐘可能是可以接受的。而對于很多生產(chǎn)系統(tǒng),如實(shí)時(shí)風(fēng)控、實(shí)時(shí)大屏等場景,要求數(shù)據(jù)寫入即可見。如果寫入出現(xiàn)延遲,就會查詢不到最新的數(shù)據(jù),嚴(yán)重影響線上業(yè)務(wù)決策。
挑戰(zhàn)五:如何支持超大的數(shù)據(jù)量和超高的RPS實(shí)時(shí)寫入(每秒記錄數(shù),Record Per Second)?
如果數(shù)據(jù)量小,寫入RPS要求低,一個(gè)傳統(tǒng)的數(shù)據(jù)庫就能很好的解決這個(gè)問題。但是在大數(shù)據(jù)場景下,當(dāng)RPS達(dá)到幾十萬幾百萬時(shí),如何更好支持?jǐn)?shù)據(jù)的實(shí)時(shí)寫入?同時(shí),如果目標(biāo)表中已經(jīng)有海量數(shù)量(十億、百億甚至更多)時(shí),Upsert要求訪問和訂正已有數(shù)據(jù),這時(shí)是否還能支持高性能的Upsert?
Hologres的實(shí)時(shí)寫入模型與性能
Hologres是阿里自研的一站式大數(shù)據(jù)實(shí)時(shí)數(shù)倉,在設(shè)計(jì)之初就對實(shí)時(shí)寫入場景進(jìn)行了充分的考慮,主要有以下幾個(gè)方面:
- 支持主鍵,可以高效利用主鍵更新、刪除數(shù)據(jù)。
- 支持Upsert:完整支持高性能的Append Only、Insert or Replace、Insert or Update 3種能力,可根據(jù)業(yè)務(wù)場景選擇寫入模式。
- 對于列存表,自動使用Merge on Write方案。對于行存表,自動使用Merge on Read方案,原因如下:
對于列存表,主要是做復(fù)雜的OLAP分析,因此查詢性能最重要。
對于行存表來說,查詢主要是點(diǎn)查,此時(shí)Merge on Read單行的開銷足夠小,因此重點(diǎn)考慮寫入性能。在阿里很多點(diǎn)查場景,寫入要求非常高的RPS。
- 支持Exactly Once。通過單行SQL事務(wù)和主鍵PK自動去重來實(shí)現(xiàn)。無論是批量數(shù)據(jù)寫入(一次更新幾億條記錄),還是逐條記錄實(shí)時(shí)寫入,Hologres都是保證單條SQL的原子性(ACID)。而對于上游Flink等failover造成的SQL重發(fā),Hologres通過目標(biāo)表的主鍵,實(shí)現(xiàn)自動覆蓋或者忽略(對于Upsert是自動覆蓋;對于append,是自動忽略Insert or Ignore)。因此,目標(biāo)表是冪等的。
- 寫入即可見。Hologres沒有類似ElasticSearch的build過程,也沒有類似ClickHouse或者Greenplum的攢批過程,數(shù)據(jù)通過SQL寫入時(shí),SQL返回即表示寫入完成,數(shù)據(jù)即可查詢。因此通過Flink等實(shí)時(shí)寫入(背后也是SQL寫入)能滿足寫入即可見,無延遲。
這5個(gè)設(shè)計(jì)選取也是傳統(tǒng)數(shù)據(jù)庫的選擇。經(jīng)驗(yàn)證明,這對于用戶來說是最自然、最友好的使用方式。Hologres的創(chuàng)新在于把這個(gè)方案成功的應(yīng)用于大數(shù)據(jù)領(lǐng)域(超高RPS寫入和超大存儲量)。
下圖為Hologres 128C實(shí)例下,10個(gè)并發(fā)實(shí)時(shí)寫入20列的列存表的測試結(jié)果。其中豎軸表示每秒寫入記錄數(shù),4個(gè)場景分別為:
- case1:寫入無主鍵表;
- case2:寫入有主鍵表(Insert or Replace),并且每次INSERT的主鍵和表已有數(shù)據(jù)都不沖突;
- case3:寫入有主鍵表(Insert or Replace),并且每次INSERT的主鍵和表已有數(shù)據(jù)均沖突,表中數(shù)據(jù)量為2億。
- case4:寫入有主鍵表(Insert or Replace),并且每次INSERT的主鍵和表已有數(shù)據(jù)均沖突,表中數(shù)據(jù)量為20億。
結(jié)果解讀:
- 對比case1和case2,可以看到Hologres判斷主鍵是否存在性能損失較?。?/li>
- 對比case2,case3,case4,可以看到主鍵沖突時(shí),hologres定位數(shù)據(jù)所在文件并標(biāo)記DELETE基本不隨數(shù)據(jù)規(guī)模上漲而上漲,可以應(yīng)對海量數(shù)據(jù)下的高速Upsert。
與常見產(chǎn)品對比
寫入方式 | 更新/刪除方式 | 更新刪除對查詢的影響 | |
ClickHouse | 攢批寫入,每個(gè)批次完成才能查詢到數(shù)據(jù) | merge on read | 查詢明細(xì)時(shí)相同pk可能多次出現(xiàn),取決于compaction時(shí)機(jī) |
Doris | 攢批寫入,每個(gè)批次完成才能查詢到數(shù)據(jù) | merge on read | 查詢時(shí)要進(jìn)行合并,性能有損失 |
Hudi/iceberg/delta lake等數(shù)據(jù)湖產(chǎn)品 | 攢批寫入,每個(gè)批次完成才能查詢到數(shù)據(jù) | merge on read或copy on write,大多會造成全量數(shù)據(jù)重寫,導(dǎo)致IO放大 | merge on read,查詢時(shí)要進(jìn)行合并,性能有損失;copy on write,查詢性能沒有影響 |
Hologres | 流式寫入,寫入即可查詢,低延遲 | Merge on write強(qiáng)主鍵模型,更新/刪除成本非常低。 | 通過delete bitmap技術(shù)實(shí)現(xiàn)Merge on Write,更新/刪除對查詢沒有影響 |
Merge on Write模式下 實(shí)時(shí)寫入與更新的常見原理
一個(gè)典型的Upsert(Insert or Replace)場景如下,一張用戶表,通過INSERT INTO ON CONFLICT執(zhí)行插入新用戶/更新老用戶操作:
CREATE TABLE users (
id int not null,
name text not null,
age int,
primary key(id)
);
INSERT INTO users VALUES (?,?,?)
ON CONFLICT(id) DO UPDATE
SET name = EXCLUDED.name, sex = EXCLUDED.sex, age = EXCLUDED.age;
性能最高的實(shí)現(xiàn)方式是寫入時(shí)APPEND ONLY不斷寫入新文件,在查詢時(shí)進(jìn)行數(shù)據(jù)邏輯合并(Merge on Read)。但這種對查詢的性能打擊是致命的,每次查詢要多個(gè)版本的數(shù)據(jù)join過才能獲取到一行最新的值。實(shí)時(shí)數(shù)倉在寫時(shí)合并(Merge on Write)方案下,Upsert的實(shí)現(xiàn)一般分為三步:
- 定位舊數(shù)據(jù)所在文件。
- 處理舊數(shù)據(jù)
- 寫入新數(shù)據(jù)
要實(shí)現(xiàn)高RPS的實(shí)時(shí)Upsert,本質(zhì)就是要把這3個(gè)步驟都做快。
1.定位舊數(shù)據(jù)所在文件
快速定位舊數(shù)據(jù)文件,有如下幾種做法:
1)bloom過濾器
bloom過濾器原理上是為每個(gè)key生成若干個(gè)hash值,通過hash碰撞來判斷是否存在相同的key。為每個(gè)文件生成一個(gè)bloom過濾器,可以明確排除不存在該key的文件。Bloom過濾器可以以很高的精度(99%甚至更高)確定一個(gè)Key不在一個(gè)文件中。
2)范圍過濾器
范圍過濾器就是記錄文件內(nèi)列的最大最小值,是一個(gè)代價(jià)非常小的過濾方式,當(dāng)key基本處于一個(gè)遞增態(tài)勢是可以得到一個(gè)非常好的過濾效果。
3)外部索引
Hudi支持HBase索引,在HBase中保存PK->file_id的映射。HBase LSM-tree的存儲結(jié)構(gòu)對于key-value的查詢非常高效,Hudi通過這種方式也不再需要去猜測哪些文件可能包含了這個(gè)PK。但是這里有兩個(gè)問題:
- HBase狀態(tài)和Hudi表狀態(tài)的一致性,因?yàn)镠Base和Hudi是獨(dú)立的兩套系統(tǒng),一方如果發(fā)生故障可能導(dǎo)致索引失效。
- 性能上限是HBase的PK點(diǎn)查性能。要取得更好的寫入性能是困難的。
2.處理舊數(shù)據(jù)+寫入新數(shù)據(jù)
常見的是兩種處理方法:
1)刷新數(shù)據(jù)文件
定位到數(shù)據(jù)所在文件后,將文件和新數(shù)據(jù)合并后生成一個(gè)新的數(shù)據(jù)文件覆蓋舊文件。(Copy on Write)。Iceberg支持這種模式。這會導(dǎo)致非常嚴(yán)重的寫放大。
2)引入delta文件
定位到數(shù)據(jù)所在文件后:
- 在數(shù)據(jù)文件對應(yīng)的delta文件中標(biāo)記該行舊數(shù)據(jù)為刪除狀態(tài)。
- 在delta中追加新數(shù)據(jù)的信息。
這種方式?jīng)]有寫放大,但是在查詢時(shí)需要將數(shù)據(jù)文件和對應(yīng)的delta文件做join操作。
Hologres 基于Memtable的寫入原理
Hologres的實(shí)時(shí)寫入與更新基本遵循Merge on Write的原理。對于實(shí)時(shí)數(shù)倉場景下的record級別的更新/插入,Hologres采用強(qiáng)主鍵的方式來讓單行更新/插入足夠輕量化,采用memtable + wal log的方式,支持高頻次的寫入操作。
1.文件模型
Hologres每張列存表底層會保存三種文件:
第一種是主鍵索引文件,采用行存結(jié)構(gòu)存儲,提供高速的key-value服務(wù),索引文件的key為表的主鍵,value為unique_id和聚簇索引。unique_id每次Upsert自動生成,單調(diào)遞增。主鍵索引文件實(shí)現(xiàn)高效的主鍵沖突判定并輔助數(shù)據(jù)文件定位;
第二種是數(shù)據(jù)文件,采用列存結(jié)構(gòu)存儲,文件內(nèi)按照聚簇索引+unique_id生成稀疏索引,并對unique_id生成范圍過濾器;
第三種是delete bitmap文件,每個(gè)file id對應(yīng)一個(gè)bitmap,bitmap中第N位為1表示file id中的第N行標(biāo)記為刪除。delete bitmap在列存模型下,相當(dāng)于是表的一列數(shù)據(jù)。Update時(shí)只刷新bitmap信息既保留了Merge on Write對查詢性能幾乎零破壞的優(yōu)點(diǎn),又極大降低了IO的開銷。
三類文件都是先寫入memtable,memtable達(dá)到特定大小后轉(zhuǎn)為不可變的memtable對象,并生成新的memtable供后續(xù)寫入使用。不可變的memtable對象由異步的flush線程將其持久化為磁盤上的文件。
2.Upsert流程
通過這個(gè)流程圖可以看到:
- 如果主鍵沒有發(fā)生沖突,那么一次Upsert的的開銷= 一次索引查詢 + 兩次內(nèi)存寫入操作;
- 如果主鍵發(fā)生了沖突,那么一次Upsert的開銷=一次索引查詢 + 一次文件及行號定位 +三次內(nèi)存寫入操作。
3.Upsert示例
下面通過示例來展示一次Upsert的過程。假設(shè)pk為id,cluserting key為name,數(shù)據(jù)列為age。(deleted信息物理上存儲于delete bitmap中,但邏輯上等同與表的一列,下文將合并在數(shù)據(jù)文件中一同描述)
CREATE TABLE users (
id text not null,
name text not null,
age int,
primary key(id)
);
表初始數(shù)據(jù)如下:
id | name | age |
u0 | 張三 | 10 |
u1 | 李四 | 11 |
u2 | 王五 | 12 |
此時(shí)執(zhí)行如下SQL:
INSERT INTO users VALUES ('u1','新李四',12)
ON CONFLICT(id) DO UPDATE
SET name = EXCLUDED.name
, age = EXCLUDED.age;
更新過程如下:
更新完成后表數(shù)據(jù)如下:
id | name | age |
u0 | 張三 | 10 |
u1 | 新李四 | 12 |
u2 | 王五 | 12 |
Hologres寫入全鏈路優(yōu)化,雕琢細(xì)節(jié)
Hologres在接口上完全兼容PostgreSQL(包括語法、語義、協(xié)議等),所以可以直接使用PostgreSQL的JDBC Driver連接Hologres進(jìn)行數(shù)據(jù)讀寫。除了寫入原理上的創(chuàng)新性外,Hologres也針對寫入進(jìn)行了全鏈路的優(yōu)化,以達(dá)到更高性能的吞吐。
1.Fixed Plan:降低、避免SQL解析與優(yōu)化器的開銷
- Query Optimizer進(jìn)行shortcut
對于符合pattern的Upsert sql,Hologres的Query Optimizer進(jìn)行了相應(yīng)的short cut,Upsert Query并不會進(jìn)入Opimizer的完整流程。Query進(jìn)入FrontEnd后它會交由Fixed Planner進(jìn)行處理,并由其生成對于的Fixed Plan(Upsert的物理Plan),F(xiàn)ixed Planner非常輕,無需經(jīng)過任何的等價(jià)變換、邏輯優(yōu)化、物理優(yōu)化等步驟,僅僅是基于AST樹進(jìn)行了一些簡單的分析并構(gòu)建出對應(yīng)的Fixed Plan,從而盡量規(guī)避掉優(yōu)化器的開銷。
- Prepared Statement
盡管Query Optimizer對Upsert Query進(jìn)行了short cut,但是Query進(jìn)入到FrontEnd后的解析開銷依然存在、Query Optimizer的開銷也沒有完全避免。
Hologres兼容Postgres,Postgres的前、后端通信協(xié)議有extended協(xié)議與simple協(xié)議兩種:
1) simple協(xié)議:是一次性交互的協(xié)議,Client每次會直接發(fā)送待執(zhí)行的SQL給Server,Server收到SQL后直接進(jìn)行解析、執(zhí)行,并將結(jié)果返回給Client。simple協(xié)議里Server無可避免的至少需要對收到的SQL進(jìn)行解析才能理解其語義。
2)extended協(xié)議:Client與Server的交互分多階段完成,整體大致可以分成兩大階段。
- 第一階段:Client在Server端定義了一個(gè)帶名字的Statement,并且生成了該Statement所對應(yīng)的generic plan(不與特定的參數(shù)綁定的通用plan)。
- 第二階段:用戶通過發(fā)送具體的參數(shù)來執(zhí)行第一階段中定義的Statement。第二階段可以重復(fù)執(zhí)行多次,每次通過帶上第一階段中所定義的Statement名字,以及執(zhí)行所需要的參數(shù),使用第一階段生成的generic plan進(jìn)行執(zhí)行。由于第二階段可以通過Statement名字和附帶的參數(shù)來反復(fù)執(zhí)行第一個(gè)階段所準(zhǔn)備好的generic plan,因此第二個(gè)段在Frontend的開銷幾乎等同于0。
為此Hologres基于Postgres的extended協(xié)議,支持了Prepared Statement,做到了Upsert Query在Frontend上的開銷接近于0。
2.高性能的內(nèi)部通信
- Reactor模型、全程無鎖的異步操作
內(nèi)部通信原理類似reactor模型,每個(gè)目標(biāo)shard對應(yīng)一個(gè)eventloop,以“死循環(huán)”的方式處理該shard上的請求。由于HOS(Hologres Operation System)對調(diào)度執(zhí)行單元的抽象,即使是shard很多的情況下,這種工作方式的基礎(chǔ)消耗也足夠低。
- 高效的數(shù)據(jù)交換協(xié)議binary row
通過自定義一套內(nèi)部的數(shù)據(jù)通信協(xié)議binary row來減少整個(gè)交互鏈路上的內(nèi)存的分配與拷貝。
- 反壓與湊批
BHClient可以感知后端的壓力,進(jìn)行自適應(yīng)的反壓與湊批,在不影響原有Latency的情況下提升系統(tǒng)吞吐。
3.穩(wěn)定可靠的后端實(shí)現(xiàn)
- 基于C++純異步的開發(fā)
Hologres采用C++進(jìn)行開發(fā),相較于Java,native語言使得我們能夠追求到更極致的性能。同時(shí)基于HOS提供的異步接口進(jìn)行純異步開發(fā),HOS通過抽象ExecutionContext來自我管理CPU的調(diào)度執(zhí)行,能夠最大化的利用硬件資源、達(dá)到吞吐最大化。
- IO優(yōu)化與豐富的Cache機(jī)制
Hologres實(shí)現(xiàn)了非常豐富的Cache機(jī)制row cache、block cache、iterator cache、meta cache等,來加速熱數(shù)據(jù)的查找、減少IO訪問、避免新內(nèi)存分配。當(dāng)無可避免的需要發(fā)生IO時(shí),Hologres會對并發(fā)IO進(jìn)行合并、通過wait/notice機(jī)制確保只訪問一次IO,減少IO處理量。通過生成文件級別的詞典及壓縮,減少文件物理存儲成本及IO訪問。
總結(jié)
Hologres是阿里巴巴自主研發(fā)的一站式實(shí)時(shí)數(shù)倉引擎,支持海量數(shù)據(jù)實(shí)時(shí)寫入、實(shí)時(shí)更新、實(shí)時(shí)分析,支持標(biāo)準(zhǔn)SQL(兼容PostgreSQL協(xié)議),支持PB級數(shù)據(jù)多維分析(OLAP)與即席分析(Ad Hoc),支持高并發(fā)低延遲的在線數(shù)據(jù)服務(wù)(Serving),并在阿里巴巴雙11等大促核心場景上,Hologres寫入峰值達(dá)11億條+/秒,經(jīng)過大規(guī)模數(shù)據(jù)生產(chǎn)驗(yàn)證。
常見的數(shù)據(jù)倉庫產(chǎn)品,大多都會犧牲讀性能或者犧牲寫性能,并且它們往往文件作為訪問介質(zhì),這天然約束了數(shù)據(jù)更新的頻率。Hologres 通過memtable使數(shù)據(jù)可以高頻更新,通過delete map讓讀操作避免了join操作保持了良好的讀性能,通過主鍵模型解決了寫操作時(shí)的效率問題,做到了讀寫性能的兼顧。同時(shí)Hologres同F(xiàn)link、Spark等計(jì)算框架原生集成,通過內(nèi)置Connector,支持高通量數(shù)據(jù)實(shí)時(shí)寫入與更新,支持源表、結(jié)果表、維度表多種場景,支持多流合并等復(fù)雜操作。?
從阿里集團(tuán)誕生到云上商業(yè)化,隨著業(yè)務(wù)的發(fā)展和技術(shù)的演進(jìn),Hologres也在持續(xù)不斷優(yōu)化核心技術(shù)競爭力,為了讓大家更加了解Hologres,我們計(jì)劃持續(xù)推出Hologres底層技術(shù)原理揭秘系列,從高性能存儲引擎到高效率查詢引擎,高吞吐寫入到高QPS查詢等,全方位解讀Hologres,請大家持續(xù)關(guān)注!