Flink SQL知其所以然:Flink SQLTumble Window 的奇妙解析之路
1.序篇-本文結(jié)構(gòu)
針對 datastream api 大家都比較熟悉了,還是那句話,在 datastream 中,你寫的代碼邏輯是什么樣的,它最終的執(zhí)行方式就是什么樣的。
但是對于 flink sql 的執(zhí)行過程,大家還是不熟悉的。上節(jié)使用 ETL,group agg(sum,count等)簡單聚合類 query 帶大家走進(jìn)一條 flink sql query 邏輯的世界。幫大家至少能夠熟悉在 flink sql 程序運行時知道 flink 程序在干什么。
此節(jié)就是窗口聚合章節(jié)的第一篇,以一個最簡單、最常用的分鐘 tumble window 聚合案例給大家介紹其使用方式和原理。
由于 flink 1.13 引入了 window tvf,所以 1.13 和 1.12 及之前版本的實現(xiàn)不同。本節(jié)先介紹 flink 1.12 及之前的 tumble window 實現(xiàn)。這也是大家在引入 flink sql 能力時最常使用的。
本節(jié)依然從以下幾個章節(jié)給大家詳細(xì)介紹 flink sql 的能力。
1.目標(biāo)篇-本文能幫助大家了解 flink sql 什么?
- 回顧上節(jié)的 flink sql 適用場景的結(jié)論
2.概念篇-先聊聊常見的窗口聚合
- 窗口竟然拖慢數(shù)據(jù)產(chǎn)出?
- 常用的窗口
3.實戰(zhàn)篇-簡單的 tumble window 案例和運行原理
- 先看一個 datastream 窗口案例
- flink sql tumble window 的語義
- tumble window 實際案例
- GeneratedWatermarkGenerator - flink 1.12.1
- BinaryRowDataKeySelector - flink 1.12.1
- AggregateWindowOperator - flink 1.12.1
4.總結(jié)與展望篇
先說說結(jié)論,以下這些結(jié)論已經(jīng)在上節(jié)說過了,此處附上上節(jié)文章:
場景問題:flink sql 很適合簡單 ETL,以及基本全部場景下的聚合類指標(biāo)(本節(jié)要介紹的 tumble window 就在聚合類指標(biāo)的范疇之內(nèi))。
語法問題:flink sql 語法其實是和其他 sql 語法基本一致的?;静粫a(chǎn)生語法問題阻礙使用 flink sql。但是本節(jié)要介紹的 tumble window 的語法就是略有不同的那部分。下面詳細(xì)介紹。
運行問題:查看 flink sql 任務(wù)時的一些技巧,以及其中一些可能會碰到的坑:
- 去 flink webui 就能看到這個任務(wù)目前在做什么。包括算子名稱都會給直接展示給我們目前哪個算子在干啥事情,在處理啥邏輯。
- sql 的 watermark 類型要設(shè)置為 TIMESTAMP(3)。
- 事件時間邏輯中,sql api 和 datastream api 對于數(shù)據(jù)記錄時間戳存儲邏輯是不一樣的。datastream api:每條記錄的 rowtime 是放在 StreamRecord 中的時間戳字段中的。sql api:時間戳是每次都從數(shù)據(jù)中進(jìn)行獲取的。算子中會維護(hù)一個下標(biāo)??梢园凑障聵?biāo)從數(shù)據(jù)中獲取時間戳。
2.目標(biāo)篇-本文能幫助大家了解 flink sql tumble window 什么?
關(guān)于 flink sql tumble window 一般都會有以下問題。本文的目標(biāo)也是為大家解答這些問題:
場景問題:場景問題就不必多說,datastream 在 tumble window 場景下的應(yīng)用很多了,分鐘級別聚合等常用場景
語法問題:flink sql 寫 tumble window 任務(wù)時是一種與 hive sql 中沒有的語法。下文詳細(xì)介紹。
運行問題:使用一條簡單的 tumble window sql 幫大家從 transformation、runtime 幫大家理解 tumble window 的整體運行機(jī)制。
理解誤區(qū):既然是 sql 必然要遵循 sql 語義,sql tumble window 聚合是輸入多條,產(chǎn)出一條數(shù)據(jù)。并不像 datastream 那樣可以在窗口 udf 中做到多對多。
在正式開始聊 tumble window 之前,先看看上節(jié) flink sql 適用場景的結(jié)論。讓大家先有 flink sql 的一個整體印象以及結(jié)論。
2.1.回顧上節(jié)的 flink sql 適用場景的結(jié)論
不裝了,我坦白了,flink sql 其實很適合干的活就是 dwd 清洗,dws 聚合。
此處主要針對實時數(shù)倉的場景來說。flink sql 能干 dwd 清洗,dws 聚合,基本上實時數(shù)倉的大多數(shù)場景都能給覆蓋了。
flink sql 牛逼!!!
但是!!!
經(jīng)過博主使用 flink sql 經(jīng)驗來看,并不是所有的 dwd,dws 聚合場景都適合 flink sql(截止發(fā)文階段來說)!!!
其實這些目前不適合 flink sql 的場景總結(jié)下來就是在處理上比 datastream 還是會有一定的損失。
先總結(jié)下使用場景:
1. dwd:簡單的清洗、復(fù)雜的清洗、維度的擴(kuò)充、各種 udf 的使用
2. dws:各類聚合
然后分適合的場景和不適合的場景來說,因為只這一篇不能覆蓋所有的內(nèi)容,所以本文此處先大致給個結(jié)論,之后會結(jié)合具體的場景詳細(xì)描述。
適合的場景:
簡單的 dwd 清洗場景
全場景的 dws 聚合場景
目前不太適合的場景:
復(fù)雜的 dwd 清洗場景:舉例比如使用了很多 udf 清洗,尤其是使用很多的 json 類解析清洗
關(guān)聯(lián)維度場景:舉例比如 datastream 中經(jīng)常會有攢一批數(shù)據(jù)批量訪問外部接口的場景,flink sql 目前對于這種場景雖然有 localcache、異步訪問能力,但是依然還是一條一條訪問外部緩存,這樣相比批量訪問還是會有性能差距。
3.概念篇-先聊聊常見的窗口聚合
窗口聚合大家都在 datastream api 中很熟悉了,目前在實時數(shù)據(jù)處理的過程中,窗口計算可以說是最重要、最常用的一種計算方式了。
但是在拋出窗口概念之前,博主有幾個關(guān)于窗口的小想法說一下。
3.1.窗口竟然拖慢數(shù)據(jù)產(chǎn)出?
一個小想法。
先拋結(jié)論:窗口會拖慢實時數(shù)據(jù)的產(chǎn)出,是在目前下游分析引擎能力有限的情況下的一種妥協(xié)方案。
站在數(shù)據(jù)開發(fā)以及需求方的世界中,當(dāng)然希望所有的數(shù)據(jù)都是實時來的,實時處理的,實時產(chǎn)出的,實時展現(xiàn)的。
舉個例子:如果你要滿足一個一分鐘窗口聚合的 pv,uv,或者其他聚合需求。
olap 數(shù)據(jù)服務(wù)引擎 就可以滿足上述的實時來的,實時處理的,實時產(chǎn)出的,實時展現(xiàn)的的場景。flink 消費處理明細(xì)數(shù)據(jù),產(chǎn)出到 kafka,然后直接導(dǎo)入到 olap 引擎中。查詢時直接用 olap 做聚合。這其中是沒有任何窗口的概念的。但是整個鏈路中,要保障端對端精確一次,要保障大數(shù)據(jù)量情況下 olap 引擎能夠秒級查詢返回,更何況有一些去重類指標(biāo)的計算,等等場景。把這些壓力都放在 olap 引擎的壓力是很大的。
因此在 flink 數(shù)據(jù)計算引擎中就誕生了窗口的概念。我們可以直接在計算引擎中進(jìn)行窗口聚合計算,然后等到窗口結(jié)束之后直接把結(jié)果數(shù)據(jù)產(chǎn)出。這就出現(xiàn)了博主所說的窗口拖慢了實時數(shù)據(jù)產(chǎn)出的情況。而且窗口在處理不好的情況下可能會導(dǎo)致數(shù)據(jù)丟失。
關(guān)于上述兩種情況的具體優(yōu)劣選擇,都由大家自行選擇。上述只是引出博主一些想法。
3.2.常用的窗口
目前已知的窗口分為以下四種。
1. Tumble Windows2. Hop Windows3. Cumulate Windows4. Session Windows
這些窗口的具體描述直接見官網(wǎng),有詳細(xì)的說明。此處不贅述。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/
此處介紹下 flink 中常常會涉及到的兩個容易混淆的概念就是:窗口 + key。這里來形象的說明下。
窗口:時間周期上面的劃分。將無限流進(jìn)行縱向切分,將無限流切分為一個一個的窗口,窗口相當(dāng)于是無限流中的一段時間內(nèi)的數(shù)據(jù)。
key:數(shù)據(jù)類別上面的劃分。將無限流進(jìn)行橫向劃分,相同 key 的數(shù)據(jù)會被劃分到一組中,這個 key 的數(shù)據(jù)也是一條無限流。
如下圖所示。
4.實戰(zhàn)篇-簡單的 tumble window 案例和運行原理
源碼公眾號后臺回復(fù)flink sql tumble window 的奇妙解析之路獲取。
4.1.先看一個 datastream 窗口案例
在介紹 sql tumble window 窗口算子執(zhí)行案例之前,先看一個 datastream 中的窗口算子案例。其邏輯都是相通的。會對我們了解 sql tumble window 算子有幫助。
我們先看看 datastream 處理邏輯。
以下面這個為例。
- public class _04_TumbleWindowTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.addSource(new UserDefinedSource())
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) {
- @Override
- public long extractTimestamp(Tuple4<String, String, Integer, Long> element) {
- return element.f3;
- }
- })
- .keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() {
- @Override
- public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception {
- return row.f0;
- }
- })
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- .sum(2)
- .print();
- env.execute("1.12.1 DataStream TUMBLE WINDOW 案例");
- }
- private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> {
- private volatile boolean isCancel;
- @Override
- public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception {
- while (!this.isCancel) {
- sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis()));
- Thread.sleep(10L);
- }
- }
- @Override
- public void cancel() {
- this.isCancel = true;
- }
- }
- }
datastream 生產(chǎn)的具體的 transformation 如下圖:
其中我們只關(guān)注最重要的 WindowOperator 算子。
其中 WindowOperator 算子包含的重要屬性如下圖。
來看看 WindowOperator 的執(zhí)行邏輯。窗口執(zhí)行的整體詳細(xì)流程可以參考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/
4.2.flink sql tumble window 的語義
介紹到 tumble window 的語義,總要有對比的去介紹。這里的參照物就是 datastream api。
在 datastream api 中。tumble window 一般用作以下兩種場景。
業(yè)務(wù)場景:使用 tumble window 很輕松的計算出窗口內(nèi)的聚合數(shù)據(jù)。一般是多條輸入數(shù)據(jù),窗口結(jié)束時一條輸出數(shù)據(jù)。
優(yōu)化場景:窗口聚合一批數(shù)據(jù)然后批量訪問外部存儲擴(kuò)充維度、或者有一些自定義的處理邏輯。一般是多條輸入數(shù)據(jù),窗口結(jié)束時多條輸出數(shù)據(jù)。
但是在 sql api 中。tumble window 是聚合(group by)語義,聚合在 sql 標(biāo)準(zhǔn)中的數(shù)據(jù)處理邏輯是多條輸入,在窗口觸發(fā)時就輸出一條數(shù)據(jù)的語義。而上面的常常用在 datastream 中的優(yōu)化場景是多對多的場景。因此和 sql 語義不符合。所以 flink sql tumble window 一般都是用于計算聚合運算值來使用。
4.3.tumble window 實際案例
滾動窗口的特性就是會將無限流進(jìn)行縱向劃分成一個一個的窗口,每個窗口都是相同的大小,并且不重疊。
本文主要介紹 flink 1.12 及之前版本的實現(xiàn)。下一篇文章介紹 flink 1.13 的實現(xiàn)。
來,在介紹原理之前,總要先用起來,我們就以下面這個例子展開。
1.(flink 1.12.1)場景:簡單且常見的分維度分鐘級別同時在線用戶數(shù)、總銷售額
數(shù)據(jù)源表:
- CREATE TABLE source_table (
- -- 維度數(shù)據(jù)
- dim STRING,
- -- 用戶 id
- user_id BIGINT,
- -- 用戶
- price BIGINT,
- -- 事件時間戳
- row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
- -- watermark 設(shè)置
- WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.dim.length' = '1',
- 'fields.user_id.min' = '1',
- 'fields.user_id.max' = '100000',
- 'fields.price.min' = '1',
- 'fields.price.max' = '100000'
- )
- Notes - 關(guān)于 watermark 容易踩得坑:sql 的 watermark 類型必須要設(shè)置為 TIMESTAMP(3)。
數(shù)據(jù)匯表:
- CREATE TABLE sink_table (
- dim STRING,
- pv BIGINT,
- sum_price BIGINT,
- max_price BIGINT,
- min_price BIGINT,
- uv BIGINT,
- window_start bigint
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理邏輯:
可以看下下面語法,窗口聚合的寫法有專門的 tumble(row_time, interval '1' minute) 寫法,這就是與平常我們寫的 hive sql,mysql 等不一樣的地方。
- insert into sink_table
- select dim,
- sum(bucket_pv) as pv,
- sum(bucket_sum_price) as sum_price,
- max(bucket_max_price) as max_price,
- min(bucket_min_price) as min_price,
- sum(bucket_uv) as uv,
- max(window_start) as window_start
- from (
- select dim,
- count(*) as bucket_pv,
- sum(price) as bucket_sum_price,
- max(price) as bucket_max_price,
- min(price) as bucket_min_price,
- -- 計算 uv 數(shù)
- count(distinct user_id) as bucket_uv,
- cast(tumble_start(row_time, interval '1' minute) as bigint) * 1000 as window_start
- from source_table
- group by
- -- 按照用戶 id 進(jìn)行分桶,防止數(shù)據(jù)傾斜
- mod(user_id, 1024),
- dim,
- tumble(row_time, interval '1' minute)
- )
- group by dim,
- window_start
2.運行:可以看到,其實在 flink sql 任務(wù)中,其會把對應(yīng)的處理邏輯給寫到算子名稱上面。
- Notes - 觀察 flink sql 技巧 1:這個其實就是我們觀察 flink sql 任務(wù)的第一個技巧。如果你想知道你的 flink 任務(wù)在干啥,第一反應(yīng)是去 flink webui 看看這個任務(wù)目前在做什么。包括算子名稱都會給直接展示給我們目前哪個算子在干啥事情,在處理啥邏輯
先看一下整個算子圖,如下圖。從左到右總共分為三個算子。
第一個算子就是數(shù)據(jù)源算子
第二個算子就是分了桶的窗口聚合算子,第一個算子和第二個算子之間 hash 傳輸就是按照 group key 進(jìn)行 hash 傳輸
第三個算子就是外層進(jìn)行合桶計算的算子,同樣也是 hash 傳輸,將分桶的數(shù)據(jù)在一個算子中進(jìn)行合并計算
來看看每一個算子具體做了什么事情。
第一個算子:
table scan 讀取數(shù)據(jù)源
從數(shù)據(jù)源中獲取對應(yīng)的字段(包括源表定義的 rowtime)
分配 watermark(按照源表定義的 watermark 分配對應(yīng)的 watermark)
將一些必要的字段抽取。比如 group by 中的字段。在 hash 時需要使用。
第二個算子:
窗口聚合,計算窗口聚合數(shù)據(jù)
將數(shù)據(jù)按照第一層 select 中的數(shù)據(jù)進(jìn)行計算以及格式化
第三個算子:
group 聚合合桶計算
將數(shù)據(jù)按照第二層 select 中的數(shù)據(jù)進(jìn)行計算以及格式化
將數(shù)據(jù) sink 寫出
3.(flink 1.12.1)結(jié)果:
- +I(9,1,32682,32682,32682,1,1631026440000)
- -U(9,1,32682,32682,32682,1,1631026440000)
- +U(9,2,115351,82669,32682,2,1631026440000)
- +I(2,1,76148,76148,76148,1,1631026440000)
- +I(8,1,79321,79321,79321,1,1631026440000)
- +I(a,1,85792,85792,85792,1,1631026440000)
- +I(0,1,12858,12858,12858,1,1631026440000)
- +I(5,1,36753,36753,36753,1,1631026440000)
- +I(3,1,19218,19218,19218,1,1631026440000)
4.(flink 1.12.1)原理:
關(guān)于 sql 開始運行的機(jī)制見上一節(jié)詳述。
此處只介紹相比前一節(jié)新增內(nèi)容??梢钥吹缴鲜龃a的具體 transformation 如下圖。
4.4.GeneratedWatermarkGenerator - flink 1.12.1
按照順序,首先看看 watermark 算子。同 datastream 的自定義 watermark 分配策略。
watermark 生成的具體代碼 WatermarkGenerator$6,主要獲取 watermark 的邏輯在 currentWatermark 方法中。如下圖。
4.5.BinaryRowDataKeySelector - flink 1.12.1
接著就是 group by(同 datastream 中的 keyby)。
group by key 生成的具體代碼 KeyProjection$19,主要邏輯在 apply 方法中。
下一個就是窗口聚合算子。
4.6.AggregateWindowOperator - flink 1.12.1
兄弟們!!!兄弟們!!!兄弟們!!!
本節(jié)的重頭戲來了。sql 窗口聚合算子解析搞起來了。
關(guān)于 WatermarkGenerator 和 KeyProjection 沒有什么可以詳細(xì)介紹的,都是輸入一條數(shù)據(jù),輸出一條數(shù)據(jù),邏輯很簡單。
但是窗口聚合算子的計算邏輯相比上面兩個算子復(fù)雜很多。窗口算子又承載了窗口聚合的主要邏輯,所以本文重點介紹窗口算子計算的邏輯。
先來看看 sql 窗口整體處理流程。其實與 datastream 處理流程基本一致,但只是少了 Evictor。如下圖所示。
接著來看看上述 sql 生成的窗口聚合算子 AggregateWindowOperator,截圖中屬性也很清晰。
具體生成的窗口聚合代碼 GroupingWindowAggsHandler$59。
計算邏輯 GroupingWindowAggsHandler$59#accumulate。
上面那段都是在 flink 客戶端初始化處理的。包括窗口算子的初始化等。
下面這段處理邏輯是在 flink TM 運行時開始執(zhí)行的,包括窗口算子資源的初始化以及運行邏輯。就到了正式的數(shù)據(jù)處理環(huán)節(jié)了。
窗口算子 Task 運行。
窗口算子 Task 初始化。
StreamTask 整體的處理流程。
窗口算子 open 初始化。
窗口算子 open 初始化后的結(jié)果。如下圖,對應(yīng)的具體組件。
初始化完成之后,開始處理具體數(shù)據(jù)。
循環(huán) loop,一直 run 啊 run。
判斷記錄的具體類型,然后執(zhí)行不同的邏輯。
來看看處理一條數(shù)據(jù)的 processElement 方法邏輯,進(jìn)行 acc 處理。代碼中的的 windowAggregator 就是之前代碼生成的 GroupingWindowAggsHandler$59。
- Notes:事件時間邏輯中,sql api 和 datastream api 對于數(shù)據(jù)記錄時間戳存儲邏輯是不一樣的。datastream api:每條記錄的 rowtime 是放在 StreamRecord 中的時間戳字段中的。sql api:時間戳是每次都從數(shù)據(jù)中進(jìn)行獲取的。算子中會維護(hù)一個下標(biāo)。可以按照下標(biāo)從數(shù)據(jù)中獲取時間戳。
來看看 watermark 到達(dá)并且觸發(fā)窗口計算時,執(zhí)行 onEventTime 邏輯。
觸發(fā)窗口計算時,onEventTime -> emitWindowResult,產(chǎn)出具體數(shù)據(jù)。
至此整個 sql tumble window 的處理邏輯也就很清楚了。和 datastream 基本上都是一致的。是不是整個邏輯就理清楚了。
5.總結(jié)與展望篇
本文主要介紹了 tumble window 聚合類指標(biāo)的常見場景案例以及其底層運行原理。
而且也介紹了在查看 flink sql 任務(wù)時的一些技巧:
去 flink webui 就能看到這個任務(wù)目前在做什么。包括算子名稱都會給直接展示給我們目前哪個算子在干啥事情,在處理啥邏輯。
sql 的 watermark 類型要設(shè)置為 TIMESTAMP(3)。
事件時間邏輯中,sql api 和 datastream api 對于數(shù)據(jù)記錄時間戳存儲邏輯是不一樣的。datastream api:每條記錄的 rowtime 是放在 StreamRecord 中的時間戳字段中的。sql api:時間戳是每次都從數(shù)據(jù)中進(jìn)行獲取的。算子中會維護(hù)一個下標(biāo)。可以按照下標(biāo)從數(shù)據(jù)中獲取時間戳。
本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)羊說」