自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

火山引擎在行為分析場景下的 ClickHouse JOIN 優(yōu)化

原創(chuàng) 精選
大數(shù)據(jù)
火山引擎增長分析 DataFinder 基于 ClickHouse 來進(jìn)行行為日志的分析,ClickHouse 的主要版本是基于社區(qū)版改進(jìn)開發(fā)的字節(jié)內(nèi)部版本。

1. 背景

火山引擎增長分析 DataFinder 基于 ClickHouse 來進(jìn)行行為日志的分析,ClickHouse 的主要版本是基于社區(qū)版改進(jìn)開發(fā)的字節(jié)內(nèi)部版本。主要的表結(jié)構(gòu):

圖片

事件表:存儲用戶行為數(shù)據(jù),以用戶 ID 分 shard 存儲。

--列出了主要的字段信息
CREATE TABLE tob_apps_all
(
`tea_app_id` UInt32, --應(yīng)用ID
`device_id` String DEFAULT '', --設(shè)備ID
`time` UInt64,--事件日志接受時間
`event` String,--事件名稱
`user_unique_id` String,--用戶ID
`event_date` Date , --事件日志日期,由time轉(zhuǎn)換而來
`hash_uid` UInt64 --用戶ID hash過后的id,用來join降低內(nèi)存消耗
)│
```

用戶表:存儲用戶的屬性數(shù)據(jù),以用戶 ID 分 shard 存儲。

--列出了主要的字段信息
CREATE TABLE users_unique_all
(
`tea_app_id` UInt32, --應(yīng)用ID
`user_unique_id` String DEFAULT '', -- 用戶ID
`device_id` String DEFAULT '', -- 用戶最近的設(shè)備ID
`hash_uid` UInt64,--用戶ID hash過后的id,用來join降低內(nèi)存消耗
`update_time` UInt64,--最近一次更新時間
`last_active_date` Date --用戶最后活躍日期
)

設(shè)備表:存儲設(shè)備相關(guān)的數(shù)據(jù),以設(shè)備 ID 分 shard 存儲。

--列出了主要的字段信息
CREATE TABLE devices_all
(
`tea_app_id` UInt32, --應(yīng)用ID
`device_id` String DEFAULT '', --設(shè)備ID
`update_time` UInt64, --最近一次更新時間
`last_active_date` Date --用戶最后活躍日期
)

業(yè)務(wù)對象表:存儲業(yè)務(wù)對象相關(guān)的數(shù)據(jù),每個 shard 存儲全量的數(shù)據(jù)。

--列出了主要的字段信息
CREATE TABLE rangers.items_all
(
`tea_app_id` UInt32,
`hash_item_id` Int64,
`item_name` String, --業(yè)務(wù)對象名稱。比如商品
`item_id` String, --業(yè)務(wù)對象ID。比如商品id 1000001
`last_active_date` Date
)

1.1 業(yè)務(wù)挑戰(zhàn)

圖片

隨著接入應(yīng)用以及應(yīng)用的 DAU 日益增加,ClickHouse 表的事件量增長迅速;并且基于行為數(shù)據(jù)需要分析的業(yè)務(wù)指標(biāo)越來越復(fù)雜,需要 JOIN 的表增多;我們遇到有一些涉及到 JOIN 的復(fù)雜 SQL 執(zhí)行效率低,內(nèi)存和 CPU 資源占用高,導(dǎo)致分析接口響應(yīng)時延和錯誤率增加。

2. 關(guān)于 Clickhouse 的 JOIN

在介紹優(yōu)化之前,先介紹一下基本的 ClickHouse JOIN 的類型和實現(xiàn)方式。

2.1 分布式 JOIN

SELECT 
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')

基本執(zhí)行過程:

一個 Clickhouse 節(jié)點作為 Coordinator 節(jié)點,給每個節(jié)點分發(fā)子查詢,子查詢 sql(tob_apps_all 替換成本地表,users_unique_all 保持不變依然是分布式表)。

每個節(jié)點執(zhí)行 Coordinator 分發(fā)的 sql 時,發(fā)現(xiàn) users_unique_all 是分布式表,就會去所有節(jié)點上去查詢以下 SQL(一共有 N*N。N 為 shard 數(shù)量)。

  1. SELECT device_id, hash_uid FROMusers_uniqueWHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
  1. 每個節(jié)點從其他 N-1個節(jié)點拉取2中子查詢的全部數(shù)據(jù),全量存儲(內(nèi)存 or 文件),進(jìn)行本地 JOIN。
  2. Coordinator 節(jié)點從每個節(jié)點拉取3中的結(jié)果集,然后做處理返回給 client。

存在的問題:

  1. 子查詢數(shù)量放大。
  2. 每個節(jié)點都全量存儲全量的數(shù)據(jù)。

2.2 分布式 Global JOIN

SELECT 
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')

基本執(zhí)行過程:

  1. 一個 Clickhouse 節(jié)點作為 Coordinator 節(jié)點,分發(fā)查詢。在每個節(jié)點上執(zhí)行sql(tob_apps_all 替換成本地表,右表子查詢替換成別名 ut)。
  2. Coordinator 節(jié)點去其他節(jié)點拉取 users_unique_all 的全部數(shù)據(jù),然后分發(fā)到全部節(jié)點(作為1中別名表 ut 的數(shù)據(jù))。
  3. 每個節(jié)點都會存儲全量的2中分發(fā)的數(shù)據(jù)(內(nèi)存or文件),進(jìn)行本地 local join。
  4. Coordinator 節(jié)點從每個節(jié)點拉取3中的結(jié)果集,然后做處理返回給 client。

存在的問題:

  1. 每個節(jié)點都全量存儲數(shù)據(jù)。
  2. 如果右表較大,分發(fā)的數(shù)據(jù)較大,會占用網(wǎng)絡(luò)帶寬資源。

2.3 本地 JOIN

SQL 里面只有本地表的 JOIN,只會在當(dāng)前節(jié)點執(zhí)行。

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
(SELECT device_id,
hash_uid
FROM rangers.users_unique
WHERE tea_app_id = 268411
AND last_active_date>='2022-08-06') ut
ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
AND event='app_launch'
AND event_date='2022-08-06'

2.3.1 Hash join

  • 右表全部數(shù)據(jù)加載到內(nèi)存,再在內(nèi)存構(gòu)建 hash table。key 為 joinkey。
  • 從左表分批讀取數(shù)據(jù),從右表 hash table匹配數(shù)據(jù)。
  • 優(yōu)點是:速度快 缺點是:右表數(shù)據(jù)量大的情況下占用內(nèi)存。

2.3.2 Merge join?

  • 對右表排序,內(nèi)部 block 切分,超出內(nèi)存部分 flush 到磁盤上,內(nèi)存大小通過參數(shù)設(shè)定。
  • 左表基于 block 排序,按照每個 block 依次與右表 merge。
  • 優(yōu)點是:能有效控制內(nèi)存 缺點是:大數(shù)據(jù)情況下速度會慢。

優(yōu)先使用 hash join 當(dāng)內(nèi)存達(dá)到一定閾值后再使用 merge join,優(yōu)先滿足性能要求。

3. 解決方案

圖片

3.1 避免JOIN

3.1.1 數(shù)據(jù)預(yù)生成

數(shù)據(jù)預(yù)生成(由 Spark/Flink 或者 Clickhouse 物化視圖產(chǎn)出數(shù)據(jù)),形成大寬表,基于單表的查詢是 ClickHouse 最為擅長的場景。

我們有個指標(biāo),實現(xiàn)的 SQL 比較復(fù)雜(如下),每次實時查詢很耗時,我們單獨建了一個表 table,由 Spark 每日構(gòu)建出這個指標(biāo),查詢時直接基于 table 查詢。

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
(SELECT event_date,hash_uid AS uc1,sum(et.float_params{ 'amount' }) AS value, count(1) AS cnt, value*cnt AS multiple
FROM tob_apps_all et GLOBAL ANY LEFT JOIN
(SELECT hash_uid AS join_key,int_profiles{ '$ab_time_34' }*1000 AS first_time
FROM users_unique_all
WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
ON et.hash_uid=upt.join_key
WHERE (查詢條件)
GROUP BY uc1,event_date)
GROUP BY event_date;

數(shù)據(jù)量2300W,查詢時間由7秒->0.008秒。當(dāng)然這種方式,需要維護(hù)額外的數(shù)據(jù)構(gòu)建任務(wù)。總的思路就是不要讓 ClickHouse 實時去 JOIN。

圖片

3.1.2 使用 IN 代替 JOIN

JOIN 需要基于內(nèi)存構(gòu)建 hash table 且需要存儲右表全部的數(shù)據(jù),然后再去匹配左表的數(shù)據(jù)。而 IN 查詢會對右表的全部數(shù)據(jù)構(gòu)建 hash set,但是不需要匹配左表的數(shù)據(jù),且不需要回寫數(shù)據(jù)到 block。

比如:

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
(SELECT hash_uid AS join_key
FROM users_unique_all
WHERE app_id = 10000000
AND last_active_date >= '2022-01-01') upt
ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
GROUP BY event_date

可以改成如下形式:

SELECT event_date,
count()
FROM tob_apps_all
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
AND hash_uid global IN
(SELECT hash_uid
FROM users_unique_all
WHERE (tea_app_id = 10000000)
AND (last_active_date >= '2022-01-01') )
GROUP BY event_date

如果需要從右表提取出屬性到外層進(jìn)行計算,則不能使用 IN 來代替 JOIN。

相同的條件下,上面的測試 SQL,由 JOIN 時的16秒優(yōu)化到了 IN 查詢時的11秒。

圖片

?

3.2 更快的 JOIN

3.2.1 優(yōu)先本地 JOIN

數(shù)據(jù)預(yù)先相同規(guī)則分區(qū)

也就是 Colocate JOIN。優(yōu)先將需要關(guān)聯(lián)的表按照相同的規(guī)則進(jìn)行分布,查詢時就不需要分布式的 JOIN。

SELECT 
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

比如事件表 tob_apps_all 和用戶表 users_unique_all 都是按照用戶 ID 來分 shard 存儲的,相同的用戶的兩個表的數(shù)據(jù)都在同一個 shard 上,因此這兩個表的 JOIN 就不需要分布式 JOIN 了。

distributed_perfect_shard 這個 settings key 是字節(jié)內(nèi)部 ClickHouse 支持的,設(shè)置過這個參數(shù),指定執(zhí)行計劃時就不會再執(zhí)行分布式 JOIN 了。

基本執(zhí)行過程:

  1. 一個 ClickHouse 節(jié)點作為 Coordinator 節(jié)點,分發(fā)查詢。在每個節(jié)點上執(zhí)行 sql(tob_apps_all、users_unique_all替換成本地表)。
  2. 每個節(jié)點都執(zhí)行1中分發(fā)的本地表 join 的 SQL(這一步不再分發(fā)右表全量的數(shù)據(jù))。
  3. 數(shù)據(jù)再回傳到 coordinator 節(jié)點,然后返回給 client。
數(shù)據(jù)冗余存儲

如果一個表的數(shù)據(jù)量比較小,可以不分 shard 存儲,每個 shard 都存儲全量的數(shù)據(jù),例如我們的業(yè)務(wù)對象表。查詢時,不需要分布式 JOIN,直接在本地進(jìn)行 JOIN 即可。

SELECT count()
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT item_id
FROM items_all
WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

例如這個 SQL,items_all 表每個 shard 都存儲同樣的數(shù)據(jù),這樣也可以避免分布式 JOIN 帶來的查詢放大和全表數(shù)據(jù)分發(fā)問題。

3.2.2 更少的數(shù)據(jù)

不論是分布式 JOIN 還是本地 JOIN,都需要盡量讓少的數(shù)據(jù)參與 JOIN,既能提升查詢速度也能減少資源消耗。

SQL 下推

ClickHouse 對 SQL 的下推做的不太好,有些復(fù)雜的 SQL 下推會失效。因此,我們手動對 SQL 做了下推,目前正在測試基于查詢優(yōu)化器來幫助實現(xiàn)下推優(yōu)化,以便讓 SQL 更加簡潔。

下推的 SQL:

SELECT 
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06'
AND 用戶屬性條件 1 OR 用戶屬性條件 2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

對應(yīng)的不下推的 SQL:

SELECT 
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM rangers.users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
AND (ut.用戶屬性條件 1 OR ut.用戶屬性條件 2)
settings distributed_perfect_shard=1

可以看到,不下推的 SQL 更加簡潔,直接基于 JOIN 過后的寬表進(jìn)行過濾。但是 ClickHouse 可能會將不滿足條件的 users_unique_all 數(shù)據(jù)也進(jìn)行 JOIN。

我們使用中有一個復(fù)雜的 case,用戶表過濾條件不下推有1千萬+,SQL 執(zhí)行了3000秒依然執(zhí)行超時,而做了下推之后60秒內(nèi)就執(zhí)行成功了。

圖片

3.2.3 Clickhouse 引擎層優(yōu)化

一個 SQL 實際在 Clickhouse 如何執(zhí)行,對 SQL 的執(zhí)行時間和資源消耗至關(guān)重要。社區(qū)版的 Clickhouse 在執(zhí)行模型和 SQL 優(yōu)化器上還要改進(jìn)的空間,尤其是復(fù)雜 SQL 以及多 JOIN 的場景下。

執(zhí)行模型優(yōu)化

社區(qū)版的 Clickhouse 目前還是一個兩階段執(zhí)行的執(zhí)行模型。第一階段,Coordinator 在收到查詢后,將請求發(fā)送給對應(yīng)的 Worker 節(jié)點。第二階段,Worker 節(jié)點完成計算,Coordinator 在收到各 Worker 節(jié)點的數(shù)據(jù)后進(jìn)行匯聚和處理,并將處理后的結(jié)果返回。

圖片

有以下幾個問題:

  1. 第二階段的計算比較復(fù)雜時,Coordinator 的節(jié)點計算壓力大,容易成為瓶頸。
  2. 不支持 shuffle join,hash join 時右表為大表時構(gòu)建慢,容易 OOM。
  3. 對復(fù)雜查詢的支持不友好。

字節(jié)跳動 ClickHouse 團(tuán)隊為了解決上述問題,改進(jìn)了執(zhí)行模型,參考其他的分布式數(shù)據(jù)庫引擎(例如 Presto 等),將一個復(fù)雜的 Query 按數(shù)據(jù)交換情況切分成多個 Stage,各 Stage 之間則通過 Exchange 完成數(shù)據(jù)交換。根據(jù) Stage 依賴關(guān)系定義拓?fù)浣Y(jié)構(gòu),產(chǎn)生 DAG 圖,并根據(jù) DAG 圖調(diào)度 Stage。例如兩表 JOIN,會先調(diào)度左右表讀取 Stage,之后再調(diào)度 JOIN 這個 Stage, JOIN 的 Stage 依賴于左右表的 Stage。

圖片

舉個例子

SELECT 
et.os_name,
ut.device_id AS user_device_id,
dt.hash_did AS device_hashid
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_did
FROM devices_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10

Stage執(zhí)行模型基本過程(可能的):

  1. 讀取 tob_apps_all 數(shù)據(jù),按照 join key(hash_uid)進(jìn)行 shuffle,數(shù)據(jù)分發(fā)到每個節(jié)點。這是一個Stage。
  2. 讀取 users_unique_all 數(shù)據(jù),按照 join key(hash_uid)進(jìn)行 shuffle,數(shù)據(jù)分發(fā)到每個節(jié)點。這是一個 Stage。
  3. 上述兩個表的數(shù)據(jù),在每個節(jié)點上的數(shù)據(jù)進(jìn)行本地JOIN,然后再按照 join key(device_id) 進(jìn)行 shuffle。這是一個 Stage。
  4. 讀取 devices_all 數(shù)據(jù),按照 join key(device_id)進(jìn)行 shuffle,這是一個Stage。
  5. 第3步、第4步的數(shù)據(jù),相同 join key(device_id) 的數(shù)據(jù)都在同一個節(jié)點上,然后進(jìn)行本地JOIN,這是一個 Stage。
  6. 匯總數(shù)據(jù),返回 limit 10 的數(shù)據(jù)。這是一個 Stage。

統(tǒng)計效果如下:

圖片

查詢優(yōu)化器

有了上面的 stage 的執(zhí)行模型,可以靈活調(diào)整 SQL 的執(zhí)行順序,字節(jié)跳動 Clickhouse 團(tuán)隊自研了查詢優(yōu)化器,根據(jù)優(yōu)化規(guī)則(基于規(guī)則和代價預(yù)估)對 SQL 的執(zhí)行計劃進(jìn)行轉(zhuǎn)換,一個執(zhí)行計劃經(jīng)過優(yōu)化規(guī)則后會變成另外一個執(zhí)行計劃,能夠準(zhǔn)確的選擇出一條效率最高的執(zhí)行路徑,然后構(gòu)建 Stage 的 DAG 圖,大幅度降低查詢時間。

下圖描述了整個查詢的執(zhí)行流程,從 SQL parse 到執(zhí)行期間所有內(nèi)容全部進(jìn)行了重新實現(xiàn)(其中紫色模塊),構(gòu)建了一套完整的且規(guī)范的查詢優(yōu)化器。

圖片

還是上面的三表 JOIN 的例子,可能的一個執(zhí)行過程是:

  1. 查詢優(yōu)化器發(fā)現(xiàn) users_unique_all 表與 tob_apps_all 表的分 shard 規(guī)則一樣(基于用戶 ID ),所以就不會先對表按 join key 進(jìn)行 shuffle,users_unique 與 tob_apps 直接基于本地表 JOIN,然后再按照 join key(device_id)進(jìn)行 shuffle。這是一個 Stage。
  2. 查詢優(yōu)化器根據(jù)規(guī)則或者代價預(yù)估決定設(shè)備表 devices_all 是需要 broadcast join 還是 shuffle join。
  1. 如果 broadcast join:在一個節(jié)點查到全部的 device 數(shù)據(jù),然后分發(fā)到其他節(jié)點。這是一個 Stage。
  2. 如果 shuffle join:在每個節(jié)點對 device 數(shù)據(jù)按照 join key(device_id) 進(jìn)行 shuffle。這是一個 Stage。
  1. 匯總數(shù)據(jù),返回 limit 10 的數(shù)據(jù)。這是一個 Stage。

效果:

可以看到,查詢優(yōu)化器能優(yōu)化典型的復(fù)雜的 SQL 的執(zhí)行效率,縮短執(zhí)行時間。

圖片

4. 總結(jié)

ClickHouse 最為擅長的領(lǐng)域是一個大寬表來進(jìn)行查詢,多表 JOIN 時Clickhouse 性能表現(xiàn)不佳。作為業(yè)內(nèi)領(lǐng)先的用戶分析與運營平臺,火山引擎增長分析 DataFinder 基于海量數(shù)據(jù)做到了復(fù)雜指標(biāo)能夠秒級查詢。本文介紹了我們是如何優(yōu)化 Clickhouse JOIN 查詢的。

主要有以下幾個方面:

  1. 減少參與 JOIN 的表以及數(shù)據(jù)量。
  2. 優(yōu)先使用本地 JOIN,避免分布式 JOIN 帶來的性能損耗。
  3. 優(yōu)化本地 JOIN,優(yōu)先使用內(nèi)存進(jìn)行 JOIN。
  4. 優(yōu)化分布式 JOIN 的執(zhí)行邏輯,依托于字節(jié)跳動對 ClickHouse 的深度定制化。
責(zé)任編輯:未麗燕 來源: 字節(jié)跳動技術(shù)團(tuán)隊
相關(guān)推薦

2022-12-07 08:31:45

ClickHouse并行計算數(shù)據(jù)

2021-04-14 14:31:37

私有云veStack系統(tǒng)

2021-09-16 10:47:09

數(shù)字化

2022-05-13 07:26:28

策略模式設(shè)計模式

2022-06-30 09:50:15

火山引擎ByteHouse

2021-08-31 16:17:50

數(shù)字化

2023-10-19 14:55:22

火山引擎擁塞控制算法

2023-09-15 14:24:54

ByteHouseClickHouse開源

2021-09-16 14:30:02

數(shù)字化

2021-11-11 21:35:48

數(shù)字化

2022-08-04 18:23:28

屏幕共享卡頓流暢度

2023-08-30 15:33:02

火山引擎
點贊
收藏

51CTO技術(shù)棧公眾號