Citus 分布式 PostgreSQL 集群-SQL Reference(查詢處理)
一個 Citus 集群由一個 coordinator 實例和多個 worker 實例組成。數(shù)據(jù)在 worker 上進(jìn)行分片和復(fù)制,而 coordinator 存儲有關(guān)這些分片的元數(shù)據(jù)。向集群發(fā)出的所有查詢都通過 coordinator 執(zhí)行。 coordinator 將查詢劃分為更小的查詢片段,其中每個查詢片段可以在分片上獨立運行。然后協(xié)調(diào)器將查詢片段分配給 worker,監(jiān)督他們的執(zhí)行,合并他們的結(jié)果,并將最終結(jié)果返回給用戶。查詢處理架構(gòu)可以通過下圖進(jìn)行簡要描述。
Citus 的查詢處理管道涉及兩個組件:
- 分布式查詢計劃器和執(zhí)行器
- PostgreSQL 計劃器和執(zhí)行器
我們將在后續(xù)部分中更詳細(xì)地討論它們。
分布式查詢計劃器
Citus 的分布式查詢計劃器接收 SQL 查詢并規(guī)劃它以進(jìn)行分布式執(zhí)行。
對于 SELECT 查詢,計劃器首先創(chuàng)建輸入查詢的計劃樹,并將其轉(zhuǎn)換為可交換和關(guān)聯(lián)形式,以便可以并行化。它還應(yīng)用了一些優(yōu)化以確保以可擴(kuò)展的方式執(zhí)行查詢,并最大限度地減少網(wǎng)絡(luò) I/O。
接下來,計劃器將查詢分為兩部分 - 在 coordinator 上運行的 coordinator 查詢和在 worker 上的各個分片上運行的 worker 查詢片段。然后,計劃器將這些查詢片段分配給 worker,以便有效地使用他們的所有資源。在這一步之后,分布式查詢計劃被傳遞給分布式執(zhí)行器執(zhí)行。
分布列上的鍵值查找或修改查詢的規(guī)劃過程略有不同,因為它們恰好命中一個分片。一旦計劃器收到傳入的查詢,它需要決定查詢應(yīng)該路由到的正確分片。為此,它提取傳入行中的分布列并查找元數(shù)據(jù)以確定查詢的正確分片。然后,計劃器重寫該命令的 SQL 以引用分片表而不是原始表。然后將該重寫的計劃傳遞給分布式執(zhí)行器。
分布式查詢執(zhí)行器
Citus 的分布式執(zhí)行器運行分布式查詢計劃并處理故障。執(zhí)行器非常適合快速響應(yīng)涉及過濾器、聚合和共置連接的查詢,以及運行具有完整 SQL 覆蓋的單租戶查詢。它根據(jù)需要為每個分片打開一個與 woker 的連接,并將所有片段查詢發(fā)送給他們。然后它從每個片段查詢中獲取結(jié)果,合并它們,并將最終結(jié)果返回給用戶。
子查詢/CTE Push-Pull 執(zhí)行
如有必要,Citus 可以將來自子查詢和 CTE 的結(jié)果收集到 coordinator 節(jié)點中,然后將它們推送回 worker 以供外部查詢使用。這允許 Citus 支持更多種類的 SQL 構(gòu)造。
例如,在 WHERE 子句中包含子查詢有時不能與主查詢同時執(zhí)行內(nèi)聯(lián),而必須單獨執(zhí)行。假設(shè) Web 分析應(yīng)用程序維護(hù)一個按 page_id 分區(qū)的 page_views 表。要查詢前 20 個訪問量最大的頁面上的訪問者主機(jī)數(shù),我們可以使用子查詢來查找頁面列表,然后使用外部查詢來計算主機(jī)數(shù)。
SELECT page_id, count(distinct host_ip)
FROM page_views
WHERE page_id IN (
SELECT page_id
FROM page_views
GROUP BY page_id
ORDER BY count(*) DESC
LIMIT 20
)
GROUP BY page_id;
執(zhí)行器希望通過 page_id 對每個分片運行此查詢的片段,計算不同的 host_ips,并在 coordinator 上組合結(jié)果。但是,子查詢中的 LIMIT 意味著子查詢不能作為片段的一部分執(zhí)行。通過遞歸規(guī)劃查詢,Citus 可以單獨運行子查詢,將結(jié)果推送給所有 worker,運行主片段查詢,并將結(jié)果拉回 coordinator。 push-pull(推拉) 設(shè)計支持上述子查詢。
讓我們通過查看此查詢的 EXPLAIN 輸出來了解這一點。它相當(dāng)參與:
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
讓我們把它拆開并檢查每一塊。
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
樹的 root 是 coordinator 節(jié)點對 worker 的結(jié)果所做的事情。在這種情況下,它正在對它們進(jìn)行分組,并且 GroupAggregate 要求首先對它們進(jìn)行排序。
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
.
自定義掃描有兩個大子樹,從“分布式子計劃”開始。
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
.
工作節(jié)點為 32 個分片中的每一個運行上述內(nèi)容(Citus 正在選擇一個代表進(jìn)行顯示)。我們可以識別 IN (...) 子查詢的所有部分:排序、分組和限制。當(dāng)所有 worker 完成此查詢后,他們會將其輸出發(fā)送回 coordinator,coordinator 將其組合為“中間結(jié)果”。
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
.
Citus 在第二個子樹中啟動另一個執(zhí)行器作業(yè)。它將在 page_views 中計算不同的主機(jī)。它使用 JOIN 連接中間結(jié)果。中間結(jié)果將幫助它限制在前二十頁。
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
.
工作人員使用 read_intermediate_result 函數(shù)在內(nèi)部檢索中間結(jié)果,該函數(shù)從 coordinator 節(jié)點復(fù)制的文件中加載數(shù)據(jù)。
這個例子展示了 Citus 如何使用分布式子計劃在多個步驟中執(zhí)行查詢,以及如何使用 EXPLAIN 來了解分布式查詢執(zhí)行。
PostgreSQL 計劃器和執(zhí)行器
一旦分布式執(zhí)行器將查詢片段發(fā)送給 worker,它們就會像常規(guī) PostgreSQL 查詢一樣被處理。該 worker 上的 PostgreSQL 計劃程序選擇在相應(yīng)分片表上本地執(zhí)行該查詢的最佳計劃。 PostgreSQL 執(zhí)行器然后運行該查詢并將查詢結(jié)果返回給分布式執(zhí)行器。您可以從 PostgreSQL 手冊中了解有關(guān) PostgreSQL 計劃器和執(zhí)行器的更多信息。最后,分布式執(zhí)行器將結(jié)果傳遞給 coordinator 進(jìn)行最終聚合。
- 計劃器
http://www.postgresql.org/docs/current/static/planner-optimizer.html
- 執(zhí)行器
http://www.postgresql.org/docs/current/static/executor.html