自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然:Group 聚合操作

數(shù)據(jù)庫 其他數(shù)據(jù)庫
架構(gòu)

Group 聚合

  • Group 聚合定義(支持 Batch\Streaming 任務(wù)):Flink 也支持 Group 聚合。Group 聚合和上面介紹到的窗口聚合的不同之處,就在于 Group 聚合是按照數(shù)據(jù)的類別進(jìn)行分組,比如年齡、性別,是橫向的;而窗口聚合是在時(shí)間粒度上對(duì)數(shù)據(jù)進(jìn)行分組,是縱向的。如下圖所示,就展示出了其區(qū)別。其中按顏色分 key(橫向)? 就是 Group 聚合,按窗口劃分(縱向) 就是窗口聚合。

圖片

tumble window + key

  • 應(yīng)用場景:一般用于對(duì)數(shù)據(jù)進(jìn)行分組,然后后續(xù)使用聚合函數(shù)進(jìn)行 count、sum 等聚合操作。

那么這時(shí)候,小伙伴萌就會(huì)問到,我其實(shí)可以把窗口聚合的寫法也轉(zhuǎn)換為 Group 聚合,只需要把 Group 聚合的 Group By key 換成時(shí)間就行,那這兩個(gè)聚合的區(qū)別到底在哪?

首先來舉一個(gè)例子看看怎么將窗口聚合轉(zhuǎn)換為 Group 聚合。假如一個(gè)窗口聚合是按照 1 分鐘的粒度進(jìn)行聚合,如下 SQL:

-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時(shí)間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設(shè)置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)

-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)

-- 數(shù)據(jù)處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計(jì)算 uv 數(shù)
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口寫法劃分窗口
tumble(row_time, interval '1' minute)

轉(zhuǎn)換為 Group 聚合的寫法如下:

Group 聚合

-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時(shí)間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設(shè)置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);

-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);

-- 數(shù)據(jù)處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計(jì)算 uv 數(shù)
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 將秒級(jí)別時(shí)間戳 / 60 轉(zhuǎn)化為 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

確實(shí)沒錯(cuò),上面這個(gè)轉(zhuǎn)換是一點(diǎn)問題都沒有的。

但是窗口聚合和 Group by 聚合的差異在于:

本質(zhì)區(qū)別:窗口聚合是具有時(shí)間語義的,其本質(zhì)是想實(shí)現(xiàn)窗口結(jié)束輸出結(jié)果之后,后續(xù)有遲到的數(shù)據(jù)也不會(huì)對(duì)原有的結(jié)果發(fā)生更改了,即輸出結(jié)果值是定值(不考慮 allowLateness)。而 Group by 聚合是沒有時(shí)間語義的,不管數(shù)據(jù)遲到多長時(shí)間,只要數(shù)據(jù)來了,就把上一次的輸出的結(jié)果數(shù)據(jù)撤回,然后把計(jì)算好的新的結(jié)果數(shù)據(jù)發(fā)出。

運(yùn)行層面:窗口聚合是和 時(shí)間 綁定的,窗口聚合其中窗口的計(jì)算結(jié)果觸發(fā)都是由時(shí)間(Watermark)推動(dòng)的。Group by 聚合完全由數(shù)據(jù)推動(dòng)觸發(fā)計(jì)算,新來一條數(shù)據(jù)去根據(jù)這條數(shù)據(jù)進(jìn)行計(jì)算出結(jié)果發(fā)出;由此可見兩者的實(shí)現(xiàn)方式也大為不同。

  • SQL 語義

也是拿離線和實(shí)時(shí)做對(duì)比,Orders 為 kafka,target_table 為 Kafka,這個(gè) SQL 生成的實(shí)時(shí)任務(wù),在執(zhí)行時(shí),會(huì)生成三個(gè)算子:

數(shù)據(jù)源算子?(From Order):數(shù)據(jù)源算子一直運(yùn)行,實(shí)時(shí)的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的Group 聚合算子,向下游發(fā)送數(shù)據(jù)的 shuffle 策略是根據(jù) group by 中的 key 進(jìn)行發(fā)送,相同的 key 發(fā)到同一個(gè) SubTask(并發(fā)) 中。

Group 聚合算子?(group by key + sum\count\max\min):接收到上游算子發(fā)的一條一條的數(shù)據(jù),去狀態(tài) state 中找這個(gè) key 之前的 sum\count\max\min 結(jié)果。如果有結(jié)果oldResult?,拿出來和當(dāng)前的數(shù)據(jù)進(jìn)行sum\count\max\min? 計(jì)算出這個(gè) key 的新結(jié)果newResult?,并將新結(jié)果[key, newResult]? 更新到 state 中,在向下游發(fā)送新計(jì)算的結(jié)果之前,先發(fā)一條撤回上次結(jié)果的消息-[key, oldResult]?,然后再將新結(jié)果發(fā)往下游+[key, newResult]?;如果 state 中沒有當(dāng)前 key 的結(jié)果,則直接使用當(dāng)前這條數(shù)據(jù)計(jì)算 sum\max\min 結(jié)果newResult?,并將新結(jié)果[key, newResult]? 更新到 state 中,當(dāng)前是第一次往下游發(fā),則不需要先發(fā)回撤消息,直接發(fā)送+[key, newResult]。

數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Kafka 中。

這個(gè)實(shí)時(shí)任務(wù)也是 24 小時(shí)一直在運(yùn)行的,所有的算子在同一時(shí)刻都是處于 running 狀態(tài)的。

特別注意:

  • Group by 聚合涉及到了回撤流(也叫 retract 流),會(huì)產(chǎn)生回撤流是因?yàn)閺恼麄€(gè) SQL 的語義來看,上游的 Kafk數(shù)據(jù)是源源不斷的,無窮無盡的,那么每次這個(gè) SQL 任務(wù)產(chǎn)出的結(jié)果都是一個(gè)中間結(jié)果,所以每次結(jié)果發(fā)生更新時(shí),都需要將上一次發(fā)出的中間結(jié)果給撤回,然后將最新的結(jié)果發(fā)下去。
  • Group by 聚合涉及到了狀態(tài):狀態(tài)大小也取決于不同 key 的數(shù)量。為了防止?fàn)顟B(tài)無限變大,我們可以設(shè)置狀態(tài)的 TTL。以上面的 SQL 為例,上面 SQL 是按照分鐘進(jìn)行聚合的,理論上到了今天,通常我們就可以不用關(guān)心昨天的數(shù)據(jù)了,那么我們可以設(shè)置狀態(tài)過期時(shí)間為一天。關(guān)于狀態(tài)過期時(shí)間的設(shè)置參數(shù)可以參考下文運(yùn)行時(shí)參數(shù) 小節(jié)。

如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),其中 Orders 為 Hive,target_table 也為 Hive,其也會(huì)生成三個(gè)相同的算子,但是其和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同:

  • 數(shù)據(jù)源算子?(From Order):數(shù)據(jù)源算子從 Order Hive 中讀取到所有的數(shù)據(jù),然后所有數(shù)據(jù)發(fā)送給下游的Group 聚合算子,向下游發(fā)送數(shù)據(jù)的 shuffle 策略是根據(jù) group by 中的 key 進(jìn)行發(fā)送,相同的 key 發(fā)到同一個(gè)算子中,然后這個(gè)算子就運(yùn)行結(jié)束了,釋放資源了。
  • Group 聚合算子?(group by + sum\count\max\min):接收到上游算子發(fā)的所有數(shù)據(jù),然后遍歷計(jì)算 sum\count\max\min 結(jié)果,批量發(fā)給下游數(shù)據(jù)匯算子,這個(gè)算子也就運(yùn)行結(jié)束了,釋放資源了。
  • 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Hive 中,整個(gè)任務(wù)也就運(yùn)行結(jié)束了,整個(gè)任務(wù)的資源也就都釋放了。

Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping sets、Rollup、Cube。

舉一個(gè) Grouping sets 的案例:

SELECT 
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)?

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-06-10 09:01:04

OverFlinkSQL

2022-07-05 09:03:05

Flink SQLTopN

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時(shí)間語義

2022-05-27 09:02:58

SQLHive語義

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2021-11-27 09:03:26

flink join數(shù)倉

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-07-12 09:02:18

Flink SQL去重

2021-12-06 07:15:47

開發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)