Flink SQL 知其所以然:不會(huì)連最適合 Flink SQL的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧?
1.序篇-本文結(jié)構(gòu)
前面的章節(jié)鋪墊了那么多,終于在本節(jié)走入一條 query 了。
針對(duì) datastream api 大家都比較熟悉了,還是那句話,在 datastream 中,你寫(xiě)的代碼邏輯是什么樣的,它最終的執(zhí)行方式就是什么樣的。
但是對(duì)于 flink sql 的執(zhí)行過(guò)程,大家還是不熟悉的。
因此本文通過(guò)以下章節(jié)使用 ETL,group agg(sum,count等)簡(jiǎn)單聚合類(lèi) query 帶大家走進(jìn)一條 flink sql query 邏輯的世界。幫大家至少能夠熟悉在 flink sql 程序運(yùn)行時(shí)知道 flink 程序在干什么。
- 背景篇-大家不了解 flink sql 什么?
- 目標(biāo)篇-本文能幫助大家了解 flink sql 什么?
- 實(shí)戰(zhàn)篇-簡(jiǎn)單的 query 案例和運(yùn)行原理
- 總結(jié)與展望篇
先說(shuō)說(shuō)結(jié)論:
場(chǎng)景問(wèn)題:flink sql 很適合簡(jiǎn)單 ETL,以及基本全部場(chǎng)景下的聚合類(lèi)指標(biāo)。
語(yǔ)法問(wèn)題:flink sql 語(yǔ)法其實(shí)是和其他 sql 語(yǔ)法基本一致的?;静粫?huì)產(chǎn)生語(yǔ)法問(wèn)題阻礙使用 flink sql。
運(yùn)行問(wèn)題:查看 flink sql 任務(wù)時(shí)的一些技巧:
- 去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯。
- 如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。flink sql 生成的代碼也在里面。
- 如果你不確定線上任務(wù)執(zhí)行原理,可以直接在本地嘗試運(yùn)行。
2.背景篇-大家不了解 flink sql 什么?
首先從大家用 flink sql 的一個(gè)初衷和狀態(tài)出發(fā),想一下大家在開(kāi)始上手 flink sql 時(shí),是什么樣的一個(gè)想法?
博主大概整理了下,在初步上手 flink sql,一般從入手到踩坑整個(gè)過(guò)程中,一般都會(huì)有以下幾種問(wèn)題或者想法:
- 場(chǎng)景問(wèn)題:首先 flink sql 是用來(lái)提效的,那相比 datastream,哪些場(chǎng)景很適合 flink sql 去做?
- 語(yǔ)法問(wèn)題:我寫(xiě) sql 時(shí) flink sql 語(yǔ)法會(huì)不會(huì)和其他 sql 語(yǔ)法有不同?
- 運(yùn)行問(wèn)題:我寫(xiě)了一條 sql,運(yùn)行起來(lái)了,但是對(duì)我來(lái)說(shuō)是黑盒的,我怎么知道這個(gè)任務(wù)正在執(zhí)行什么操作?有沒(méi)有什么好辦法幫我去理解 flink sql 的運(yùn)行機(jī)制?
- 理解誤區(qū):在理解 flink sql 的運(yùn)算機(jī)制上有哪些誤區(qū)?
- 坑:flink sql 一般都有啥坑?提前了解幫我們避免踩坑。
就是上面這些想法,會(huì)讓很多想在公司內(nèi)部引入 flink sql 的同學(xué)望而卻步。
3.目標(biāo)篇-本文能幫助大家了解 flink sql 什么?
來(lái)看看本文的目標(biāo):
- 場(chǎng)景問(wèn)題:幫大家理解哪些場(chǎng)景是非常適合 flink sql 的
- 語(yǔ)法問(wèn)題:幫大家簡(jiǎn)單熟悉 flink sql 的語(yǔ)法
- 運(yùn)行問(wèn)題:使用一條簡(jiǎn)單的 query sql 看看其運(yùn)行起來(lái)的過(guò)程,其運(yùn)行的機(jī)制
- 理解誤區(qū):運(yùn)算機(jī)制上的常見(jiàn)誤區(qū)
- 坑:看看 sql 一般會(huì)有啥坑
由于一篇文章不能覆蓋所有概念,本文主要介紹一些最簡(jiǎn)單的 ETL,聚合場(chǎng)景,主要集中于前三點(diǎn)。
后兩點(diǎn)在后續(xù)系列文章中會(huì)按照?qǐng)鼍霸敿?xì)展開(kāi)。
4.實(shí)戰(zhàn)篇-簡(jiǎn)單的 query 案例和運(yùn)行原理
4.1.場(chǎng)景問(wèn)題:有哪些場(chǎng)景適合 flink sql?
不裝了,我坦白了,flink sql 其實(shí)很適合干的活就是 dwd 清洗,dws 聚合。
此處主要針對(duì)實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景來(lái)說(shuō)。flink sql 能干 dwd 清洗,dws 聚合,基本上實(shí)時(shí)數(shù)倉(cāng)的大多數(shù)場(chǎng)景都能給覆蓋了。
flink sql 牛逼!!!
但是!!!
經(jīng)過(guò)博主使用 flink sql 經(jīng)驗(yàn)來(lái)看,并不是所有的 dwd,dws 聚合場(chǎng)景都適合 flink sql(截止發(fā)文階段來(lái)說(shuō))!!!
其實(shí)這些目前不適合 flink sql 的場(chǎng)景總結(jié)下來(lái)就是在處理上比 datastream 還是會(huì)有一定的損失。
先總結(jié)下使用場(chǎng)景:
1. dwd:簡(jiǎn)單的清洗、復(fù)雜的清洗、維度的擴(kuò)充、各種 udf 的使用
2. dws:各類(lèi)聚合
然后分適合的場(chǎng)景和不適合的場(chǎng)景來(lái)說(shuō),因?yàn)橹贿@一篇不能覆蓋所有的內(nèi)容,所以本文此處先大致給個(gè)結(jié)論,之后會(huì)結(jié)合具體的場(chǎng)景詳細(xì)描述。
適合的場(chǎng)景:
簡(jiǎn)單的 dwd 清洗場(chǎng)景
全場(chǎng)景的 dws 聚合場(chǎng)景
目前不太適合的場(chǎng)景:
復(fù)雜的 dwd 清洗場(chǎng)景:舉例比如使用了很多 udf 清洗,尤其是使用很多的 json 類(lèi)解析清洗
關(guān)聯(lián)維度場(chǎng)景:舉例比如 datastream 中經(jīng)常會(huì)有攢一批數(shù)據(jù)批量訪問(wèn)外部接口的場(chǎng)景,flink sql 目前對(duì)于這種場(chǎng)景雖然有 localcache、異步訪問(wèn)能力,但是依然還是一條一條訪問(wèn)外部緩存,這樣相比批量訪問(wèn)還是會(huì)有性能差距。
4.2.語(yǔ)法\運(yùn)行問(wèn)題
其實(shí)總結(jié)來(lái)說(shuō),對(duì)于接觸過(guò) sql 的同學(xué)來(lái)說(shuō),除了 flink sql 中窗口聚合類(lèi)的寫(xiě)法來(lái)說(shuō),其他的 sql 語(yǔ)法都是相同的,很容易理解。
本節(jié)會(huì)針對(duì)具體的案例進(jìn)行詳細(xì)介紹。
4.2.1.ETL
最簡(jiǎn)單的 ETL 類(lèi)型任務(wù)。
- SELECT select_list FROM table_expression [ WHERE boolean_expression ]
1.場(chǎng)景:簡(jiǎn)單的 dwd 清洗過(guò)濾場(chǎng)景
源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。
數(shù)據(jù)源表:
- CREATE TABLE source_table (
- order_number BIGINT,
- price DECIMAL(32,2)
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.order_number.min' = '10',
- 'fields.order_number.max' = '11'
- )
數(shù)據(jù)匯表:
- CREATE TABLE sink_table (
- order_number BIGINT,
- price DECIMAL(32,2)
- ) WITH (
- 'connector' = 'print'
- )
ETL 邏輯:
- insert into sink_table
- select * from source_table
- where order_number = 10
2.運(yùn)行:可以看到,其實(shí)在 flink sql 任務(wù)中,其會(huì)把對(duì)應(yīng)的處理邏輯給寫(xiě)到算子名稱上面。
Notes - 觀察 flink sql 技巧 1:這個(gè)其實(shí)就是我們觀察 flink sql 任務(wù)的第一個(gè)技巧。如果你想知道你的 flink 任務(wù)在干啥,第一反應(yīng)是去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯
3.結(jié)果
- +I[10, 337546916355686018150362513408.00]
- +I[10, 734895198061906189720381030400.00]
- +I[10, 496632591763800912960818249728.00]
- +I[10, 495090465926828588045441171456.00]
- +I[10, 167305033642317182838130081792.00]
- +I[10, 409466913112794578407573684224.00]
- +I[10, 894352160414515330502514180096.00]
- +I[10, 680063350384451712068576346112.00]
- +I[10, 50807402446574997641386524672.00]
- +I[10, 646597093362022945955245981696.00]
- +I[10, 233317961584082024331537809408.00]
- ...
4.原理:
先看一下一個(gè) flink sql 任務(wù)的入口執(zhí)行邏輯。
首先看看建表語(yǔ)句的執(zhí)行和 query 語(yǔ)句執(zhí)行的邏輯有什么不同。
可以發(fā)現(xiàn)執(zhí)行到 executeInternal 時(shí)會(huì)針對(duì)具體的 operation 來(lái)執(zhí)行不同的操作。
執(zhí)行建表操作就是具體的 CreateTableOperation 時(shí),會(huì)將表的信息保存到 catalogManager。
執(zhí)行 query 操作就是具體的 ModifyOperation 時(shí),會(huì)將對(duì)應(yīng)的邏輯轉(zhuǎn)換成對(duì)應(yīng)的 Transformation。
Transformation 中就包含了執(zhí)行的整體邏輯以及對(duì)應(yīng)要執(zhí)行的 sql 代碼內(nèi)容。
接下來(lái)我們?cè)敿?xì)看下對(duì)應(yīng)的 transform 中包含了什么內(nèi)容。
首先是最外層 LegacySinkTransformation,即 sink 算子,如圖就是 print sink function。比較好理解。
然后是中間層 OneInputTransformation,即 sql 中過(guò)濾和轉(zhuǎn)換操作(select * from source_table where order_number = 10),如圖是代碼生成的具體過(guò)濾和轉(zhuǎn)換邏輯。
生成的代碼就在 GeneratedOperator 中的 code 字段。我們將對(duì)應(yīng)的 code 復(fù)制到一個(gè)新的文件夾中。
這個(gè)算子是直接繼承了 OneInputStreamOperator 進(jìn)行直接執(zhí)行邏輯,跳過(guò)了 datastream 那一層。
我們來(lái)看看最重要的 processElement 邏輯,具體字段解釋和執(zhí)行邏輯如圖所示。
Notes - 觀察 flink sql 技巧 2:這個(gè)其實(shí)就是我們觀察 flink sql 任務(wù)的第二個(gè)技巧。如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。
4.2.2.去重場(chǎng)景
1.場(chǎng)景:最簡(jiǎn)單的去重場(chǎng)景
源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。
數(shù)據(jù)源:
- CREATE TABLE source_table (
- string_field STRING
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.string_field.length' = '3'
- )
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- string_field STRING
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理:
- insert into sink_table
- select distinct string_field
- from source_table
2.運(yùn)行:可以看到,其實(shí)在 flink sql 任務(wù)中,其會(huì)把對(duì)應(yīng)的處理邏輯給寫(xiě)到算子名稱上面。
3.上面這個(gè)案例的結(jié)果:
- +I[cd3]
- +I[8fc]
- +I[b0c]
- +I[1d8]
- +I[e28]
- +I[c5f]
- +I[e7d]
- +I[dfa]
- +I[1fe]
- ...
4.原理:
此處我們只關(guān)注和上面不同的邏輯。
第一個(gè)就是 PartitionTransform 中的 KeyGroupStreamPartitioner,就是對(duì)應(yīng)的分區(qū)邏輯。來(lái)看看生成代碼的邏輯。
其中做 shuffle 邏輯時(shí),是按照 string_field 作為 key 進(jìn)行 shuffle。
第二個(gè)就是 OneInputTransformation 中的 KeyedProcessOperator,就是對(duì)應(yīng)的去重邏輯。
可以看到生成的 function 中只有這三段代碼是業(yè)務(wù)邏輯代碼,但是其中的 RowData 初始化大小都是 0。那么到底是哪里做的去重邏輯呢?
我們跟一下處理邏輯會(huì)發(fā)現(xiàn)。去重邏輯主要集中在 GroupAggFunction#processElement。
4.2.3.group 聚合場(chǎng)景
4.2.3.1.簡(jiǎn)單聚合場(chǎng)景
1.場(chǎng)景:最簡(jiǎn)單的聚合場(chǎng)景
源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。
count,sum,avg,max,min 等:
數(shù)據(jù)源:
- CREATE TABLE source_table (
- order_id STRING,
- price BIGINT
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.order_id.length' = '1',
- 'fields.price.min' = '1',
- 'fields.price.max' = '1000000'
- )
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- order_id STRING,
- count_result BIGINT,
- sum_result BIGINT,
- avg_result DOUBLE,
- min_result BIGINT,
- max_result BIGINT
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理邏輯:
- insert into sink_table
- select order_id,
- count(*) as count_result,
- sum(price) as sum_result,
- avg(price) as avg_result,
- min(price) as min_result,
- max(price) as max_result
- from source_table
- group by order_id
2.運(yùn)行:
3.上面這個(gè)案例的結(jié)果:
- +I[1, 1, 415300, 415300.0, 415300, 415300]
- +I[d, 1, 416878, 416878.0, 416878, 416878]
- +I[0, 1, 120837, 120837.0, 120837, 120837]
- +I[c, 1, 337749, 337749.0, 337749, 337749]
- +I[7, 1, 387053, 387053.0, 387053, 387053]
- +I[8, 1, 387042, 387042.0, 387042, 387042]
- +I[2, 1, 546317, 546317.0, 546317, 546317]
- +I[e, 1, 22131, 22131.0, 22131, 22131]
- +I[9, 1, 651731, 651731.0, 651731, 651731]
- -U[0, 1, 120837, 120837.0, 120837, 120837]
- +U[0, 2, 566664, 283332.0, 120837, 445827]
- +I[b, 1, 748659, 748659.0, 748659, 748659]
- -U[7, 1, 387053, 387053.0, 387053, 387053]
- +U[7, 2, 1058056, 529028.0, 387053, 671003]
4.原理:
來(lái)瞅一眼 transformation。
還是和之前的邏輯一樣,跟一下 GroupAggFunction 的邏輯。如下圖,有五個(gè)執(zhí)行步驟執(zhí)行計(jì)算。
再看最終生成的 function 代碼邏輯。
首先看看 count 怎么算的。
sum 怎么算的。
4.2.3.2.去重聚合場(chǎng)景
1.場(chǎng)景:去重聚合場(chǎng)景
數(shù)據(jù)源:
- CREATE TABLE source_table (
- dim STRING,
- user_id BIGINT
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '10',
- 'fields.dim.length' = '1',
- 'fields.user_id.min' = '1',
- 'fields.user_id.max' = '1000000'
- )
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- dim STRING,
- uv BIGINT
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理:
- insert into sink_table
- select dim,
- count(distinct user_id) as uv
- from source_table
- group by dim
2.運(yùn)行:
3.上面這個(gè)案例的結(jié)果:
- +U[9, 3097]
- -U[a, 3054]
- +U[a, 3055]
- -U[8, 3030]
- +U[8, 3031]
- -U[4, 3137]
- +U[4, 3138]
- -U[6, 3139]
- +U[6, 3140]
- -U[0, 3082]
- +U[0, 3083]
4.原理:
此處只看和之前的案例不一樣的地方。
4.2.3.3.語(yǔ)法糖
1.grouping sets
多維計(jì)算。相當(dāng)于語(yǔ)法糖,用戶可以根據(jù)自己的場(chǎng)景去指定自己想要的維度組合。
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- supplier_id STRING,
- product_id STRING,
- total BIGINT
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理邏輯:
- insert into sink_table
- SELECT
- supplier_id,
- product_id,
- COUNT(*) AS total
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY GROUPING SETS ((supplier_id, product_id), (supplier_id), ())
其結(jié)果等同于:
- insert into sink_table
- SELECT
- supplier_id,
- product_id,
- COUNT(*) AS total
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY supplier_id, product_id
- UNION ALL
- SELECT
- supplier_id,
- cast(null as string) as product_id,
- COUNT(*) AS total
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY supplier_id
- UNION ALL
- SELECT
- cast(null as string) AS supplier_id,
- cast(null as string) AS product_id,
- COUNT(*) AS total
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
結(jié)果如下:
- +I[supplier1, product1, 1]
- +I[supplier1, null, 1]
- +I[null, null, 1]
- +I[supplier1, product2, 1]
- -U[supplier1, null, 1]
- +U[supplier1, null, 2]
- -U[null, null, 1]
- +U[null, null, 2]
- +I[supplier2, product3, 1]
- +I[supplier2, null, 1]
- -U[null, null, 2]
- +U[null, null, 3]
- +I[supplier2, product4, 1]
- -U[supplier2, null, 1]
- +U[supplier2, null, 2]
- -U[null, null, 3]
- +U[null, null, 4]
grouping sets 能幫助我們?cè)诙嗑S場(chǎng)景下,減少很多冗余代碼。關(guān)于 grouping sets 原理后面的系列文章會(huì)介紹。
2.rollup
rollup 是上卷計(jì)算的一種簡(jiǎn)化寫(xiě)法。比如可以把 GROUPING SETS ((supplier_id, product_id), (supplier_id), ()) 簡(jiǎn)化為 ROLLUP (supplier_id, product_id)。
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- supplier_id STRING,
- product_id STRING,
- total BIGINT
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理邏輯:
- SELECT supplier_id, rating, COUNT(*)
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY ROLLUP (supplier_id, product_id)
其結(jié)果等同于:
- SELECT supplier_id, rating, product_id, COUNT(*)
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY GROUPING SET (
- ( supplier_id, product_id ),
- ( supplier_id ),
- ( )
- )
結(jié)果如下:
- +I[supplier1, product1, 1]
- +I[supplier1, null, 1]
- +I[null, null, 1]
- +I[supplier1, product2, 1]
- -U[supplier1, null, 1]
- +U[supplier1, null, 2]
- -U[null, null, 1]
- +U[null, null, 2]
- +I[supplier2, product3, 1]
- +I[supplier2, null, 1]
- -U[null, null, 2]
- +U[null, null, 3]
- +I[supplier2, product4, 1]
- -U[supplier2, null, 1]
- +U[supplier2, null, 2]
- -U[null, null, 3]
- +U[null, null, 4]
5.CUBE 計(jì)算
源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。
cube 相當(dāng)于是一種覆蓋了所有維度組合聚合計(jì)算。比如 group by a, b, c。其會(huì)將 a, b, c 三個(gè)維度的所有維度組合進(jìn)行 group by。
數(shù)據(jù)匯:
- CREATE TABLE sink_table (
- supplier_id STRING,
- product_id STRING,
- total BIGINT
- ) WITH (
- 'connector' = 'print'
- )
數(shù)據(jù)處理邏輯:
- SELECT supplier_id, rating, product_id, COUNT(*)
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY CUBE (supplier_id, product_id)
它等同于
- SELECT supplier_id, rating, product_id, COUNT(*)
- FROM (VALUES
- ('supplier1', 'product1', 4),
- ('supplier1', 'product2', 3),
- ('supplier2', 'product3', 3),
- ('supplier2', 'product4', 4))
- AS Products(supplier_id, product_id, rating)
- GROUP BY GROUPING SET (
- ( supplier_id, product_id ),
- ( supplier_id ),
- ( product_id ),
- ( )
- )
結(jié)果如下:
- +I[supplier1, product1, 1]
- +I[supplier1, null, 1]
- +I[null, product1, 1]
- +I[null, null, 1]
- +I[supplier1, product2, 1]
- -U[supplier1, null, 1]
- +U[supplier1, null, 2]
- +I[null, product2, 1]
- -U[null, null, 1]
- +U[null, null, 2]
- +I[supplier2, product3, 1]
- +I[supplier2, null, 1]
- +I[null, product3, 1]
- -U[null, null, 2]
- +U[null, null, 3]
- +I[supplier2, product4, 1]
- -U[supplier2, null, 1]
- +U[supplier2, null, 2]
- +I[null, product4, 1]
- -U[null, null, 3]
- +U[null, null, 4]
5.總結(jié)與展望篇
本文主要介紹了 ETL,group agg 聚合類(lèi)指標(biāo)的一些常見(jiàn)場(chǎng)景案例以及其底層運(yùn)行原理。我們可以發(fā)現(xiàn) flink sql 的語(yǔ)法其實(shí)和 hive sql,mysql 啥的語(yǔ)法都是基本一致的。所以上手 flink sql 時(shí),語(yǔ)法基本不會(huì)成為我們的障礙。
而且也介紹了在查看 flink sql 任務(wù)時(shí)的一些技巧:
去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯。
如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。
后續(xù)文章會(huì)繼續(xù)介紹 flink sql 窗口聚合,一些理解誤區(qū),和坑之類(lèi)的案例。
本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)羊說(shuō)」