Flink SQL 知其所以然:基礎(chǔ) DML SQL 執(zhí)行語義!
1.DML:With 子句?
- 應(yīng)用場景(支持 Batch\Streaming):With 語句和離線 Hive SQL With 語句一樣的,xdm,語法糖 +1,使用它可以讓你的代碼邏輯更加清晰。
- 直接上案例:
-- 語法糖+1
WITH orders_with_total AS (
SELECT
order_id
, price + tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;
2.DML:SELECT & WHERE 子句?
INSERT INTO target_table
SELECT * FROM Orders
INSERT INTO target_table
SELECT order_id, price + tax FROM Orders
INSERT INTO target_table
-- 自定義 Source 的數(shù)據(jù)
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10
-- 使用 UDF 做字段標準化處理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 過濾條件
Where id > 3
- SQL 語義:
其實理解一個 SQL 最后生成的任務(wù)是怎樣執(zhí)行的,最好的方式就是理解其語義。
以下面的 SQL 為例,我們來介紹下其在離線中和在實時中執(zhí)行的區(qū)別,對比學(xué)習(xí)一下,大家就比較清楚了。
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
這個 SQL 對應(yīng)的實時任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時,會生成三個算子:
- 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 過濾和字段標準化算子。
- 過濾和字段標準化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,一條一條將計算結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Kafka 中。
可以看到這個實時任務(wù)的所有算子是以一種 pipeline 模式運行的,所有的算子在同一時刻都是處于 running 狀態(tài)的,24 小時一直在運行,實時任務(wù)中也沒有離線中常見的分區(qū)概念。
select & where
關(guān)于看如何看一段 Flink SQL 最終的執(zhí)行計劃:
最好的方法就如上圖,看 Flink web ui 的算子圖,算子圖上詳細的標記清楚了每一個算子做的事情。以上圖來說,我們可以看到主要有三個算子:
- Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名稱為 table=[[default_catalog, default_database, Orders],字段為 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略為 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]。
- 過濾算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中過濾條件為 where=[(order_id > 3)],結(jié)果字段為 select=[order_id, name, row_time]
- Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最終產(chǎn)出的表名稱為 table=[default_catalog.default_database.target_table],表字段為 fields=[order_id, name, row_time]。
可以看到 Flink SQL 具體執(zhí)行了哪些操作是非常詳細的標記在算子圖上。所以小伙伴萌一定要學(xué)會看算子圖,這是掌握 debug、調(diào)優(yōu)前最基礎(chǔ)的一個技巧。
那么如果這個 SQL 放在 Hive 中執(zhí)行時,假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會生成三個類似的算子(雖然實際可能會被優(yōu)化為一個算子,這里為了方便對比,劃分為三個進行介紹),離線和實時任務(wù)的執(zhí)行方式完全不同:
- 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都是讀一天、一小時的分區(qū)數(shù)據(jù))中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游 過濾字段標準化算子,然后 數(shù)據(jù)源算子就運行結(jié)束了,釋放資源了。
- 過濾和字段標準化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,將所有數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 過濾和字段標準化算子 就運行結(jié)束了,釋放資源了。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫到 target_table Hive 表中,然后整個任務(wù)就運行結(jié)束了,整個任務(wù)的資源也就都釋放了。
可以看到離線任務(wù)的算子是分階段(stage)進行運行的,每一個 stage 運行結(jié)束之后,然后下一個 stage 開始運行,全部的 stage 運行完成之后,這個離線任務(wù)就跑結(jié)束了。
注意:
很多小伙伴都是之前做過離線數(shù)倉的,熟悉了離線的分區(qū)、計算任務(wù)定時調(diào)度運行這兩個概念,所以在最初接觸 Flink SQL 時,會以為 Flink SQL 實時任務(wù)也會存在這兩個概念,這里博主做一下解釋。
- 分區(qū)概念:離線由于能力限制問題,通常都是進行一批一批的數(shù)據(jù)計算,每一批數(shù)據(jù)的數(shù)據(jù)量都是有限的集合,這一批一批的數(shù)據(jù)自然的劃分方式就是時間,比如按小時、天進行劃分分區(qū)。但是 在實時任務(wù)中,是沒有分區(qū)的概念的,實時任務(wù)的上游、下游都是無限的數(shù)據(jù)流。
- 計算任務(wù)定時調(diào)度概念:同上,離線就是由于計算能力限制,數(shù)據(jù)要一批一批算,一批一批輸入、產(chǎn)出,所以要按照小時、天定時的調(diào)度和計算。但是在實時任務(wù)中,是沒有定時調(diào)度的概念的,實時任務(wù)一旦運行起來就是 24 小時不間斷,不間斷的處理上游無限的數(shù)據(jù),不簡單的產(chǎn)出數(shù)據(jù)給到下游。
3.DML:SELECT DISTINCT 子句
- 應(yīng)用場景(支持 Batch\Streaming):語句和離線 Hive SQL SELECT DISTINCT 語句一樣的,xdm,用作根據(jù) key 進行數(shù)據(jù)去重。
- 直接上案例:
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
- SQL 語義:
也是拿離線和實時做對比。
這個 SQL 對應(yīng)的實時任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時,會生成三個算子:
- 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 去重算子。
- 去重算子(DISTINCT id):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷這個 id 之前是否已經(jīng)來過了,判斷方式就是使用 Flink 中的 state 狀態(tài),如果狀態(tài)中已經(jīng)有這個 id 了,則說明已經(jīng)來過了,不往下游算子發(fā),如果狀態(tài)中沒有這個 id,則說明沒來過,則往下游算子發(fā),也是一條一條發(fā)給下游 。 數(shù)據(jù)匯算子數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到target_table Kafka 中。
select distinct
注意:
對于實時任務(wù),計算時的狀態(tài)可能會無限增長。
狀態(tài)大小取決于不同 key(上述案例為 id 字段)的數(shù)量。為了防止狀態(tài)無限變大,我們可以設(shè)置狀態(tài)的 TTL。但是這可能會影響查詢結(jié)果的正確性,比如某個 key 的數(shù)據(jù)過期從狀態(tài)中刪除了,那么下次再來這么一個 key,由于在狀態(tài)中找不到,就又會輸出一遍。
那么如果這個 SQL 放在 Hive 中執(zhí)行時,假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會生成三個相同的算子(雖然可能會被優(yōu)化為一個算子,這里為了方便對比,劃分為三個進行介紹),但是其和實時任務(wù)的執(zhí)行方式完全不同:
- 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都有天、小時分區(qū)限制)中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游去重算子,然后 數(shù)據(jù)源算子 就運行結(jié)束了,釋放資源了。
- 去重算子(DISTINCT id):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)進行去重,將去重完的所有結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 去重算子就運行結(jié)束了,釋放資源了。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫到 target_table Hive 中,然后整個任務(wù)就運行結(jié)束了,整個任務(wù)的資源也就都釋放了。