自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然:不會(huì)連最適合 Flink SQL的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧?

數(shù)據(jù)庫(kù) 其他數(shù)據(jù)庫(kù)
本文主要介紹了 ETL,group agg 聚合類(lèi)指標(biāo)的一些常見(jiàn)場(chǎng)景案例以及其底層運(yùn)行原理。我們可以發(fā)現(xiàn) flink sql 的語(yǔ)法其實(shí)和 hive sql,mysql 啥的語(yǔ)法都是基本一致的。所以上手 flink sql 時(shí),語(yǔ)法基本不會(huì)成為我們的障礙。

[[421838]]

1.序篇-本文結(jié)構(gòu)

前面的章節(jié)鋪墊了那么多,終于在本節(jié)走入一條 query 了。

針對(duì) datastream api 大家都比較熟悉了,還是那句話,在 datastream 中,你寫(xiě)的代碼邏輯是什么樣的,它最終的執(zhí)行方式就是什么樣的。

但是對(duì)于 flink sql 的執(zhí)行過(guò)程,大家還是不熟悉的。

因此本文通過(guò)以下章節(jié)使用 ETL,group agg(sum,count等)簡(jiǎn)單聚合類(lèi) query 帶大家走進(jìn)一條 flink sql query 邏輯的世界。幫大家至少能夠熟悉在 flink sql 程序運(yùn)行時(shí)知道 flink 程序在干什么。

  • 背景篇-大家不了解 flink sql 什么?
  • 目標(biāo)篇-本文能幫助大家了解 flink sql 什么?
  • 實(shí)戰(zhàn)篇-簡(jiǎn)單的 query 案例和運(yùn)行原理
  • 總結(jié)與展望篇

先說(shuō)說(shuō)結(jié)論:

場(chǎng)景問(wèn)題:flink sql 很適合簡(jiǎn)單 ETL,以及基本全部場(chǎng)景下的聚合類(lèi)指標(biāo)。

語(yǔ)法問(wèn)題:flink sql 語(yǔ)法其實(shí)是和其他 sql 語(yǔ)法基本一致的?;静粫?huì)產(chǎn)生語(yǔ)法問(wèn)題阻礙使用 flink sql。

運(yùn)行問(wèn)題:查看 flink sql 任務(wù)時(shí)的一些技巧:

  • 去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯。
  • 如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。flink sql 生成的代碼也在里面。
  • 如果你不確定線上任務(wù)執(zhí)行原理,可以直接在本地嘗試運(yùn)行。

2.背景篇-大家不了解 flink sql 什么?

首先從大家用 flink sql 的一個(gè)初衷和狀態(tài)出發(fā),想一下大家在開(kāi)始上手 flink sql 時(shí),是什么樣的一個(gè)想法?

博主大概整理了下,在初步上手 flink sql,一般從入手到踩坑整個(gè)過(guò)程中,一般都會(huì)有以下幾種問(wèn)題或者想法:

  1. 場(chǎng)景問(wèn)題:首先 flink sql 是用來(lái)提效的,那相比 datastream,哪些場(chǎng)景很適合 flink sql 去做?
  2. 語(yǔ)法問(wèn)題:我寫(xiě) sql 時(shí) flink sql 語(yǔ)法會(huì)不會(huì)和其他 sql 語(yǔ)法有不同?
  3. 運(yùn)行問(wèn)題:我寫(xiě)了一條 sql,運(yùn)行起來(lái)了,但是對(duì)我來(lái)說(shuō)是黑盒的,我怎么知道這個(gè)任務(wù)正在執(zhí)行什么操作?有沒(méi)有什么好辦法幫我去理解 flink sql 的運(yùn)行機(jī)制?
  4. 理解誤區(qū):在理解 flink sql 的運(yùn)算機(jī)制上有哪些誤區(qū)?
  5. 坑:flink sql 一般都有啥坑?提前了解幫我們避免踩坑。

就是上面這些想法,會(huì)讓很多想在公司內(nèi)部引入 flink sql 的同學(xué)望而卻步。

3.目標(biāo)篇-本文能幫助大家了解 flink sql 什么?

來(lái)看看本文的目標(biāo):

  1. 場(chǎng)景問(wèn)題:幫大家理解哪些場(chǎng)景是非常適合 flink sql 的
  2. 語(yǔ)法問(wèn)題:幫大家簡(jiǎn)單熟悉 flink sql 的語(yǔ)法
  3. 運(yùn)行問(wèn)題:使用一條簡(jiǎn)單的 query sql 看看其運(yùn)行起來(lái)的過(guò)程,其運(yùn)行的機(jī)制
  4. 理解誤區(qū):運(yùn)算機(jī)制上的常見(jiàn)誤區(qū)
  5. 坑:看看 sql 一般會(huì)有啥坑

由于一篇文章不能覆蓋所有概念,本文主要介紹一些最簡(jiǎn)單的 ETL,聚合場(chǎng)景,主要集中于前三點(diǎn)。

后兩點(diǎn)在后續(xù)系列文章中會(huì)按照?qǐng)鼍霸敿?xì)展開(kāi)。

4.實(shí)戰(zhàn)篇-簡(jiǎn)單的 query 案例和運(yùn)行原理

4.1.場(chǎng)景問(wèn)題:有哪些場(chǎng)景適合 flink sql?

不裝了,我坦白了,flink sql 其實(shí)很適合干的活就是 dwd 清洗,dws 聚合。

此處主要針對(duì)實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景來(lái)說(shuō)。flink sql 能干 dwd 清洗,dws 聚合,基本上實(shí)時(shí)數(shù)倉(cāng)的大多數(shù)場(chǎng)景都能給覆蓋了。

flink sql 牛逼!!!

但是!!!

經(jīng)過(guò)博主使用 flink sql 經(jīng)驗(yàn)來(lái)看,并不是所有的 dwd,dws 聚合場(chǎng)景都適合 flink sql(截止發(fā)文階段來(lái)說(shuō))!!!

其實(shí)這些目前不適合 flink sql 的場(chǎng)景總結(jié)下來(lái)就是在處理上比 datastream 還是會(huì)有一定的損失。

先總結(jié)下使用場(chǎng)景:

1. dwd:簡(jiǎn)單的清洗、復(fù)雜的清洗、維度的擴(kuò)充、各種 udf 的使用

2. dws:各類(lèi)聚合

然后分適合的場(chǎng)景和不適合的場(chǎng)景來(lái)說(shuō),因?yàn)橹贿@一篇不能覆蓋所有的內(nèi)容,所以本文此處先大致給個(gè)結(jié)論,之后會(huì)結(jié)合具體的場(chǎng)景詳細(xì)描述。

適合的場(chǎng)景:

簡(jiǎn)單的 dwd 清洗場(chǎng)景

全場(chǎng)景的 dws 聚合場(chǎng)景

目前不太適合的場(chǎng)景:

復(fù)雜的 dwd 清洗場(chǎng)景:舉例比如使用了很多 udf 清洗,尤其是使用很多的 json 類(lèi)解析清洗

關(guān)聯(lián)維度場(chǎng)景:舉例比如 datastream 中經(jīng)常會(huì)有攢一批數(shù)據(jù)批量訪問(wèn)外部接口的場(chǎng)景,flink sql 目前對(duì)于這種場(chǎng)景雖然有 localcache、異步訪問(wèn)能力,但是依然還是一條一條訪問(wèn)外部緩存,這樣相比批量訪問(wèn)還是會(huì)有性能差距。

4.2.語(yǔ)法\運(yùn)行問(wèn)題

其實(shí)總結(jié)來(lái)說(shuō),對(duì)于接觸過(guò) sql 的同學(xué)來(lái)說(shuō),除了 flink sql 中窗口聚合類(lèi)的寫(xiě)法來(lái)說(shuō),其他的 sql 語(yǔ)法都是相同的,很容易理解。

本節(jié)會(huì)針對(duì)具體的案例進(jìn)行詳細(xì)介紹。

4.2.1.ETL

最簡(jiǎn)單的 ETL 類(lèi)型任務(wù)。

  1. SELECT select_list FROM table_expression [ WHERE boolean_expression ] 

1.場(chǎng)景:簡(jiǎn)單的 dwd 清洗過(guò)濾場(chǎng)景

源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。

數(shù)據(jù)源表:

  1. CREATE TABLE source_table ( 
  2.     order_number BIGINT
  3.     price        DECIMAL(32,2) 
  4. WITH ( 
  5.   'connector' = 'datagen'
  6.   'rows-per-second' = '10'
  7.   'fields.order_number.min' = '10'
  8.   'fields.order_number.max' = '11' 

數(shù)據(jù)匯表:

  1. CREATE TABLE sink_table ( 
  2.     order_number BIGINT
  3.     price        DECIMAL(32,2) 
  4. WITH ( 
  5.   'connector' = 'print' 

ETL 邏輯:

  1. insert into sink_table 
  2. select * from source_table 
  3. where order_number = 10 

2.運(yùn)行:可以看到,其實(shí)在 flink sql 任務(wù)中,其會(huì)把對(duì)應(yīng)的處理邏輯給寫(xiě)到算子名稱上面。

Notes - 觀察 flink sql 技巧 1:這個(gè)其實(shí)就是我們觀察 flink sql 任務(wù)的第一個(gè)技巧。如果你想知道你的 flink 任務(wù)在干啥,第一反應(yīng)是去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯

3.結(jié)果

  1. +I[10, 337546916355686018150362513408.00] 
  2. +I[10, 734895198061906189720381030400.00] 
  3. +I[10, 496632591763800912960818249728.00] 
  4. +I[10, 495090465926828588045441171456.00] 
  5. +I[10, 167305033642317182838130081792.00] 
  6. +I[10, 409466913112794578407573684224.00] 
  7. +I[10, 894352160414515330502514180096.00] 
  8. +I[10, 680063350384451712068576346112.00] 
  9. +I[10, 50807402446574997641386524672.00] 
  10. +I[10, 646597093362022945955245981696.00] 
  11. +I[10, 233317961584082024331537809408.00] 
  12. ... 

4.原理:

先看一下一個(gè) flink sql 任務(wù)的入口執(zhí)行邏輯。

首先看看建表語(yǔ)句的執(zhí)行和 query 語(yǔ)句執(zhí)行的邏輯有什么不同。

可以發(fā)現(xiàn)執(zhí)行到 executeInternal 時(shí)會(huì)針對(duì)具體的 operation 來(lái)執(zhí)行不同的操作。

執(zhí)行建表操作就是具體的 CreateTableOperation 時(shí),會(huì)將表的信息保存到 catalogManager。

執(zhí)行 query 操作就是具體的 ModifyOperation 時(shí),會(huì)將對(duì)應(yīng)的邏輯轉(zhuǎn)換成對(duì)應(yīng)的 Transformation。

Transformation 中就包含了執(zhí)行的整體邏輯以及對(duì)應(yīng)要執(zhí)行的 sql 代碼內(nèi)容。

接下來(lái)我們?cè)敿?xì)看下對(duì)應(yīng)的 transform 中包含了什么內(nèi)容。

首先是最外層 LegacySinkTransformation,即 sink 算子,如圖就是 print sink function。比較好理解。

然后是中間層 OneInputTransformation,即 sql 中過(guò)濾和轉(zhuǎn)換操作(select * from source_table where order_number = 10),如圖是代碼生成的具體過(guò)濾和轉(zhuǎn)換邏輯。

生成的代碼就在 GeneratedOperator 中的 code 字段。我們將對(duì)應(yīng)的 code 復(fù)制到一個(gè)新的文件夾中。

這個(gè)算子是直接繼承了 OneInputStreamOperator 進(jìn)行直接執(zhí)行邏輯,跳過(guò)了 datastream 那一層。

我們來(lái)看看最重要的 processElement 邏輯,具體字段解釋和執(zhí)行邏輯如圖所示。

Notes - 觀察 flink sql 技巧 2:這個(gè)其實(shí)就是我們觀察 flink sql 任務(wù)的第二個(gè)技巧。如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。

4.2.2.去重場(chǎng)景

1.場(chǎng)景:最簡(jiǎn)單的去重場(chǎng)景

源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。

數(shù)據(jù)源:

  1. CREATE TABLE source_table ( 
  2.     string_field STRING 
  3. WITH ( 
  4.   'connector' = 'datagen'
  5.   'rows-per-second' = '10'
  6.   'fields.string_field.length' = '3' 

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     string_field STRING 
  3. WITH ( 
  4.   'connector' = 'print' 

數(shù)據(jù)處理:

  1. insert into sink_table 
  2. select distinct string_field 
  3. from source_table 

2.運(yùn)行:可以看到,其實(shí)在 flink sql 任務(wù)中,其會(huì)把對(duì)應(yīng)的處理邏輯給寫(xiě)到算子名稱上面。

3.上面這個(gè)案例的結(jié)果:

  1. +I[cd3] 
  2. +I[8fc] 
  3. +I[b0c] 
  4. +I[1d8] 
  5. +I[e28] 
  6. +I[c5f] 
  7. +I[e7d] 
  8. +I[dfa] 
  9. +I[1fe] 
  10. ... 

4.原理:

此處我們只關(guān)注和上面不同的邏輯。

第一個(gè)就是 PartitionTransform 中的 KeyGroupStreamPartitioner,就是對(duì)應(yīng)的分區(qū)邏輯。來(lái)看看生成代碼的邏輯。

其中做 shuffle 邏輯時(shí),是按照 string_field 作為 key 進(jìn)行 shuffle。

第二個(gè)就是 OneInputTransformation 中的 KeyedProcessOperator,就是對(duì)應(yīng)的去重邏輯。

可以看到生成的 function 中只有這三段代碼是業(yè)務(wù)邏輯代碼,但是其中的 RowData 初始化大小都是 0。那么到底是哪里做的去重邏輯呢?

我們跟一下處理邏輯會(huì)發(fā)現(xiàn)。去重邏輯主要集中在 GroupAggFunction#processElement。

4.2.3.group 聚合場(chǎng)景

4.2.3.1.簡(jiǎn)單聚合場(chǎng)景

1.場(chǎng)景:最簡(jiǎn)單的聚合場(chǎng)景

源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。

count,sum,avg,max,min 等:

數(shù)據(jù)源:

  1. CREATE TABLE source_table ( 
  2.     order_id STRING, 
  3.     price BIGINT 
  4. WITH ( 
  5.   'connector' = 'datagen'
  6.   'rows-per-second' = '10'
  7.   'fields.order_id.length' = '1'
  8.   'fields.price.min' = '1'
  9.   'fields.price.max' = '1000000' 

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     order_id STRING, 
  3.     count_result BIGINT
  4.     sum_result BIGINT
  5.     avg_result DOUBLE
  6.     min_result BIGINT
  7.     max_result BIGINT 
  8. WITH ( 
  9.   'connector' = 'print' 

數(shù)據(jù)處理邏輯:

  1. insert into sink_table 
  2. select order_id, 
  3.        count(*) as count_result, 
  4.        sum(price) as sum_result, 
  5.        avg(price) as avg_result, 
  6.        min(price) as min_result, 
  7.        max(price) as max_result 
  8. from source_table 
  9. group by order_id 

2.運(yùn)行:

3.上面這個(gè)案例的結(jié)果:

  1. +I[1, 1, 415300, 415300.0, 415300, 415300] 
  2. +I[d, 1, 416878, 416878.0, 416878, 416878] 
  3. +I[0, 1, 120837, 120837.0, 120837, 120837] 
  4. +I[c, 1, 337749, 337749.0, 337749, 337749] 
  5. +I[7, 1, 387053, 387053.0, 387053, 387053] 
  6. +I[8, 1, 387042, 387042.0, 387042, 387042] 
  7. +I[2, 1, 546317, 546317.0, 546317, 546317] 
  8. +I[e, 1, 22131, 22131.0, 22131, 22131] 
  9. +I[9, 1, 651731, 651731.0, 651731, 651731] 
  10. -U[0, 1, 120837, 120837.0, 120837, 120837] 
  11. +U[0, 2, 566664, 283332.0, 120837, 445827] 
  12. +I[b, 1, 748659, 748659.0, 748659, 748659] 
  13. -U[7, 1, 387053, 387053.0, 387053, 387053] 
  14. +U[7, 2, 1058056, 529028.0, 387053, 671003] 

4.原理:

來(lái)瞅一眼 transformation。

還是和之前的邏輯一樣,跟一下 GroupAggFunction 的邏輯。如下圖,有五個(gè)執(zhí)行步驟執(zhí)行計(jì)算。

再看最終生成的 function 代碼邏輯。

首先看看 count 怎么算的。

sum 怎么算的。

4.2.3.2.去重聚合場(chǎng)景

1.場(chǎng)景:去重聚合場(chǎng)景

數(shù)據(jù)源:

  1. CREATE TABLE source_table ( 
  2.     dim STRING, 
  3.     user_id BIGINT 
  4. WITH ( 
  5.   'connector' = 'datagen'
  6.   'rows-per-second' = '10'
  7.   'fields.dim.length' = '1'
  8.   'fields.user_id.min' = '1'
  9.   'fields.user_id.max' = '1000000' 

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     dim STRING, 
  3.     uv BIGINT 
  4. WITH ( 
  5.   'connector' = 'print' 

數(shù)據(jù)處理:

  1. insert into sink_table 
  2. select dim, 
  3.        count(distinct user_id) as uv 
  4. from source_table 
  5. group by dim 

2.運(yùn)行:

3.上面這個(gè)案例的結(jié)果:

  1. +U[9, 3097] 
  2. -U[a, 3054] 
  3. +U[a, 3055] 
  4. -U[8, 3030] 
  5. +U[8, 3031] 
  6. -U[4, 3137] 
  7. +U[4, 3138] 
  8. -U[6, 3139] 
  9. +U[6, 3140] 
  10. -U[0, 3082] 
  11. +U[0, 3083] 

4.原理:

此處只看和之前的案例不一樣的地方。

4.2.3.3.語(yǔ)法糖

1.grouping sets

多維計(jì)算。相當(dāng)于語(yǔ)法糖,用戶可以根據(jù)自己的場(chǎng)景去指定自己想要的維度組合。

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     supplier_id STRING, 
  3.     product_id STRING, 
  4.     total BIGINT 
  5. WITH ( 
  6.   'connector' = 'print' 

數(shù)據(jù)處理邏輯:

  1. insert into sink_table 
  2. SELECT 
  3.      supplier_id, 
  4.      product_id, 
  5.      COUNT(*) AS total 
  6. FROM (VALUES 
  7.      ('supplier1''product1', 4), 
  8.      ('supplier1''product2', 3), 
  9.      ('supplier2''product3', 3), 
  10.      ('supplier2''product4', 4)) 
  11. AS Products(supplier_id, product_id, rating) 
  12. GROUP BY GROUPING SETS ((supplier_id, product_id), (supplier_id), ()) 

其結(jié)果等同于:

  1. insert into sink_table 
  2. SELECT 
  3.      supplier_id, 
  4.      product_id, 
  5.      COUNT(*) AS total 
  6. FROM (VALUES 
  7.      ('supplier1''product1', 4), 
  8.      ('supplier1''product2', 3), 
  9.      ('supplier2''product3', 3), 
  10.      ('supplier2''product4', 4)) 
  11. AS Products(supplier_id, product_id, rating) 
  12. GROUP BY supplier_id, product_id 
  13. UNION ALL 
  14. SELECT 
  15.      supplier_id, 
  16.      cast(null as string) as product_id, 
  17.      COUNT(*) AS total 
  18. FROM (VALUES 
  19.      ('supplier1''product1', 4), 
  20.      ('supplier1''product2', 3), 
  21.      ('supplier2''product3', 3), 
  22.      ('supplier2''product4', 4)) 
  23. AS Products(supplier_id, product_id, rating) 
  24. GROUP BY supplier_id 
  25. UNION ALL 
  26. SELECT 
  27.      cast(null as string) AS supplier_id, 
  28.      cast(null as string) AS product_id, 
  29.      COUNT(*) AS total 
  30. FROM (VALUES 
  31.      ('supplier1''product1', 4), 
  32.      ('supplier1''product2', 3), 
  33.      ('supplier2''product3', 3), 
  34.      ('supplier2''product4', 4)) 
  35. AS Products(supplier_id, product_id, rating) 

結(jié)果如下:

  1. +I[supplier1, product1, 1] 
  2. +I[supplier1, null, 1] 
  3. +I[nullnull, 1] 
  4. +I[supplier1, product2, 1] 
  5. -U[supplier1, null, 1] 
  6. +U[supplier1, null, 2] 
  7. -U[nullnull, 1] 
  8. +U[nullnull, 2] 
  9. +I[supplier2, product3, 1] 
  10. +I[supplier2, null, 1] 
  11. -U[nullnull, 2] 
  12. +U[nullnull, 3] 
  13. +I[supplier2, product4, 1] 
  14. -U[supplier2, null, 1] 
  15. +U[supplier2, null, 2] 
  16. -U[nullnull, 3] 
  17. +U[nullnull, 4] 

grouping sets 能幫助我們?cè)诙嗑S場(chǎng)景下,減少很多冗余代碼。關(guān)于 grouping sets 原理后面的系列文章會(huì)介紹。

2.rollup

rollup 是上卷計(jì)算的一種簡(jiǎn)化寫(xiě)法。比如可以把 GROUPING SETS ((supplier_id, product_id), (supplier_id), ()) 簡(jiǎn)化為 ROLLUP (supplier_id, product_id)。

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     supplier_id STRING, 
  3.     product_id STRING, 
  4.     total BIGINT 
  5. WITH ( 
  6.   'connector' = 'print' 

數(shù)據(jù)處理邏輯:

  1. SELECT supplier_id, rating, COUNT(*) 
  2. FROM (VALUES 
  3.     ('supplier1''product1', 4), 
  4.     ('supplier1''product2', 3), 
  5.     ('supplier2''product3', 3), 
  6.     ('supplier2''product4', 4)) 
  7. AS Products(supplier_id, product_id, rating) 
  8. GROUP BY ROLLUP (supplier_id, product_id) 

其結(jié)果等同于:

  1. SELECT supplier_id, rating, product_id, COUNT(*) 
  2. FROM (VALUES 
  3.     ('supplier1''product1', 4), 
  4.     ('supplier1''product2', 3), 
  5.     ('supplier2''product3', 3), 
  6.     ('supplier2''product4', 4)) 
  7. AS Products(supplier_id, product_id, rating) 
  8. GROUP BY GROUPING SET ( 
  9.     ( supplier_id, product_id ), 
  10.     ( supplier_id             ), 
  11.     (                         ) 

結(jié)果如下:

  1. +I[supplier1, product1, 1] 
  2. +I[supplier1, null, 1] 
  3. +I[nullnull, 1] 
  4. +I[supplier1, product2, 1] 
  5. -U[supplier1, null, 1] 
  6. +U[supplier1, null, 2] 
  7. -U[nullnull, 1] 
  8. +U[nullnull, 2] 
  9. +I[supplier2, product3, 1] 
  10. +I[supplier2, null, 1] 
  11. -U[nullnull, 2] 
  12. +U[nullnull, 3] 
  13. +I[supplier2, product4, 1] 
  14. -U[supplier2, null, 1] 
  15. +U[supplier2, null, 2] 
  16. -U[nullnull, 3] 
  17. +U[nullnull, 4] 

5.CUBE 計(jì)算

源碼公眾號(hào)后臺(tái)回復(fù)不會(huì)連最適合 flink sql 的 ETL 和 group agg 場(chǎng)景都沒(méi)見(jiàn)過(guò)吧獲取。

cube 相當(dāng)于是一種覆蓋了所有維度組合聚合計(jì)算。比如 group by a, b, c。其會(huì)將 a, b, c 三個(gè)維度的所有維度組合進(jìn)行 group by。

數(shù)據(jù)匯:

  1. CREATE TABLE sink_table ( 
  2.     supplier_id STRING, 
  3.     product_id STRING, 
  4.     total BIGINT 
  5. WITH ( 
  6.   'connector' = 'print' 

數(shù)據(jù)處理邏輯:

  1. SELECT supplier_id, rating, product_id, COUNT(*) 
  2. FROM (VALUES 
  3.     ('supplier1''product1', 4), 
  4.     ('supplier1''product2', 3), 
  5.     ('supplier2''product3', 3), 
  6.     ('supplier2''product4', 4)) 
  7. AS Products(supplier_id, product_id, rating) 
  8. GROUP BY CUBE (supplier_id, product_id) 

它等同于

  1. SELECT supplier_id, rating, product_id, COUNT(*) 
  2. FROM (VALUES 
  3.     ('supplier1''product1', 4), 
  4.     ('supplier1''product2', 3), 
  5.     ('supplier2''product3', 3), 
  6.     ('supplier2''product4', 4)) 
  7. AS Products(supplier_id, product_id, rating) 
  8. GROUP BY GROUPING SET ( 
  9.     ( supplier_id, product_id ), 
  10.     ( supplier_id             ), 
  11.     (              product_id ), 
  12.     (                         ) 

結(jié)果如下:

  1. +I[supplier1, product1, 1] 
  2. +I[supplier1, null, 1] 
  3. +I[null, product1, 1] 
  4. +I[nullnull, 1] 
  5. +I[supplier1, product2, 1] 
  6. -U[supplier1, null, 1] 
  7. +U[supplier1, null, 2] 
  8. +I[null, product2, 1] 
  9. -U[nullnull, 1] 
  10. +U[nullnull, 2] 
  11. +I[supplier2, product3, 1] 
  12. +I[supplier2, null, 1] 
  13. +I[null, product3, 1] 
  14. -U[nullnull, 2] 
  15. +U[nullnull, 3] 
  16. +I[supplier2, product4, 1] 
  17. -U[supplier2, null, 1] 
  18. +U[supplier2, null, 2] 
  19. +I[null, product4, 1] 
  20. -U[nullnull, 3] 
  21. +U[nullnull, 4] 

5.總結(jié)與展望篇

本文主要介紹了 ETL,group agg 聚合類(lèi)指標(biāo)的一些常見(jiàn)場(chǎng)景案例以及其底層運(yùn)行原理。我們可以發(fā)現(xiàn) flink sql 的語(yǔ)法其實(shí)和 hive sql,mysql 啥的語(yǔ)法都是基本一致的。所以上手 flink sql 時(shí),語(yǔ)法基本不會(huì)成為我們的障礙。

而且也介紹了在查看 flink sql 任務(wù)時(shí)的一些技巧:

去 flink webui 看看這個(gè)任務(wù)目前在做什么。包括算子名稱都會(huì)給直接展示給我們目前哪個(gè)算子在干啥事情,在處理啥邏輯。

如果你想知道你的 flink 任務(wù)執(zhí)行了什么代碼,就去看看 sql 最后轉(zhuǎn)換成的 transformation 里面具體要執(zhí)行哪些操作。

后續(xù)文章會(huì)繼續(xù)介紹 flink sql 窗口聚合,一些理解誤區(qū),和坑之類(lèi)的案例。

本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)羊說(shuō)」

 

責(zé)任編輯:姜華 來(lái)源: 大數(shù)據(jù)羊說(shuō)
相關(guān)推薦

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時(shí)間語(yǔ)義

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2021-12-09 06:59:24

FlinkSQL 開(kāi)發(fā)

2022-05-27 09:02:58

SQLHive語(yǔ)義

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類(lèi)型

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-11-28 11:36:08

SQL Flink Join

2021-11-27 09:03:26

flink join數(shù)倉(cāng)

2022-08-10 10:05:29

FlinkSQL

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-12-06 07:15:47

開(kāi)發(fā)Flink SQL

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-12-13 07:57:47

Flink SQL Flink Hive Udf

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)