Flink SQL 知其所以然:Table 與 DataStream 的轉(zhuǎn)轉(zhuǎn)轉(zhuǎn)
1.序篇
廢話不多說,咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:
- 背景及應(yīng)用場景介紹:博主期望你能了解到,F(xiàn)link 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉(zhuǎn)的接口。通過這種互轉(zhuǎn)的方式,我們就可以將一些自定義的數(shù)據(jù)源(DataStream)創(chuàng)建為 SQL 表,也可以將 SQL 執(zhí)行結(jié)果轉(zhuǎn)換為 DataStream 然后后續(xù)去完成一些在 SQL 中實(shí)現(xiàn)不了的復(fù)雜操作。肥腸的方便。
- 目前只有流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務(wù)為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉(zhuǎn)的接口,TableEnvironment 沒有這樣的接口,因此目前流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持。但是 1.14 版本中流批任務(wù)的 env 都統(tǒng)一到了 StreamTableEnvironment 中,流批任務(wù)中就都可以進(jìn)行互轉(zhuǎn)了。
- Retract 語義 SQL 轉(zhuǎn) DataStream 需要重點(diǎn)注意:Append 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。
2.背景及應(yīng)用場景介紹
相信大家看到本文的標(biāo)題時,會比較好奇,要寫 SQL 就純 SQL 唄,要寫 DataStream 就純 DataStream 唄,為啥還要把這兩個接口做集成呢?
博主舉一個案例:在拼多多發(fā)優(yōu)惠券的場景下,為了控制成本,希望能在每日優(yōu)惠券發(fā)放金額加和超過 1w 時,及時報警出來,控制預(yù)算。
優(yōu)惠券表的發(fā)放數(shù)據(jù):
最終期望的結(jié)果是:每天的 money 之和超過 1w 的時候,報警報警報警!!!
那么針對上述場景,有兩種對應(yīng)的解決方案:
- 方案 1:可想而知,DataStream 是必然能夠解決我們的問題的。
- 方案 2:DataStream 開發(fā)效率不高,可以使用 SQL 計算優(yōu)惠券發(fā)放的結(jié)果,但是 SQL 無法做到報警。所以可以將 SQL 的查詢的結(jié)果(即 Table)轉(zhuǎn)為 DataStream,然后在 DataStream 后自定義報警邏輯的算子,超過閾值進(jìn)行報警。
本節(jié)就介紹方案 2 的實(shí)現(xiàn)思路。
注意:
當(dāng)然還有一些其他的比如模式識別監(jiān)控異常然后報警的場景使用 DataStream 去實(shí)現(xiàn)就更加復(fù)雜了,所以我們也可以使用類似的思路,先 SQL 實(shí)現(xiàn)業(yè)務(wù)邏輯,然后接一個 DataStream 算子實(shí)現(xiàn)報警邏輯。
3.Table 與 DataStream API 的轉(zhuǎn)換具體實(shí)現(xiàn)
3.1.先看一個官網(wǎng)的簡單案例
官網(wǎng)的案例主要是讓大家看看要做到 Table 與 DataStream API 的轉(zhuǎn)換會涉及到使用哪些接口。
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
- // 1. 使用 StreamTableEnvironment::fromDataStream API 將 DataStream 轉(zhuǎn)為 Table
- Table inputTable = tableEnv.fromDataStream(dataStream);
- // 將 Table 注冊為一個臨時表
- tableEnv.createTemporaryView("InputTable", inputTable);
- // 然后就可以在這個臨時表上做一些自定義的查詢了
- Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
- // 2. 也可以使用 StreamTableEnvironment::toDataStream 將 Table 轉(zhuǎn)為 DataStream
- // 注意:這里只能轉(zhuǎn)為 DataStream<Row>,其中的數(shù)據(jù)類型只能為 Row
- DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
- // 將 DataStream 結(jié)果打印到控制臺
- resultStream.print();
- env.execute();
- // prints:
- // +I[Alice]
- // +I[Bob]
- // +I[John]
可以看到重點(diǎn)的接口就是:
- StreamTableEnvironment::toDataStream:將 Table 轉(zhuǎn)為 DataStream
- StreamTableEnvironment::fromDataStream:將 DataStream 轉(zhuǎn)為 Table
3.2.實(shí)現(xiàn)第 2 節(jié)中的邏輯
我們使用上面介紹的兩個接口對優(yōu)惠券發(fā)放金額預(yù)警的案例做一個實(shí)現(xiàn)。
- @Slf4j
- public class AlertExample {
- public static void main(String[] args) throws Exception {
- FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
- String createTableSql = "CREATE TABLE source_table (\n"
- + " id BIGINT,\n"
- + " money BIGINT,\n"
- + " row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),\n"
- + " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
- + ") WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '1',\n"
- + " 'fields.id.min' = '1',\n"
- + " 'fields.id.max' = '100000',\n"
- + " 'fields.money.min' = '1',\n"
- + " 'fields.money.max' = '100000'\n"
- + ")\n";
- String querySql = "SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, \n"
- + " window_start, \n"
- + " sum(money) as sum_money,\n"
- + " count(distinct id) as count_distinct_id\n"
- + "FROM TABLE(CUMULATE(\n"
- + " TABLE source_table\n"
- + " , DESCRIPTOR(row_time)\n"
- + " , INTERVAL '5' SECOND\n"
- + " , INTERVAL '1' DAY))\n"
- + "GROUP BY window_start, \n"
- + " window_end";
- // 1. 創(chuàng)建數(shù)據(jù)源表,即優(yōu)惠券發(fā)放明細(xì)數(shù)據(jù)
- flinkEnv.streamTEnv().executeSql(createTableSql);
- // 2. 執(zhí)行 query 查詢,計算每日發(fā)放金額
- Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql);
- // 3. 報警邏輯(toDataStream 返回 Row 類型),如果 sum_money 超過 1w,報警
- flinkEnv.streamTEnv()
- .toDataStream(resultTable, Row.class)
- .flatMap(new FlatMapFunction<Row, Object>() {
- @Override
- public void flatMap(Row value, Collector<Object> out) throws Exception {
- long l = Long.parseLong(String.valueOf(value.getField("sum_money")));
- if (l > 10000L) {
- log.info("報警,超過 1w");
- }
- }
- });
- flinkEnv.env().execute();
- }
- }
執(zhí)行效果如下:
3.3.Table 和 DataStream 轉(zhuǎn)換注意事項(xiàng)
3.3.1.目前只支持流任務(wù)互轉(zhuǎn)(1.13)
目前在 1.13 版本中,F(xiàn)link 對于 Table 和 DataStream 的轉(zhuǎn)化是有一些限制的:
目前流任務(wù)使用的 env 為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,而 Table 和 DataStream 之間的轉(zhuǎn)換目前只有 StreamTableEnvironment 的接口支持。
所以其實(shí)小伙伴萌可以理解為只有流任務(wù)才支持 Table 和 DataStream 之間的轉(zhuǎn)換,批任務(wù)是不支持的(雖然可以使用流模式處理有界流(批數(shù)據(jù)),但效率較低,這種騷操作不建議大家搞)。
那什么時候才能支持批任務(wù)的 Table 和 DataStream 之間的轉(zhuǎn)換呢?
1.14 版本支持。1.14 版本中,流和批的都統(tǒng)一到了 StreamTableEnvironment 中,因此就可以做 Table 和 DataStream 的互相轉(zhuǎn)換了。
3.3.2.Retract 語義 SQL 轉(zhuǎn) DataStream 注意事項(xiàng)
Retract 語義的 SQL 使用 toDataStream 轉(zhuǎn)換會報錯不支持。具體報錯截圖如下。意思是不支持 update 類型的結(jié)果數(shù)據(jù)。
如果要把 Retract 語義的 SQL 轉(zhuǎn)為 DataStream,我們需要使用 toRetractStream。如下案例:
- @Slf4j
- public class AlertExampleRetract {
- public static void main(String[] args) throws Exception {
- FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
- String createTableSql = "CREATE TABLE source_table (\n"
- + " id BIGINT,\n"
- + " money BIGINT,\n"
- + " `time` as cast(CURRENT_TIMESTAMP as bigint) * 1000\n"
- + ") WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '1',\n"
- + " 'fields.id.min' = '1',\n"
- + " 'fields.id.max' = '100000',\n"
- + " 'fields.money.min' = '1',\n"
- + " 'fields.money.max' = '100000'\n"
- + ")\n";
- String querySql = "SELECT max(`time`), \n"
- + " sum(money) as sum_money\n"
- + "FROM source_table\n"
- + "GROUP BY (`time` + 8 * 3600 * 1000) / (24 * 3600 * 1000)";
- // 1. 創(chuàng)建數(shù)據(jù)源表,即優(yōu)惠券發(fā)放明細(xì)數(shù)據(jù)
- flinkEnv.streamTEnv().executeSql(createTableSql);
- // 2. 執(zhí)行 query 查詢,計算每日發(fā)放金額
- Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql);
- // 3. 報警邏輯(toRetractStream 返回 Tuple2<Boolean, Row> 類型),如果 sum_money 超過 1w,報警
- // Tuple2<Boolean, Row> f0 的 Boolean 標(biāo)識是否是回撤消息
- flinkEnv.streamTEnv()
- .toRetractStream(resultTable, Row.class)
- .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
- @Override
- public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
- long l = Long.parseLong(String.valueOf(value.f1.getField("sum_money")));
- if (l > 10000L) {
- log.info("報警,超過 1w");
- }
- }
- });
- flinkEnv.env().execute();
- }
- }
4.總結(jié)與展望
本文主要介紹了 flink 中 Table 和 DataStream 互轉(zhuǎn)使用方式,并介紹了一些使用注意事項(xiàng),總結(jié)如下:
- 背景及應(yīng)用場景介紹:博主期望你能了解到,F(xiàn)link 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉(zhuǎn)的接口。通過這種互轉(zhuǎn)的方式,我們就可以將一些自定義的數(shù)據(jù)源(DataStream)創(chuàng)建為 SQL 表,也可以將 SQL 執(zhí)行結(jié)果轉(zhuǎn)換為 DataStream 然后后續(xù)去完成一些在 SQL 中實(shí)現(xiàn)不了的復(fù)雜操作。肥腸的方便。
- 目前只有流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務(wù)為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉(zhuǎn)的接口,TableEnvironment 沒有這樣的接口,因此目前流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持。但是 1.14 版本中流批任務(wù)的 env 都統(tǒng)一到了 StreamTableEnvironment 中,流批任務(wù)中就都可以進(jìn)行互轉(zhuǎn)了。
- Retract 語義 SQL 轉(zhuǎn) DataStream 需要重點(diǎn)注意:Append 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。