Flink SQL 知其所以然之維表 Join 的性能優(yōu)化之路(上)附源碼
本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)羊說(shuō)」,作者antigeneral了呀 。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)羊說(shuō)公眾號(hào)。
1.序篇
源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 sql lookup join獲取。
廢話不多說(shuō),咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來(lái)什么幫助:
- 背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,flink sql 提供了輕松訪問(wèn)外部存儲(chǔ)的 lookup join(與上節(jié)不同,上節(jié)說(shuō)的是流與流的 join)。lookup join 可以簡(jiǎn)單理解為使用 flatmap 訪問(wèn)外部存儲(chǔ)數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
- 來(lái)一個(gè)實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
- flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時(shí)間的,并且 lookup join 經(jīng)常會(huì)由于訪問(wèn)外部存儲(chǔ)的 qps 過(guò)高而導(dǎo)致背壓,產(chǎn)出延遲等性能問(wèn)題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用 local cache,異步訪問(wèn)維表,批量訪問(wèn)維表三種方式去解決性能問(wèn)題。
- 總結(jié)及展望:官方并沒(méi)有提供 批量訪問(wèn)維表 的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請(qǐng)期待下篇文章。
2.背景及應(yīng)用場(chǎng)景介紹
維表作為 sql 任務(wù)中一種常見表的類型,其本質(zhì)就是關(guān)聯(lián)表數(shù)據(jù)的額外數(shù)據(jù)屬性,通常在 join 語(yǔ)句中進(jìn)行使用。比如源數(shù)據(jù)有人的 id,你現(xiàn)在想要得到人的性別、年齡,那么可以通過(guò)用戶 id 去關(guān)聯(lián)人的性別、年齡,就可以得到更全的數(shù)據(jù)。
維表 join 在離線數(shù)倉(cāng)中是最常見的一種數(shù)據(jù)處理方式了,在實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景中,flink sql 目前也支持了維表的 join,即 lookup join,生產(chǎn)環(huán)境可以用 mysql,redis,hbase 來(lái)作為高速維表存儲(chǔ)引擎。
Notes:
在實(shí)時(shí)數(shù)倉(cāng)中,常用實(shí)時(shí)維表有兩種更新頻率
實(shí)時(shí)的更新:維度信息是實(shí)時(shí)新建的,實(shí)時(shí)寫入到高速存儲(chǔ)引擎中。然后其他實(shí)時(shí)任務(wù)在做處理時(shí)實(shí)時(shí)的關(guān)聯(lián)這些維度信息。
周期性的更新:對(duì)于一些緩慢變化維度,比如年齡、性別的用戶畫像等,幾萬(wàn)年都不變化一次的東西??,實(shí)時(shí)維表的更新可以是小時(shí)級(jí)別,天級(jí)別的。
3.來(lái)一個(gè)實(shí)戰(zhàn)案例
來(lái)看看在具體場(chǎng)景下,對(duì)應(yīng)輸入值的輸出值應(yīng)該長(zhǎng)啥樣。
需求指標(biāo):使用曝光用戶日志流(show_log)關(guān)聯(lián)用戶畫像維表(user_profile)關(guān)聯(lián)到用戶的維度之后,提供給下游計(jì)算分性別,年齡段的曝光用戶數(shù)使用。此處我們只關(guān)心關(guān)聯(lián)維表這一部分的輸入輸出數(shù)據(jù)。
來(lái)一波輸入數(shù)據(jù):
曝光用戶日志流(show_log)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 kafka 中):
log_id | timestamp | user_id |
---|---|---|
1 | 2021-11-01 00:01:03 | a |
2 | 2021-11-01 00:03:00 | b |
3 | 2021-11-01 00:05:00 | c |
4 | 2021-11-01 00:06:00 | b |
5 | 2021-11-01 00:07:00 | c |
用戶畫像維表(user_profile)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 redis 中):
user_id(主鍵) | age | sex |
---|---|---|
a | 12-18 | 男 |
b | 18-24 | 女 |
c | 18-24 | 男 |
注意:redis 中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)是按照 key,value 去存儲(chǔ)的。其中 key 為 user_id,value 為 age,sex 的 json。如下圖所示:
user_profile redis
預(yù)期輸出數(shù)據(jù)如下:
log_id | timestamp | user_id | age | sex |
---|---|---|---|---|
1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
5 | 2021-11-01 00:07:00 | c | 18-24 |
flink sql lookup join 登場(chǎng)。下面是官網(wǎng)的鏈接。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join
4.flink sql lookup join
4.1.lookup join 定義
以上述案例來(lái)說(shuō),lookup join 其實(shí)簡(jiǎn)單理解來(lái),就是每來(lái)一條數(shù)據(jù)去 redis 里面摟一次數(shù)據(jù)。然后把關(guān)聯(lián)到的維度數(shù)據(jù)給拼接到當(dāng)前數(shù)據(jù)中。
熟悉 DataStream api 的小伙伴萌,簡(jiǎn)單來(lái)理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中處理每一條來(lái)的數(shù)據(jù),針對(duì)每一條數(shù)據(jù)去訪問(wèn)用戶畫像的 redis。(實(shí)際上,flink sql api 中也確實(shí)是這樣實(shí)現(xiàn)的!sql 生成的 lookup join 代碼就是繼承了 flatmap)
4.2.上述案例解決方案
來(lái)看看上述案例的 flink sql lookup join sql 怎么寫:
- CREATE TABLE show_log (
- log_id BIGINT,
- `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
- user_id STRING,
- proctime AS PROCTIME()
- )
- WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.user_id.length' = '1',
- 'fields.log_id.min' = '1',
- 'fields.log_id.max' = '10'
- );
- CREATE TABLE user_profile (
- user_id STRING,
- age STRING,
- sex STRING
- ) WITH (
- 'connector' = 'redis',
- 'hostname' = '127.0.0.1',
- 'port' = '6379',
- 'format' = 'json',
- 'lookup.cache.max-rows' = '500',
- 'lookup.cache.ttl' = '3600',
- 'lookup.max-retries' = '1'
- );
- CREATE TABLE sink_table (
- log_id BIGINT,
- `timestamp` TIMESTAMP(3),
- user_id STRING,
- proctime TIMESTAMP(3),
- age STRING,
- sex STRING
- ) WITH (
- 'connector' = 'print'
- );
- -- lookup join 的 query 邏輯
- INSERT INTO sink_table
- SELECT
- s.log_id as log_id
- , s.`timestamp` as `timestamp`
- , s.user_id as user_id
- , s.proctime as proctime
- , u.sex as sex
- , u.age as age
- FROM show_log AS s
- LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
- ON s.user_id = u.user_id
這里使用了 for SYSTEM_TIME as of 時(shí)態(tài)表的語(yǔ)法來(lái)作為維表關(guān)聯(lián)的標(biāo)識(shí)語(yǔ)法。
Notes:
實(shí)時(shí)的 lookup 維表關(guān)聯(lián)能使用處理時(shí)間去做關(guān)聯(lián)。
運(yùn)行結(jié)果如下:
log_id | timestamp | user_id | age | sex |
---|---|---|---|---|
1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
flink web ui 算子圖如下:
flink web ui
但是!!!但是!!!但是!!!
flink 官方并沒(méi)有提供 redis 的維表 connector 實(shí)現(xiàn)。
沒(méi)錯(cuò),博主自己實(shí)現(xiàn)了一套。關(guān)于 redis 維表的 connector 實(shí)現(xiàn),直接參考下面的文章。都是可以從 github 上找到源碼拿來(lái)用的!
flink sql 知其所以然(二)| 自定義 redis 數(shù)據(jù)維表(附源碼)
4.3.關(guān)于維表使用的一些注意事項(xiàng)
同一條數(shù)據(jù)關(guān)聯(lián)到的維度數(shù)據(jù)可能不同:實(shí)時(shí)數(shù)倉(cāng)中常用的實(shí)時(shí)維表都是在不斷的變化中的,當(dāng)前流表數(shù)據(jù)關(guān)聯(lián)完維表數(shù)據(jù)后,如果同一個(gè) key 的維表的數(shù)據(jù)發(fā)生了變化,已關(guān)聯(lián)到的維表的結(jié)果數(shù)據(jù)不會(huì)再同步更新。舉個(gè)例子,維表中 user_id 為 1 的數(shù)據(jù)在 08:00 時(shí) age 由 12-18 變?yōu)榱?18-24,那么當(dāng)我們的任務(wù)在 08:01 failover 之后從 07:59 開始回溯數(shù)據(jù)時(shí),原本應(yīng)該關(guān)聯(lián)到 12-18 的數(shù)據(jù)會(huì)關(guān)聯(lián)到 18-24 的 age 數(shù)據(jù)。這是有可能會(huì)影響數(shù)據(jù)質(zhì)量的。所以小伙伴萌在評(píng)估你們的實(shí)時(shí)任務(wù)時(shí)要考慮到這一點(diǎn)。
會(huì)發(fā)生實(shí)時(shí)的新建及更新的維表博主建議小伙伴萌應(yīng)該建立起數(shù)據(jù)延遲的監(jiān)控機(jī)制,防止出現(xiàn)流表數(shù)據(jù)先于維表數(shù)據(jù)到達(dá),導(dǎo)致關(guān)聯(lián)不到維表數(shù)據(jù)
4.4.再說(shuō)說(shuō)維表常見的性能問(wèn)題及優(yōu)化思路
所有的維表性能問(wèn)題都可以總結(jié)為:高 qps 下訪問(wèn)維表存儲(chǔ)引擎產(chǎn)生的任務(wù)背壓,數(shù)據(jù)產(chǎn)出延遲問(wèn)題。
舉個(gè)例子:
- 在沒(méi)有使用維表的情況下:一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時(shí)延假如為 0.1 ms,那么并行度為 1 的任務(wù)的吞吐可以達(dá)到 1 query / 0.1 ms = 1w qps。
- 在使用維表之后:每條數(shù)據(jù)訪問(wèn)維表的外部存儲(chǔ)的時(shí)長(zhǎng)為 2 ms,那么一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時(shí)延就會(huì)變成 2.1 ms,那么同樣并行度為 1 的任務(wù)的吞吐只能達(dá)到 1 query / 2.1 ms = 476 qps。兩者的吞吐量相差 21 倍。
這就是為什么維表 join 的算子會(huì)產(chǎn)生背壓,任務(wù)產(chǎn)出會(huì)延遲。
那么當(dāng)然,解決方案也是有很多的。拋開 flink sql 想一下,如果我們使用 DataStream api,甚至是在做一個(gè)后端應(yīng)用,需要訪問(wèn)外部存儲(chǔ)時(shí),常用的優(yōu)化方案有哪些?這里列舉一下:
按照 redis 維表的 key 分桶 + local cache:通過(guò)按照 key 分桶的方式,讓大多數(shù)據(jù)的維表關(guān)聯(lián)的數(shù)據(jù)訪問(wèn)走之前訪問(wèn)過(guò)得 local cache 即可。這樣就可以把訪問(wèn)外部存儲(chǔ) 2.1 ms 處理一個(gè) query 變?yōu)樵L問(wèn)內(nèi)存的 0.1 ms 處理一個(gè) query 的時(shí)長(zhǎng)。
異步訪問(wèn)外存:DataStream api 有異步算子,可以利用線程池去同時(shí)多次請(qǐng)求維表外部存儲(chǔ)。這樣就可以把 2.1 ms 處理 1 個(gè) query 變?yōu)?2.1 ms 處理 10 個(gè) query。吞吐可變優(yōu)化到 10 / 2.1 ms = 4761 qps。
批量訪問(wèn)外存:除了異步訪問(wèn)之外,我們還可以批量訪問(wèn)外部存儲(chǔ)。舉一個(gè)例子:在訪問(wèn) redis 維表的 1 query 占用 2.1 ms 時(shí)長(zhǎng)中,其中可能有 2 ms 都是在網(wǎng)絡(luò)請(qǐng)求上面的耗時(shí) ,其中只有 0.1 ms 是 redis server 處理請(qǐng)求的時(shí)長(zhǎng)。那么我們就可以使用 redis 提供的 pipeline 能力,在客戶端(也就是 flink 任務(wù) lookup join 算子中),攢一批數(shù)據(jù),使用 pipeline 去同時(shí)訪問(wèn) redis sever。這樣就可以把 2.1 ms 處理 1 個(gè) query 變?yōu)?7ms(2ms + 50 * 0.1ms) 處理 50 個(gè) query。吞吐可變?yōu)?50 query / 7 ms = 7143 qps。博主這里測(cè)試了下使用 redis pipeline 和未使用的時(shí)長(zhǎng)消耗對(duì)比。如下圖所示。
redis pipeline
博主認(rèn)為上述優(yōu)化效果中,最好用的是 1 + 3,2 相比 3 還是一條一條發(fā)請(qǐng)求,性能會(huì)差一些。
既然 DataStream 可以這樣做,flink sql 必須必的也可以借鑒上面的這些優(yōu)化方案。具體怎么操作呢?看下文騷操作
4.5.lookup join 的具體性能優(yōu)化方案
按照 redis 維表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做訪問(wèn) redis 處理,并且 udaf 產(chǎn)出的結(jié)果只能是一條,所以這種實(shí)現(xiàn)起來(lái)非常復(fù)雜。我們選擇不做 keyby 分桶。但是我們可以直接使用 local cache 去做本地緩存,雖然【直接緩存】的效果比【先按照 key 分桶再做緩存】的效果差,但是也能一定程度上減少訪問(wèn) redis 壓力。在博主實(shí)現(xiàn)的 redis connector 中,內(nèi)置了 local cache 的實(shí)現(xiàn),小伙伴萌可以參考下面這部篇文章進(jìn)行配置。
異步訪問(wèn)外存:目前博主實(shí)現(xiàn)的 redis connector 不支持異步訪問(wèn),但是官方實(shí)現(xiàn)的 hbase connector 支持這個(gè)功能,參考下面鏈接文章的,點(diǎn)開之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
批量訪問(wèn)外存:這玩意官方必然沒(méi)有實(shí)現(xiàn)啊,但是,但是,但是,經(jīng)過(guò)博主周末兩天的瘋狂 debug,改了改源碼,搞定了基于 redis 的批量訪問(wèn)外存優(yōu)化的功能。
4.6.基于 redis connector 的批量訪問(wèn)機(jī)制優(yōu)化
先描述一下大概是個(gè)什么東西,具體怎么用。
你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode 為 true,sql 不用做任何改動(dòng)的情況下,flink lookup join 算子會(huì)自動(dòng)優(yōu)化,優(yōu)化效果如下:
lookup join 算子的每個(gè) task 上,每攢夠 30 條數(shù)據(jù) or 每隔五秒(處理時(shí)間) 去觸發(fā)一次批量訪問(wèn) redis 的請(qǐng)求,使用的是 jedis client 的 pipeline 功能訪問(wèn) redis server。實(shí)測(cè)性能有很大提升。
關(guān)于這個(gè)批量訪問(wèn)機(jī)制的優(yōu)化介紹和使用方式介紹,小伙伴們先別急,下篇文章會(huì)詳細(xì)介紹到。
5.總結(jié)與展望
源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 sql lookup join獲取。
本文主要介紹了 flink sql lookup join 的使用方式,并介紹了一些經(jīng)常出現(xiàn)的性能問(wèn)題以及優(yōu)化思路,總結(jié)如下:
背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,flink sql 提供了輕松訪問(wèn)外部存儲(chǔ)的 lookup join(與上節(jié)不同,上節(jié)說(shuō)的是流與流的 join)。lookup join 可以簡(jiǎn)單理解為使用 flatmap 訪問(wèn)外部存儲(chǔ)數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
來(lái)一個(gè)實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時(shí)間的,并且 lookup join 經(jīng)常會(huì)由于訪問(wèn)外部存儲(chǔ)的 qps 過(guò)高而導(dǎo)致背壓,產(chǎn)出延遲等性能問(wèn)題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用 local cache,異步訪問(wèn)維表,批量訪問(wèn)維表三種方式去解決性能問(wèn)題。
總結(jié)及展望:官方并沒(méi)有提供 批量訪問(wèn)維表 的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請(qǐng)期待下篇文章。