自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然:Deduplication去重以及如何獲取最新狀態(tài)操作

數(shù)據(jù)庫 其他數(shù)據(jù)庫
今天我們來學(xué)習(xí) Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態(tài)。

大家好,我是老羊,今天我們來學(xué)習(xí) Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態(tài)。

  1. Deduplication 定義(支持 Batch\Streaming):Deduplication 其實(shí)就是去重,也即上文介紹到的 TopN 中 row_number = 1 的場(chǎng)景,但是這里有一點(diǎn)不一樣在于其排序字段一定是時(shí)間屬性列,不能是其他非時(shí)間屬性的普通列。在 row_number = 1 時(shí),如果排序字段是普通列 planner 會(huì)翻譯成 TopN 算子,如果是時(shí)間屬性列 planner 會(huì)翻譯成 Deduplication,這兩者最終的執(zhí)行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對(duì)應(yīng)的優(yōu)化,性能會(huì)有很大提升。
  2. 應(yīng)用場(chǎng)景:比如上游數(shù)據(jù)發(fā)重了,或者計(jì)算 DAU 明細(xì)數(shù)據(jù)等場(chǎng)景,都可以使用 Deduplication 語法去做去重。
  3. SQL 語法標(biāo)準(zhǔn):
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1

其中:

  • ROW_NUMBER():標(biāo)識(shí)當(dāng)前數(shù)據(jù)的排序值。
  • PARTITION BY col1[, col2...]:標(biāo)識(shí)分區(qū)字段,代表按照這個(gè) col 字段作為分區(qū)粒度對(duì)數(shù)據(jù)進(jìn)行排序。
  • ORDER BY time_attr [asc|desc]:標(biāo)識(shí)排序規(guī)則,必須為時(shí)間戳列,當(dāng)前 Flink SQL 支持處理時(shí)間、事件時(shí)間,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:這個(gè)子句是一定需要的,而且必須為 rownum = 1。
  1. 實(shí)際案例:

博主這里舉兩個(gè)案例:

  • 案例 1(事件時(shí)間):是騰訊 QQ 用戶等級(jí)的場(chǎng)景,每一個(gè) QQ 用戶都有一個(gè) QQ 用戶等級(jí),需要求出當(dāng)前用戶等級(jí)在星星,月亮,太陽 的用戶數(shù)分別有多少。
-- 數(shù)據(jù)源:當(dāng)每一個(gè)用戶的等級(jí)初始化及后續(xù)變化的時(shí)候的數(shù)據(jù),即用戶等級(jí)變化明細(xì)數(shù)據(jù)。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
level STRING COMMENT '用戶等級(jí)',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件時(shí)間戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 數(shù)據(jù)匯:輸出即每一個(gè)等級(jí)的用戶數(shù)
CREATE TABLE sink_table (
level STRING COMMENT '等級(jí)',
uv BIGINT COMMENT '當(dāng)前等級(jí)用戶數(shù)',
row_time timestamp(3) COMMENT '時(shí)間戳'
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level

輸出結(jié)果:

+I[等級(jí) 1, 6928, 2021-1-28T22:34]
-I[等級(jí) 1, 6928, 2021-1-28T22:34]
+I[等級(jí) 1, 8670, 2021-1-28T22:34]
-I[等級(jí) 1, 8670, 2021-1-28T22:34]
+I[等級(jí) 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤數(shù)據(jù)。

其對(duì)應(yīng)的 SQL 語義如下:

  • 數(shù)據(jù)源:消費(fèi)到 Kafka 中數(shù)據(jù)后,將數(shù)據(jù)按照 partition by 的 key 通過 hash 分發(fā)策略發(fā)送到下游去重算子。
  • Deduplication 去重算子:接受到上游數(shù)據(jù)之后,根據(jù) order by 中的條件判斷當(dāng)前的這條數(shù)據(jù)和之前數(shù)據(jù)時(shí)間戳大小,以上面案例來說,如果當(dāng)前數(shù)據(jù)時(shí)間戳大于之前數(shù)據(jù)時(shí)間戳,則撤回之前向下游發(fā)的中間結(jié)果,然后將最新的結(jié)果發(fā)向下游(發(fā)送策略也為 hash,具體的 hash 策略為按照 group by 中 key 進(jìn)行發(fā)送),如果當(dāng)前數(shù)據(jù)時(shí)間戳小于之前數(shù)據(jù)時(shí)間戳,則不做操作。次算子產(chǎn)出的結(jié)果就是每一個(gè)用戶的對(duì)應(yīng)的最新等級(jí)信息。
  • Group by 聚合算子:接受到上游數(shù)據(jù)之后,根據(jù) Group by 聚合粒度對(duì)數(shù)據(jù)進(jìn)行聚合計(jì)算結(jié)果(每一個(gè)等級(jí)的用戶數(shù)),發(fā)往下游數(shù)據(jù)匯算子。
  • 數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲(chǔ)引擎中。
  • 案例 2(處理時(shí)間):最原始的日志是明細(xì)數(shù)據(jù),需要我們根據(jù)用戶 id 篩選出這個(gè)用戶當(dāng)天的第一條數(shù)據(jù),發(fā)往下游,下游可以據(jù)此計(jì)算分各種維度的 DAU。
-- 數(shù)據(jù)源:原始日志明細(xì)數(shù)據(jù)
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
name STRING COMMENT '用戶姓名',
server_timestamp BIGINT COMMENT '用戶訪問時(shí)間戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);

-- 數(shù)據(jù)匯:根據(jù) user_id 去重的第一條數(shù)據(jù)
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1

輸出結(jié)果:

+I[1, 用戶 1, 2021-1-28T22:34]
+I[2, 用戶 2, 2021-1-28T22:34]
+I[3, 用戶 3, 2021-1-28T22:34]
...

可以看到這個(gè)處理邏輯是沒有回撤數(shù)據(jù)的。其對(duì)應(yīng)的 SQL 語義如下:

  • 數(shù)據(jù)源:消費(fèi)到 Kafka 中數(shù)據(jù)后,將數(shù)據(jù)按照 partition by 的 key 通過 hash 分發(fā)策略發(fā)送到下游去重算子。
  • Deduplication 去重算子:處理時(shí)間語義下,如果是當(dāng)前 key 的第一條數(shù)據(jù),則直接發(fā)往下游,如果判斷(根據(jù) state 中是否存儲(chǔ)過改 key)不是第一條,則直接丟棄。
  • 數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲(chǔ)引擎中。

注意:

在 Deduplication 關(guān)于是否會(huì)出現(xiàn)回撤流,博主總結(jié)如下:

  1. ? Order by 事件時(shí)間 DESC:會(huì)出現(xiàn)回撤流,因?yàn)楫?dāng)前 key 下可能會(huì)有 比當(dāng)前事件時(shí)間還大的數(shù)據(jù)。
  2. ? Order by 事件時(shí)間 ASC:會(huì)出現(xiàn)回撤流,因?yàn)楫?dāng)前 key 下可能會(huì)有 比當(dāng)前事件時(shí)間還小的數(shù)據(jù)。
  3. ? Order by 處理時(shí)間 DESC:會(huì)出現(xiàn)回撤流,因?yàn)楫?dāng)前 key 下可能會(huì)有 比當(dāng)前處理時(shí)間還大的數(shù)據(jù)。
  4. ? Order by 處理時(shí)間 ASC:不會(huì)出現(xiàn)回撤流,因?yàn)楫?dāng)前 key 下不可能會(huì)有 比當(dāng)前處理時(shí)間還小的數(shù)據(jù)。
責(zé)任編輯:姜華 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2022-05-18 09:02:28

Flink SQLSQL字符串

2021-11-25 07:01:57

SQL應(yīng)用場(chǎng)景

2022-05-15 09:57:59

Flink SQL時(shí)間語義

2022-05-27 09:02:58

SQLHive語義

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2021-11-27 09:03:26

flink join數(shù)倉

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-12-06 07:15:47

開發(fā)Flink SQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)