自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 知其所以然:Table 與 DataStream 的轉(zhuǎn)轉(zhuǎn)轉(zhuǎn)

數(shù)據(jù)庫 其他數(shù)據(jù)庫
相信大家看到本文的標(biāo)題時,會比較好奇,要寫 SQL 就純 SQL 唄,要寫 DataStream 就純 DataStream 唄,為啥還要把這兩個接口做集成呢?

1.序篇

廢話不多說,咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  1. 背景及應(yīng)用場景介紹:博主期望你能了解到,F(xiàn)link 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉(zhuǎn)的接口。通過這種互轉(zhuǎn)的方式,我們就可以將一些自定義的數(shù)據(jù)源(DataStream)創(chuàng)建為 SQL 表,也可以將 SQL 執(zhí)行結(jié)果轉(zhuǎn)換為 DataStream 然后后續(xù)去完成一些在 SQL 中實(shí)現(xiàn)不了的復(fù)雜操作。肥腸的方便。
  2. 目前只有流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務(wù)為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉(zhuǎn)的接口,TableEnvironment 沒有這樣的接口,因此目前流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持。但是 1.14 版本中流批任務(wù)的 env 都統(tǒng)一到了 StreamTableEnvironment 中,流批任務(wù)中就都可以進(jìn)行互轉(zhuǎn)了。
  3. Retract 語義 SQL 轉(zhuǎn) DataStream 需要重點(diǎn)注意:Append 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。

2.背景及應(yīng)用場景介紹

相信大家看到本文的標(biāo)題時,會比較好奇,要寫 SQL 就純 SQL 唄,要寫 DataStream 就純 DataStream 唄,為啥還要把這兩個接口做集成呢?

博主舉一個案例:在拼多多發(fā)優(yōu)惠券的場景下,為了控制成本,希望能在每日優(yōu)惠券發(fā)放金額加和超過 1w 時,及時報警出來,控制預(yù)算。

優(yōu)惠券表的發(fā)放數(shù)據(jù):

 

最終期望的結(jié)果是:每天的 money 之和超過 1w 的時候,報警報警報警!!!

那么針對上述場景,有兩種對應(yīng)的解決方案:

  1. 方案 1:可想而知,DataStream 是必然能夠解決我們的問題的。
  2. 方案 2:DataStream 開發(fā)效率不高,可以使用 SQL 計算優(yōu)惠券發(fā)放的結(jié)果,但是 SQL 無法做到報警。所以可以將 SQL 的查詢的結(jié)果(即 Table)轉(zhuǎn)為 DataStream,然后在 DataStream 后自定義報警邏輯的算子,超過閾值進(jìn)行報警。

本節(jié)就介紹方案 2 的實(shí)現(xiàn)思路。

注意:

當(dāng)然還有一些其他的比如模式識別監(jiān)控異常然后報警的場景使用 DataStream 去實(shí)現(xiàn)就更加復(fù)雜了,所以我們也可以使用類似的思路,先 SQL 實(shí)現(xiàn)業(yè)務(wù)邏輯,然后接一個 DataStream 算子實(shí)現(xiàn)報警邏輯。

3.Table 與 DataStream API 的轉(zhuǎn)換具體實(shí)現(xiàn)

3.1.先看一個官網(wǎng)的簡單案例

官網(wǎng)的案例主要是讓大家看看要做到 Table 與 DataStream API 的轉(zhuǎn)換會涉及到使用哪些接口。

  1. import org.apache.flink.streaming.api.datastream.DataStream; 
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
  3. import org.apache.flink.table.api.Table
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
  5. import org.apache.flink.types.Row; 
  6.  
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); 
  9.  
  10. DataStream<String> dataStream = env.fromElements("Alice""Bob""John"); 
  11.  
  12. // 1. 使用 StreamTableEnvironment::fromDataStream API 將 DataStream 轉(zhuǎn)為 Table 
  13. Table inputTable = tableEnv.fromDataStream(dataStream); 
  14.  
  15. // 將 Table 注冊為一個臨時表 
  16. tableEnv.createTemporaryView("InputTable", inputTable); 
  17.  
  18. // 然后就可以在這個臨時表上做一些自定義的查詢了 
  19. Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); 
  20.  
  21. // 2. 也可以使用 StreamTableEnvironment::toDataStream 將 Table 轉(zhuǎn)為 DataStream 
  22. // 注意:這里只能轉(zhuǎn)為 DataStream<Row>,其中的數(shù)據(jù)類型只能為 Row 
  23. DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); 
  24.  
  25. // 將 DataStream 結(jié)果打印到控制臺 
  26. resultStream.print(); 
  27. env.execute(); 
  28.  
  29. // prints: 
  30. // +I[Alice] 
  31. // +I[Bob] 
  32. // +I[John] 

可以看到重點(diǎn)的接口就是:

  1. StreamTableEnvironment::toDataStream:將 Table 轉(zhuǎn)為 DataStream
  2. StreamTableEnvironment::fromDataStream:將 DataStream 轉(zhuǎn)為 Table

3.2.實(shí)現(xiàn)第 2 節(jié)中的邏輯

我們使用上面介紹的兩個接口對優(yōu)惠券發(fā)放金額預(yù)警的案例做一個實(shí)現(xiàn)。

  1. @Slf4j 
  2. public class AlertExample { 
  3.  
  4.     public static void main(String[] args) throws Exception { 
  5.  
  6.         FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); 
  7.  
  8.         String createTableSql = "CREATE TABLE source_table (\n" 
  9.                 + "    id BIGINT,\n" 
  10.                 + "    money BIGINT,\n" 
  11.                 + "    row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),\n" 
  12.                 + "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" 
  13.                 + ") WITH (\n" 
  14.                 + "  'connector' = 'datagen',\n" 
  15.                 + "  'rows-per-second' = '1',\n" 
  16.                 + "  'fields.id.min' = '1',\n" 
  17.                 + "  'fields.id.max' = '100000',\n" 
  18.                 + "  'fields.money.min' = '1',\n" 
  19.                 + "  'fields.money.max' = '100000'\n" 
  20.                 + ")\n"
  21.  
  22.         String querySql = "SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, \n" 
  23.                 + "      window_start, \n" 
  24.                 + "      sum(money) as sum_money,\n" 
  25.                 + "      count(distinct id) as count_distinct_id\n" 
  26.                 + "FROM TABLE(CUMULATE(\n" 
  27.                 + "         TABLE source_table\n" 
  28.                 + "         , DESCRIPTOR(row_time)\n" 
  29.                 + "         , INTERVAL '5' SECOND\n" 
  30.                 + "         , INTERVAL '1' DAY))\n" 
  31.                 + "GROUP BY window_start, \n" 
  32.                 + "        window_end"
  33.  
  34.         // 1. 創(chuàng)建數(shù)據(jù)源表,即優(yōu)惠券發(fā)放明細(xì)數(shù)據(jù) 
  35.         flinkEnv.streamTEnv().executeSql(createTableSql); 
  36.         // 2. 執(zhí)行 query 查詢,計算每日發(fā)放金額 
  37.         Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql); 
  38.  
  39.         // 3. 報警邏輯(toDataStream 返回 Row 類型),如果 sum_money 超過 1w,報警 
  40.         flinkEnv.streamTEnv() 
  41.                 .toDataStream(resultTable, Row.class) 
  42.                 .flatMap(new FlatMapFunction<Row, Object>() { 
  43.                     @Override 
  44.                     public void flatMap(Row value, Collector<Object> out) throws Exception { 
  45.                         long l = Long.parseLong(String.valueOf(value.getField("sum_money"))); 
  46.  
  47.                         if (l > 10000L) { 
  48.                             log.info("報警,超過 1w"); 
  49.                         } 
  50.                     } 
  51.                 }); 
  52.  
  53.         flinkEnv.env().execute(); 
  54.     } 
  55.  

 執(zhí)行效果如下:

3.3.Table 和 DataStream 轉(zhuǎn)換注意事項(xiàng)

3.3.1.目前只支持流任務(wù)互轉(zhuǎn)(1.13)

目前在 1.13 版本中,F(xiàn)link 對于 Table 和 DataStream 的轉(zhuǎn)化是有一些限制的:

目前流任務(wù)使用的 env 為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,而 Table 和 DataStream 之間的轉(zhuǎn)換目前只有 StreamTableEnvironment 的接口支持。

所以其實(shí)小伙伴萌可以理解為只有流任務(wù)才支持 Table 和 DataStream 之間的轉(zhuǎn)換,批任務(wù)是不支持的(雖然可以使用流模式處理有界流(批數(shù)據(jù)),但效率較低,這種騷操作不建議大家搞)。

那什么時候才能支持批任務(wù)的 Table 和 DataStream 之間的轉(zhuǎn)換呢?

1.14 版本支持。1.14 版本中,流和批的都統(tǒng)一到了 StreamTableEnvironment 中,因此就可以做 Table 和 DataStream 的互相轉(zhuǎn)換了。

3.3.2.Retract 語義 SQL 轉(zhuǎn) DataStream 注意事項(xiàng)

Retract 語義的 SQL 使用 toDataStream 轉(zhuǎn)換會報錯不支持。具體報錯截圖如下。意思是不支持 update 類型的結(jié)果數(shù)據(jù)。

如果要把 Retract 語義的 SQL 轉(zhuǎn)為 DataStream,我們需要使用 toRetractStream。如下案例:

  1. @Slf4j 
  2. public class AlertExampleRetract { 
  3.  
  4.     public static void main(String[] args) throws Exception { 
  5.  
  6.         FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); 
  7.  
  8.         String createTableSql = "CREATE TABLE source_table (\n" 
  9.                 + "    id BIGINT,\n" 
  10.                 + "    money BIGINT,\n" 
  11.                 + "    `time` as cast(CURRENT_TIMESTAMP as bigint) * 1000\n" 
  12.                 + ") WITH (\n" 
  13.                 + "  'connector' = 'datagen',\n" 
  14.                 + "  'rows-per-second' = '1',\n" 
  15.                 + "  'fields.id.min' = '1',\n" 
  16.                 + "  'fields.id.max' = '100000',\n" 
  17.                 + "  'fields.money.min' = '1',\n" 
  18.                 + "  'fields.money.max' = '100000'\n" 
  19.                 + ")\n"
  20.  
  21.         String querySql = "SELECT max(`time`), \n" 
  22.                 + "      sum(money) as sum_money\n" 
  23.                 + "FROM source_table\n" 
  24.                 + "GROUP BY (`time` + 8 * 3600 * 1000) / (24 * 3600 * 1000)"
  25.  
  26.         // 1. 創(chuàng)建數(shù)據(jù)源表,即優(yōu)惠券發(fā)放明細(xì)數(shù)據(jù) 
  27.         flinkEnv.streamTEnv().executeSql(createTableSql); 
  28.         // 2. 執(zhí)行 query 查詢,計算每日發(fā)放金額 
  29.         Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql); 
  30.         // 3. 報警邏輯(toRetractStream 返回 Tuple2<Boolean, Row> 類型),如果 sum_money 超過 1w,報警 
  31.         // Tuple2<Boolean, Row> f0 的 Boolean 標(biāo)識是否是回撤消息 
  32.         flinkEnv.streamTEnv() 
  33.                 .toRetractStream(resultTable, Row.class) 
  34.                 .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() { 
  35.                     @Override 
  36.                     public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception { 
  37.                         long l = Long.parseLong(String.valueOf(value.f1.getField("sum_money"))); 
  38.  
  39.                         if (l > 10000L) { 
  40.                             log.info("報警,超過 1w"); 
  41.                         } 
  42.                     } 
  43.                 }); 
  44.  
  45.         flinkEnv.env().execute(); 
  46.     } 
  47.  

4.總結(jié)與展望

本文主要介紹了 flink 中 Table 和 DataStream 互轉(zhuǎn)使用方式,并介紹了一些使用注意事項(xiàng),總結(jié)如下:

  1. 背景及應(yīng)用場景介紹:博主期望你能了解到,F(xiàn)link 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉(zhuǎn)的接口。通過這種互轉(zhuǎn)的方式,我們就可以將一些自定義的數(shù)據(jù)源(DataStream)創(chuàng)建為 SQL 表,也可以將 SQL 執(zhí)行結(jié)果轉(zhuǎn)換為 DataStream 然后后續(xù)去完成一些在 SQL 中實(shí)現(xiàn)不了的復(fù)雜操作。肥腸的方便。
  2. 目前只有流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務(wù)為 StreamTableEnvironment,批任務(wù)為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉(zhuǎn)的接口,TableEnvironment 沒有這樣的接口,因此目前流任務(wù)支持互轉(zhuǎn),批任務(wù)不支持。但是 1.14 版本中流批任務(wù)的 env 都統(tǒng)一到了 StreamTableEnvironment 中,流批任務(wù)中就都可以進(jìn)行互轉(zhuǎn)了。
  3. Retract 語義 SQL 轉(zhuǎn) DataStream 需要重點(diǎn)注意:Append 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉(zhuǎn)為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。

 

責(zé)任編輯:姜華 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-27 09:02:58

SQLHive語義

2022-06-29 09:01:38

FlinkSQL時間屬性

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2021-11-27 09:03:26

flink join數(shù)倉

2022-08-10 10:05:29

FlinkSQL

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-12-06 07:15:47

開發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf

2018-08-27 06:30:49

InnoDBMySQLMyISAM
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號