Apache Flink 漫談系列(07) - 持續(xù)查詢(Continuous Queries)
一、實(shí)際問題
我們知道在流計(jì)算場(chǎng)景中,數(shù)據(jù)是源源不斷的流入的,數(shù)據(jù)流永遠(yuǎn)不會(huì)結(jié)束,那么計(jì)算就永遠(yuǎn)不會(huì)結(jié)束,如果計(jì)算永遠(yuǎn)不會(huì)結(jié)束的話,那么計(jì)算結(jié)果何時(shí)輸出呢?本篇將介紹Apache Flink利用持續(xù)查詢來對(duì)流計(jì)算結(jié)果進(jìn)行持續(xù)輸出的實(shí)現(xiàn)原理。
二、數(shù)據(jù)管理
在介紹持續(xù)查詢之前,我們先看看Apache Flink對(duì)數(shù)據(jù)的管理和傳統(tǒng)數(shù)據(jù)庫對(duì)數(shù)據(jù)管理的區(qū)別,以MySQL為例,如下圖:
如上圖所示傳統(tǒng)數(shù)據(jù)庫是數(shù)據(jù)存儲(chǔ)和查詢計(jì)算于一體的架構(gòu)管理方式,這個(gè)很明顯,oracle數(shù)據(jù)庫不可能管理MySQL數(shù)據(jù)庫數(shù)據(jù),反之亦然,每種數(shù)據(jù)庫廠商都有自己的數(shù)據(jù)庫管理和存儲(chǔ)的方式,各自有特有的實(shí)現(xiàn)。在這點(diǎn)上Apache Flink海納百川(也有corner case),將data store 進(jìn)行抽象,分為source(讀) 和 sink(寫)兩種類型接口,然后結(jié)合不同存儲(chǔ)的特點(diǎn)提供常用數(shù)據(jù)存儲(chǔ)的內(nèi)置實(shí)現(xiàn),當(dāng)然也支持用戶自定義的實(shí)現(xiàn)。
那么在宏觀設(shè)計(jì)上Apache Flink與傳統(tǒng)數(shù)據(jù)庫一樣都可以對(duì)數(shù)據(jù)表進(jìn)行SQL查詢,并將產(chǎn)出的結(jié)果寫入到數(shù)據(jù)存儲(chǔ)里面,那么Apache Flink上面的SQL查詢和傳統(tǒng)數(shù)據(jù)庫查詢的區(qū)別是什么呢?Apache Flink又是如何做到求同(語義相同)存異(實(shí)現(xiàn)機(jī)制不同),***支持ANSI-SQL的呢?
三、靜態(tài)查詢
傳統(tǒng)數(shù)據(jù)庫中對(duì)表(比如 flink_tab,有user和clicks兩列,user主鍵)的一個(gè)查詢SQL(select * from flink_tab)在數(shù)據(jù)量允許的情況下,會(huì)立刻返回表中的所有數(shù)據(jù),在查詢結(jié)果顯示之后,對(duì)數(shù)據(jù)庫表flink_tab的DML操作將與執(zhí)行的SQL無關(guān)了。也就是說傳統(tǒng)數(shù)據(jù)庫下面對(duì)表的查詢是靜態(tài)查詢,將計(jì)算的最終查詢的結(jié)果立即輸出,如下:
- select * from flink_tab;
- +----+------+--------+
- | id | user | clicks |
- +----+------+--------+
- | 1 | Mary | 1 |
- +----+------+--------+
- 1 row in set (0.00 sec)
當(dāng)我執(zhí)行完上面的查詢,查詢結(jié)果立即返回,上面情況告訴我們表 flink_tab里面只有一條記錄,id=1,user=Mary,clicks=1; 這樣傳統(tǒng)數(shù)據(jù)庫表的一條查詢語句就完全結(jié)束了。傳統(tǒng)數(shù)據(jù)庫表在查詢那一刻我們這里叫Static table,是指在查詢的那一刻數(shù)據(jù)庫表的內(nèi)容不再變化了,查詢進(jìn)行一次計(jì)算完成之后表的變化也與本次查詢無關(guān)了,我們將在Static Table 上面的查詢叫做靜態(tài)查詢。
四、持續(xù)查詢
什么是連續(xù)查詢呢?連續(xù)查詢發(fā)生在流計(jì)算上面,在 《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》 中我們提到過Dynamic Table,連續(xù)查詢是作用在Dynamic table上面的,永遠(yuǎn)不會(huì)結(jié)束的,隨著表內(nèi)容的變化計(jì)算在不斷的進(jìn)行著...
五、靜態(tài)/持續(xù)查詢特點(diǎn)
靜態(tài)查詢和持續(xù)查詢的特點(diǎn)就是《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》中所提到的批與流的計(jì)算特點(diǎn),批一次查詢返回一個(gè)計(jì)算結(jié)果就結(jié)束查詢,流一次查詢不斷修正計(jì)算結(jié)果,查詢永遠(yuǎn)不結(jié)束,表格示意如下:
六、靜態(tài)/持續(xù)查詢關(guān)系
接下來我們以flink_tab表實(shí)際操作為例,體驗(yàn)一下靜態(tài)查詢與持續(xù)查詢的關(guān)系。假如我們對(duì)flink_tab表再進(jìn)行一條增加和一次更新操作,如下:
- MySQL> insert into flink_tab(user, clicks) values ('Bob', 1);
- Query OK, 1 row affected (0.08 sec)
- MySQL> update flink_tab set clicks=2 where user='Mary';
- Query OK, 1 row affected (0.06 sec)
這時(shí)候我們?cè)龠M(jìn)行查詢 select * from flink_tab ,結(jié)果如下:
- MySQL> select * from flink_tab;
- +----+------+--------+
- | id | user | clicks |
- +----+------+--------+
- | 1 | Mary | 2 |
- | 2 | Bob | 1 |
- +----+------+--------+
- 2 rows in set (0.00 sec)
那么我們看見,相同的查詢SQL(select * from flink_tab),計(jì)算結(jié)果完全 不 一樣了。這說明相同的sql語句,在不同的時(shí)刻執(zhí)行計(jì)算,得到的結(jié)果可能不一樣(有點(diǎn)像廢話),就如下圖一樣:
假設(shè)不斷的有人在對(duì)表flink_tab做操作,同時(shí)有一個(gè)人間歇性的發(fā)起對(duì)表數(shù)據(jù)的查詢,上圖我們只是在三個(gè)時(shí)間點(diǎn)進(jìn)行了3次查詢。并且在這段時(shí)間內(nèi)數(shù)據(jù)表的內(nèi)容也在變化。引起上面變化的DML如下:
- MySQL> insert into flink_tab(user, clicks) values ('Llz', 1);
- Query OK, 1 row affected (0.08 sec)
- MySQL> update flink_tab set clicks=2 where user='Bob';
- Query OK, 1 row affected (0.01 sec)
- Rows matched: 1 Changed: 1 Warnings: 0
- MySQL> update flink_tab set clicks=3 where user='Mary';
- Query OK, 1 row affected (0.05 sec)
- Rows matched: 1 Changed: 1 Warnings: 0
到現(xiàn)在我們不難想象,上面圖內(nèi)容的核心要點(diǎn)如下:
- 時(shí)間
- 表數(shù)據(jù)變化
- 觸發(fā)計(jì)算
- 計(jì)算結(jié)果更新
接下來我們利用傳統(tǒng)數(shù)據(jù)庫現(xiàn)有的機(jī)制模擬一下持續(xù)查詢...
1. 無PK的 Append only 場(chǎng)景
接下來我們把上面隱式存在的時(shí)間屬性timestamp作為表flink_tab_ts(timestamp,user,clicks三列,無主鍵)的一列,再寫一個(gè) 觸發(fā)器(Trigger) 示例觀察一下:
- // INSERT 的時(shí)候查詢一下數(shù)據(jù)flink_tab_ts,將結(jié)果寫到trigger.sql中
- DELIMITER ;;
- create trigger flink_tab_ts_trigger_insert after insert
- on flink_tab_ts for each row
- begin
- select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql';
- end ;;
- DELIMITER ;
上面的trigger要將查詢結(jié)果寫入本地文件,默認(rèn)MySQL是不允許寫入的,我們查看一下:
- MySQL> show variables like '%secure%';
- +--------------------------+-------+
- | Variable_name | Value |
- +--------------------------+-------+
- | require_secure_transport | OFF |
- | secure_file_priv | NULL |
- +--------------------------+-------+
- 2 rows in set (0.00 sec)
上面secure_file_priv屬性為NULL,說明MySQL不允許寫入file,我需要修改my.cnf在添加secure_file_priv=''打開寫文件限制;
- MySQL> show variables like '%secure%';
- +--------------------------+-------+
- | Variable_name | Value |
- +--------------------------+-------+
- | require_secure_transport | OFF |
- | secure_file_priv | |
- +--------------------------+-------+
- 2 rows in set (0.00 sec)
下面我們對(duì)flink_tab_ts進(jìn)行INSERT操作:
我們?cè)賮砜纯?次trigger 查詢計(jì)算的結(jié)果:
大家到這里發(fā)現(xiàn)我寫了Trigger的存儲(chǔ)過程之后,每次在數(shù)據(jù)表flink_tab_ts進(jìn)行DML操作的時(shí)候,Trigger就會(huì)觸發(fā)一次查詢計(jì)算,產(chǎn)出一份新的計(jì)算結(jié)果,觀察上面的查詢結(jié)果發(fā)現(xiàn),結(jié)果表不停的增加(Append only)。
2. 有PK的Update場(chǎng)景
我們利用flink_tab_ts的6次DML操作和自定義的觸發(fā)器TriggerL來介紹了什么是持續(xù)查詢,做處理靜態(tài)查詢與持續(xù)查詢的關(guān)系。那么上面的演示目的是為了說明持續(xù)查詢,所有操作都是insert,沒有基于主鍵的更新,也就是說Trigger產(chǎn)生的結(jié)果都是append only的,那么大家想一想,如果我們操作flink_tab這張表,按主鍵user進(jìn)行插入和更新操作,同樣利用Trigger機(jī)制來進(jìn)行持續(xù)查詢,結(jié)果是怎樣的的呢? 初始化表,trigger:
- drop table flink_tab;
- create table flink_tab(
- user VARCHAR(100) NOT NULL,
- clicks INT NOT NULL,
- PRIMARY KEY (user)
- );
- DELIMITER ;;
- create trigger flink_tab_trigger_insert after insert
- on flink_tab for each row
- begin
- select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
- end ;;
- DELIMITER ;
- DELIMITER ;;
- create trigger flink_tab_trigger_ after update
- on flink_tab for each row
- begin
- select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
- end ;;
- DELIMITER ;
同樣我做如下6次DML操作,Trigger 6次查詢計(jì)算:
在來看看這次的結(jié)果與append only 有什么不同?
我想大家早就知道這結(jié)果了,數(shù)據(jù)庫里面定義的PK所有變化會(huì)按PK更新,那么觸發(fā)的6次計(jì)算中也會(huì)得到更新后的結(jié)果,這應(yīng)該不難理解,查詢結(jié)果也是不斷更新的(Update)!
3. 關(guān)系定義
上面Append Only 和 Update兩種場(chǎng)景在MySQL上面都可以利用Trigger機(jī)制模擬 持續(xù)查詢的概念,也就是說數(shù)據(jù)表中每次數(shù)據(jù)變化,我們都觸發(fā)一次相同的查詢計(jì)算(只是計(jì)算時(shí)候數(shù)據(jù)的集合發(fā)生了變化),因?yàn)閿?shù)據(jù)表不斷的變化,這個(gè)表就可以看做是一個(gè)動(dòng)態(tài)表Dynamic Table,而查詢SQL(select * from flink_tab_ts) 被觸發(fā)器Trigger在滿足某種條件后不停的觸發(fā)計(jì)算,進(jìn)而也不斷地產(chǎn)生新的結(jié)果。這種作用在Dynamic Table,并且有某種機(jī)制(Trigger)不斷的觸發(fā)計(jì)算的查詢我們就稱之為 持續(xù)查詢。
那么到底靜態(tài)查詢和動(dòng)態(tài)查詢的關(guān)系是什么呢?在語義上 持續(xù)查詢 中的每一次查詢計(jì)算的觸發(fā)都是一次靜態(tài)查詢(相對(duì)于當(dāng)時(shí)查詢的時(shí)間點(diǎn)), 在實(shí)現(xiàn)上 Apache Flink會(huì)利用上一次查詢結(jié)果+當(dāng)前記錄 以增量的方式完成查詢計(jì)算。
特別說明: 上面我們利用 數(shù)據(jù)變化+Trigger方式描述了持續(xù)查詢的概念,這里有必要特別強(qiáng)調(diào)一下的是數(shù)據(jù)庫中trigger機(jī)制觸發(fā)的查詢,每次都是一個(gè)全量查詢,這與Apache Flink上面流計(jì)算的持續(xù)查詢概念相同,但實(shí)現(xiàn)機(jī)制完全不同,Apache Flink上面的持續(xù)查詢內(nèi)部實(shí)現(xiàn)是增量處理的,隨著時(shí)間的推移,每條數(shù)據(jù)的到來實(shí)時(shí)處理當(dāng)前的那一條記錄,不會(huì)處理曾經(jīng)來過的歷史記錄!
七、Apache Flink 如何做到持續(xù)查詢
1. 動(dòng)態(tài)表上面持續(xù)查詢
在 《Apache Flink 漫談系列 - 流表對(duì)偶(duality)性》 中我們了解到流和表可以相互轉(zhuǎn)換,在Apache Flink流計(jì)算中攜帶流事件的Schema,經(jīng)過算子計(jì)算之后再產(chǎn)生具有新的Schema的事件,流入下游節(jié)點(diǎn),在產(chǎn)生新的Schema的Event和不斷流轉(zhuǎn)的過程就是持續(xù)查詢作用的結(jié)果,如下圖:
2. 增量計(jì)算
我們進(jìn)行查詢大多數(shù)場(chǎng)景是進(jìn)行數(shù)據(jù)聚合,比如查詢SQL中利用count,sum等aggregate function進(jìn)行聚合統(tǒng)計(jì),那么流上的數(shù)據(jù)源源不斷的流入,我們既不能等所有事件流入結(jié)束(永遠(yuǎn)不會(huì)結(jié)束)再計(jì)算,也不會(huì)每次來一條事件就像傳統(tǒng)數(shù)據(jù)庫一樣將全部事件集合重新整體計(jì)算一次,在持續(xù)查詢的計(jì)算過程中,Apache Flink采用增量計(jì)算的方式,也就是每次計(jì)算都會(huì)將計(jì)算結(jié)果存儲(chǔ)到state中,下一條事件到來的時(shí)候利用上次計(jì)算的結(jié)果和當(dāng)前的事件進(jìn)行聚合計(jì)算,比如 有一個(gè)訂單表,如下:
一個(gè)簡(jiǎn)單的計(jì)數(shù)和求和查詢SQL:
- // 求訂單總數(shù)和所有訂單的總金額
- select count(id) as cnt,sum(amount)as sumAmount from order_tab;
這樣一個(gè)簡(jiǎn)單的持續(xù)查詢計(jì)算,Apache Flink內(nèi)部是如何處理的呢?如下圖:
如上圖,Apache Flink中每來一條事件,就進(jìn)行一次計(jì)算,并且每次計(jì)算后結(jié)果會(huì)存儲(chǔ)到state中,供下一條事件到來時(shí)候進(jìn)行計(jì)算,即:
- result(n) = calculation(result(n-1), n)。
3. 無PK的Append Only 場(chǎng)景
在實(shí)際的業(yè)務(wù)場(chǎng)景中,我們只需要進(jìn)行簡(jiǎn)單的數(shù)據(jù)統(tǒng)計(jì),然后就將統(tǒng)計(jì)結(jié)果寫入到業(yè)務(wù)的數(shù)據(jù)存儲(chǔ)系統(tǒng)里面,比如上面統(tǒng)計(jì)訂單數(shù)量和總金額的場(chǎng)景,訂單表本身是一個(gè)append only的數(shù)據(jù)源(假設(shè)沒有更新,截止到2018.5.14日,Apache Flink內(nèi)部支持的數(shù)據(jù)源都是append only的),在持續(xù)查詢過程中經(jīng)過count(id),sum(amount)統(tǒng)計(jì)計(jì)算之后產(chǎn)生的動(dòng)態(tài)表也是append only的,種場(chǎng)景Apache Flink內(nèi)部只需要進(jìn)行aggregate function的聚合統(tǒng)計(jì)計(jì)算就可以,如下:
4. 有PK的Update 場(chǎng)景
現(xiàn)在我們將上面的訂單場(chǎng)景稍微變化一下,在數(shù)據(jù)表上面我們將金額字段amount,變?yōu)榈貐^(qū)字段region,數(shù)據(jù)如下:
查詢統(tǒng)計(jì)的變?yōu)?,在?jì)算具有相同訂單數(shù)量的地區(qū)數(shù)量;查詢SQL如下:
- CREATE TABLE order_tab(
- id BIGINT,
- region VARCHAR
- )
- CREATE TABLE region_count_sink(
- order_cnt BIGINT,
- region_cnt BIGINT,
- PRIMARY KEY(order_cnt) -- 主鍵
- )
- -- 按地區(qū)分組計(jì)算每個(gè)地區(qū)的訂單數(shù)量
- CREATE VIEW order_count_view AS
- SELECT
- region, count(id) AS order_cnt
- FROM order_tab
- GROUP BY region;
- -- 按訂單數(shù)量分組統(tǒng)計(jì)具有相同訂單數(shù)量的地區(qū)數(shù)量
- INSERT INTO region_count_sink
- SELECT
- order_cnt,
- count(region) as region_cnt
- FROM order_count_view
- GROUP BY order_cnt;
上面查詢SQL的代碼結(jié)構(gòu)如下(這個(gè)圖示在Alibaba 對(duì) Apache Flink 的增強(qiáng)的集成IDE環(huán)境生成的,了解更多):
上面SQL中我們發(fā)現(xiàn)有兩層查詢計(jì)算邏輯,***個(gè)查詢計(jì)算邏輯是與SOURCE相連的按地區(qū)統(tǒng)計(jì)訂單數(shù)量的分組統(tǒng)計(jì),第二個(gè)查詢計(jì)算邏輯是在***個(gè)查詢產(chǎn)出的動(dòng)態(tài)表上面進(jìn)行按訂單數(shù)量統(tǒng)計(jì)地區(qū)數(shù)量的分組統(tǒng)計(jì),我們一層一層分析。
5. 錯(cuò)誤處理
- ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
- 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
按照***層分析的結(jié)果,再分析第二層產(chǎn)出的結(jié)果,我們分析的過程是對(duì)的,但是最終寫到sink表的計(jì)算結(jié)果是錯(cuò)誤的,那我們錯(cuò)在哪里了呢?
其實(shí)當(dāng) (SH,2)這條記錄來的時(shí)候,以前來過的(SH, 1)已經(jīng)是臟數(shù)據(jù)了,當(dāng)(BJ, 2)來的時(shí)候,已經(jīng)參與過計(jì)算的(BJ, 1)也變成臟數(shù)據(jù)了,同樣當(dāng)(BJ, 3)來的時(shí)候,(BJ, 2)也是臟數(shù)據(jù)了,上面的分析,沒有處理臟數(shù)據(jù)進(jìn)而導(dǎo)致最終結(jié)果的錯(cuò)誤。那么Apache Flink內(nèi)部是如何正確處理的呢?
6. 正確處理
- ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
- 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
上面我們將有更新的事件進(jìn)行打標(biāo)的方式來處理臟數(shù)據(jù),這樣在Apache Flink內(nèi)部計(jì)算的時(shí)候 算子會(huì)根據(jù)事件的打標(biāo)來處理事件,在aggregate function中有兩個(gè)對(duì)應(yīng)的方法(retract和accumulate)來處理不同標(biāo)識(shí)的事件,如上面用到的count AGG,內(nèi)部實(shí)現(xiàn)如下:
- def accumulate(acc: CountAccumulator): Unit = {
- acc.f0 += 1L // acc.f0 存儲(chǔ)記數(shù)
- }
- def retract(acc: CountAccumulator, value: Any): Unit = {
- if (value != null) {
- acc.f0 -= 1L //acc.f0 存儲(chǔ)記數(shù)
- }}
Apache Flink內(nèi)部這種為事件進(jìn)行打標(biāo)的機(jī)制叫做 retraction。retraction機(jī)制保障了在流上已經(jīng)流轉(zhuǎn)到下游的臟數(shù)據(jù)需要被撤回問題,進(jìn)而保障了持續(xù)查詢的正確語義。
八、Apache Flink Connector 類型
本篇一開始就對(duì)比了MySQL的數(shù)據(jù)存儲(chǔ)和Apache Flink數(shù)據(jù)存儲(chǔ)的區(qū)別,Apache Flink目前是一個(gè)計(jì)算平臺(tái),將數(shù)據(jù)的存儲(chǔ)以高度抽象的插件機(jī)制與各種已有的數(shù)據(jù)存儲(chǔ)無縫對(duì)接。目前Apache Flink中將數(shù)據(jù)插件稱之為鏈接器Connector,Connnector又按數(shù)據(jù)的讀和寫分成Soruce(讀)和Sink(寫)兩種類型。對(duì)于傳統(tǒng)數(shù)據(jù)庫表,PK是一個(gè)很重要的屬性,在頻繁的按某些字段(PK)進(jìn)行更新的場(chǎng)景,在表上定義PK非常重要。那么作為完全支持ANSI-SQL的Apache Flink平臺(tái)在Connector上面是否也支持PK的定義呢?
1. Apache Flink Source
現(xiàn)在(2018.11.***pache Flink中用于數(shù)據(jù)流驅(qū)動(dòng)的Source Connector上面無法定義PK,這樣在某些業(yè)務(wù)場(chǎng)景下會(huì)造成數(shù)據(jù)量較大,造成計(jì)算資源不必要的浪費(fèi),甚至有聚合結(jié)果不是用戶“期望”的情況。我們以雙流JOIN為例來說明:
- SQL:
- CREATE TABLE inventory_tab(
- product_id VARCHAR,
- product_count BIGINT
- );
- CREATE TABLE sales_tab(
- product_id VARCHAR,
- sales_count BIGINT
- ) ;
- CREATE TABLE join_sink(
- product_id VARCHAR,
- product_count BIGINT,
- sales_count BIGINT,
- PRIMARY KEY(product_id)
- );
- CREATE VIEW join_view AS
- SELECT
- l.product_id,
- l.product_count,
- r.sales_count
- FROM inventory_tab l
- JOIN sales_tab r
- ON l.product_id = r.product_id;
- INSERT INTO join_sink
- SELECT
- product_id,
- product_count,
- sales_count
- FROM join_view ;
代碼結(jié)構(gòu)圖:
實(shí)現(xiàn)示意圖:
上圖描述了一個(gè)雙流JOIN的場(chǎng)景,雙流JOIN的底層實(shí)現(xiàn)會(huì)將左(L)右(R)兩面的數(shù)據(jù)都持久化到Apache Flink的State中,當(dāng)L流入一條事件,首先會(huì)持久化到LState,然后在和RState中存儲(chǔ)的R中所有事件進(jìn)行條件匹配,這樣的邏輯如果R流product_id為P001的產(chǎn)品銷售記錄已經(jīng)流入4條,L流的(P001, 48) 流入的時(shí)候會(huì)匹配4條事件流入下游(join_sink)。
2. 問題
上面雙流JOIN的場(chǎng)景,我們發(fā)現(xiàn)其實(shí)inventory和sales表是有業(yè)務(wù)的PK的,也就是兩張表上面的product_id是唯一的,但是由于我們?cè)赟orure上面無法定義PK字段,表上面所有的數(shù)據(jù)都會(huì)以append only的方式從source流入到下游計(jì)算節(jié)點(diǎn)JOIN,這樣就導(dǎo)致了JOIN內(nèi)部所有product_id相同的記錄都會(huì)被匹配流入下游,上面的例子是 (P001, 48) 來到的時(shí)候,就向下游流入了4條記錄,不難想象每個(gè)product_id相同的記錄都會(huì)與歷史上所有事件進(jìn)行匹配,進(jìn)而操作下游數(shù)據(jù)壓力。
那么這樣的壓力是必要的嗎?從業(yè)務(wù)的角度看,不是必要的,因?yàn)閷?duì)于product_id相同的記錄,我們只需要對(duì)左右兩邊***的記錄進(jìn)行JOIN匹配就可以了。比如(P001, 48)到來了,業(yè)務(wù)上面只需要右流的(P001, 22)匹配就好,流入下游一條事件(P001, 48, 22)。 那么目前在Apache Flink上面如何做到這樣的優(yōu)化呢?
3. 解決方案
上面的問題根本上我們要構(gòu)建一張有PK的動(dòng)態(tài)表,這樣按照業(yè)務(wù)PK進(jìn)行更新處理,我們可以在Source后面添加group by 操作生產(chǎn)一張有PK的動(dòng)態(tài)表。如下:
- SQL:
- CREATE TABLE inventory_tab(
- product_id VARCHAR,
- product_count BIGINT
- )
- CREATE TABLE sales_tab(
- product_id VARCHAR,
- sales_count BIGINT
- )
- CREATE VIEW inventory_view AS
- SELECT
- product_id,
- LAST_VALUE(product_count) AS product_count
- FROM inventory_tab
- GROUP BY product_id;
- CREATE VIEW sales_view AS
- SELECT
- product_id,
- LAST_VALUE(sales_count) AS sales_count
- FROM sales_tab
- GROUP BY product_id;
- CREATE TABLE join_sink(
- product_id VARCHAR,
- product_count BIGINT,
- sales_count BIGINT,
- PRIMARY KEY(product_id)
- )WITH (
- type = 'print'
- ) ;
- CREATE VIEW join_view AS
- SELECT
- l.product_id,
- l.product_count,
- r.sales_count
- FROM inventory_view l
- JOIN sales_view r
- ON l.product_id = r.product_id;
- INSERT INTO join_sink
- SELECT
- product_id,
- product_count,
- sales_count
- FROM join_view ;
代碼結(jié)構(gòu):
示意圖:
如上方式可以將無PK的source經(jīng)過一次節(jié)點(diǎn)變成有PK的動(dòng)態(tài)表,以Apache Flink的retract機(jī)制和業(yè)務(wù)要素解決數(shù)據(jù)瓶頸,減少計(jì)算資源的消耗。
說明1: 上面方案LAST_VALUE是Alibaba對(duì) Apache Flink 的增強(qiáng)的功能,社區(qū)還沒有支持。
4. Apache Flink Sink
在Apache Flink上面可以根據(jù)實(shí)際外部存儲(chǔ)的特點(diǎn)(是否支持PK),以及整體job的執(zhí)行plan來動(dòng)態(tài)推導(dǎo)Sink的執(zhí)行模式,具體有如下三種類型:
Append 模式 - 該模式用戶在定義Sink的DDL時(shí)候不定義PK,在Apache Flink內(nèi)部生成的所有只有INSERT語句;
Upsert 模式 - 該模式用戶在定義Sink的DDL時(shí)候可以定義PK,在Apache Flink內(nèi)部會(huì)根據(jù)事件打標(biāo)(retract機(jī)制)生成INSERT/UPDATE和DELETE 語句,其中如果定義了PK, UPDATE語句按PK進(jìn)行更新,如果沒有定義PK UPDATE會(huì)按整行更新;
Retract 模式 - 該模式下會(huì)產(chǎn)生INSERT和DELETE兩種信息,Sink Connector 根據(jù)這兩種信息構(gòu)造對(duì)應(yīng)的數(shù)據(jù)操作指令;
九、小結(jié)
本篇以MySQL為例介紹了傳統(tǒng)數(shù)據(jù)庫的靜態(tài)查詢和利用MySQL的Trigger+DML操作來模擬持續(xù)查詢,并介紹了Apache Flink上面利用增量模式完成持續(xù)查詢,并以雙流JOIN為例說明了持續(xù)查詢可能會(huì)遇到的問題,并且介紹Apache Flink以為事件打標(biāo)產(chǎn)生delete事件的方式解決持續(xù)查詢的問題,進(jìn)而保證語義的正確性,***的在流計(jì)算上支持續(xù)查詢。
# 關(guān)于點(diǎn)贊和評(píng)論
本系列文章難免有很多缺陷和不足,真誠希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】