Flink SQL 知其所以然之去重不僅僅有 Count Distinct 還有強(qiáng)大的 Deduplication
1.序篇
源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 deduplication 的奇妙解析之路獲取。
下面即是文章目錄,也對(duì)應(yīng)到了本文的結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:
- 背景及應(yīng)用場(chǎng)景介紹:博主期望你了解到,flink sql 的 deduplication 其實(shí)就是 row_number = 1,所以它可以在去重的同時(shí),還能保留原始字段數(shù)據(jù)
- 來一個(gè)實(shí)戰(zhàn)案例:博主以一個(gè)日志上報(bào)重復(fù)的場(chǎng)景,來引出下文要介紹的 flink sql deduplication 解決方案
- 基于 Deduplication 的解決方案及原理解析:博主期望你了解到,deduplication 中,當(dāng) row_number order by proctime(處理時(shí)間)去重的原理就是給每一個(gè) partition key 維護(hù)一個(gè) value state。如果當(dāng)前 value state 不為空,則說明 id 已經(jīng)來過了,當(dāng)前這條數(shù)據(jù)就不用下發(fā)了。如果 value state 為空,則 id 還沒還沒來過,把 value state 標(biāo)記之后,把當(dāng)前數(shù)據(jù)下發(fā)。
- 總結(jié)及展望篇
2.背景及應(yīng)用場(chǎng)景介紹
你是否遇到過一下的場(chǎng)景:
由于上游發(fā)過來的數(shù)據(jù)有重復(fù)或者日志源頭數(shù)據(jù)有重復(fù)上報(bào),導(dǎo)致下游計(jì)算 count,sum 時(shí)算多
想做到去重計(jì)算的同時(shí),原始表的所有字段還能正常保留且下發(fā)
那么你能想到哪些解決方案呢?
熟悉離線計(jì)算的小伙伴可能很快就能給出答案。沒錯(cuò),hive sql 中的 row_number = 1。flink sql 中也是提供了一模一樣的功能,xdm,完美的解決這個(gè)問題。
下面開始正式篇章。
3.來一個(gè)實(shí)戰(zhàn)案例
先來一個(gè)實(shí)際案例來看看在具體輸入值的場(chǎng)景下,輸出值應(yīng)該長啥樣。
場(chǎng)景:埋點(diǎn)數(shù)據(jù)上報(bào)的的字段有 id(標(biāo)識(shí)唯一一條日志),timestamp(事件時(shí)間戳),page(時(shí)間發(fā)生的當(dāng)前頁面),param1,param2,paramN...。但是日志上報(bào)時(shí)由于一些機(jī)制導(dǎo)致日志上報(bào)重復(fù),下游算多了,因此需要做一次去重,下游再去消費(fèi)去過重的數(shù)據(jù)。
來一波輸入數(shù)據(jù):
id | timestamp | page | param1 | param2 | paramN |
---|---|---|---|---|---|
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
3 | 2021-11-01 00:03:00 | C | xxx5 | xxx2 | xxxN |
其中第二條和第四條是重復(fù)上報(bào)的數(shù)據(jù),則預(yù)期輸出數(shù)據(jù)如下:
id | timestamp | page | param1 | param2 | paramN |
---|---|---|---|---|---|
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
3 | 2021-11-01 00:03:00 | C | xxx5 | xxx2 | xxxN |
4.基于 Deduplication 的解決方案及原理解析
4.1.sql 寫法
還是上面的案例,我們來看看最終的 sql 應(yīng)該怎么寫:
- select id,
- timestamp,
- page,
- param1,
- param2,
- paramN
- from (
- SELECT
- id,
- timestamp,
- page,
- param1,
- param2,
- paramN
- -- proctime 代表處理時(shí)間即 source 表中的 PROCTIME()
- row_number() over(partition by id order by proctime) as rn
- FROM source_table
- )
- where rn = 1
上面的 sql 應(yīng)該很好理解。其中由于我們并不關(guān)心重復(fù)數(shù)據(jù)上報(bào)的時(shí)間前后,所以此處就直接使用 order by proctime 進(jìn)行處理,按照數(shù)據(jù)來的前后時(shí)間去第一條。
4.2.proctime 下 flink 生成的算子圖及 sql 算子語義
算子圖如下所示:
deduplication
- source 算子:source 通過 keyby 的方式向 deduplication 算子發(fā)數(shù)據(jù)時(shí),其中 keyby 的 key 就是 sql 中的 id
- deduplication 算子:deduplication 算子為每一個(gè) partition key 都維護(hù)了一個(gè) value state 用于去重。每來一條數(shù)據(jù)時(shí)都從當(dāng)前 partition key 的 value state 去獲取 value, 如果不為空,則說明已經(jīng)有數(shù)據(jù)來過了,當(dāng)前這一條數(shù)據(jù)就是重復(fù)數(shù)據(jù),就不往下游算子下發(fā)了, 如果為空,則說明之前沒有數(shù)據(jù)來過,當(dāng)前這一條數(shù)據(jù)就是第一條數(shù)據(jù),則把當(dāng)前的 value state 值設(shè)置為 true,往下游算子下發(fā)數(shù)據(jù)
4.3.proctime 下 deduplication 原理解析
具體的去重算子為 deduplication。我們通過 transformation 可以看到去重算子為下圖所示:
transformation
上述的去重邏輯集中在 org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction 的 processFirstRowOnProcTime,如下圖所示:
ProcTimeDeduplicateKeepFirstRowFunction
5.總結(jié)與展望
源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 deduplication 的奇妙解析之路獲取。
本文主要介紹了 deduplication 的應(yīng)用場(chǎng)景案例以及其運(yùn)行原理,主要包含下面兩部分:
背景及應(yīng)用場(chǎng)景介紹:博主期望你了解到,flink sql 的 deduplication 其實(shí)就是 row_number = 1,所以它可以在去重的同時(shí),還能保留原始字段數(shù)據(jù)
來一個(gè)實(shí)戰(zhàn)案例:博主以一個(gè)日志上報(bào)重復(fù)的場(chǎng)景,來引出下文要介紹的 flink sql deduplication 解決方案
基于 Deduplication 的解決方案及原理解析:博主期望你了解到,deduplication 中,當(dāng) row_number order by proctime(處理時(shí)間)去重的原理就是給每一個(gè) partition key 維護(hù)一個(gè) value state。如果當(dāng)前 value state 不為空,則說明 id 已經(jīng)來過了,當(dāng)前這條數(shù)據(jù)就不用下發(fā)了。如果 value state 為空,則 id 還沒還沒來過,把 value state 標(biāo)記之后,把當(dāng)前數(shù)據(jù)下發(fā)。