自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然之去重不僅僅有 Count Distinct 還有強(qiáng)大的 Deduplication

運(yùn)維 數(shù)據(jù)庫運(yùn)維
熟悉離線計(jì)算的小伙伴可能很快就能給出答案。沒錯(cuò),hive sql 中的 row_number = 1。flink sql 中也是提供了一模一樣的功能,xdm,完美的解決這個(gè)問題。

[[436600]]

1.序篇

源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 deduplication 的奇妙解析之路獲取。

下面即是文章目錄,也對(duì)應(yīng)到了本文的結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  • 背景及應(yīng)用場(chǎng)景介紹:博主期望你了解到,flink sql 的 deduplication 其實(shí)就是 row_number = 1,所以它可以在去重的同時(shí),還能保留原始字段數(shù)據(jù)
  • 來一個(gè)實(shí)戰(zhàn)案例:博主以一個(gè)日志上報(bào)重復(fù)的場(chǎng)景,來引出下文要介紹的 flink sql deduplication 解決方案
  • 基于 Deduplication 的解決方案及原理解析:博主期望你了解到,deduplication 中,當(dāng) row_number order by proctime(處理時(shí)間)去重的原理就是給每一個(gè) partition key 維護(hù)一個(gè) value state。如果當(dāng)前 value state 不為空,則說明 id 已經(jīng)來過了,當(dāng)前這條數(shù)據(jù)就不用下發(fā)了。如果 value state 為空,則 id 還沒還沒來過,把 value state 標(biāo)記之后,把當(dāng)前數(shù)據(jù)下發(fā)。
  • 總結(jié)及展望篇

2.背景及應(yīng)用場(chǎng)景介紹

你是否遇到過一下的場(chǎng)景:

由于上游發(fā)過來的數(shù)據(jù)有重復(fù)或者日志源頭數(shù)據(jù)有重復(fù)上報(bào),導(dǎo)致下游計(jì)算 count,sum 時(shí)算多

想做到去重計(jì)算的同時(shí),原始表的所有字段還能正常保留且下發(fā)

那么你能想到哪些解決方案呢?

熟悉離線計(jì)算的小伙伴可能很快就能給出答案。沒錯(cuò),hive sql 中的 row_number = 1。flink sql 中也是提供了一模一樣的功能,xdm,完美的解決這個(gè)問題。

下面開始正式篇章。

3.來一個(gè)實(shí)戰(zhàn)案例

先來一個(gè)實(shí)際案例來看看在具體輸入值的場(chǎng)景下,輸出值應(yīng)該長啥樣。

場(chǎng)景:埋點(diǎn)數(shù)據(jù)上報(bào)的的字段有 id(標(biāo)識(shí)唯一一條日志),timestamp(事件時(shí)間戳),page(時(shí)間發(fā)生的當(dāng)前頁面),param1,param2,paramN...。但是日志上報(bào)時(shí)由于一些機(jī)制導(dǎo)致日志上報(bào)重復(fù),下游算多了,因此需要做一次去重,下游再去消費(fèi)去過重的數(shù)據(jù)。

來一波輸入數(shù)據(jù):

id timestamp page param1 param2 paramN
1 2021-11-01 00:01:00 A xxx1 xxx2 xxxN
1 2021-11-01 00:01:00 A xxx1 xxx2 xxxN
2 2021-11-01 00:01:00 A xxx3 xxx2 xxxN
2 2021-11-01 00:01:00 A xxx3 xxx2 xxxN
3 2021-11-01 00:03:00 C xxx5 xxx2 xxxN

其中第二條和第四條是重復(fù)上報(bào)的數(shù)據(jù),則預(yù)期輸出數(shù)據(jù)如下:

id timestamp page param1 param2 paramN
1 2021-11-01 00:01:00 A xxx1 xxx2 xxxN
2 2021-11-01 00:01:00 A xxx3 xxx2 xxxN
3 2021-11-01 00:03:00 C xxx5 xxx2 xxxN

4.基于 Deduplication 的解決方案及原理解析

4.1.sql 寫法

還是上面的案例,我們來看看最終的 sql 應(yīng)該怎么寫:

  1. select id, 
  2.        timestamp
  3.        page, 
  4.        param1, 
  5.        param2, 
  6.        paramN 
  7. from ( 
  8.       SELECT 
  9.           id, 
  10.           timestamp
  11.           page, 
  12.           param1, 
  13.           param2, 
  14.           paramN 
  15.           -- proctime 代表處理時(shí)間即 source 表中的 PROCTIME() 
  16.           row_number() over(partition by id order by proctime) as rn 
  17.       FROM source_table 
  18. where rn = 1 

上面的 sql 應(yīng)該很好理解。其中由于我們并不關(guān)心重復(fù)數(shù)據(jù)上報(bào)的時(shí)間前后,所以此處就直接使用 order by proctime 進(jìn)行處理,按照數(shù)據(jù)來的前后時(shí)間去第一條。

4.2.proctime 下 flink 生成的算子圖及 sql 算子語義

算子圖如下所示:

deduplication

  • source 算子:source 通過 keyby 的方式向 deduplication 算子發(fā)數(shù)據(jù)時(shí),其中 keyby 的 key 就是 sql 中的 id
  • deduplication 算子:deduplication 算子為每一個(gè) partition key 都維護(hù)了一個(gè) value state 用于去重。每來一條數(shù)據(jù)時(shí)都從當(dāng)前 partition key 的 value state 去獲取 value, 如果不為空,則說明已經(jīng)有數(shù)據(jù)來過了,當(dāng)前這一條數(shù)據(jù)就是重復(fù)數(shù)據(jù),就不往下游算子下發(fā)了, 如果為空,則說明之前沒有數(shù)據(jù)來過,當(dāng)前這一條數(shù)據(jù)就是第一條數(shù)據(jù),則把當(dāng)前的 value state 值設(shè)置為 true,往下游算子下發(fā)數(shù)據(jù)

4.3.proctime 下 deduplication 原理解析

具體的去重算子為 deduplication。我們通過 transformation 可以看到去重算子為下圖所示:

transformation

上述的去重邏輯集中在 org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction 的 processFirstRowOnProcTime,如下圖所示:

ProcTimeDeduplicateKeepFirstRowFunction

5.總結(jié)與展望

源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 deduplication 的奇妙解析之路獲取。

本文主要介紹了 deduplication 的應(yīng)用場(chǎng)景案例以及其運(yùn)行原理,主要包含下面兩部分:

背景及應(yīng)用場(chǎng)景介紹:博主期望你了解到,flink sql 的 deduplication 其實(shí)就是 row_number = 1,所以它可以在去重的同時(shí),還能保留原始字段數(shù)據(jù)

來一個(gè)實(shí)戰(zhàn)案例:博主以一個(gè)日志上報(bào)重復(fù)的場(chǎng)景,來引出下文要介紹的 flink sql deduplication 解決方案

基于 Deduplication 的解決方案及原理解析:博主期望你了解到,deduplication 中,當(dāng) row_number order by proctime(處理時(shí)間)去重的原理就是給每一個(gè) partition key 維護(hù)一個(gè) value state。如果當(dāng)前 value state 不為空,則說明 id 已經(jīng)來過了,當(dāng)前這條數(shù)據(jù)就不用下發(fā)了。如果 value state 為空,則 id 還沒還沒來過,把 value state 標(biāo)記之后,把當(dāng)前數(shù)據(jù)下發(fā)。 

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-07-12 09:02:18

Flink SQL去重

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-05-15 09:57:59

Flink SQL時(shí)間語義

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-27 09:02:58

SQLHive語義

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2021-12-06 07:15:47

開發(fā)Flink SQL

2021-11-27 09:03:26

flink join數(shù)倉

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2022-06-18 09:26:00

Flink SQLJoin 操作

2011-12-06 08:44:01

程序員
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)