自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

十分鐘入門Fink SQL

運維 數(shù)據(jù)庫運維
本篇文章主要講解了Flink SQL 入門操作,后面我會分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

[[358221]]

前言

Flink 本身是批流統(tǒng)一的處理框架,所以 Table API 和 SQL,就是批流統(tǒng)一的上層處理 API。目前功能尚未完善,處于活躍的開發(fā)階段。 Table API 是一套內(nèi)嵌在 Java 和 Scala 語言中的查詢 API,它允許我們以非常直觀的方式,組合來自一些關(guān)系運算符的查詢(比如 select、filter 和 join)。而對于 Flink SQL,就是直接可以在代碼中寫 SQL,來實現(xiàn)一些查詢(Query)操作。Flink 的 SQL 支持,基于實現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite(Apache 開源 SQL 解析工具)。圖片

1、導(dǎo)入所需要的的依賴包

  1. <dependency> 
  2.           <groupId>org.apache.flink</groupId> 
  3.           <artifactId>flink-table-planner_2.12</artifactId> 
  4.           <version>1.10.1</version> 
  5.       </dependency> 
  6.       <dependency> 
  7.           <groupId>org.apache.flink</groupId> 
  8.           <artifactId>flink-table-api-scala-bridge_2.12</artifactId> 
  9.           <version>1.10.1</version> 
  10.       </dependency> 
  11.       <dependency> 
  12.           <groupId>org.apache.flink</groupId> 
  13.           <artifactId>flink-csv</artifactId> 
  14.           <version>1.10.1</version> 
  15.      </dependency> 

flink-table-planner:planner 計劃器,是 table API 最主要的部分,提供了運行時環(huán)境和生成程序執(zhí)行計劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和 DataStream/DataSet API的連接支持,按照語言分 java 和 scala。

這里的兩個依賴,是 IDE 環(huán)境下運行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。

當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個 SQL client,這個包含在 flink-table-common 里。

2、兩種 planner(old& blink)的區(qū)別

  1. 批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet 應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來處理。
  2. 因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
  3. Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
  4. 舊 planner 和 Blink planner 的 FilterableTableSource 實現(xiàn)不兼容。舊的 planner 會把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會把 Expressions 下推。
  5. 基于字符串的鍵值配置選項僅適用于 Blink planner。
  6. PlannerConfig 在兩個 planner 中的實現(xiàn)不同。
  7. Blink planner 會將多個 sink 優(yōu)化在一個 DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個 sink 放在一個新的 DAG 中,其中所有 DAG 彼此獨立。
  8. 舊的 planner 不支持目錄統(tǒng)計,而 Blink planner 支持。

3、表(Table)的概念

TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。它會維護一個Catalog-Table 表之間的 map。 表(Table)是由一個標(biāo)識符來指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(database)名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當(dāng)前的默認(rèn)值。

4、連接到文件系統(tǒng)(Csv 格式)

連接外部系統(tǒng)在 Catalog 中注冊表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個 ConnectorDescriptor,也就是 connector 描述器。對于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做 FileSystem()。

5、測試案例 (新)

需求: 將一個txt文本文件作為輸入流讀取數(shù)據(jù)過濾id不等于sensor_1的數(shù)據(jù)實現(xiàn)思路: 首先我們先構(gòu)建一個table的env環(huán)境通過connect提供的方法來讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊為一張表就可進行我們的數(shù)據(jù)過濾了(使用sql或者流處理方式進行解析)

準(zhǔn)備數(shù)據(jù)

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼實現(xiàn)

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.{DataTypes} 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大數(shù)據(jù)老哥 
  9.  * @date 2020/12/12 21:22 
  10.  * @version V1.0 
  11.  *          第一個Flinksql測試案例 
  12.  */ 
  13.  
  14. object FlinkSqlTable { 
  15.   def main(args: Array[String]): Unit = { 
  16.     // 構(gòu)建運行流處理的運行環(huán)境 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.     // 構(gòu)建table環(huán)境 
  19.     val tableEnv = StreamTableEnvironment.create(env) 
  20.      //通過 connect 讀取數(shù)據(jù) 
  21.     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) 
  22.       .withFormat(new Csv()) //設(shè)置類型 
  23.       .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息 
  24.         .field("id", DataTypes.STRING()) 
  25.         .field("time", DataTypes.BIGINT()) 
  26.         .field("temperature", DataTypes.DOUBLE()) 
  27.       ).createTemporaryTable("inputTable")  // 創(chuàng)建一個臨時表 
  28.      
  29.     val resTable = tableEnv.from("inputTable"
  30.       .select("*").filter('id === "sensor_1"
  31.     // 使用sql的方式查詢數(shù)據(jù) 
  32.     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'"
  33.     // 將數(shù)據(jù)轉(zhuǎn)為流進行輸出 
  34.     resTable.toAppendStream[(String, Long, Double)].print("resTable"
  35.     resSql.toAppendStream[(String, Long, Double)].print("resSql"
  36.  
  37.     env.execute("FlinkSqlWrodCount"
  38.   } 

6、TableEnvironment 的作用

  • 注冊 catalog
  • 在內(nèi)部 catalog 中注冊表
  • 執(zhí)行 SQL 查詢
  • 注冊用戶自定義函數(shù)
  • 注冊用戶自定義函數(shù)
  • 保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在創(chuàng)建 TableEnv 的時候,可以多傳入一個 EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置 TableEnvironment 的一些特性。

7、 老版本創(chuàng)建流處理批處理

7.1老版本流處理

  1. val settings = EnvironmentSettings.newInstance() 
  2. .useOldPlanner() // 使用老版本 planner 
  3. .inStreamingMode() // 流處理模式 
  4. .build() 
  5. val tableEnv = StreamTableEnvironment.create(env, settings) 

7.2 老版本批處理

  1. val batchEnv = ExecutionEnvironment.getExecutionEnvironment  
  2. val batchTableEnv = BatchTableEnvironment.create(batchEnv) 

7.3 blink 版本的流處理環(huán)境

  1. val bsSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inStreamingMode().build() 
  4. val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 

7.4 blink 版本的批處理環(huán)境

  1. val bbSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inBatchMode().build() 
  4. val bbTableEnv = TableEnvironment.create(bbSettings) 

總結(jié):

本篇文章主要講解了Flink SQL 入門操作,后面我會分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

本文轉(zhuǎn)載自微信公眾號「 大數(shù)據(jù)老哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系 大數(shù)據(jù)老哥公眾號。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)老哥
相關(guān)推薦

2022-06-16 07:31:41

Web組件封裝HTML 標(biāo)簽

2012-07-10 01:22:32

PythonPython教程

2024-05-13 09:28:43

Flink SQL大數(shù)據(jù)

2019-04-01 14:59:56

負(fù)載均衡服務(wù)器網(wǎng)絡(luò)

2023-06-07 08:27:10

Docker容器

2021-09-07 09:40:20

Spark大數(shù)據(jù)引擎

2024-06-19 09:58:29

2023-04-12 11:18:51

甘特圖前端

2015-09-06 09:22:24

框架搭建快速高效app

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫

2023-10-07 00:06:09

SQL數(shù)據(jù)庫

2023-07-15 18:26:51

LinuxABI

2009-10-09 14:45:29

VB程序

2019-09-16 09:14:51

2024-11-07 16:09:53

2022-08-26 09:01:07

CSSFlex 布局

2017-08-01 15:25:41

LinuxNginxHttps

2015-11-06 11:03:36

2020-12-11 09:40:10

DevOpsCICD

2022-04-13 22:01:44

錯誤監(jiān)控系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號