自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然:改了改源碼,實現(xiàn)了個 Batch lookup join(附源碼)

運維 數(shù)據庫運維
本文主要介紹了 flink sql batch lookup join 的使用方式,主要介紹 batch lookup join 的功能是從 flink transformation 出發(fā),確定要 batch lookup join 涉及改動的地方以及其實現(xiàn)思路、原理。

[[438566]]

1.序篇

書接上回,上節(jié)說到了博主發(fā)現(xiàn)由于在 flink sql 中 lookup join 訪問外部維表存在的性能問題。

由此誕生了一個想法,以 Redis 維表為例,Redis 支持 pipeline 批量訪問模式,因此 flink sql lookup join 能不能按照 DataStream 方式一樣,先攢一批數(shù)據 ,然后使用 Redis pipeline 批量訪問外部存儲。博主親切的將這個功能稱為 flink sql batch lookup join,本節(jié)就是講述博主基于 flink 源碼對此功能的實現(xiàn)。

廢話不多說,咱們先直接上本文的目錄和結論,小伙伴可以先看結論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  1. 直接來一個實戰(zhàn)案例:博主以曝光用戶日志流關聯(lián)用戶畫像(年齡、性別)維表為例介紹 batch lookup join 具有的基本能力(怎么配置參數(shù),怎么寫 sql,最終效果咋樣)。
  2. batch lookup join:主要介紹 batch lookup join 的功能是從 flink transformation 出發(fā),確定要 batch lookup join 涉及改動的地方以及其實現(xiàn)思路、原理。也會教給大家一些改動源碼來實現(xiàn)自己想要的一些功能的思路。
  3. 總結及展望:目前的 batch lookup join 實現(xiàn)其實不符合 sql 的原始語義,后續(xù)大家可以按照 sql 標準自己做一些實現(xiàn)

2.來一個實戰(zhàn)案例

2.1.預期的輸入、輸出數(shù)據

來看看在具體場景下,對應輸入值的輸出值應該長啥樣。

需求指標:使用曝光用戶日志流(show_log)關聯(lián)用戶畫像維表(user_profile)關聯(lián)到用戶的畫像(性別,年齡段)數(shù)據。

來一波輸入數(shù)據:

曝光用戶日志流(show_log)數(shù)據(數(shù)據存儲在 kafka 中):

用戶畫像維表(user_profile)數(shù)據(數(shù)據存儲在 redis 中):

 

注意:redis 中的數(shù)據結構存儲是按照 key,value 去存儲的。其中 key 為 user_id,value 為 age,sex 的 json。如下圖所示:

user_profile redis

預期輸出數(shù)據如下:

2.2.batch lookup join sql 代碼

batch lookup join sql 代碼和原來的 lookup join sql 代碼一模一樣。如下 sql。

  1. CREATE TABLE show_log ( 
  2.     log_id BIGINT
  3.     `timestampas cast(CURRENT_TIMESTAMP as timestamp(3)), 
  4.     user_id STRING, 
  5.     proctime AS PROCTIME() 
  6. WITH ( 
  7.   'connector' = 'datagen'
  8.   'rows-per-second' = '10'
  9.   'fields.user_id.length' = '1'
  10.   'fields.log_id.min' = '1'
  11.   'fields.log_id.max' = '10' 
  12. ); 
  13.  
  14. CREATE TABLE user_profile ( 
  15.     user_id STRING, 
  16.     age STRING, 
  17.     sex STRING 
  18.     ) WITH ( 
  19.   'connector' = 'redis'
  20.   'hostname' = '127.0.0.1'
  21.   'port' = '6379'
  22.   'format' = 'json'
  23.   'lookup.cache.max-rows' = '500'
  24.   'lookup.cache.ttl' = '3600'
  25.   'lookup.max-retries' = '1' 
  26. ); 
  27.  
  28. CREATE TABLE sink_table ( 
  29.     log_id BIGINT
  30.     `timestampTIMESTAMP(3), 
  31.     user_id STRING, 
  32.     proctime TIMESTAMP(3), 
  33.     age STRING, 
  34.     sex STRING 
  35. WITH ( 
  36.   'connector' = 'print' 
  37. ); 
  38.  
  39. -- lookup join 的 query 邏輯 
  40. INSERT INTO sink_table 
  41. SELECT  
  42.     s.log_id as log_id 
  43.     , s.`timestampas `timestamp
  44.     , s.user_id as user_id 
  45.     , s.proctime as proctime 
  46.     , u.sex as sex 
  47.     , u.age as age 
  48. FROM show_log AS s 
  49. LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u 
  50. ON s.user_id = u.user_id 

可以看到 lookup join 和 batch lookup join 的代碼是完全相同的,唯一的不同之處在于,batch lookup join 需要設置 table config 參數(shù),如下圖所示:

table config

2.2.batch lookup join 效果

將原生 lookup join 和 batch lookup join 的效果做個對比:

原生的 lookup join:每輸入一條數(shù)據,訪問外部維表獲取到結果輸出一條數(shù)據,如下圖所示。

lookup join

博主實現(xiàn)的 batch lookup join:是每攢夠 30 條數(shù)據或者每 5s(防止數(shù)據量少的情況下,長時間不輸出數(shù)據) 就利用 redis pipeline 能力訪問外部存儲一次。然后批量輸出結果,如下圖所示。大大提高了吞吐。

batch lookup join

3.batch lookup join 實現(xiàn)

3.1.怎么知道應該改哪部分源碼?

博主將通過下面幾個問題去交給大家怎么改源碼去實現(xiàn)自己的功能。

改源碼的有哪些比較好的思路?

結論:首先就是參考類似模塊的實現(xiàn)(不會寫,但是我會抄啊!),比如本文要實現(xiàn) batch lookup join,必然要參考原生的 lookup join 去實現(xiàn)。

大家在改 flink 源碼時,因為 flink 源碼的模塊太多了,項目非常龐大,往往第一步碰到的問題不是怎么去實現(xiàn)這個功能,而是應該在什么地方去改才能實現(xiàn)!

結論:一個 flink 的任務(DataStream\Table\SQL)所有的精華精華精華都集中在 transformation 中!!!只要是涉及到算子實現(xiàn)的東西,小伙伴萌就可以到 transformation 中去尋找。可以將斷點打在每一個 operator 的構造器或者 open 方法中就可以看到其實在哪一步構造和初始化的。這樣就能順著調用棧往前回溯而確定要改哪部分代碼了。

3.2.lookup join 原理

3.2.1.transformation

在實現(xiàn)batch lookup join 之前,當然要從原生的 lookup join 的實現(xiàn)開始入手,看看 flink 官方大大是怎么實現(xiàn)的,具體 transformation 如下圖所示:

transformation

具體的實現(xiàn)邏輯承載在 org.apache.flink.streaming.api.operators.ProcessOperator,org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner 中。

3.2.2.LookupJoinRunner

LookupJoinRunner 中的數(shù)據處理邏輯集中在 processElement 中。

LookupJoinRunner

可以看到上圖,LookupJoinRunner 又內嵌了一層 fetcher 來實現(xiàn)具體的 lookup 邏輯。

  1. 其中 fetcher:就是根據 flink sql lookup join 邏輯生成的 lookup join 的代碼實例;
  2. 其中 collector:collector 的主要功能就是將原始數(shù)據 RowData 和 lookup 到的 RowData 的數(shù)據合并為 JoinedRowData 結果,然后輸出。

接下來詳細看看 fetcher 和 collector。

3.2.3.fetcher

transformation fetcher

把這個 fetcher 的代碼 copy 出來瞅瞅。

fetcher

fetcher 內嵌了 RedisRowDataLookupFunction 來作為最終訪問外部維表的函數(shù)。

3.2.4.RedisRowDataLookupFunction

訪問 redis 獲取到數(shù)據。

RedisRowDataLookupFunction

3.2.5.collector

transformation collector

把這個 collector 的代碼 copy 出來瞅瞅。

collector

3.3.lookup join 算子實現(xiàn)調用鏈

是不是感覺一個 lookup join 的調用鏈賊復雜。

因為 batch lookup join 是完全參考 lookup join 去實現(xiàn)的,所以接下來博主介紹一下整體的調用鏈關系,這就會方便后續(xù)設計 batch lookup join 實現(xiàn)方案的時候去確定具體修改哪一部分代碼。

調用鏈

整體的調用邏輯如下:

  1. ProcessOpeartor 把 原始 RowData 傳給 LookupJoinRunner
  2. LookupJoinRunner 把 原始 RowData 傳給根據 sql 代碼生成的 fetcher
  3. fetcher 中把 原始 RowData 傳給 RedisRowDataLookupFunction 然后去 lookup 維表,lookup 到的結果數(shù)據為 lookup RowData
  4. collector 把 原始 RowData 和 lookup RowData 數(shù)據合并為 JoinedRowData 然后輸出。

3.4.batch lookup join 設計思路

還是一樣,先看看設計思路最終的結論,batch lookup join 算子調用鏈設計如下:

batch lookup 調用鏈

詳細說明一下設計思路:

  1. 如果想做到批量訪問外部存儲(Redis)的數(shù)據??梢酝茢喑?RedisRowDataLookupFunction 的輸入需要是 List<原始 RowData> ,輸出需要是 List。其中輸入數(shù)據輸入到 RedisRowDataLookupFunction 中后,使用 Redis pipeline 去批量訪問外部存儲,然后把結果 List 輸出。
  2. 由 RedisRowDataLookupFunction 的輸出數(shù)據為 List 推斷出 collector 輸入數(shù)據格式必然是 List<原始 RowData>。由于在 lookup join 中 collector 的邏輯就是將 原始 RowData 和 lookup RowData 合并為 JoinedRowData,將結果輸出。因此 collector 這里就是將 List<原始 RowData> 和 List 進行遍歷合并,一條一條的輸出 JoinedRowData。
  3. 同樣 RedisRowDataLookupFunction 的輸入數(shù)據是 fetcher 傳入的,則推斷出 fetcher 輸入數(shù)據格式必然是 List<原始 RowData>。
  4. 由于 fetcher 輸入是 List<原始 RowData>,則 LookupJoinRunner 輸出到 fetcher 的數(shù)據也需要是 List<原始 RowData>。但是 ProcessOpeartor 只能傳給 LookupJoinRunner 原始 RowData,因此可以得出我們的每攢 30 條數(shù)據或者每隔 5s 的邏輯就能確定需要在 LookupJoinRunner 中做了。

思路有了,那么 batch lookup join 涉及到的改動項也就能確認了。

  1. 新建一個 BatchLookupJoinRunner:實現(xiàn)攢批邏輯(每攢 30 條數(shù)據或者每隔 5s),其中攢批的數(shù)據放在 ListState 中,以防止丟失,在 table config 中的 is.dim.batch.mode 設置為 true 時使用此 BatchLookupJoinRunner。
  2. 代碼生成的 fetcher:將原來輸入的 原始 RowData 改為 List<原始 RowData>。
  3. 新建一個 RedisRowDataBatchLookupFunction:實現(xiàn)將輸入的批量數(shù)據 List<原始 RowData> 拿到之后使用 redis pipeline 批量訪問外部存儲,獲取到 List 結果數(shù)據給 collector。
  4. 代碼生成的 collector:將原來 lookup join 中的輸入 原始 RowData,lookup RowData 改為 List<原始 RowData>,List,添加遍歷循環(huán) List<原始 RowData>,List,按順序合并 List 中的每一項 原始 RowData,lookup RowData 輸出 JoinedRowData 的邏輯。

3.5.batch lookup join 的最終效果

3.5.1.transformation

可以看到 is.dim.batch.mode 設置為 true 時,transformation 如下。transformation 中的重點處理邏輯就是 BatchLookupJoinRunner

batch transformation

3.5.2.BatchLookupJoinRunner

BatchLookupJoinRunner

3.5.3.fetchersql 生成的 fetcher 代碼如下:

fetcher

3.5.4.RedisRowDataBatchLookupFunction

RedisRowDataBatchLookupFunction 拿到輸入的 List 數(shù)據,調用 Redis pipeline 批量訪問外部存儲。

RedisRowDataBatchLookupFunction

3.5.5.collectorsql 生成的 collector 代碼如下:

collector

3.6.待改進項

目前上述方案實現(xiàn)的不足之處如下:

  1. batch 的執(zhí)行邏輯與 sql 原始的語義不一致。因為從 sql 上看是完全沒有這種 batch lookup join 的語義的。
  2. 其中每 5s博主簡單實現(xiàn)了下,完全基于數(shù)據驅動的每 5s 攢一批,不是基于 onTimer 驅動的??赡軙霈F(xiàn)來了一條數(shù)據之后,5 min 內都沒有來數(shù)據,則數(shù)據就不輸出了。
  3. 沒有考慮實現(xiàn)代碼的抽象,以實現(xiàn)功能為主,所以很多基于源碼的改動都是直接 copy 出來了另一個方法實現(xiàn)。

4.xdm 怎么使用這個功能?

git clone https://github.com/yangyichao-mango/flink/tree/release-1.13.2

在 clone 下來的項目的中,重新把下面兩個模塊 install (mvn clean install) 到本地倉庫中。

然后在你的項目中引用兩個 blink 包即可使用。使用方法就是只需要把 table config 的 is.dim.batch.mode 設置為 true,代碼還按照 lookup join 的方式寫即可。

5.總結與展望

本文主要介紹了 flink sql batch lookup join 的使用方式,并介紹了其實現(xiàn)思路以及效果,主要內容如下:

直接來一個實戰(zhàn)案例:博主以曝光用戶日志流關聯(lián)用戶畫像(年齡、性別)維表為例介紹 batch lookup join 具有的基本能力(怎么配置參數(shù),怎么寫 sql,最終效果咋樣)。

batch lookup join:主要介紹 batch lookup join 的功能是從 flink transformation 出發(fā),確定要 batch lookup join 涉及改動的地方以及其實現(xiàn)思路、原理。也會教給大家一些改動源碼來實現(xiàn)自己想要的一些功能的思路。

總結及展望:目前的 batch lookup join 實現(xiàn)其實不符合 sql 的原始語義,后續(xù)大家可以按照 sql 標準自己做一些實現(xiàn)

 

責任編輯:姜華 來源: 大數(shù)據羊說
相關推薦

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2021-11-30 23:30:45

sql 性能異步

2021-11-28 11:36:08

SQL Flink Join

2021-12-13 07:57:47

Flink SQL Flink Hive Udf

2021-11-27 09:03:26

flink join數(shù)倉

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-15 09:57:59

Flink SQL時間語義

2022-05-27 09:02:58

SQLHive語義

2022-06-29 09:01:38

FlinkSQL時間屬性

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-05-12 09:02:47

Flink SQL數(shù)據類型

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-12-06 07:15:47

開發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據流數(shù)據
點贊
收藏

51CTO技術棧公眾號