一篇文章帶你深入理解FlinkSQL中的窗口
前言
時(shí)間語(yǔ)義,要配合窗口操作才能發(fā)揮作用。最主要的用途,當(dāng)然就是開(kāi)窗口、根據(jù)時(shí)間段做計(jì)算了。下面我們就來(lái)看看 Table API 和 SQL 中,怎么利用時(shí)間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over Windows
一、分組窗口(Group Windows) 分組窗口(Group Windows)會(huì)根據(jù)時(shí)間或行計(jì)數(shù)間隔,將行聚合到有限的組(Group)中,并對(duì)每個(gè)組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個(gè)別名。為了按窗口對(duì)表進(jìn)行分組,窗口的別名必須在 group by 子句中,像常規(guī)的分組字段一樣引用。例子:
- val table = input
- .window([w: GroupWindow] as 'w)
- .groupBy('w, 'a)
- .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)
Table API 提供了一組具有特定語(yǔ)義的預(yù)定義 Window 類(lèi),這些類(lèi)會(huì)被轉(zhuǎn)換為底層DataStream 或 DataSet 的窗口操作。
Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(dòng)(Tumbling)、滑動(dòng)(Sliding和 會(huì)話(Session)。
1.1 滾動(dòng)窗口
滾動(dòng)窗口(Tumbling windows)要用 Tumble 類(lèi)來(lái)定義,另外還有三個(gè)方法:
- over:定義窗口長(zhǎng)度
- on:用來(lái)分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段
- as:別名,必須出現(xiàn)在后面的 groupBy 中
實(shí)現(xiàn)案例
1.需求
設(shè)置滾動(dòng)窗口為10秒鐘統(tǒng)計(jì)id出現(xiàn)的次數(shù)。
2.數(shù)據(jù)準(zhǔn)備
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718206,32
- sensor_1,1547718208,36.2
- sensor_1,1547718210,29.7
- sensor_1,1547718213,30.9
3.代碼實(shí)現(xiàn)
- package windows
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.scala._
- import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
- import org.apache.flink.types.Row
- /**
- * @Package Windows
- * @File :FlinkSQLTumBlingTie.java
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/25 21:58
- * @version V1.0
- * 設(shè)置滾動(dòng)窗口
- */
- object FlinkSQLTumBlingTie {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
- val tableEnv = StreamTableEnvironment.create(env, settings)
- // 讀取數(shù)據(jù)
- val inputPath = "./data/sensor.txt"
- val inputStream = env.readTextFile(inputPath)
- // 先轉(zhuǎn)換成樣例類(lèi)類(lèi)型(簡(jiǎn)單轉(zhuǎn)換操作)
- val dataStream = inputStream
- .map(data => {
- val arr = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
- override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
- })
- val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
- // 注冊(cè)表
- tableEnv.createTemporaryView("sensor", sensorTable)
- // table 實(shí)現(xiàn)
- val resultTable = sensorTable
- .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統(tǒng)計(jì)一次,滾動(dòng)時(shí)間窗口
- .groupBy('id, 'tw)
- .select('id, 'id.count, 'tw.end)
- //sql 實(shí)現(xiàn)
- val sqlTable = tableEnv.sqlQuery(
- """
- |select
- |id,
- |count(id) ,
- |tumble_end(ts,interval '10' second)
- |from sensor
- |group by
- |id,
- |tumble(ts,interval '10' second)
- |""".stripMargin)
- /***
- * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時(shí)間字段 rowtime)
- * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時(shí)間字段 proctime)
- * .window(Tumble over 10.minutes on 'proctime as 'w) (類(lèi)似于計(jì)數(shù)窗口,按處理時(shí)間排序,10 行一組)
- */
- resultTable.toAppendStream[Row].print("talbe")
- sqlTable.toRetractStream[Row].print("sqlTable")
- env.execute("FlinkSQLTumBlingTie")
- }
- case class SensorReading(id: String, timestamp: Long, temperature: Double)
- }
運(yùn)行結(jié)果
1.2 滑動(dòng)窗口
滑動(dòng)窗口(Sliding windows)要用 Slide 類(lèi)來(lái)定義,另外還有四個(gè)方法:
- over:定義窗口長(zhǎng)度
- every:定義滑動(dòng)步長(zhǎng)
- on:用來(lái)分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段
- as:別名,必須出現(xiàn)在后面的 groupBy 中
實(shí)現(xiàn)案例
1.需求描述
設(shè)置窗口大小為10秒鐘設(shè)置滑動(dòng)距離為5秒鐘,統(tǒng)計(jì)id的出現(xiàn)的次數(shù)。
2.數(shù)據(jù)準(zhǔn)備
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718206,32
- sensor_1,1547718208,36.2
- sensor_1,1547718210,29.7
- sensor_1,1547718213,30.9
3.實(shí)現(xiàn)代碼
- package windows
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- import windows.FlinkSQLTumBlingTie.SensorReading
- /**
- * @Package windows
- * @File :FlinkSQLSlideTime.java
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/27 22:19
- * @version V1.0
- * 滑動(dòng)窗口
- */
- object FlinkSQLSlideTime {
- def main(args: Array[String]): Unit = {
- //構(gòu)建運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測(cè)試
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時(shí)間
- val settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
- // 創(chuàng)建表env
- val tableEnv = StreamTableEnvironment.create(env, settings)
- // 讀取數(shù)據(jù)
- val inputPath = "./data/sensor.txt"
- val inputStream = env.readTextFile(inputPath)
- // 先轉(zhuǎn)換成樣例類(lèi)類(lèi)型(簡(jiǎn)單轉(zhuǎn)換操作)
- val dataStream = inputStream
- .map(data => {
- val arr = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
- override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
- })
- val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
- // 注冊(cè)表
- tableEnv.createTemporaryView("sensor", sensorTable)
- // table API 實(shí)現(xiàn)
- val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w)
- .groupBy('w, 'id)
- .select('id, 'id.count, 'w.end)
- val tableSql = tableEnv.sqlQuery(
- """
- |select
- |id,
- |count(id),
- |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w
- |from sensor
- |group by
- |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id
- |""".stripMargin)
- tableApi.toAppendStream[Row].print("tableApi")
- tableSql.toAppendStream[Row].print("tableSql")
- /**
- .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時(shí)間字段 rowtime)
- .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時(shí)間字段 proctime)
- .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類(lèi)似于計(jì)數(shù)窗口,按處理時(shí)間排序,10 行一組)
- **/
- env.execute("FlinkSQLSlideTime")
- }
- }
4.運(yùn)行結(jié)果
1.3 會(huì)話窗口
會(huì)話窗口(Session windows)要用 Session 類(lèi)來(lái)定義,另外還有三個(gè)方法:
- withGap:會(huì)話時(shí)間間隔
- on:用來(lái)分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段
- as:別名,必須出現(xiàn)在后面的 groupBy 中實(shí)現(xiàn)案例
1.需求描述
設(shè)置一個(gè)session 為10秒鐘 統(tǒng)計(jì)id的個(gè)數(shù)
2.準(zhǔn)備數(shù)據(jù)
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718206,32
- sensor_1,1547718208,36.2
- sensor_1,1547718210,29.7
- sensor_1,1547718213,30.9
3.編寫(xiě)代碼
- package windows
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.{EnvironmentSettings, Session, Table}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- import windows.FlinkSQLTumBlingTie.SensorReading
- /**
- * @Package windows
- * @File :FlinkSqlSessionTime.java
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/27 22:52
- * @version V1.0
- */
- object FlinkSqlSessionTime {
- def main(args: Array[String]): Unit = {
- //構(gòu)建運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測(cè)試
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時(shí)間
- val settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
- // 創(chuàng)建表env
- val tableEnv = StreamTableEnvironment.create(env, settings)
- // 讀取數(shù)據(jù)
- val inputPath = "./data/sensor.txt"
- val inputStream = env.readTextFile(inputPath)
- // 先轉(zhuǎn)換成樣例類(lèi)類(lèi)型(簡(jiǎn)單轉(zhuǎn)換操作)
- val dataStream = inputStream
- .map(data => {
- val arr = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
- override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
- })
- val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
- // 注冊(cè)表
- tableEnv.createTemporaryView("sensor", sensorTable)
- val tableApi = sensorTable.
- window(Session withGap 10.seconds on 'ts as 'w)
- .groupBy('id, 'w)
- .select('id, 'id.count, 'w.end)
- val tableSQL = tableEnv.sqlQuery(
- """
- |SELECT
- |id,
- |COUNT(id),
- |SESSION_END(ts, INTERVAL '10' SECOND) AS w
- |FROM sensor
- |GROUP BY
- |id,
- |SESSION(ts, INTERVAL '10' SECOND)
- |""".stripMargin)
- tableApi.toAppendStream[Row].print("tableApi")
- tableSQL.toAppendStream[Row].print("tableSQL")
- /**
- * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時(shí)間字段 rowtime)
- * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時(shí)間字段 proctime)
- */
- env.execute("FlinkSqlSessionTime")
- }
- }
4.運(yùn)行結(jié)果
二、 Over Windows
Over window 聚合是標(biāo)準(zhǔn) SQL 中已有的(Over 子句),可以在查詢(xún)的 SELECT 子句中定義。Over window 聚合,會(huì)針對(duì)每個(gè)輸入行,計(jì)算相鄰行范圍內(nèi)的聚合。Over windows使用.window(w:overwindows*)子句定義,并在 select()方法中通過(guò)別名來(lái)引用。例子:
- val table = input
- .window([w: OverWindow] as 'w)
- .select('a, 'b.sum over 'w, 'c.min over 'w)
Table API 提供了 Over 類(lèi),來(lái)配置 Over 窗口的屬性。可以在事件時(shí)間或處理時(shí)間,以及指定為時(shí)間間隔、或行計(jì)數(shù)的范圍內(nèi),定義 Over windows。
無(wú)界的 over window 是使用常量指定的。也就是說(shuō),時(shí)間間隔要指定 UNBOUNDED_RANGE,或者行計(jì)數(shù)間隔要指定 UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。
2.1 無(wú)界的 over window
- // 無(wú)界的事件時(shí)間 over window (時(shí)間字段 "rowtime")
- .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
- //無(wú)界的處理時(shí)間 over window (時(shí)間字段"proctime")
- .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
- // 無(wú)界的事件時(shí)間 Row-count over window (時(shí)間字段 "rowtime")
- .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
- //無(wú)界的處理時(shí)間 Row-count over window (時(shí)間字段 "rowtime")
- .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
2.2 有界的 over window
- // 有界的事件時(shí)間 over window (時(shí)間字段 "rowtime",之前 1 分鐘)
- .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
- // 有界的處理時(shí)間 over window (時(shí)間字段 "rowtime",之前 1 分鐘)
- .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
- // 有界的事件時(shí)間 Row-count over window (時(shí)間字段 "rowtime",之前 10 行)
- .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
- // 有界的處理時(shí)間 Row-count over window (時(shí)間字段 "rowtime",之前 10 行)
- .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
2.3 代碼練習(xí)
我們可以綜合學(xué)習(xí)過(guò)的內(nèi)容,用一段完整的代碼實(shí)現(xiàn)一個(gè)具體的需求。例如,統(tǒng)計(jì)每個(gè)sensor每條數(shù)據(jù),與之前兩行數(shù)據(jù)的平均溫度。
數(shù)據(jù)準(zhǔn)備
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718206,32
- sensor_1,1547718208,36.2
- sensor_1,1547718210,29.7
- sensor_1,1547718213,30.9
代碼分析:
- package windows
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- /**
- * @Package windows
- * @File :FlinkSqlTumBlingOverTime.java
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/28 21:45
- * @version V1.0
- */
- object FlinkSqlTumBlingOverTime {
- def main(args: Array[String]): Unit = {
- // 構(gòu)建運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1) // 設(shè)置并行度為1方便后面進(jìn)行測(cè)試
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設(shè)置事件時(shí)間
- val settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
- //構(gòu)建table Env
- val tableEnv = StreamTableEnvironment.create(env, settings)
- // 讀取數(shù)據(jù)
- val inputPath = "./data/sensor.txt"
- val inputStream = env.readTextFile(inputPath)
- // 先轉(zhuǎn)換成樣例類(lèi)類(lèi)型(簡(jiǎn)單轉(zhuǎn)換操作)
- // 解析數(shù)據(jù) 封裝成樣例類(lèi)
- val dataStream = inputStream
- .map(data => {
- val arr = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
- override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
- })
- // 將數(shù)據(jù)注冊(cè)成一張臨時(shí)表
- val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts)
- tableEnv.createTemporaryView("sensor",dataTable)
- var tableRes= dataTable.window( Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow)
- .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow)
- var tableSql= tableEnv.sqlQuery(
- """
- |select
- |id,
- |ts,
- |count(id) over ow,
- |avg(temperature) over ow
- |from sensor
- |window ow as(
- | partition by id
- | order by ts
- | rows between 2 preceding and current row
- |)
- |""".stripMargin)
- tableRes.toAppendStream[Row].print("tableRes")
- tableSql.toAppendStream[Row].print("tableSql")
- env.execute("FlinkSqlTumBlingOverTime")
- }
- case class SensorReading(id: String, timestamp: Long, temperature: Double)
- }
運(yùn)行結(jié)果
總結(jié)
好了到這里FlinkSql中窗口使用到這里就結(jié)束啦,喜歡的可以給了三連。其中FlinkSql中的窗口的用法還是比較多得,所有還是要多加練習(xí)。老話說(shuō)的好,師傅領(lǐng)進(jìn)門(mén),修行在個(gè)人。
本文轉(zhuǎn)載自微信公眾號(hào)「 大數(shù)據(jù)老哥」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 大數(shù)據(jù)老哥公眾號(hào)。