Apache Flink 漫談系列(11) - Temporal Table JOIN
一、什么是Temporal Table
在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什么是Temporal Table JOIN。
在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數(shù)據(jù)庫廠商也先后實現(xiàn)了這個標準。Temporal Table記錄了歷史上任何時間點所有的數(shù)據(jù)改動,Temporal Table的工作流程如下:
上圖示意Temporal Table具有普通table的特性,有具體獨特的DDL/DML/QUERY語法,時間是其核心屬性。歷史意味著時間,意味著快照Snapshot。
二、ANSI-SQL 2011 Temporal Table示例
我們以一個DDL和一套DML示例說明Temporal Table的原理,DDL定義PK是可選的,下面的示例我們以不定義PK的為例進行說明:
1. DDL 示例
- CREATE TABLE Emp
- ENo INTEGER,
- Sys_Start TIMESTAMP(12) GENERATED
- ALWAYS AS ROW Start,
- Sys_end TIMESTAMP(12) GENERATED
- ALWAYS AS ROW END,
- EName VARCHAR(30),
- PERIOD FOR SYSTEM_TIME (Sys_Start,Sys_end)
- ) WITH SYSTEM VERSIONING
2. DML 示例
(1) INSERT
- INSERT INTO Emp (ENo, EName) VALUES (22217, 'Joe')
說明: 其中Sys_Start和Sys_End是數(shù)據(jù)庫系統(tǒng)默認填充的。
(2) UPDATE
- UPDATE Emp SET EName = 'Tom' WHERE ENo = 22217
說明: 假設是在 2012-02-03 10:00:00 執(zhí)行的UPDATE,執(zhí)行之后上一個值"Joe"的Sys_End值由9999-12-31 23:59:59 變成了 2012-02-03 10:00:00, 也就是下一個值"Tom"生效的開始時間??梢娢覀儓?zhí)行的是UPDATE但是數(shù)據(jù)庫里面會存在兩條數(shù)據(jù),數(shù)據(jù)值和有效期不同,也就是版本不同。
(3) DELETE (假設執(zhí)行DELETE之前的表內容如下)
- DELETE FROM Emp WHERE ENo = 22217
說明: 假設我們是在2012-06-01 00:00:00執(zhí)行的DELETE,則Sys_End值由9999-12-31 23:59:59 變成了 2012-06-01 00:00:00, 也就是在執(zhí)行DELETE時候沒有真正的刪除符合條件的行,而是系統(tǒng)將符合條件的行的Sys_end修改為執(zhí)行DELETE的操作時間。標識數(shù)據(jù)的有效期到DELETE執(zhí)行那一刻為止。
(4) SELECT
- SELECT ENo,EName,Sys_Start,Sys_End FROM Emp
- FOR SYSTEM_TIME AS OF TIMESTAMP '2011-01-02 00:00:00'
說明: 這個查詢會返回所有Sys_Start <= 2011-01-02 00:00:00 并且 Sys_end > 2011-01-02 00:00:00 的記錄。
三、SQLServer Temporal Table 示例
1. DDL
- CREATE TABLE Department
- (
- DeptID int NOT NULL PRIMARY KEY CLUSTERED
- , DeptName varchar(50) NOT NULL
- , ManagerID INT NULL
- , ParentDeptID int NULL
- , SysStartTime datetime2 GENERATED ALWAYS AS ROW Start NOT NULL
- , SysEndTime datetime2 GENERATED ALWAYS AS ROW END NOT NULL
- , PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime)
- )
- WITH (SYSTEM_VERSIONING = ON);
執(zhí)行上面的語句,在數(shù)據(jù)庫會創(chuàng)建當前表和歷史表,如下圖:
Department 顯示是有版本控制的,歷史表是默認的名字,我也可以指定名字如:SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory)。
2. DML
(1) INSERT - 插入列不包含SysStartTime和SysEndTime列
- INSERT INTO [dbo].[Department] ([DeptID] ,[DeptName] ,[ManagerID] ,[ParentDeptID])
- VALUES(10, 'Marketing', 101, 1);
VALUES(10, 'Marketing', 101, 1);
執(zhí)行之后我們分別查詢當前表和歷史表,如下圖:
我們***條INSERT語句數(shù)據(jù)值的有效時間是操作那一刻2018-06-06 05:50:20.7913985 到永遠 9999-12-31 23:59:59.9999999,但這時刻歷史表還沒有任何信息。我們接下來進行更新操作。
(2) UPDATE
- UPDATE [dbo].[Department] SET [ManagerID] = 501 WHERE [DeptID] = 10
執(zhí)行之后當前表信息會更新并在歷史表里面產(chǎn)生一條歷史信息,如下:
注意當前表的SysStartTime意見發(fā)生了變化,歷史表產(chǎn)生了一條記錄,SyStartTIme是原當前表記錄的SysStartTime,SysEndTime是當前表記錄的SystemStartTime。我們再更新一次:
- UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10
到這里我們了解到SQLServer里面關于Temporal Table的邏輯是有當前表和歷史表來存儲數(shù)據(jù),并且數(shù)據(jù)庫內部以StartTime和EndTime的方式管理數(shù)據(jù)的版本。
(3) SELECT
- SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime]
- FROM [dbo].[Department]
- FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ;
SELECT語句查詢的是Department的表,實際返回的數(shù)據(jù)是從歷史表里面查詢出來的,查詢的底層邏輯就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。
四、Apache Flink Temporal Table
我們不止一次的提到Apache Flink遵循ANSI-SQL標準,Apache Flink中Temporal Table的概念也源于ANSI-2011的標準語義,但目前的實現(xiàn)在語法層面和ANSI-SQL略有差別,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的語法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的語法。這一點后續(xù)需要推動社區(qū)進行改進。
1. 為啥需要 Temporal Table
我們以具體的查詢示例來說明為啥需要Temporal Table,假設我們有一張實時變化的匯率表(RatesHistory),如下:
RatesHistory代表了Yen匯率(Yen匯率為1),是不斷變化的Append only的匯率表。例如,Euro兌Yen匯率從09:00至10:45的匯率為114。從10點45分到11點15分是116。
假設我們想在10:58輸出所有當前匯率,我們需要以下SQL查詢來計算結果表:
- SELECT *
- FROM RatesHistory AS r
- WHERE r.rowtime = (
- SELECT MAX(rowtime)
- FROM RatesHistory AS r2
- WHERE rr2.currency = r.currency
- AND r2.rowtime <= '10:58');
相應Flink代碼如下:
- 定義數(shù)據(jù)源-genRatesHistorySource
- def genRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "rowtime ,currency ,rate",
- "09:00:00 ,US Dollar , 102",
- "09:00:00 ,Euro , 114",
- "09:00:00 ,Yen , 1",
- "10:45:00 ,Euro , 116",
- "11:15:00 ,Euro , 119",
- "11:49:00 ,Pounds , 108"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("rowtime","currency","rate"),
- Array(
- Types.STRING,Types.STRING,Types.STRING
- ),
- fieldDelim = ",",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def writeToTempFile(
- contents: String,
- filePrefix: String,
- fileSuffix: String,
- charset: String = "UTF-8"): String = {
- val tempFile = File.createTempFile(filePrefix, fileSuffix)
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
- tmpWriter.write(contents)
- tmpWriter.close()
- tempFile.getAbsolutePath}
- def main(args: Array[String]): Unit = {
- // Streaming 環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- //方便我們查出輸出數(shù)據(jù)
- env.setParallelism(1)
- val sourceTableName = "RatesHistory"
- // 創(chuàng)建CSV source數(shù)據(jù)結構
- val tableSource = CsvTableSourceUtils.genRatesHistorySource
- // 注冊source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注冊retract sink
- val sinkTableName = "retractSink"
- val fieldNames = Array("rowtime", "currency", "rate")
- val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING)
- tEnv.registerTableSink(
- sinkTableName,
- fieldNames,
- fieldTypes,
- new MemoryRetractSink)
- val SQL =
- """
- |SELECT *
- |FROM RatesHistory AS r
- |WHERE r.rowtime = (
- | SELECT MAX(rowtime)
- | FROM RatesHistory AS r2
- | WHERE rr2.currency = r.currency
- | AND r2.rowtime <= '10:58:00' )
- """.stripMargin
- // 執(zhí)行查詢
- val result = tEnv.SQLQuery(SQL)
- // 將結果插入sink
- result.insertInto(sinkTableName)
- env.execute()
- }
結果表格化一下:
Temporal Table的概念旨在簡化此類查詢,加速它們的執(zhí)行。Temporal Table是Append Only表上的參數(shù)化視圖,它把Append Only的表變化解釋為表的Changelog,并在特定時間點提供該表的版本(時間版本)。將Applend Only表解釋為changelog需要指定主鍵屬性和時間戳屬性。主鍵確定覆蓋哪些行,時間戳確定行有效的時間,也就是數(shù)據(jù)版本,與上面SQL Server示例的有效期的概念一致。
在上面的示例中,currency是RatesHistory表的主鍵,而rowtime是timestamp屬性。
2. 如何定義Temporal Table
在Apache Flink中擴展了TableFunction的接口,在TableFunction接口的基礎上添加了時間屬性和pk屬性。
(1) 內部TemporalTableFunction定義如下:
- class TemporalTableFunction private(
- @transient private val underlyingHistoryTable: Table,
- // 時間屬性,相當于版本信息
- private val timeAttribute: Expression,
- // 主鍵定義
- private val primaryKey: String,
- private val resultType: RowTypeInfo)
- extends TableFunction[Row] {
- ...}
(2) 用戶創(chuàng)建TemporalTableFunction方式
在Table中添加了createTemporalTableFunction方法,該方法需要傳入時間屬性和主鍵,接口定義如下:
- // Creates TemporalTableFunction backed up by this table as a history table.
- def createTemporalTableFunction(
- timeAttribute: Expression,
- primaryKey: Expression): TemporalTableFunction = {
- ...}
用戶通過如下方式調用就可以得到一個TemporalTableFunction的實例,代碼如下:
- val tab = ...
- val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk)
- ...
3. 案例代碼
(1) 需求描述
假設我們有一張訂單表Orders和一張匯率表Rates,那么訂單來自于不同的地區(qū),所以支付的幣種各不一樣,那么假設需要統(tǒng)計每個訂單在下單時候Yen幣種對應的金額。
(2) Orders 數(shù)據(jù)
(3) Rates 數(shù)據(jù)
(4) 統(tǒng)計需求對應的SQL
- SELECT o.currency, o.amount, r.rate
- o.amount * r.rate AS yen_amount
- FROM
- Orders AS o,
- LATERAL TABLE (Rates(o.rowtime)) AS r
- WHERE r.currency = o.currency
(5) 預期結果
4. Without connnector 實現(xiàn)代碼
- object TemporalTableJoinTest {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- // 設置時間類型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 構造訂單數(shù)據(jù)
- val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
- ordersData.+=((2L, "Euro", new Timestamp(2L)))
- ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
- ordersData.+=((50L, "Yen", new Timestamp(4L)))
- ordersData.+=((3L, "Euro", new Timestamp(5L)))
- //構造匯率數(shù)據(jù)
- val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
- ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
- ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
- ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
- ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
- ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
- // 進行訂單表 event-time 的提取
- val orders = env
- .fromCollection(ordersData)
- .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]())
- .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
- // 進行匯率表 event-time 的提取
- val ratesHistory = env
- .fromCollection(ratesHistoryData)
- .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]())
- .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
- // 注冊訂單表和匯率表
- tEnv.registerTable("Orders", orders)
- tEnv.registerTable("RatesHistory", ratesHistory)
- val tab = tEnv.scan("RatesHistory");
- // 創(chuàng)建TemporalTableFunction
- val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
- //注冊TemporalTableFunction
- tEnv.registerFunction("Rates",temporalTableFunction)
- val SQLQuery =
- """
- |SELECT o.currency, o.amount, r.rate,
- | o.amount * r.rate AS yen_amount
- |FROM
- | Orders AS o,
- | LATERAL TABLE (Rates(o.rowtime)) AS r
- |WHERE r.currency = o.currency
- |""".stripMargin
- tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))
- val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
- // 打印查詢結果
- result.print()
- env.execute()
- }
- }
在運行上面代碼之前需要注意上面代碼中對EventTime時間提取的過程,也就是說Apache Flink的TimeCharacteristic.EventTime 模式,需要調用assignTimestampsAndWatermarks方法設置EventTime的生成方式,這種方式也非常靈活,用戶可以控制業(yè)務數(shù)據(jù)的EventTime的值和WaterMark的產(chǎn)生,WaterMark相關內容可以查閱《Apache Flink 漫談系列(03) - Watermark》。 在本示例中提取EventTime的完整代碼如下:
- import java.SQL.Timestamp
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.windowing.time.Time
- class OrderTimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
查看運行結果:
5. With CSVConnector 實現(xiàn)代碼
在實際的生產(chǎn)開發(fā)中,都需要實際的Connector的定義,下面我們以CSV格式的Connector定義來開發(fā)Temporal Table JOIN Demo。
(1) genEventRatesHistorySource
- def genEventRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )}
(2) genRatesOrderSource
- def genRatesOrderSource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
(3) 主程序代碼
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.flink.book.connectors
- import java.io.File
- import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
- import org.apache.flink.book.utils.{CommonUtils, FileUtils}
- import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
- import org.apache.flink.table.sources.CsvTableSource
- import org.apache.flink.types.Row
- object CsvTableSourceUtils {
- def genWordCountSource: CsvTableSource = {
- val csvRecords = Seq(
- "words",
- "Hello Flink",
- "Hi, Apache Flink",
- "Apache FlinkBook"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("words"),
- Array(
- Types.STRING
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "rowtime ,currency ,rate",
- "09:00:00 ,US Dollar , 102",
- "09:00:00 ,Euro , 114",
- "09:00:00 ,Yen , 1",
- "10:45:00 ,Euro , 116",
- "11:15:00 ,Euro , 119",
- "11:49:00 ,Pounds , 108"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("rowtime","currency","rate"),
- Array(
- Types.STRING,Types.STRING,Types.STRING
- ),
- fieldDelim = ",",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genEventRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genRatesOrderSource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 測試數(shù)據(jù)寫入臨時文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- /**
- * Example:
- * genCsvSink(
- * Array[String]("word", "count"),
- * Array[TypeInformation[_] ](Types.STRING, Types.LONG))
- */
- def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- if (tempFile.exists()) {
- tempFile.delete()
- }
- new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)
- }
- }
運行結果如下 :
6. 內部實現(xiàn)原理
我們還是以訂單和匯率關系示例來說明Apache Flink內部實現(xiàn)Temporal Table JOIN的原理,如下圖所示:
五、Temporal Table JOIN vs 雙流JOIN vs Lateral JOIN
在《Apache Flink 漫談系列(09) - JOIN算子》中我們介紹了雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL 》中我們介紹了 JOIN LATERAL(TableFunction),那么本篇介紹的Temporal Table JOIN和雙流JOIN/JOIN LATERAL(TableFunction)有什么本質區(qū)別呢?
- 雙流JOIN - 雙流JOIN本質很明確是 Stream JOIN Stream,雙流驅動。
- LATERAL JOIN - Lateral JOIN的本質是Steam JOIN Table Function, 是單流驅動。
- Temporal Table JOIN - Temporal Table JOIN 的本質就是 Stream JOIN Temporal Table 或者 Stream JOIN Table with snapshot。Temporal Table JOIN 特點單流驅動,Temporal Table 是被動查詢。
1. Temporal Table JOIN vs LATERAL JOIN
從功能上說Temporal Table JOIN和 LATERAL JOIN都是由左流一條數(shù)據(jù)獲取多行數(shù)據(jù),也就是單流驅動,并且都是被動查詢,那么Temporal JOIN和LATERAL JOIN最本質的區(qū)別是什么呢?這里我們說最關鍵的一點是 State 的管理,LATERAL JOIN是一個TableFunction,不具備state的管理能力,數(shù)據(jù)不具備版本特性。而Temporal Table JOIN是一個具備版本信息的數(shù)據(jù)表。
2. Temporal Table JOIN vs 雙流 JOIN
Temporal Table JOIN 和 雙流 JOIN都可以管理State,那么他們的本質區(qū)別是什么? 那就是計算驅動的差別,Temporal Table JOIN是單邊驅動,Temporal Table是被動的查詢,而雙流JOIN是雙邊驅動,兩邊都是主動的進行JOIN計算。
3. Temporal Table JOIN改進
個人認為Apache Flink的Temporal Table JOIN功能不論在語法和語義上面都要遵循ANSI-SQL標準,后期會推動社區(qū)在Temporal Table上面支持ANSI-SQL的FOR SYSTEM_TIME AS OF標準語法。改進后的處理邏輯示意圖:
其中cache是一種性能考慮的優(yōu)化,詳細內容待社區(qū)完善后再細述。
六、小結
本篇結合ANSI-SQL標準和SQL Server對Temporal Table的支持來開篇,然后介紹目前Apache Flink對Temporal Table的支持現(xiàn)狀,以代碼示例和內部處理邏輯示意圖的方式讓大家直觀體驗Temporal Table JOIN的語法和語義。
關于點贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉載請聯(lián)系原作者】