快速上手Flink SQL——Table與DataStream之間的互轉(zhuǎn)
本篇文章主要會跟大家分享如何連接kafka,MySQL,作為輸入流和數(shù)出的操作,以及Table與DataStream進行互轉(zhuǎn)。
一、將kafka作為輸入流
kafka 的連接器 flink-kafka-connector 中,1.10 版本的已經(jīng)提供了 Table API 的支持。我們可以在 connect方法中直接傳入一個叫做 Kafka 的類,這就是 kafka 連接器的描述器ConnectorDescriptor。
準備數(shù)據(jù):
- 1,語數(shù)
- 2,英物
- 3,化生
- 4,文學(xué)
- 5,語理
- 6,學(xué)物
創(chuàng)建kafka主題
- ./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic FlinkSqlTest
通過命令行的方式啟動一個生產(chǎn)者
- [root@node01 bin]# ./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic FlinkSqlTest
- >1,語數(shù)
- >2,英物
- >3,化生
- >4,文學(xué)
- >5,語理\
- >6,學(xué)物
編寫Flink代碼連接到kafka
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api.DataTypes
- import org.apache.flink.table.api.scala._
- import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}
- /**
- * @Package
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/17 0:35
- * @version V1.0
- */
- object FlinkSQLSourceKafka {
- def main(args: Array[String]): Unit = {
- // 獲取流處理的運行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 獲取table的運行環(huán)境
- val tableEnv = StreamTableEnvironment.create(env)
- tableEnv.connect(
- new Kafka()
- .version("0.11") // 設(shè)置kafka的版本
- .topic("FlinkSqlTest") // 設(shè)置要連接的主題
- .property("zookeeper.connect","node01:2181,node02:2181,node03:2181") //設(shè)置zookeeper的連接地址跟端口號
- .property("bootstrap.servers","node01:9092,node02:9092,node03:9092") //設(shè)置kafka的連接地址跟端口號
- ).withFormat(new Csv()) // 設(shè)置格式
- .withSchema(new Schema() // 設(shè)置元數(shù)據(jù)信息
- .field("id",DataTypes.STRING())
- .field("name",DataTypes.STRING())
- ).createTemporaryTable("kafkaInputTable") // 創(chuàng)建臨時表
- //定義要查詢的sql語句
- val result = tableEnv.sqlQuery("select * from kafkaInputTable ")
- //打印數(shù)據(jù)
- result.toAppendStream[(String,String)].print()
- // 開啟執(zhí)行
- env.execute("source kafkaInputTable")
- }
- }
運行結(jié)果圖
當然也可以連接到 ElasticSearch、MySql、HBase、Hive 等外部系統(tǒng),實現(xiàn)方式基本上是類似的。
二、表的查詢
利用外部系統(tǒng)的連接器 connector,我們可以讀寫數(shù)據(jù),并在環(huán)境的 Catalog 中注冊表。接下來就可以對表做查詢轉(zhuǎn)換了。Flink 給我們提供了兩種查詢方式:Table API 和 SQL。
三、Table API 的調(diào)用
Table API 是集成在 Scala 和 Java 語言內(nèi)的查詢 API。與 SQL 不同,Table API 的查詢不會用字符串表示,而是在宿主語言中一步一步調(diào)用完成的。 Table API 基于代表一張表的 Table 類,并提供一整套操作處理的方法 API。這些方法會返回一個新的 Table 對象,這個對象就表示對輸入表應(yīng)用轉(zhuǎn)換操作的結(jié)果。有些關(guān)系型轉(zhuǎn)換操作,可以由多個方法調(diào)用組成,構(gòu)成鏈式調(diào)用結(jié)構(gòu)。例如 table.select(…).filter(…) ,其中 select(…) 表示選擇表中指定的字段,filter(…)表示篩選條件。代碼中的實現(xiàn)如下:
- val kafkaInputTable = tableEnv.from("kafkaInputTable")
- kafkaInputTable.select("*")
- .filter('id !=="1")
四、SQL查詢
Flink 的 SQL 集成,基于的是 ApacheCalcite,它實現(xiàn)了 SQL 標準。在 Flink 中,用常規(guī)字符串來定義 SQL 查詢語句。SQL 查詢的結(jié)果,是一個新的 Table。
代碼實現(xiàn)如下:
- val result = tableEnv.sqlQuery("select * from kafkaInputTable ")
當然,也可以加上聚合操作,比如我們統(tǒng)計每個用戶的個數(shù)
調(diào)用 table API
- val result: Table = tableEnv.from("kafkaInputTable")
- result.groupBy("user")
- .select('name,'name.count as 'count)
調(diào)用SQL
- val result = tableEnv.sqlQuery("select name ,count(1) as count from kafkaInputTable group by name ")
這里 Table API 里指定的字段,前面加了一個單引號’,這是 Table API 中定義的 Expression類型的寫法,可以很方便地表示一個表中的字段。 字段可以直接全部用雙引號引起來,也可以用半邊單引號+字段名的方式。以后的代碼中,一般都用后一種形式。
五、將DataStream 轉(zhuǎn)成Table
Flink 允許我們把 Table 和DataStream 做轉(zhuǎn)換:我們可以基于一個 DataStream,先流式地讀取數(shù)據(jù)源,然后 map 成樣例類,再把它轉(zhuǎn)成 Table。Table 的列字段(column fields),就是樣例類里的字段,這樣就不用再麻煩地定義 schema 了。
5.1、代碼實現(xiàn)
代碼中實現(xiàn)非常簡單,直接用 tableEnv.fromDataStream() 就可以了。默認轉(zhuǎn)換后的 Table schema 和 DataStream 中的字段定義一一對應(yīng),也可以單獨指定出來。
這就允許我們更換字段的順序、重命名,或者只選取某些字段出來,相當于做了一次 map 操作(或者 Table API 的 select 操作)。
代碼具體如下:
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api.scala._
- /**
- * @Package
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/17 21:21
- * @version V1.0
- */
- object FlinkSqlReadFileTable {
- def main(args: Array[String]): Unit = {
- // 構(gòu)建流處理運行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 構(gòu)建table運行環(huán)境
- val tableEnv = StreamTableEnvironment.create(env)
- // 使用流處理來讀取數(shù)據(jù)
- val readData = env.readTextFile("./data/word.txt")
- // 使用flatMap進行切分
- val word: DataStream[String] = readData.flatMap(_.split(" "))
- // 將word 轉(zhuǎn)為 table
- val table = tableEnv.fromDataStream(word)
- // 計算wordcount
- val wordCount = table.groupBy("f0").select('f0, 'f0.count as 'count)
- wordCount.printSchema()
- //轉(zhuǎn)換成流處理打印輸出
- tableEnv.toRetractStream[(String,Long)](wordCount).print()
- env.execute("FlinkSqlReadFileTable")
- }
- }
5.2 數(shù)據(jù)類型與 Table schema 的對應(yīng)
DataStream 中的數(shù)據(jù)類型,與表的 Schema之間的對應(yīng)關(guān)系,是按照樣例類中的字段名來對應(yīng)的(name-based mapping),所以還可以用 as 做重命名。
另外一種對應(yīng)方式是,直接按照字段的位置來對應(yīng)(position-based mapping),對應(yīng)的過程中,就可以直接指定新的字段名了。
基于名稱的對應(yīng):
- val userTable = tableEnv.fromDataStream(dataStream,'username as 'name,'id as 'myid)
基于位置的對應(yīng):
- val userTable = tableEnv.fromDataStream(dataStream, 'name, 'id)
Flink 的 DataStream 和 DataSet API 支持多種類型。組合類型,比如元組(內(nèi)置 Scala 和 Java 元組)、POJO、Scala case 類和 Flink 的 Row 類型等,允許具有多個字段的嵌套數(shù)據(jù)結(jié)構(gòu),這些字段可以在 Table 的表達式中訪問。其他類型,則被視為原子類型。
元組類型和原子類型,一般用位置對應(yīng)會好一些;如果非要用名稱對應(yīng),也是可以的:元組類型,默認的名稱是_1, _2;而原子類型,默認名稱是 f0。
六、創(chuàng)建臨時視圖(Temporary View)
創(chuàng)建臨時視圖的第一種方式,就是直接從 DataStream 轉(zhuǎn)換而來。同樣,可以直接對應(yīng)字段轉(zhuǎn)換;也可以在轉(zhuǎn)換的時候,指定相應(yīng)的字段。代碼如下:
- tableEnv.createTemporaryView("sensorView", dataStream)
- tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature,'timestamp as 'ts)
另外,當然還可以基于 Table 創(chuàng)建視圖:
- tableEnv.createTemporaryView("sensorView", sensorTable)
View 和 Table 的 Schema 完全相同。事實上,在 Table API 中,可以認為 View 和 Table是等價的。
總結(jié)
上述文章了主要講解了以kafka方式作為輸入流進行流失處理,其實我也可以設(shè)置MySQL、ES、MySQL 等,都是類似的,以及table API 與sql之間的區(qū)別,還講解了DataStream轉(zhuǎn)換位Table 或者Table 轉(zhuǎn)換為DataStream這樣的或我們后面在做數(shù)據(jù)分析的時候就非常簡單了。
本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)老哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)老哥公眾號。