自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然之流 Join 很難嘛???(下)

運(yùn)維 數(shù)據(jù)庫運(yùn)維
本文主要介紹了 flink sql interval 是怎么避免出現(xiàn) flink regular join 存在的 retract 問題的,并通過解析其實現(xiàn)說明了運(yùn)行原理。

[[437064]]

1.序篇

本節(jié)是 flink sql 流 join 系列的下篇,上篇的鏈接如下:

flink sql 知其所以然之:流 join 很難嘛???(上)

廢話不多說,咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  • 背景及應(yīng)用場景介紹:博主期望你能了解到,flink sql 提供的豐富的 join 方式(總結(jié) 6 種:regular join,維表 join,快照 join,interval join,array 拍平,table function)對我們滿足需求提供了強(qiáng)大的后盾, 這 6 種 join 中涉及到流與流的 join 最常用的是 regular join 以及 interval join,本節(jié)主要介紹 interval join
  • 來一個實戰(zhàn)案例:博主以上節(jié)說到的曝光日志流點擊日志流為案例展開,主要是想告訴小伙伴 flink sql left join 數(shù)據(jù)不會互相等待,存在 retract 問題,會導(dǎo)致寫入 kafka 的數(shù)據(jù)量變大, 然后轉(zhuǎn)變思路為使用 flink sql interval join 的方式可以使得數(shù)據(jù)互相等待一段時間進(jìn)行 join,這種方式不會存在 retract 問題
  • flink sql interval join 的解決方案以及原理的介紹:主要介紹 interval join 的在上述實戰(zhàn)案例的運(yùn)行結(jié)果及分析源碼機(jī)制,博主期望你能了解到,interval join 的執(zhí)行機(jī)制是會在你設(shè)置的 interval 區(qū)間之內(nèi)互相等待一段時間,一旦時間推進(jìn)(事件時間由 watermark 推進(jìn))到區(qū)間之外(即當(dāng)前這條數(shù)據(jù)再也不可能被另一條流的數(shù)據(jù) join 到時),outer join 會輸出沒有 join 到的數(shù)據(jù),inner join 會從 state 中刪除這條數(shù)據(jù)
  • 總結(jié)及展望

2.背景及應(yīng)用場景介紹

書接上文,上文介紹了曝光流在關(guān)聯(lián)點擊流時,使用 flink sql regular join 存在的 retract 問題。

本文介紹怎么使用 flink sql interval join 解決這些問題。

3.來一個實戰(zhàn)案例

flink sql 知其所以然之流 join 很難嘛???(上)

看看上節(jié)的實際案例,來看看在具體輸入值的場景下,輸出值應(yīng)該長啥樣。

場景:即常見的曝光日志流(show_log)通過 log_id 關(guān)聯(lián)點擊日志流(click_log),將數(shù)據(jù)的關(guān)聯(lián)結(jié)果進(jìn)行下發(fā)。

來一波輸入數(shù)據(jù):

曝光數(shù)據(jù):

log_id timestamp show_params
1 2021-11-01 00:01:03 show_params
2 2021-11-01 00:03:00 show_params2
3 2021-11-01 00:05:00 show_params3

點擊數(shù)據(jù):

log_id timestamp click_params
1 2021-11-01 00:01:53 click_params
2 2021-11-01 00:02:01 click_params2

預(yù)期輸出數(shù)據(jù)如下:

log_id timestamp show_params click_params
1 2021-11-01 00:01:00 show_params click_params
2 2021-11-01 00:01:00 show_params2 click_params2
3 2021-11-01 00:02:00 show_params3 null

上節(jié)的 flink sql regular join 解決方案如下:

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log 
  8. LEFT JOIN click_log ON show_log.log_id = click_log.log_id; 

上節(jié)說道,flink sql left join 在流數(shù)據(jù)到達(dá)時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在后續(xù)右表流數(shù)據(jù)代打時,會將(show_log,null)撤回,發(fā)送(show_log,click_log)。這就是為什么產(chǎn)生了 retract 流,從而導(dǎo)致重復(fù)寫入 kafka。

對此,我們也是提出了對應(yīng)的解決思路,既然 left join 中左流不會等待右流,那么能不能讓左流強(qiáng)行等待右流一段時間,實在等不到在數(shù)據(jù)關(guān)聯(lián)不到的數(shù)據(jù)即可。

當(dāng)當(dāng)當(dāng)!!!

本文的 flink sql interval join 登場,它就能等。

4.flink sql interval join

4.1.interval join 定義

大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已經(jīng)使用過了),后續(xù)會詳細(xì)介紹原理。

interval join 就是用一個流的數(shù)據(jù)去關(guān)聯(lián)另一個流的一段時間區(qū)間內(nèi)的數(shù)據(jù)。關(guān)聯(lián)到就下發(fā)關(guān)聯(lián)到的數(shù)據(jù),關(guān)聯(lián)不到且在超時后就根據(jù)是否是 outer join(left join,right join,full join)下發(fā)沒關(guān)聯(lián)到的數(shù)據(jù)。

interval join

4.2.案例解決方案

來看看上述案例的 flink sql interval join sql 怎么寫:

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id 
  8. AND show_log.row_time  
  9.     BETWEEN click_log.row_time - INTERVAL '10' MINUTE  
  10.     AND click_log.row_time + INTERVAL '10' MINUTE

這里設(shè)置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE代表 show_log 表中的數(shù)據(jù)會和 click_log 表中的 row_time 在前后 10 分鐘之內(nèi)的數(shù)據(jù)進(jìn)行關(guān)聯(lián)。

運(yùn)行結(jié)果如下:

  1. +[1 | 2021-11-01 00:01:03 | show_params | click_params] 
  2.  
  3. +[2 | 2021-11-01 00:03:00 | show_params | click_params] 
  4.  
  5. +[3 | 2021-11-01 00:05:00 | show_params | null

如上就是我們期望的正確結(jié)果了。

flink web ui 算子圖如下:

flink web ui

那么此時你可能有一個問題,結(jié)果中的前兩條數(shù)據(jù) join 到了輸出我是理解的,那當(dāng) show_log join 不到 click_log 時為啥也輸出了?原理是啥?

博主帶你們來定位到具體的實現(xiàn)源碼。先看一下 transformations。

transformations

可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay。

其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。RowTimeIntervalJoin 重要方法如下圖所示。

TimeIntervalJoin

下面詳細(xì)給大家解釋一下。

4.3.TimeIntervalJoin 簡版說明

join 時,左流和右流會在 interval 時間之內(nèi)相互等待,如果等到了則輸出數(shù)據(jù)[+(show_log,click_log)],如果等不到,并且另一條流的時間已經(jīng)推進(jìn)到當(dāng)前這條數(shù)據(jù)在也不可能 join 到另一條流的數(shù)據(jù)時,則直接輸出[+(show_log,null)],[+(null,click_log)]。

舉個例子,show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE, 當(dāng) click_log 的時間推進(jìn)到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數(shù)據(jù), 那這條 show_log 必然不可能和 click_log 中的數(shù)據(jù) join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數(shù)據(jù)以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]

Notes:

如果你設(shè)置了 allowLateness,join 不到的數(shù)據(jù)的輸出和 state 的清理會多保留 allowLateness 時間

4.4.TimeIntervalJoin 詳細(xì)實現(xiàn)說明

以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執(zhí)行):

第一步,首先如果 join xxx on 中的條件是等式則代表 join 是在相同 key 下進(jìn)行的(上述案例中 join 的 key 即 show_log.log_id,click_log.log_id),相同 key 的數(shù)據(jù)會被發(fā)送到一個并發(fā)中進(jìn)行處理。如果 join xxx on 中的條件是不等式,則兩個流的 source 算子向 join 算子下發(fā)數(shù)據(jù)是按照 global 的 partition 策略進(jìn)行下發(fā)的,并且 join 算子并發(fā)會被設(shè)置為 1,所有的數(shù)據(jù)會被發(fā)送到這一個并發(fā)中處理。

第二步,相同 key 下,一條 show_log 的數(shù)據(jù)先到達(dá),首先會計算出下面要使用的最重要的三類時間戳:

  • 根據(jù) show_log 的時間戳(l_time)計算出能關(guān)聯(lián)到的右流的時間區(qū)間下限(r_lower)、上限(r_upper)
  • 根據(jù) show_log 目前的 watermark 計算出目前右流的數(shù)據(jù)能夠過期做過期處理的時間的最小值(r_expire)
  • 獲取左流的 l_watermark,右流的 r_watermark,這兩個時間戳在事件語義的任務(wù)中都是 watermark

第三步,遍歷所有同 key 下的 click_log 來做 join

  • 對于遍歷的每一條 click_log,走如下步驟
  • 經(jīng)過判斷,如果 on 中的條件為 true,則和 click_log 關(guān)聯(lián),輸出[+(show_log,click_log)]數(shù)據(jù);如果 on 中的條件為 false,則啥也不干
  • 接著判斷當(dāng)前這條 click_log 的數(shù)據(jù)時間(r_time)是否小于右流的數(shù)據(jù)過期時間的最小值(r_expire)(即判斷這條 click_log 是否永遠(yuǎn)不會再被 show_log join 到了)。如果小于,并且當(dāng)前 click_log 這一側(cè)是 outer join,則不用等直接輸出[+(null,click_log)]),從狀態(tài)刪除這條 click_log;如果 click_log 這一側(cè)不是 outer join,則直接從狀態(tài)里刪除這條 click_log。

第四步,判斷右流的時間戳(r_watermark)是否小于能關(guān)聯(lián)到的右流的時間區(qū)間上限(r_upper):

  • 如果是,則說明這條 show_log 還有可能被 click_log join 到,則 show_log 放到 state 中,并注冊后面用于狀態(tài)清除的 timer。
  • 如果否,則說明關(guān)聯(lián)不到了,則輸出[+(show_log,null)]

第五步,timer 觸發(fā)時:

  • timer 觸發(fā)時,根據(jù)當(dāng)前 l_watermark,r_watermark 以及 state 中存儲的 show_log,click_log 的 l_time,r_time 判斷是否再也不會被對方 join 到,如果是,則根據(jù)是否為 outer join 對應(yīng)輸出[+(show_log,null)],[+(null,click_log)],并從狀態(tài)中刪除對應(yīng)的 show_log,click_log。

上面只是左流 show_log 數(shù)據(jù)到達(dá)時的執(zhí)行流程(即 ProcessElement1),當(dāng)右流 click_log 到達(dá)時也是完全類似的執(zhí)行流程(即 ProcessElement2)。

4.5.使用注意事項

小伙伴萌在使用 interval join 需要注意的兩點事項:

interval join 的時間區(qū)間取決于日志的真實情況:設(shè)置大了容易造成任務(wù)的 state 太大,并且時效性也會變差。設(shè)置小了,join 不到,下發(fā)的數(shù)據(jù)在后續(xù)使用時,數(shù)據(jù)質(zhì)量會存在問題。所以小伙伴萌在使用時建議先使用離線數(shù)據(jù)做一遍兩條流的時間戳 diff 比較,來確定真實情況下的時間戳 diff 的分布是怎樣的。舉例:你通過離線數(shù)據(jù) join 并做時間戳 diff 后發(fā)現(xiàn) 99% 的數(shù)據(jù)都能在時間戳相差 5min 以內(nèi) join 到,那么你就有依據(jù)去設(shè)置 interval 時間差為 5min。

interval join 中的時間區(qū)間條件即支持事件時間,也支持處理時間。事件時間由 watermark 推進(jìn)。

5.總結(jié)與展望

源碼公眾號后臺回復(fù)1.13.2 sql interval join獲取。

本文主要介紹了 flink sql interval 是怎么避免出現(xiàn) flink regular join 存在的 retract 問題的,并通過解析其實現(xiàn)說明了運(yùn)行原理,博主期望你讀完本文之后能了解到:

背景及應(yīng)用場景介紹:博主期望你能了解到,flink sql 提供的豐富的 join 方式(總結(jié) 6 種:regular join,維表 join,快照 join,interval join,array 拍平,table function)對我們滿足需求提供了強(qiáng)大的后盾, 這 6 種 join 中涉及到流與流的 join 最常用的是 regular join 以及 interval join,本節(jié)主要介紹 interval join

來一個實戰(zhàn)案例:博主以上節(jié)說到的曝光日志流點擊日志流為案例展開,主要是想告訴小伙伴 flink sql left join 數(shù)據(jù)不會互相等待,存在 retract 問題,會導(dǎo)致寫入 kafka 的數(shù)據(jù)量變大, 然后轉(zhuǎn)變思路為使用 flink sql interval join 的方式可以使得數(shù)據(jù)互相等待一段時間進(jìn)行 join,這種方式不會存在 retract 問題

flink sql interval join 的解決方案以及原理的介紹:主要介紹 interval join 的在上述實戰(zhàn)案例的運(yùn)行結(jié)果及分析源碼機(jī)制,博主期望你能了解到,interval join 的執(zhí)行機(jī)制是會在你設(shè)置的 interval 區(qū)間之內(nèi)互相等待一段時間,一旦時間推進(jìn)(事件時間由 watermark 推進(jìn))到區(qū)間之外(即當(dāng)前這條數(shù)據(jù)再也不可能被另一條流的數(shù)據(jù) join 到時),outer join 會輸出沒有 join 到的數(shù)據(jù),inner join 會從 state 中刪除這條數(shù)據(jù)

 

總結(jié)及展望

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2021-11-27 09:03:26

flink join數(shù)倉

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-06-06 09:27:23

FlinkSQLGroup

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-15 09:57:59

Flink SQL時間語義

2022-05-27 09:02:58

SQLHive語義

2022-06-29 09:01:38

FlinkSQL時間屬性

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-11-30 23:30:45

sql 性能異步

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-12-06 07:15:47

開發(fā)Flink SQL

2021-12-05 08:28:39

Flink SQLbatch lookuSQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點贊
收藏

51CTO技術(shù)棧公眾號