自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Flink 漫談系列(10) - JOIN LATERAL

開發(fā) 開發(fā)工具
上一篇《Apache Flink 漫談系列 - JOIN算子》我們對(duì)最常見的JOIN做了詳盡的分析,本篇介紹一個(gè)特殊的JOIN,那就是JOIN LATERAL。

一、聊什么

上一篇《Apache Flink 漫談系列 - JOIN算子》我們對(duì)最常見的JOIN做了詳盡的分析,本篇介紹一個(gè)特殊的JOIN,那就是JOIN LATERAL。JOIN LATERAL為什么特殊呢,直觀說因?yàn)镴OIN的右邊不是一個(gè)實(shí)際的物理表,而是一個(gè)VIEW或者Table-valued Funciton。本篇會(huì)先介紹傳統(tǒng)數(shù)據(jù)庫對(duì)LATERAL JOIN的支持,然后介紹Apache Flink目前對(duì)LATERAL JOIN的支持情況。

二、實(shí)際問題

假設(shè)我們有兩張表,一張是Customers表(消費(fèi)者id, 所在城市), 一張是Orders表(訂單id,消費(fèi)者id),兩張表的DDL(SQL Server)如下:

  • Customers
  1. CREATE TABLE Customers ( 
  2. customerid char(5) NOT NULL, 
  3. city varchar (10) NOT NULL 
  4.  
  5. insert into Customers values('C001','Beijing'); 
  6. insert into Customers values('C002','Beijing'); 
  7. insert into Customers values('C003','Beijing'); 
  8. insert into Customers values('C004','HangZhou'); 

查看數(shù)據(jù):

  • Orders
  1. CREATE TABLE Orders( 
  2. orderid char(5) NOT NULL, 
  3. customerid char(5) NULL 
  4.  
  5. insert into Orders values('O001','C001'); 
  6. insert into Orders values('O002','C001'); 
  7. insert into Orders values('O003','C003'); 
  8. insert into Orders values('O004','C001'); 

查看數(shù)據(jù):

1. 問題示例

假設(shè)我們想查詢所有Customers的客戶ID,地點(diǎn)和訂單信息,我們想得到的信息是:

(1) 用INNER JOIN解決

如果大家查閱了《Apache Flink 漫談系列 - JOIN算子》,我想看到這樣的查詢需求會(huì)想到INNER JOIN來解決,SQL如下:

  1. SELECT 
  2. c.customerid, c.city, o.orderid 
  3. FROM Customers c JOIN Orders o 
  4.     ON o.customerid = c.customerid 

查詢結(jié)果如下:

但如果我們真的用上面的方式來解決,就不會(huì)有本篇要介紹的內(nèi)容了,所以我們換一種寫法。

2. 用 Correlated subquery解決

Correlated subquery 是在subquery中使用關(guān)聯(lián)表的字段,subquery可以在FROM Clause中也可以在WHERE Clause中。

  • WHERE Clause

用WHERE Clause實(shí)現(xiàn)上面的查詢需求,SQL如下:

  1. SELECT 
  2. c.customerid, c.city 
  3. FROM Customers c WHERE c.customerid IN ( 
  4. SELECT 
  5. o.customerid, o.orderid 
  6. FROM Orders o 
  7. WHERE o.customerid = c.customerid 

執(zhí)行情況:

上面的問題是用在WHERE Clause里面subquery的查詢列必須和需要比較的列對(duì)應(yīng),否則我們無法對(duì)o.orderid進(jìn)行投影, 上面查詢我為什么要加一個(gè)o.orderid呢,因?yàn)椴樵冃枨笫切枰猳.orderid的,去掉o.orderid查詢能成功,但是拿到的結(jié)果并不是我們想要的,如下:

  1. SELECT 
  2. c.customerid, c.city 
  3. FROM Customers c WHERE c.customerid IN ( 
  4. SELECT 
  5. o.customerid 
  6. FROM Orders o 
  7. WHERE o.customerid = c.customerid 

查詢結(jié)果:

可見上面查詢結(jié)果缺少了o.orderid,不能滿足我們的查詢需求。

  • FROM Clause

用WHERE Clause實(shí)現(xiàn)上面的查詢需求,SQL如下:

  1. SELECT 
  2. c.customerid, c.city, o.orderid 
  3. FROM Customers c, ( 
  4. SELECT 
  5. o.orderid, o.customerid 
  6. FROM Orders o 
  7. WHERE o.customerid = c.customerid 
  8. ) as o 

我們會(huì)得到如下錯(cuò)誤:

錯(cuò)誤信息提示我們無法識(shí)別c.customerid。在ANSI-SQL里面FROM Clause里面的subquery是無法引用左邊表信息的,所以簡單的用FROM Clause里面的subquery,也無法解決上面的問題,

那么上面的查詢需求除了INNER JOIN 我們還可以如何解決呢?

三、JOIN LATERAL

我們分析上面的需求,本質(zhì)上是根據(jù)左表Customers的customerid,去查詢右表的Orders信息,就像一個(gè)For循環(huán)一樣,外層是遍歷左表Customers所有數(shù)據(jù),內(nèi)層是根據(jù)左表Customers的每一個(gè)Customerid去右表Orders中進(jìn)行遍歷查詢,然后再將符合條件的左右表數(shù)據(jù)進(jìn)行JOIN,這種根據(jù)左表逐條數(shù)據(jù)動(dòng)態(tài)生成右表進(jìn)行JOIN的語義,SQL標(biāo)準(zhǔn)里面提出了LATERAL關(guān)鍵字,也叫做 lateral drive table。

1. CROSS APPLY和LATERAL

上面的示例我們用的是SQL Server進(jìn)行測(cè)試的,這里在多提一下在SQL Server里面是如何支持 LATERAL 的呢?SQL Server是用自己的方言 CROSS APPLY 來支持的。那么為啥不用ANSI-SQL的LATERAL而用CROSS APPLY呢? 可能的原因是當(dāng)時(shí)SQL Server為了解決TVF問題而引入的,同時(shí)LATERAL是SQL2003引入的,而CROSS APPLY是SQL Server 2005就支持了,SQL Server 2005的開發(fā)是在2000年就進(jìn)行了,這個(gè)可能也有個(gè)時(shí)間差,等LATERAL出來的時(shí)候,CROSS APPLY在SQL Server里面已經(jīng)開發(fā)完成了。所以種種原因SQL Server里面就采用了CROSS APPLY,但CROSS APPLY的語義與LATERAL卻完全一致,同時(shí)后續(xù)支持LATERAL的Oracle12和PostgreSQL94同時(shí)支持了LATERAL和CROSS APPLY。

2. 問題解決

那么我們回到上面的問題,我們用SQL Server的CROSS APPLY來解決上面問題,SQL如下:

上面得到的結(jié)果完全滿足查詢需求。

四、JOIN LATERAL 與 INNER JOIN 關(guān)系

上面的查詢需求并沒有體現(xiàn)JOIN LATERAL和INNER JOIN的區(qū)別,我們還是以SQL Server中兩個(gè)查詢執(zhí)行Plan來觀察一下:

上面我們發(fā)現(xiàn)經(jīng)過SQL Server優(yōu)化器優(yōu)化之后的兩個(gè)執(zhí)行plan完全一致,那么為啥還要再造一個(gè)LATERAL 出來呢?

1. 性能方面

我們將上面的查詢需求稍微改變一下,我們查詢所有Customer和Customers的***份訂單信息。

  • LATERAL 的寫法
  1. SELECT 
  2. c.customerid, c.city, o.orderid 
  3. FROM Customers c CROSS APPLY ( 
  4. SELECT 
  5. ***) o.orderid, o.customerid 
  6. FROM Orders o 
  7. WHERE o.customerid = c.customerid 
  8.     ORDER BY o.customerid, o.orderid 
  9. ) as o 

查詢結(jié)果:

我們發(fā)現(xiàn)雖然C001的Customer有三筆訂單,但是我們查詢的***信息。

  • JOIN 寫法
  1. SELECT c.customerid, c.city, o.orderid 
  2. FROM Customers c 
  3. JOIN ( 
  4. SELECT 
  5. o2.*, 
  6.      ROW_NUMBER() OVER ( 
  7.         PARTITION BY customerid 
  8.         ORDER BY orderid 
  9.      ) AS rn 
  10. FROM Orders o2 
  11. ) o 
  12. ON c.customerid = o.customerid AND o.rn = 1 

查詢結(jié)果:

如上我們都完成了查詢需求,我們?cè)趤砜匆幌聢?zhí)行Plan,如下:

我們直觀發(fā)現(xiàn)完成相同功能,使用CROSS APPLY進(jìn)行查詢,執(zhí)行Plan簡單許多。

2. 功能方面

在功能方面INNER JOIN本身在ANSI-SQL中是不允許 JOIN 一個(gè)Function的,這也是SQL Server當(dāng)時(shí)引入CROSS APPLY的根本原因。我們以一個(gè)SQL Server中DMV(相當(dāng)于TVF)查詢?yōu)槔?/p>

  1. SELECT 
  2. name, log_backup_time 
  3. FROM sys.databases AS s 
  4. CROSS APPLY sys.dm_db_log_stats(s.database_id); 

查詢結(jié)果:

五、Apache Flink對(duì) LATERAL的支持

前面我花費(fèi)了大量的章節(jié)來向大家介紹ANSI-SQL和傳統(tǒng)數(shù)據(jù)庫以SQL Server為例如何支持LATERAL的,接下來我們看看Apache Flink對(duì)LATERAL的支持情況。

1. Calcite

Apache Flink 利用 Calcite進(jìn)行SQL的解析和優(yōu)化,目前Calcite完全支持LATERAL語法,示例如下:

  1. SELECT 
  2. e.NAME, e.DEPTNO, d.NAME 
  3. FROM EMPS e, LATERAL ( 
  4. SELECT 
  5. FORM DEPTS d 
  6. WHERE e.DEPTNO=d.DEPTNO 
  7. ) as d; 

查詢結(jié)果:

我使用的是Calcite官方自帶測(cè)試數(shù)據(jù)。

2. Flink

截止到Flink-1.6.2,Apache Flink 中有兩種場(chǎng)景使用LATERAL,如下:

  • UDTF(TVF) - User-defined Table Funciton
  • Temporal Table - 涉及內(nèi)容會(huì)在后續(xù)篇章單獨(dú)介紹。

本篇我們以在TVF(UDTF)為例說明 Apache Fink中如何支持LATERAL。

(1) UDTF

UDTF- User-defined Table Function是Apache Flink中三大用戶自定義函數(shù)(UDF,UDTF,UDAGG)之一。 自定義接口如下:

  • 基類
  1. /** 
  2. * Base class for all user-defined functions such as scalar functions, table functions, 
  3. * or aggregation functions. 
  4. */ 
  5. abstract class UserDefinedFunction extends Serializable { 
  6. // 關(guān)鍵是FunctionContext中提供了若干高級(jí)屬性(在UDX篇會(huì)詳細(xì)介紹) 
  7. def open(context: FunctionContext): Unit = {} 
  8. def close(): Unit = {} 
  • TableFunction
  1. /** 
  2. * Base class for a user-defined table function (UDTF). A user-defined table functions works on 
  3. * zero, one, or multiple scalar values as input and returns multiple rows as output. 
  4. * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation 
  5. * method. An evaluation method must be declared publicly, not static and named "eval". 
  6. * Evaluation methods can also be overloaded by implementing multiple methods named "eval". 
  7. * User-defined functions must have a default constructor and must be instantiable during runtime. 
  8. * By default the result type of an evaluation method is determined by Flink's type extraction 
  9. * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more 
  10. * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type 
  11. * can be manually defined by overriding [[getResultType()]]. 
  12. */ 
  13. abstract class TableFunction[T] extends UserDefinedFunction { 
  14.  
  15. // 對(duì)于泛型T,如果是基礎(chǔ)類型那么Flink框架可以自動(dòng)識(shí)別, 
  16. // 對(duì)于用戶自定義的復(fù)雜對(duì)象,需要用戶overwrite這個(gè)實(shí)現(xiàn)。 
  17. def getResultType: TypeInformation[T] = null 

上面定義的核心是要求用戶實(shí)現(xiàn)eval方法,我們寫一個(gè)具體示例。

  • 示例
  1. // 定義一個(gè)簡單的UDTF返回類型,對(duì)應(yīng)接口上的 T 
  2. case class SimpleUser(name: String, age: Int) 
  3. // 繼承TableFunction,并實(shí)現(xiàn)evale方法 
  4. // 核心功能是解析以#分割的字符串 
  5. class SplitTVF extends TableFunction[SimpleUser] { 
  6. // make sure input element's format is "<string>#<int>
  7. def eval(user: String): Unit = { 
  8. if (user.contains("#")) { 
  9. val splits = user.split("#") 
  10. collect(SimpleUser(splits(0), splits(1).toInt)) 
  11. }} 

(2) 示例(完整的ITCase):

  • 測(cè)試數(shù)據(jù)

我們構(gòu)造一個(gè)只包含一個(gè)data字段的用戶表,用戶表數(shù)據(jù)如下:

  • 查詢需求

查詢的需求是將data字段flatten成為name和age兩個(gè)字段的表,期望得到:

  • 查詢示例

我們以ITCase方式完成如上查詢需求,完整代碼如下:

  1. @Test 
  2. def testLateralTVF(): Unit = { 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5. env.setStateBackend(getStateBackend) 
  6. StreamITCase.clear 
  7.  
  8. val userData = new mutable.MutableList[(String)] 
  9. userData.+=(("Sunny#8")) 
  10. userData.+=(("Kevin#36")) 
  11. userData.+=(("Panpan#36")) 
  12.  
  13. val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)" 
  14.  
  15. val users = env.fromCollection(userData).toTable(tEnv, 'data) 
  16.  
  17. val tvf = new SplitTVF() 
  18. tEnv.registerTable("userTab", users) 
  19. tEnv.registerFunction("splitTVF", tvf) 
  20.  
  21. val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row] 
  22. result.addSink(new StreamITCase.StringSink[Row]) 
  23. env.execute() 
  24. StreamITCase.testResults.foreach(println(_)) 

運(yùn)行結(jié)果:

上面的核心語句是:

  1. val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)" 

如果大家想運(yùn)行上面的示例,請(qǐng)查閱《Apache Flink 漫談系列 - SQL概覽》中 源碼方式 搭建測(cè)試環(huán)境。

六、小結(jié)

本篇重點(diǎn)向大家介紹了一種新的JOIN類型 - JOIN LATERAL。并向大家介紹了SQL Server中對(duì)LATERAL的支持方式,詳細(xì)分析了JOIN LATERAL和INNER JOIN的區(qū)別與聯(lián)系,***切入到Apache Flink中,以UDTF示例說明了Apache Flink中對(duì)JOIN LATERAL的支持,后續(xù)篇章會(huì)介紹Apache Flink中另一種使用LATERAL的場(chǎng)景,就是Temporal JION,Temporal JION也是一種新的JOIN類型,我們下一篇再見!

關(guān)于點(diǎn)贊和評(píng)論

本系列文章難免有很多缺陷和不足,真誠希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】

戳這里,看該作者更多好文

責(zé)任編輯:趙寧寧 來源: 51CTO專欄
相關(guān)推薦

2022-07-13 12:53:59

數(shù)據(jù)存儲(chǔ)

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-06-10 17:26:07

數(shù)據(jù)集計(jì)算

2018-12-29 08:16:32

Apache FlinJOIN代碼

2018-10-09 10:55:52

Apache FlinWatermark流計(jì)算

2018-09-26 08:44:22

Apache Flin流計(jì)算計(jì)算模式

2018-09-26 07:50:52

Apache Flin流計(jì)算計(jì)算模式

2018-10-16 08:54:35

Apache Flin流計(jì)算State

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計(jì)算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-07-13 13:03:29

流計(jì)算亂序

2018-11-07 08:48:31

Apache Flin持續(xù)查詢流計(jì)算

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對(duì)偶duality

2020-04-09 11:08:30

PyFlinkJAR依賴

2022-06-20 05:52:27

FlinkTTL流查詢

2018-10-30 11:10:05

Flink數(shù)據(jù)集計(jì)算
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)