自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Flink 漫談系列(07) - 持續(xù)查詢(Continuous Queries)

開發(fā) 開發(fā)工具
我們知道在流計(jì)算場(chǎng)景中,數(shù)據(jù)是源源不斷的流入的,數(shù)據(jù)流永遠(yuǎn)不會(huì)結(jié)束,那么計(jì)算就永遠(yuǎn)不會(huì)結(jié)束,如果計(jì)算永遠(yuǎn)不會(huì)結(jié)束的話,那么計(jì)算結(jié)果何時(shí)輸出呢?

一、實(shí)際問題

我們知道在流計(jì)算場(chǎng)景中,數(shù)據(jù)是源源不斷的流入的,數(shù)據(jù)流永遠(yuǎn)不會(huì)結(jié)束,那么計(jì)算就永遠(yuǎn)不會(huì)結(jié)束,如果計(jì)算永遠(yuǎn)不會(huì)結(jié)束的話,那么計(jì)算結(jié)果何時(shí)輸出呢?本篇將介紹Apache Flink利用持續(xù)查詢來對(duì)流計(jì)算結(jié)果進(jìn)行持續(xù)輸出的實(shí)現(xiàn)原理。

二、數(shù)據(jù)管理

在介紹持續(xù)查詢之前,我們先看看Apache Flink對(duì)數(shù)據(jù)的管理和傳統(tǒng)數(shù)據(jù)庫對(duì)數(shù)據(jù)管理的區(qū)別,以MySQL為例,如下圖:

MySQL

如上圖所示傳統(tǒng)數(shù)據(jù)庫是數(shù)據(jù)存儲(chǔ)和查詢計(jì)算于一體的架構(gòu)管理方式,這個(gè)很明顯,oracle數(shù)據(jù)庫不可能管理MySQL數(shù)據(jù)庫數(shù)據(jù),反之亦然,每種數(shù)據(jù)庫廠商都有自己的數(shù)據(jù)庫管理和存儲(chǔ)的方式,各自有特有的實(shí)現(xiàn)。在這點(diǎn)上Apache Flink海納百川(也有corner case),將data store 進(jìn)行抽象,分為source(讀) 和 sink(寫)兩種類型接口,然后結(jié)合不同存儲(chǔ)的特點(diǎn)提供常用數(shù)據(jù)存儲(chǔ)的內(nèi)置實(shí)現(xiàn),當(dāng)然也支持用戶自定義的實(shí)現(xiàn)。

那么在宏觀設(shè)計(jì)上Apache Flink與傳統(tǒng)數(shù)據(jù)庫一樣都可以對(duì)數(shù)據(jù)表進(jìn)行SQL查詢,并將產(chǎn)出的結(jié)果寫入到數(shù)據(jù)存儲(chǔ)里面,那么Apache Flink上面的SQL查詢和傳統(tǒng)數(shù)據(jù)庫查詢的區(qū)別是什么呢?Apache Flink又是如何做到求同(語義相同)存異(實(shí)現(xiàn)機(jī)制不同),***支持ANSI-SQL的呢?

三、靜態(tài)查詢

傳統(tǒng)數(shù)據(jù)庫中對(duì)表(比如 flink_tab,有user和clicks兩列,user主鍵)的一個(gè)查詢SQL(select * from flink_tab)在數(shù)據(jù)量允許的情況下,會(huì)立刻返回表中的所有數(shù)據(jù),在查詢結(jié)果顯示之后,對(duì)數(shù)據(jù)庫表flink_tab的DML操作將與執(zhí)行的SQL無關(guān)了。也就是說傳統(tǒng)數(shù)據(jù)庫下面對(duì)表的查詢是靜態(tài)查詢,將計(jì)算的最終查詢的結(jié)果立即輸出,如下:

  1. select * from flink_tab; 
  2. +----+------+--------+ 
  3. | id | user | clicks | 
  4. +----+------+--------+ 
  5. | 1 | Mary | 1 | 
  6. +----+------+--------+ 
  7. 1 row in set (0.00 sec) 

當(dāng)我執(zhí)行完上面的查詢,查詢結(jié)果立即返回,上面情況告訴我們表 flink_tab里面只有一條記錄,id=1,user=Mary,clicks=1; 這樣傳統(tǒng)數(shù)據(jù)庫表的一條查詢語句就完全結(jié)束了。傳統(tǒng)數(shù)據(jù)庫表在查詢那一刻我們這里叫Static table,是指在查詢的那一刻數(shù)據(jù)庫表的內(nèi)容不再變化了,查詢進(jìn)行一次計(jì)算完成之后表的變化也與本次查詢無關(guān)了,我們將在Static Table 上面的查詢叫做靜態(tài)查詢。

四、持續(xù)查詢

什么是連續(xù)查詢呢?連續(xù)查詢發(fā)生在流計(jì)算上面,在 《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》 中我們提到過Dynamic Table,連續(xù)查詢是作用在Dynamic table上面的,永遠(yuǎn)不會(huì)結(jié)束的,隨著表內(nèi)容的變化計(jì)算在不斷的進(jìn)行著...

五、靜態(tài)/持續(xù)查詢特點(diǎn)

靜態(tài)查詢和持續(xù)查詢的特點(diǎn)就是《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》中所提到的批與流的計(jì)算特點(diǎn),批一次查詢返回一個(gè)計(jì)算結(jié)果就結(jié)束查詢,流一次查詢不斷修正計(jì)算結(jié)果,查詢永遠(yuǎn)不結(jié)束,表格示意如下:

六、靜態(tài)/持續(xù)查詢關(guān)系

接下來我們以flink_tab表實(shí)際操作為例,體驗(yàn)一下靜態(tài)查詢與持續(xù)查詢的關(guān)系。假如我們對(duì)flink_tab表再進(jìn)行一條增加和一次更新操作,如下:

  1. MySQL> insert into flink_tab(user, clicks) values ('Bob', 1); 
  2. Query OK, 1 row affected (0.08 sec) 
  3.  
  4. MySQL> update flink_tab set clicks=2 where user='Mary'
  5. Query OK, 1 row affected (0.06 sec) 

這時(shí)候我們?cè)龠M(jìn)行查詢 select * from flink_tab ,結(jié)果如下:

  1. MySQL> select * from flink_tab; 
  2. +----+------+--------+ 
  3. | id | user | clicks | 
  4. +----+------+--------+ 
  5. | 1 | Mary | 2 | 
  6. | 2 | Bob | 1 | 
  7. +----+------+--------+ 
  8. 2 rows in set (0.00 sec) 

那么我們看見,相同的查詢SQL(select * from flink_tab),計(jì)算結(jié)果完全 不 一樣了。這說明相同的sql語句,在不同的時(shí)刻執(zhí)行計(jì)算,得到的結(jié)果可能不一樣(有點(diǎn)像廢話),就如下圖一樣:

假設(shè)不斷的有人在對(duì)表flink_tab做操作,同時(shí)有一個(gè)人間歇性的發(fā)起對(duì)表數(shù)據(jù)的查詢,上圖我們只是在三個(gè)時(shí)間點(diǎn)進(jìn)行了3次查詢。并且在這段時(shí)間內(nèi)數(shù)據(jù)表的內(nèi)容也在變化。引起上面變化的DML如下:

  1. MySQL> insert into flink_tab(user, clicks) values ('Llz', 1); 
  2. Query OK, 1 row affected (0.08 sec) 
  3.  
  4. MySQL> update flink_tab set clicks=2 where user='Bob'
  5. Query OK, 1 row affected (0.01 sec) 
  6. Rows matched: 1 Changed: 1 Warnings: 0 
  7.  
  8. MySQL> update flink_tab set clicks=3 where user='Mary'
  9. Query OK, 1 row affected (0.05 sec) 
  10. Rows matched: 1 Changed: 1 Warnings: 0 

到現(xiàn)在我們不難想象,上面圖內(nèi)容的核心要點(diǎn)如下:

  • 時(shí)間
  • 表數(shù)據(jù)變化
  • 觸發(fā)計(jì)算
  • 計(jì)算結(jié)果更新

接下來我們利用傳統(tǒng)數(shù)據(jù)庫現(xiàn)有的機(jī)制模擬一下持續(xù)查詢...

1. 無PK的 Append only 場(chǎng)景

接下來我們把上面隱式存在的時(shí)間屬性timestamp作為表flink_tab_ts(timestamp,user,clicks三列,無主鍵)的一列,再寫一個(gè) 觸發(fā)器(Trigger) 示例觀察一下:

  1. // INSERT 的時(shí)候查詢一下數(shù)據(jù)flink_tab_ts,將結(jié)果寫到trigger.sql中 
  2. DELIMITER ;; 
  3. create trigger flink_tab_ts_trigger_insert after insert 
  4. on flink_tab_ts for each row 
  5. begin 
  6. select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql'; 
  7. end ;; 
  8. DELIMITER ; 

上面的trigger要將查詢結(jié)果寫入本地文件,默認(rèn)MySQL是不允許寫入的,我們查看一下:

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | NULL | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

上面secure_file_priv屬性為NULL,說明MySQL不允許寫入file,我需要修改my.cnf在添加secure_file_priv=''打開寫文件限制;

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

下面我們對(duì)flink_tab_ts進(jìn)行INSERT操作:

對(duì)flink_tab_ts進(jìn)行INSERT操作

我們?cè)賮砜纯?次trigger 查詢計(jì)算的結(jié)果:

大家到這里發(fā)現(xiàn)我寫了Trigger的存儲(chǔ)過程之后,每次在數(shù)據(jù)表flink_tab_ts進(jìn)行DML操作的時(shí)候,Trigger就會(huì)觸發(fā)一次查詢計(jì)算,產(chǎn)出一份新的計(jì)算結(jié)果,觀察上面的查詢結(jié)果發(fā)現(xiàn),結(jié)果表不停的增加(Append only)。

2. 有PK的Update場(chǎng)景

我們利用flink_tab_ts的6次DML操作和自定義的觸發(fā)器TriggerL來介紹了什么是持續(xù)查詢,做處理靜態(tài)查詢與持續(xù)查詢的關(guān)系。那么上面的演示目的是為了說明持續(xù)查詢,所有操作都是insert,沒有基于主鍵的更新,也就是說Trigger產(chǎn)生的結(jié)果都是append only的,那么大家想一想,如果我們操作flink_tab這張表,按主鍵user進(jìn)行插入和更新操作,同樣利用Trigger機(jī)制來進(jìn)行持續(xù)查詢,結(jié)果是怎樣的的呢? 初始化表,trigger:

  1. drop table flink_tab; 
  2. create table flink_tab( 
  3. user VARCHAR(100) NOT NULL, 
  4. clicks INT NOT NULL, 
  5. PRIMARY KEY (user) 
  6. ); 
  7.  
  8. DELIMITER ;; 
  9. create trigger flink_tab_trigger_insert after insert 
  10. on flink_tab for each row 
  11. begin 
  12. select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  13. end ;; 
  14. DELIMITER ; 
  15.  
  16. DELIMITER ;; 
  17. create trigger flink_tab_trigger_ after update 
  18. on flink_tab for each row 
  19. begin 
  20. select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  21. end ;; 
  22. DELIMITER ; 

同樣我做如下6次DML操作,Trigger 6次查詢計(jì)算:

在來看看這次的結(jié)果與append only 有什么不同?

我想大家早就知道這結(jié)果了,數(shù)據(jù)庫里面定義的PK所有變化會(huì)按PK更新,那么觸發(fā)的6次計(jì)算中也會(huì)得到更新后的結(jié)果,這應(yīng)該不難理解,查詢結(jié)果也是不斷更新的(Update)!

3. 關(guān)系定義

上面Append Only 和 Update兩種場(chǎng)景在MySQL上面都可以利用Trigger機(jī)制模擬 持續(xù)查詢的概念,也就是說數(shù)據(jù)表中每次數(shù)據(jù)變化,我們都觸發(fā)一次相同的查詢計(jì)算(只是計(jì)算時(shí)候數(shù)據(jù)的集合發(fā)生了變化),因?yàn)閿?shù)據(jù)表不斷的變化,這個(gè)表就可以看做是一個(gè)動(dòng)態(tài)表Dynamic Table,而查詢SQL(select * from flink_tab_ts) 被觸發(fā)器Trigger在滿足某種條件后不停的觸發(fā)計(jì)算,進(jìn)而也不斷地產(chǎn)生新的結(jié)果。這種作用在Dynamic Table,并且有某種機(jī)制(Trigger)不斷的觸發(fā)計(jì)算的查詢我們就稱之為 持續(xù)查詢。

那么到底靜態(tài)查詢和動(dòng)態(tài)查詢的關(guān)系是什么呢?在語義上 持續(xù)查詢 中的每一次查詢計(jì)算的觸發(fā)都是一次靜態(tài)查詢(相對(duì)于當(dāng)時(shí)查詢的時(shí)間點(diǎn)), 在實(shí)現(xiàn)上 Apache Flink會(huì)利用上一次查詢結(jié)果+當(dāng)前記錄 以增量的方式完成查詢計(jì)算。

特別說明: 上面我們利用 數(shù)據(jù)變化+Trigger方式描述了持續(xù)查詢的概念,這里有必要特別強(qiáng)調(diào)一下的是數(shù)據(jù)庫中trigger機(jī)制觸發(fā)的查詢,每次都是一個(gè)全量查詢,這與Apache Flink上面流計(jì)算的持續(xù)查詢概念相同,但實(shí)現(xiàn)機(jī)制完全不同,Apache Flink上面的持續(xù)查詢內(nèi)部實(shí)現(xiàn)是增量處理的,隨著時(shí)間的推移,每條數(shù)據(jù)的到來實(shí)時(shí)處理當(dāng)前的那一條記錄,不會(huì)處理曾經(jīng)來過的歷史記錄!

七、Apache Flink 如何做到持續(xù)查詢

1. 動(dòng)態(tài)表上面持續(xù)查詢

在 《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》 中我們了解到流和表可以相互轉(zhuǎn)換,在Apache Flink流計(jì)算中攜帶流事件的Schema,經(jīng)過算子計(jì)算之后再產(chǎn)生具有新的Schema的事件,流入下游節(jié)點(diǎn),在產(chǎn)生新的Schema的Event和不斷流轉(zhuǎn)的過程就是持續(xù)查詢作用的結(jié)果,如下圖:

2. 增量計(jì)算

我們進(jìn)行查詢大多數(shù)場(chǎng)景是進(jìn)行數(shù)據(jù)聚合,比如查詢SQL中利用count,sum等aggregate function進(jìn)行聚合統(tǒng)計(jì),那么流上的數(shù)據(jù)源源不斷的流入,我們既不能等所有事件流入結(jié)束(永遠(yuǎn)不會(huì)結(jié)束)再計(jì)算,也不會(huì)每次來一條事件就像傳統(tǒng)數(shù)據(jù)庫一樣將全部事件集合重新整體計(jì)算一次,在持續(xù)查詢的計(jì)算過程中,Apache Flink采用增量計(jì)算的方式,也就是每次計(jì)算都會(huì)將計(jì)算結(jié)果存儲(chǔ)到state中,下一條事件到來的時(shí)候利用上次計(jì)算的結(jié)果和當(dāng)前的事件進(jìn)行聚合計(jì)算,比如 有一個(gè)訂單表,如下:

一個(gè)簡(jiǎn)單的計(jì)數(shù)和求和查詢SQL:

  1. // 求訂單總數(shù)和所有訂單的總金額 
  2. select count(id) as cnt,sum(amount)as sumAmount from order_tab; 

這樣一個(gè)簡(jiǎn)單的持續(xù)查詢計(jì)算,Apache Flink內(nèi)部是如何處理的呢?如下圖:

如上圖,Apache Flink中每來一條事件,就進(jìn)行一次計(jì)算,并且每次計(jì)算后結(jié)果會(huì)存儲(chǔ)到state中,供下一條事件到來時(shí)候進(jìn)行計(jì)算,即:

  1. result(n) = calculation(result(n-1), n)。 

3. 無PK的Append Only 場(chǎng)景

在實(shí)際的業(yè)務(wù)場(chǎng)景中,我們只需要進(jìn)行簡(jiǎn)單的數(shù)據(jù)統(tǒng)計(jì),然后就將統(tǒng)計(jì)結(jié)果寫入到業(yè)務(wù)的數(shù)據(jù)存儲(chǔ)系統(tǒng)里面,比如上面統(tǒng)計(jì)訂單數(shù)量和總金額的場(chǎng)景,訂單表本身是一個(gè)append only的數(shù)據(jù)源(假設(shè)沒有更新,截止到2018.5.14日,Apache Flink內(nèi)部支持的數(shù)據(jù)源都是append only的),在持續(xù)查詢過程中經(jīng)過count(id),sum(amount)統(tǒng)計(jì)計(jì)算之后產(chǎn)生的動(dòng)態(tài)表也是append only的,種場(chǎng)景Apache Flink內(nèi)部只需要進(jìn)行aggregate function的聚合統(tǒng)計(jì)計(jì)算就可以,如下:

4. 有PK的Update 場(chǎng)景

現(xiàn)在我們將上面的訂單場(chǎng)景稍微變化一下,在數(shù)據(jù)表上面我們將金額字段amount,變?yōu)榈貐^(qū)字段region,數(shù)據(jù)如下:

查詢統(tǒng)計(jì)的變?yōu)?,在?jì)算具有相同訂單數(shù)量的地區(qū)數(shù)量;查詢SQL如下:

  1. CREATE TABLE order_tab( 
  2. id BIGINT, 
  3. region VARCHAR 
  4.  
  5. CREATE TABLE region_count_sink( 
  6. order_cnt BIGINT, 
  7. region_cnt BIGINT, 
  8. PRIMARY KEY(order_cnt) -- 主鍵 
  9.  
  10. -- 按地區(qū)分組計(jì)算每個(gè)地區(qū)的訂單數(shù)量 
  11. CREATE VIEW order_count_view AS 
  12. SELECT 
  13. region, count(id) AS order_cnt 
  14. FROM order_tab 
  15. GROUP BY region; 
  16.  
  17. -- 按訂單數(shù)量分組統(tǒng)計(jì)具有相同訂單數(shù)量的地區(qū)數(shù)量 
  18. INSERT INTO region_count_sink 
  19. SELECT 
  20. order_cnt, 
  21. count(region) as region_cnt 
  22. FROM order_count_view 
  23. GROUP BY order_cnt; 

上面查詢SQL的代碼結(jié)構(gòu)如下(這個(gè)圖示在Alibaba 對(duì) Apache Flink 的增強(qiáng)的集成IDE環(huán)境生成的,了解更多):

上面SQL中我們發(fā)現(xiàn)有兩層查詢計(jì)算邏輯,***個(gè)查詢計(jì)算邏輯是與SOURCE相連的按地區(qū)統(tǒng)計(jì)訂單數(shù)量的分組統(tǒng)計(jì),第二個(gè)查詢計(jì)算邏輯是在***個(gè)查詢產(chǎn)出的動(dòng)態(tài)表上面進(jìn)行按訂單數(shù)量統(tǒng)計(jì)地區(qū)數(shù)量的分組統(tǒng)計(jì),我們一層一層分析。

5. 錯(cuò)誤處理

  • ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

按照***層分析的結(jié)果,再分析第二層產(chǎn)出的結(jié)果,我們分析的過程是對(duì)的,但是最終寫到sink表的計(jì)算結(jié)果是錯(cuò)誤的,那我們錯(cuò)在哪里了呢?

其實(shí)當(dāng) (SH,2)這條記錄來的時(shí)候,以前來過的(SH, 1)已經(jīng)是臟數(shù)據(jù)了,當(dāng)(BJ, 2)來的時(shí)候,已經(jīng)參與過計(jì)算的(BJ, 1)也變成臟數(shù)據(jù)了,同樣當(dāng)(BJ, 3)來的時(shí)候,(BJ, 2)也是臟數(shù)據(jù)了,上面的分析,沒有處理臟數(shù)據(jù)進(jìn)而導(dǎo)致最終結(jié)果的錯(cuò)誤。那么Apache Flink內(nèi)部是如何正確處理的呢?

6. 正確處理

  • ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

上面我們將有更新的事件進(jìn)行打標(biāo)的方式來處理臟數(shù)據(jù),這樣在Apache Flink內(nèi)部計(jì)算的時(shí)候 算子會(huì)根據(jù)事件的打標(biāo)來處理事件,在aggregate function中有兩個(gè)對(duì)應(yīng)的方法(retract和accumulate)來處理不同標(biāo)識(shí)的事件,如上面用到的count AGG,內(nèi)部實(shí)現(xiàn)如下:

  1. def accumulate(acc: CountAccumulator): Unit = { 
  2. acc.f0 += 1L // acc.f0 存儲(chǔ)記數(shù) 
  3.  
  4. def retract(acc: CountAccumulator, value: Any): Unit = { 
  5. if (value != null) { 
  6. acc.f0 -1L //acc.f0 存儲(chǔ)記數(shù) 
  7. }} 

Apache Flink內(nèi)部這種為事件進(jìn)行打標(biāo)的機(jī)制叫做 retraction。retraction機(jī)制保障了在流上已經(jīng)流轉(zhuǎn)到下游的臟數(shù)據(jù)需要被撤回問題,進(jìn)而保障了持續(xù)查詢的正確語義。

八、Apache Flink Connector 類型

本篇一開始就對(duì)比了MySQL的數(shù)據(jù)存儲(chǔ)和Apache Flink數(shù)據(jù)存儲(chǔ)的區(qū)別,Apache Flink目前是一個(gè)計(jì)算平臺(tái),將數(shù)據(jù)的存儲(chǔ)以高度抽象的插件機(jī)制與各種已有的數(shù)據(jù)存儲(chǔ)無縫對(duì)接。目前Apache Flink中將數(shù)據(jù)插件稱之為鏈接器Connector,Connnector又按數(shù)據(jù)的讀和寫分成Soruce(讀)和Sink(寫)兩種類型。對(duì)于傳統(tǒng)數(shù)據(jù)庫表,PK是一個(gè)很重要的屬性,在頻繁的按某些字段(PK)進(jìn)行更新的場(chǎng)景,在表上定義PK非常重要。那么作為完全支持ANSI-SQL的Apache Flink平臺(tái)在Connector上面是否也支持PK的定義呢?

1. Apache Flink Source

現(xiàn)在(2018.11.***pache Flink中用于數(shù)據(jù)流驅(qū)動(dòng)的Source Connector上面無法定義PK,這樣在某些業(yè)務(wù)場(chǎng)景下會(huì)造成數(shù)據(jù)量較大,造成計(jì)算資源不必要的浪費(fèi),甚至有聚合結(jié)果不是用戶“期望”的情況。我們以雙流JOIN為例來說明:

  1. SQL: 
  2.  
  3. CREATE TABLE inventory_tab( 
  4. product_id VARCHAR, 
  5. product_count BIGINT 
  6. ); 
  7.  
  8. CREATE TABLE sales_tab( 
  9. product_id VARCHAR, 
  10. sales_count BIGINT 
  11. ) ; 
  12.  
  13. CREATE TABLE join_sink( 
  14. product_id VARCHAR, 
  15. product_count BIGINT, 
  16. sales_count BIGINT, 
  17. PRIMARY KEY(product_id) 
  18. ); 
  19.  
  20. CREATE VIEW join_view AS 
  21. SELECT 
  22. l.product_id, 
  23. l.product_count, 
  24. r.sales_count 
  25. FROM inventory_tab l 
  26. JOIN sales_tab r 
  27. ON l.product_id = r.product_id; 
  28.  
  29. INSERT INTO join_sink 
  30. SELECT 
  31. product_id, 
  32. product_count, 
  33. sales_count 
  34. FROM join_view ; 

代碼結(jié)構(gòu)圖:

實(shí)現(xiàn)示意圖:

上圖描述了一個(gè)雙流JOIN的場(chǎng)景,雙流JOIN的底層實(shí)現(xiàn)會(huì)將左(L)右(R)兩面的數(shù)據(jù)都持久化到Apache Flink的State中,當(dāng)L流入一條事件,首先會(huì)持久化到LState,然后在和RState中存儲(chǔ)的R中所有事件進(jìn)行條件匹配,這樣的邏輯如果R流product_id為P001的產(chǎn)品銷售記錄已經(jīng)流入4條,L流的(P001, 48) 流入的時(shí)候會(huì)匹配4條事件流入下游(join_sink)。

2. 問題

上面雙流JOIN的場(chǎng)景,我們發(fā)現(xiàn)其實(shí)inventory和sales表是有業(yè)務(wù)的PK的,也就是兩張表上面的product_id是唯一的,但是由于我們?cè)赟orure上面無法定義PK字段,表上面所有的數(shù)據(jù)都會(huì)以append only的方式從source流入到下游計(jì)算節(jié)點(diǎn)JOIN,這樣就導(dǎo)致了JOIN內(nèi)部所有product_id相同的記錄都會(huì)被匹配流入下游,上面的例子是 (P001, 48) 來到的時(shí)候,就向下游流入了4條記錄,不難想象每個(gè)product_id相同的記錄都會(huì)與歷史上所有事件進(jìn)行匹配,進(jìn)而操作下游數(shù)據(jù)壓力。

那么這樣的壓力是必要的嗎?從業(yè)務(wù)的角度看,不是必要的,因?yàn)閷?duì)于product_id相同的記錄,我們只需要對(duì)左右兩邊***的記錄進(jìn)行JOIN匹配就可以了。比如(P001, 48)到來了,業(yè)務(wù)上面只需要右流的(P001, 22)匹配就好,流入下游一條事件(P001, 48, 22)。 那么目前在Apache Flink上面如何做到這樣的優(yōu)化呢?

3. 解決方案

上面的問題根本上我們要構(gòu)建一張有PK的動(dòng)態(tài)表,這樣按照業(yè)務(wù)PK進(jìn)行更新處理,我們可以在Source后面添加group by 操作生產(chǎn)一張有PK的動(dòng)態(tài)表。如下:

  1. SQL: 
  2.  
  3. CREATE TABLE inventory_tab( 
  4. product_id VARCHAR, 
  5. product_count BIGINT 
  6.  
  7. CREATE TABLE sales_tab( 
  8. product_id VARCHAR, 
  9. sales_count BIGINT 
  10. CREATE VIEW inventory_view AS 
  11. SELECT 
  12. product_id, 
  13. LAST_VALUE(product_count) AS product_count 
  14. FROM inventory_tab 
  15. GROUP BY product_id; 
  16.  
  17. CREATE VIEW sales_view AS 
  18. SELECT 
  19. product_id, 
  20. LAST_VALUE(sales_count) AS sales_count 
  21. FROM sales_tab 
  22. GROUP BY product_id; 
  23.  
  24. CREATE TABLE join_sink( 
  25. product_id VARCHAR, 
  26. product_count BIGINT, 
  27. sales_count BIGINT, 
  28. PRIMARY KEY(product_id) 
  29. )WITH ( 
  30. type = 'print' 
  31. ) ; 
  32.  
  33. CREATE VIEW join_view AS 
  34. SELECT 
  35. l.product_id, 
  36. l.product_count, 
  37. r.sales_count 
  38. FROM inventory_view l 
  39. JOIN sales_view r 
  40. ON l.product_id = r.product_id; 
  41.  
  42. INSERT INTO join_sink 
  43. SELECT 
  44. product_id, 
  45. product_count, 
  46. sales_count 
  47. FROM join_view ; 

代碼結(jié)構(gòu):

示意圖:

如上方式可以將無PK的source經(jīng)過一次節(jié)點(diǎn)變成有PK的動(dòng)態(tài)表,以Apache Flink的retract機(jī)制和業(yè)務(wù)要素解決數(shù)據(jù)瓶頸,減少計(jì)算資源的消耗。

說明1: 上面方案LAST_VALUE是Alibaba對(duì) Apache Flink 的增強(qiáng)的功能,社區(qū)還沒有支持。

4. Apache Flink Sink

在Apache Flink上面可以根據(jù)實(shí)際外部存儲(chǔ)的特點(diǎn)(是否支持PK),以及整體job的執(zhí)行plan來動(dòng)態(tài)推導(dǎo)Sink的執(zhí)行模式,具體有如下三種類型:

Append 模式 - 該模式用戶在定義Sink的DDL時(shí)候不定義PK,在Apache Flink內(nèi)部生成的所有只有INSERT語句;

Upsert 模式 - 該模式用戶在定義Sink的DDL時(shí)候可以定義PK,在Apache Flink內(nèi)部會(huì)根據(jù)事件打標(biāo)(retract機(jī)制)生成INSERT/UPDATE和DELETE 語句,其中如果定義了PK, UPDATE語句按PK進(jìn)行更新,如果沒有定義PK UPDATE會(huì)按整行更新;

Retract 模式 - 該模式下會(huì)產(chǎn)生INSERT和DELETE兩種信息,Sink Connector 根據(jù)這兩種信息構(gòu)造對(duì)應(yīng)的數(shù)據(jù)操作指令;

九、小結(jié)

本篇以MySQL為例介紹了傳統(tǒng)數(shù)據(jù)庫的靜態(tài)查詢和利用MySQL的Trigger+DML操作來模擬持續(xù)查詢,并介紹了Apache Flink上面利用增量模式完成持續(xù)查詢,并以雙流JOIN為例說明了持續(xù)查詢可能會(huì)遇到的問題,并且介紹Apache Flink以為事件打標(biāo)產(chǎn)生delete事件的方式解決持續(xù)查詢的問題,進(jìn)而保證語義的正確性,***的在流計(jì)算上支持續(xù)查詢。

# 關(guān)于點(diǎn)贊和評(píng)論

本系列文章難免有很多缺陷和不足,真誠希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】

戳這里,看該作者更多好文

責(zé)任編輯:趙寧寧 來源: 51CTO專欄
相關(guān)推薦

2022-06-10 17:26:07

數(shù)據(jù)集計(jì)算

2018-10-09 10:55:52

Apache FlinWatermark流計(jì)算

2022-07-13 12:53:59

數(shù)據(jù)存儲(chǔ)

2018-09-26 08:44:22

Apache Flin流計(jì)算計(jì)算模式

2018-10-16 08:54:35

Apache Flin流計(jì)算State

2018-09-26 07:50:52

Apache Flin流計(jì)算計(jì)算模式

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計(jì)算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-07-13 13:03:29

流計(jì)算亂序

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對(duì)偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2020-04-09 11:08:30

PyFlinkJAR依賴

2018-10-30 11:10:05

Flink數(shù)據(jù)集計(jì)算

2019-03-29 10:05:44

Apache開源軟件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)