Citus 分布式 PostgreSQL 集群 - SQL Reference(攝取、修改數(shù)據(jù) DML)
插入數(shù)據(jù)
要將數(shù)據(jù)插入分布式表,您可以使用標(biāo)準(zhǔn) PostgreSQL INSERT 命令。例如,我們從 Github 存檔數(shù)據(jù)集中隨機選擇兩行。
- INSERT
http://www.postgresql.org/docs/current/static/sql-insert.html
/*
CREATE TABLE github_events
(
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
actor jsonb,
org jsonb,
created_at timestamp
);
*/
INSERT INTO github_events VALUES (2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13');
INSERT INTO github_events VALUES (2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24');
向分布式表中插入行時,必須指定插入行的分布列。根據(jù)分布列,Citus 確定插入應(yīng)該路由到的正確分片。然后,查詢被轉(zhuǎn)發(fā)到正確的分片,并在該分片的所有副本上執(zhí)行遠程插入命令。
有時將多個 insert 語句放在一個包含多行的單個 insert 中會很方便。它也比重復(fù)數(shù)據(jù)庫查詢更有效。例如,上一節(jié)中的示例可以像這樣一次性加載:
INSERT INTO github_events VALUES
(
2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13'
), (
2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24'
);
“From Select”子句(分布式匯總)
Citus 還支持 INSERT ... SELECT 語句 —— 根據(jù)選擇查詢的結(jié)果插入行。這是一種方便的填充表的方法,并且還允許使用 ON CONFLICT 子句進行“更新插入(upserts)”,這是進行分布式匯總的最簡單方法。
- 分布式匯總
https://docs.citusdata.com/en/v11.0-beta/develop/reference_dml.html#rollups
在 Citus 中,可以通過三種方式從 select 語句中插入。第一個是如果源表和目標(biāo)表位于同一位置,并且 select/insert 語句都包含分布列。在這種情況下,Citus 可以將 INSERT ... SELECT 語句下推以在所有節(jié)點上并行執(zhí)行。
當(dāng) SELECT 查詢不需要協(xié)調(diào)器上的合并步驟時,可能會發(fā)生重新分區(qū)優(yōu)化。它不適用于以下需要合并步驟的 SQL 功能:
- ORDER BY
- LIMIT
- OFFSET
- GROUP BY 當(dāng)分布列不是 group 鍵的一部分時
- 按源表中的非分布列分區(qū)時的 Window(窗口)函數(shù)
- 非同位表之間的Join(連接)(即重新分區(qū)連接)
當(dāng)源表和目標(biāo)表沒有在同一位置,并且無法應(yīng)用重新分區(qū)優(yōu)化時,Citus 使用第三種方式執(zhí)行 INSERT ... SELECT。它從工作節(jié)點中選擇結(jié)果,并將數(shù)據(jù)拉到協(xié)調(diào)節(jié)點。協(xié)調(diào)器將行重定向回適當(dāng)?shù)姆制?。因為所有?shù)據(jù)都必須通過單個節(jié)點,所以這種方法效率不高。
如果對 Citus 使用哪種方法有疑問,請使用 EXPLAIN 命令,如 PostgreSQL 調(diào)優(yōu)中所述。當(dāng)目標(biāo)表的分片數(shù)量非常大時,禁用重新分區(qū)可能是明智之舉, 請參閱 citus.enable_repartitioned_insert_select (boolean)。
- PostgreSQL 調(diào)優(yōu)
https://docs.citusdata.com/en/v11.0-beta/performance/performance_tuning.html#postgresql-tuning
- citus.enable_repartitioned_insert_select (boolean)
https://docs.citusdata.com/en/v11.0-beta/develop/api_guc.html#enable-repartitioned-insert-select
COPY 命令(批量加載)
要從文件中批量加載數(shù)據(jù),您可以直接使用 PostgreSQL 的 \COPY 命令。
首先通過運行下載我們的示例 github_events 數(shù)據(jù)集:
wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
gzip -d github_events-2015-01-01-*.gz
然后,您可以使用 psql 復(fù)制數(shù)據(jù)(注意,此數(shù)據(jù)需要數(shù)據(jù)庫具有 UTF8 編碼):
\COPY github_events FROM 'github_events-2015-01-01-0.csv' WITH (format
CSV)
注意:
沒有跨分片的快照隔離的概念,這意味著與 COPY 并發(fā)運行的多分片 SELECT 可能會看到它在某些分片上提交,但在其他分片上沒有。如果用戶正在存儲事件數(shù)據(jù),他可能偶爾會觀察到最近數(shù)據(jù)中的小間隙。如果這是一個問題,則由應(yīng)用程序來處理(例如,從查詢中排除最新數(shù)據(jù),或使用一些鎖)。
如果 COPY 未能打開分片放置的連接,那么它的行為方式與 INSERT 相同,即將放置標(biāo)記為非活動,除非沒有更多活動的放置。如果連接后發(fā)生任何其他故障,事務(wù)將回滾,因此不會更改元數(shù)據(jù)。
使用匯總緩存聚合
事件數(shù)據(jù)管道和實時儀表板等應(yīng)用程序需要對大量數(shù)據(jù)進行亞秒級查詢。使這些查詢快速的一種方法是提前計算和保存聚合。這稱為“匯總”數(shù)據(jù),它避免了在運行時處理原始數(shù)據(jù)的成本。作為一個額外的好處,將時間序列數(shù)據(jù)匯總到每小時或每天的統(tǒng)計數(shù)據(jù)中也可以節(jié)省空間。當(dāng)不再需要其全部詳細信息并且聚合足夠時,可能會刪除舊數(shù)據(jù)。
例如,這是一個通過 url 跟蹤頁面瀏覽量的分布式表:
CREATE TABLE page_views (
site_id int,
url text,
host_ip inet,
view_time timestamp default now(),
PRIMARY KEY (site_id, url)
);
SELECT create_distributed_table('page_views', 'site_id');
一旦表中填充了數(shù)據(jù),我們就可以運行聚合查詢來計算每個 URL 每天的頁面瀏覽量,限制到給定的站點和年份。
-- how many views per url per day on site 5?
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
FROM page_views
WHERE site_id = 5 AND
view_time >= date '2016-01-01' AND view_time < date '2017-01-01'
GROUP BY view_time::date, site_id, url;
上述設(shè)置有效,但有兩個缺點。首先,當(dāng)您重復(fù)執(zhí)行聚合查詢時,它必須遍歷每個相關(guān)行并重新計算整個數(shù)據(jù)集的結(jié)果。如果您使用此查詢來呈現(xiàn)儀表板,則將聚合結(jié)果保存在每日頁面瀏覽量表中并查詢該表會更快。其次,存儲成本將隨著數(shù)據(jù)量和可查詢歷史的長度成比例增長。在實踐中,您可能希望在短時間內(nèi)保留原始事件并查看較長時間窗口內(nèi)的歷史圖表。
為了獲得這些好處,我們可以創(chuàng)建一個 daily_page_views 表來存儲每日統(tǒng)計信息。
CREATE TABLE daily_page_views (
site_id int,
day date,
url text,
view_count bigint,
PRIMARY KEY (site_id, day, url)
);
SELECT create_distributed_table('daily_page_views', 'site_id');
在此示例中,我們在 site_id 列上同時分配了 page_views 和 daily_page_views。這確保了與特定站點相對應(yīng)的數(shù)據(jù)將位于同一節(jié)點上。在每個節(jié)點上將兩個表的行保持在一起可以最大限度地減少節(jié)點之間的網(wǎng)絡(luò)流量并實現(xiàn)高度并行執(zhí)行。
一旦我們創(chuàng)建了這個新的分布式表,我們就可以運行 INSERT INTO ... SELECT 將原始頁面視圖匯總到聚合表中。在下文中,我們每天匯總頁面瀏覽量。Citus 用戶通常在一天結(jié)束后等待一段時間來運行這樣的查詢,以容納遲到的數(shù)據(jù)。
-- roll up yesterday's data
INSERT INTO daily_page_views (day, site_id, url, view_count)
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
FROM page_views
WHERE view_time >= date '2017-01-01' AND view_time < date '2017-01-02'
GROUP BY view_time::date, site_id, url;
-- now the results are available right out of the table
SELECT day, site_id, url, view_count
FROM daily_page_views
WHERE site_id = 5 AND
day >= date '2016-01-01' AND day < date '2017-01-01';
上面的匯總查詢匯總了前一天的數(shù)據(jù)并將其插入 daily_page_views。每天運行一次查詢意味著不需要更新匯總表行,因為新一天的數(shù)據(jù)不會影響之前的行。
當(dāng)處理遲到的數(shù)據(jù)或每天多次運行匯總查詢時,情況會發(fā)生變化。如果任何新行與匯總表中已有的天數(shù)匹配,則匹配計數(shù)應(yīng)增加。 PostgreSQL 可以使用 “ON CONFLICT” 來處理這種情況, 這是它進行 upserts 的技術(shù)。這是一個例子。
- upserts
https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT
-- roll up from a given date onward,
-- updating daily page views when necessary
INSERT INTO daily_page_views (day, site_id, url, view_count)
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
FROM page_views
WHERE view_time >= date '2017-01-01'
GROUP BY view_time::date, site_id, url
ON CONFLICT (day, url, site_id) DO UPDATE SET
view_count = daily_page_views.view_count + EXCLUDED.view_count;
更新和刪除
您可以使用標(biāo)準(zhǔn) PostgreSQL UPDATE 和 DELETE 命令更新或刪除分布式表中的行。
DELETE FROM github_events
WHERE repo_id IN (24509048, 24509049);
UPDATE github_events
SET event_public = TRUE
WHERE (org->>'id')::int = 5430905;
- UPDATE
http://www.postgresql.org/docs/current/static/sql-update.html
- DELETE
http://www.postgresql.org/docs/current/static/sql-delete.html
當(dāng)更新/刪除影響如上例中的多個分片時,Citus 默認使用單階段提交協(xié)議。為了提高安全性,您可以通過設(shè)置啟用兩階段提交。
SET citus.multi_shard_commit_protocol = '2pc';
如果更新或刪除僅影響單個分片,則它在單個工作節(jié)點內(nèi)運行。在這種情況下,不需要啟用 2PC。當(dāng)按表的分布列更新或刪除過濾器時,通常會發(fā)生這種情況:
-- since github_events is distributed by repo_id,
-- this will execute in a single worker node
DELETE FROM github_events
WHERE repo_id = 206084;
此外,在處理單個分片時,Citus 支持 SELECT ... FOR UPDATE。這是對象關(guān)系映射器 (ORM) 有時使用的一種技術(shù),用于安全地:
- 加載行
- 在應(yīng)用程序代碼中進行計算
- 根據(jù)計算更新行
選擇要更新的行會對它們設(shè)置寫鎖定,以防止其他進程導(dǎo)致“丟失更新(lost update)”異常。
BEGIN;
-- select events for a repo, but
-- lock them for writing
SELECT *
FROM github_events
WHERE repo_id = 206084
FOR UPDATE;
-- calculate a desired value event_public using
-- application logic that uses those rows...
-- now make the update
UPDATE github_events
SET event_public = :our_new_value
WHERE repo_id = 206084;
COMMIT;
僅哈希分布表和引用表支持此功能,并且僅那些具有 replication_factor 為 1 的表支持。
- replication_factor
https://docs.citusdata.com/en/v11.0-beta/develop/api_guc.html#replication-factor
最大化寫入性能
在大型機器上,INSERT 和 UPDATE/DELETE 語句都可以擴展到每秒約 50,000 個查詢。但是,要達到這個速度,您將需要使用許多并行的、長期存在的連接并考慮如何處理鎖定。有關(guān)更多信息,您可以查閱我們文檔的橫向擴展數(shù)據(jù)攝取部分。
- 橫向擴展數(shù)據(jù)攝取
https://docs.citusdata.com/en/v11.0-beta/performance/performance_tuning.html#scaling-data-ingestion