自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN

開發(fā) 開發(fā)工具
本篇將介紹在UnBounded數(shù)據(jù)流上按時間維度進行數(shù)據(jù)劃分進行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

一、說什么

JOIN 算子是數(shù)據(jù)處理的核心算子,前面我們在《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL》介紹了單流與UDTF的JOIN操作,在《Apache Flink 漫談系列(11) - Temporal Table JOIN》又介紹了單流與版本表的JOIN,本篇將介紹在UnBounded數(shù)據(jù)流上按時間維度進行數(shù)據(jù)劃分進行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

二、實際問題

前面章節(jié)我們介紹了Flink中對各種JOIN的支持,那么想想下面的查詢需求之前介紹的JOIN能否滿足?需求描述如下:

比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計下單一小時內(nèi)付款的訂單信息。

1. 傳統(tǒng)數(shù)據(jù)庫解決方式

在傳統(tǒng)劉數(shù)據(jù)庫中完成上面的需求非常簡單,查詢sql如下::

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒 

上面查詢可以***的完成查詢需求,那么在Apache Flink里面應(yīng)該如何完成上面的需求呢?

2. Apache Flink解決方式

(1) UnBounded 雙流 JOIN

上面查詢需求我們很容易想到利用《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,SQL語句如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime) 

UnBounded雙流JOIN可以解決上面問題,這個示例和本篇要介紹的Interval JOIN有什么關(guān)系呢?

(2) 性能問題

雖然我們利用UnBounded的JOIN能解決上面的問題,但是仔細分析用戶需求,會發(fā)現(xiàn)這個需求場景訂單信息和付款信息并不需要長期存儲,比如2018-12-27 14:22:22的訂單只需要保持1小時,因為超過1個小時的訂單如果沒有被付款就是無效訂單了。同樣付款信息也不需要長期保持,2018-12-27 14:22:22的訂單付款信息如果是2018-12-27 15:22:22以后到達的那么我們也沒有必要保存到State中。 而對于UnBounded的雙流JOIN我們會一直將數(shù)據(jù)保存到State中,如下示意圖:

這樣的底層實現(xiàn),對于當(dāng)前需求有不必要的性能損失。所以我們有必要開發(fā)一種新的可以清除State的JOIN方式(Interval JOIN)來高性能的完成上面的查詢需求。

(3) 功能擴展

目前的UnBounded的雙流JOIN是后面是沒有辦法再進行Event-Time的Window Aggregate的。也就是下面的語句在Apache Flink上面是無法支持的:

  1. SELECT COUNT(*) FROM ( 
  2. SELECT 
  3. ..., 
  4. payTime 
  5. FROM Orders AS o JOIN Payment AS p ON 
  6. o.orderId = p.orderId 
  7. ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE) 

因為在UnBounded的雙流JOIN中無法保證payTime的值一定大于WaterMark(WaterMark相關(guān)可以查閱<>). Apache Flink的Interval JOIN之后可以進行Event-Time的Window Aggregate。

3. Interval JOIN

為了完成上面需求,并且解決性能和功能擴展的問題,Apache Flink在1.4開始開發(fā)了Time-windowed Join,也就是本文所說的Interval JOIN。接下來我們詳細介紹Interval JOIN的語法,語義和實現(xiàn)原理。

三、什么是Interval JOIN

Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會與另一條流上的不同時間區(qū)域的數(shù)據(jù)進行JOIN。對應(yīng)Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

1. Interval JOIN 語法

  1. SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION 

TIMEBOUND_EXPRESSION 有兩種寫法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 帶有時間屬性(L.time/R.time)的比較表達式。

2. Interval JOIN 語義

Interval JOIN 的語義就是每條數(shù)據(jù)對應(yīng)一個 Interval 的數(shù)據(jù)區(qū)間,比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計在下單一小時內(nèi)付款的訂單信息。SQL查詢?nèi)缦?

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. cast(payTime as timestamp) as payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND 
  10. p.payTime BETWEEN orderTime AND 
  11. orderTime + INTERVAL '1' HOUR 
  • Orders訂單數(shù)據(jù)

  • Payment付款數(shù)據(jù)

符合語義的預(yù)期結(jié)果是 訂單id為003的信息不出現(xiàn)在結(jié)果表中,因為下單時間2018-12-26 04:53:24.0, 付款時間是 2018-12-26 05:53:30.0超過了1小時付款。

那么預(yù)期的結(jié)果信息如下:

這樣Id為003的訂單是無效訂單,可以更新庫存繼續(xù)售賣。

接下來我們以圖示的方式直觀說明Interval JOIN的語義,我們對上面的示例需求稍微變化一下: 訂單可以預(yù)付款(不管是否合理,我們只是為了說明語義)也就是訂單 前后 1小時的付款都是有效的。SQL語句如下:

  1. SELECT 
  2. ... 
  3. FROM 
  4. Orders AS o JOIN Payment AS p ON 
  5. o.orderId = p.orderId AND 
  6. p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND 
  7. orderTime + INTERVAL '1' HOUR 

這樣的查詢語義示意圖如下:

上圖有幾個關(guān)鍵點,如下:

  • 數(shù)據(jù)JOIN的區(qū)間 - 比如Order時間為3的訂單會在付款時間為[2, 4]區(qū)間進行JOIN。
  • WaterMark - 比如圖示Order***一條數(shù)據(jù)時間是3,Payment***一條數(shù)據(jù)時間是5,那么WaterMark是根據(jù)實際最小值減去UpperBound生成,即:Min(3,5)-1 = 2
  • 過期數(shù)據(jù) - 出于性能和存儲的考慮,要將過期數(shù)據(jù)清除,如圖當(dāng)WaterMark是2的時候時間為2以前的數(shù)據(jù)過期了,可以被清除。

3. Interval JOIN 實現(xiàn)原理

由于Interval JOIN和雙流JOIN類似都要存儲左右兩邊的數(shù)據(jù),所以底層實現(xiàn)中仍然是利用State進行數(shù)據(jù)的存儲。流計算的特點是數(shù)據(jù)不停的流入,我們可以不停的進行增量計算,也就是我們每條數(shù)據(jù)流入都可以進行JOIN計算。我們還是以具體示例和圖示來說明內(nèi)部計算邏輯,如下圖:

簡單解釋一下每條記錄的處理邏輯如下:

實際的內(nèi)部邏輯會比描述的復(fù)雜的多,大家可以根據(jù)如上簡述理解內(nèi)部原理即可。

四、示例代碼

我們還是以訂單和付款示例,將完整代碼分享給大家,具體如下(代碼基于flink-1.7.0):

  1. import java.sql.Timestamp 
  2.  
  3. import org.apache.flink.api.scala._ 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8. import org.apache.flink.table.api.TableEnvironment 
  9. import org.apache.flink.table.api.scala._ 
  10. import org.apache.flink.types.Row 
  11.  
  12. import scala.collection.mutable 
  13.  
  14. object SimpleTimeIntervalJoin { 
  15. def main(args: Array[String]): Unit = { 
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17. val tEnv = TableEnvironment.getTableEnvironment(env) 
  18. env.setParallelism(1) 
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  20. // 構(gòu)造訂單數(shù)據(jù) 
  21. val ordersData = new mutable.MutableList[(String, String, Timestamp)] 
  22. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) 
  23. ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) 
  24. ordersData.+=(("003", "book", new Timestamp(1545800004000L))) 
  25. ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) 
  26.  
  27. // 構(gòu)造付款表 
  28. val paymentData = new mutable.MutableList[(String, String, Timestamp)] 
  29. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) 
  30. paymentData.+=(("002", "card", new Timestamp(1545803602000L))) 
  31. paymentData.+=(("003", "card", new Timestamp(1545803610000L))) 
  32. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) 
  33. val orders = env 
  34. .fromCollection(ordersData) 
  35. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  36. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) 
  37. val ratesHistory = env 
  38. .fromCollection(paymentData) 
  39. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  40. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) 
  41.  
  42. tEnv.registerTable("Orders", orders) 
  43. tEnv.registerTable("Payment", ratesHistory) 
  44.  
  45. var sqlQuery = 
  46. ""
  47. |SELECT 
  48. | o.orderId, 
  49. | o.productName, 
  50. | p.payType, 
  51. | o.orderTime, 
  52. | cast(payTime as timestamp) as payTime 
  53. |FROM 
  54. | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND 
  55. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR 
  56. |""".stripMargin 
  57. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) 
  58.  
  59. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  60. result.print() 
  61. env.execute() 
  62.  
  63.  
  64. class TimestampExtractor[T1, T2] 
  65. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  66. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  67. element._3.getTime 

運行結(jié)果如下:

五、小節(jié)

本篇由實際業(yè)務(wù)需求場景切入,介紹了相同業(yè)務(wù)需求既可以利用Unbounded 雙流JOIN實現(xiàn),也可以利用Time Interval JOIN來實現(xiàn),Time Interval JOIN 性能優(yōu)于UnBounded的雙流JOIN,并且Interval JOIN之后可以進行Window Aggregate算子計算。然后介紹了Interval JOIN的語法,語義和實現(xiàn)原理,***將訂單和付款的完整示例代碼分享給大家。期望本篇能夠讓大家對Apache Flink Time Interval JOIN有一個具體的了解!

關(guān)于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設(shè)計研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】

戳這里,看該作者更多好文

責(zé)任編輯:趙寧寧 來源: 51CTO專欄
相關(guān)推薦

2022-07-13 12:53:59

數(shù)據(jù)存儲

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-06-10 17:26:07

數(shù)據(jù)集計算

2018-10-09 10:55:52

Apache FlinWatermark流計算

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-07-13 13:03:29

流計算亂序

2018-11-07 08:48:31

Apache Flin持續(xù)查詢流計算

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2022-07-12 11:01:03

數(shù)據(jù)庫

2022-08-31 14:49:05

IoTDBIoTDatabase

2025-04-25 10:28:40

點贊
收藏

51CTO技術(shù)棧公眾號