Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN
一、說什么
JOIN 算子是數(shù)據(jù)處理的核心算子,前面我們在《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL》介紹了單流與UDTF的JOIN操作,在《Apache Flink 漫談系列(11) - Temporal Table JOIN》又介紹了單流與版本表的JOIN,本篇將介紹在UnBounded數(shù)據(jù)流上按時間維度進行數(shù)據(jù)劃分進行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。
二、實際問題
前面章節(jié)我們介紹了Flink中對各種JOIN的支持,那么想想下面的查詢需求之前介紹的JOIN能否滿足?需求描述如下:
比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計下單一小時內(nèi)付款的訂單信息。
1. 傳統(tǒng)數(shù)據(jù)庫解決方式
在傳統(tǒng)劉數(shù)據(jù)庫中完成上面的需求非常簡單,查詢sql如下::
- SELECT
- o.orderId,
- o.productName,
- p.payType,
- o.orderTime,
- payTime
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒
上面查詢可以***的完成查詢需求,那么在Apache Flink里面應(yīng)該如何完成上面的需求呢?
2. Apache Flink解決方式
(1) UnBounded 雙流 JOIN
上面查詢需求我們很容易想到利用《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,SQL語句如下:
- SELECT
- o.orderId,
- o.productName,
- p.payType,
- o.orderTime,
- payTime
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime)
UnBounded雙流JOIN可以解決上面問題,這個示例和本篇要介紹的Interval JOIN有什么關(guān)系呢?
(2) 性能問題
雖然我們利用UnBounded的JOIN能解決上面的問題,但是仔細分析用戶需求,會發(fā)現(xiàn)這個需求場景訂單信息和付款信息并不需要長期存儲,比如2018-12-27 14:22:22的訂單只需要保持1小時,因為超過1個小時的訂單如果沒有被付款就是無效訂單了。同樣付款信息也不需要長期保持,2018-12-27 14:22:22的訂單付款信息如果是2018-12-27 15:22:22以后到達的那么我們也沒有必要保存到State中。 而對于UnBounded的雙流JOIN我們會一直將數(shù)據(jù)保存到State中,如下示意圖:
這樣的底層實現(xiàn),對于當(dāng)前需求有不必要的性能損失。所以我們有必要開發(fā)一種新的可以清除State的JOIN方式(Interval JOIN)來高性能的完成上面的查詢需求。
(3) 功能擴展
目前的UnBounded的雙流JOIN是后面是沒有辦法再進行Event-Time的Window Aggregate的。也就是下面的語句在Apache Flink上面是無法支持的:
- SELECT COUNT(*) FROM (
- SELECT
- ...,
- payTime
- FROM Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId
- ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE)
因為在UnBounded的雙流JOIN中無法保證payTime的值一定大于WaterMark(WaterMark相關(guān)可以查閱<
3. Interval JOIN
為了完成上面需求,并且解決性能和功能擴展的問題,Apache Flink在1.4開始開發(fā)了Time-windowed Join,也就是本文所說的Interval JOIN。接下來我們詳細介紹Interval JOIN的語法,語義和實現(xiàn)原理。
三、什么是Interval JOIN
Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會與另一條流上的不同時間區(qū)域的數(shù)據(jù)進行JOIN。對應(yīng)Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。
1. Interval JOIN 語法
- SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION
TIMEBOUND_EXPRESSION 有兩種寫法,如下:
- L.time between LowerBound(R.time) and UpperBound(R.time)
- R.time between LowerBound(L.time) and UpperBound(L.time)
- 帶有時間屬性(L.time/R.time)的比較表達式。
2. Interval JOIN 語義
Interval JOIN 的語義就是每條數(shù)據(jù)對應(yīng)一個 Interval 的數(shù)據(jù)區(qū)間,比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計在下單一小時內(nèi)付款的訂單信息。SQL查詢?nèi)缦?
- SELECT
- o.orderId,
- o.productName,
- p.payType,
- o.orderTime,
- cast(payTime as timestamp) as payTime
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND
- p.payTime BETWEEN orderTime AND
- orderTime + INTERVAL '1' HOUR
- Orders訂單數(shù)據(jù)
- Payment付款數(shù)據(jù)
符合語義的預(yù)期結(jié)果是 訂單id為003的信息不出現(xiàn)在結(jié)果表中,因為下單時間2018-12-26 04:53:24.0, 付款時間是 2018-12-26 05:53:30.0超過了1小時付款。
那么預(yù)期的結(jié)果信息如下:
這樣Id為003的訂單是無效訂單,可以更新庫存繼續(xù)售賣。
接下來我們以圖示的方式直觀說明Interval JOIN的語義,我們對上面的示例需求稍微變化一下: 訂單可以預(yù)付款(不管是否合理,我們只是為了說明語義)也就是訂單 前后 1小時的付款都是有效的。SQL語句如下:
- SELECT
- ...
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND
- p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
- orderTime + INTERVAL '1' HOUR
這樣的查詢語義示意圖如下:
上圖有幾個關(guān)鍵點,如下:
- 數(shù)據(jù)JOIN的區(qū)間 - 比如Order時間為3的訂單會在付款時間為[2, 4]區(qū)間進行JOIN。
- WaterMark - 比如圖示Order***一條數(shù)據(jù)時間是3,Payment***一條數(shù)據(jù)時間是5,那么WaterMark是根據(jù)實際最小值減去UpperBound生成,即:Min(3,5)-1 = 2
- 過期數(shù)據(jù) - 出于性能和存儲的考慮,要將過期數(shù)據(jù)清除,如圖當(dāng)WaterMark是2的時候時間為2以前的數(shù)據(jù)過期了,可以被清除。
3. Interval JOIN 實現(xiàn)原理
由于Interval JOIN和雙流JOIN類似都要存儲左右兩邊的數(shù)據(jù),所以底層實現(xiàn)中仍然是利用State進行數(shù)據(jù)的存儲。流計算的特點是數(shù)據(jù)不停的流入,我們可以不停的進行增量計算,也就是我們每條數(shù)據(jù)流入都可以進行JOIN計算。我們還是以具體示例和圖示來說明內(nèi)部計算邏輯,如下圖:
簡單解釋一下每條記錄的處理邏輯如下:
實際的內(nèi)部邏輯會比描述的復(fù)雜的多,大家可以根據(jù)如上簡述理解內(nèi)部原理即可。
四、示例代碼
我們還是以訂單和付款示例,將完整代碼分享給大家,具體如下(代碼基于flink-1.7.0):
- import java.sql.Timestamp
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- import scala.collection.mutable
- object SimpleTimeIntervalJoin {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 構(gòu)造訂單數(shù)據(jù)
- val ordersData = new mutable.MutableList[(String, String, Timestamp)]
- ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
- ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
- ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
- ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))
- // 構(gòu)造付款表
- val paymentData = new mutable.MutableList[(String, String, Timestamp)]
- paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
- paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
- paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
- paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
- val orders = env
- .fromCollection(ordersData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
- val ratesHistory = env
- .fromCollection(paymentData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
- tEnv.registerTable("Orders", orders)
- tEnv.registerTable("Payment", ratesHistory)
- var sqlQuery =
- """
- |SELECT
- | o.orderId,
- | o.productName,
- | p.payType,
- | o.orderTime,
- | cast(payTime as timestamp) as payTime
- |FROM
- | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND
- | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
- |""".stripMargin
- tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
- val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
- result.print()
- env.execute()
- }
- }
- class TimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
運行結(jié)果如下:
五、小節(jié)
本篇由實際業(yè)務(wù)需求場景切入,介紹了相同業(yè)務(wù)需求既可以利用Unbounded 雙流JOIN實現(xiàn),也可以利用Time Interval JOIN來實現(xiàn),Time Interval JOIN 性能優(yōu)于UnBounded的雙流JOIN,并且Interval JOIN之后可以進行Window Aggregate算子計算。然后介紹了Interval JOIN的語法,語義和實現(xiàn)原理,***將訂單和付款的完整示例代碼分享給大家。期望本篇能夠讓大家對Apache Flink Time Interval JOIN有一個具體的了解!
關(guān)于點贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設(shè)計研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】