徹底搞清 Flink 中的 Window 機(jī)制
一、 為什么需要Window
在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,有時(shí)我們需要做一些聚合類(lèi)的處理,例如:在過(guò)去的1分鐘內(nèi)有多少用戶(hù)點(diǎn)擊了我們的網(wǎng)頁(yè)。
在這種情況下,我們必須定義一個(gè)窗口(window),用來(lái)收集最近1分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算
二、Window的分類(lèi)
2.1 按照time和count分類(lèi)
time-window:時(shí)間窗口:根據(jù)時(shí)間劃分窗口,如:每xx分鐘統(tǒng)計(jì)最近xx分鐘的數(shù)據(jù)
count-window:數(shù)量窗口:根據(jù)數(shù)量劃分窗口,如:每xx個(gè)數(shù)據(jù)統(tǒng)計(jì)最近xx個(gè)數(shù)據(jù)
2.2 按照slide和size分類(lèi)
窗口有兩個(gè)重要的屬性: 窗口大小size和滑動(dòng)間隔slide,根據(jù)它們的大小關(guān)系可分為:
tumbling-window:滾動(dòng)窗口:size=slide,如:每隔10s統(tǒng)計(jì)最近10s的數(shù)據(jù)
sliding-window:滑動(dòng)窗口:size>slide,如:每隔5s統(tǒng)計(jì)最近10s數(shù)據(jù)
注意:當(dāng)size<slide的時(shí)候,如每隔15s統(tǒng)計(jì)最近10s的數(shù)據(jù),那么中間5s
小結(jié)
按照上面窗口的分類(lèi)方式進(jìn)行組合,可以得出如下的窗口:
- 基于時(shí)間的滾動(dòng)窗口tumbling-time-window--用的較多
- 基于時(shí)間的滑動(dòng)窗口sliding-time-window--用的較多
- 基于數(shù)量的滾動(dòng)窗口tumbling-count-window--用的較少
- 基于數(shù)量的滑動(dòng)窗口sliding-count-window--用的較少
注意:Flink還支持一個(gè)特殊的窗口:Session會(huì)話(huà)窗口,需要設(shè)置一個(gè)會(huì)話(huà)超時(shí)時(shí)間,如30s,則表示30s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算
三、WindowAPI
3.1 window和windowAll
使用keyby的流,應(yīng)該使用window方法
未使用keyby的流,應(yīng)該調(diào)用windowAll方法
區(qū)別:
Window算子:是可以設(shè)置并行度的
WindowAll 算子:并行度始終為1
3.2 WindowAssigner
Windows Assigner的作用是指定窗口的類(lèi)型,定義如何將數(shù)據(jù)流分配到一個(gè)或者多個(gè)窗口,API中通過(guò)window (WindowsAssigner assigner)指定。在Flink中支持兩種類(lèi)型的窗口,一種是基于時(shí)間的窗口(TimeWindow),另一種是基于數(shù)量的窗口(countWindow)。窗口所表現(xiàn)出的類(lèi)型特性取決于window assigner的定義。
Flink底層Window模型僅有TimeWindow以及GlobalWindow。
Flink提供了很多各種場(chǎng)景用的WindowAssigner:
如果需要自定制數(shù)據(jù)分發(fā)策略,則可以實(shí)現(xiàn)一個(gè) class,繼承自 WindowAssigner。
3.3 evictor
evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶(hù)代碼之前,也可以在執(zhí)行
用戶(hù)代碼之后,更詳細(xì)的描述可以參考o(jì)rg.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個(gè)方法。
Flink 提供了如下三種通用的 evictor:
CountEvictor 保留指定數(shù)量的元素
TimeEvictor 設(shè)定一個(gè)閾值 interval,刪除所有不再 max_ts - interval 范圍內(nèi)的元
素,其中 max_ts 是窗口內(nèi)時(shí)間戳的最大值。
DeltaEvictor 通過(guò)執(zhí)行用戶(hù)給定的 DeltaFunction 以及預(yù)設(shè)的 theshold,判斷是否刪 除一個(gè)元素。
3.4 trigger
trigger 用來(lái)判斷一個(gè)窗口是否需要被觸發(fā),每個(gè) WindowAssigner 都自帶一個(gè)默認(rèn)的trigger,
如果默認(rèn)的 trigger 不能滿(mǎn)足你的需求,則可以自定義一個(gè)類(lèi),繼承自Trigger 即可,我們?cè)敿?xì)描述下 Trigger 的接口以及含義:
onEventTime() 當(dāng) event-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用
onElement() 每次往 window 增加一個(gè)元素的時(shí)候都會(huì)觸發(fā)
onMerge() 對(duì)兩個(gè) `rigger 的 state 進(jìn)行 merge 操作
clear() window 銷(xiāo)毀的時(shí)候被調(diào)用
上面的接口中前三個(gè)會(huì)返回一個(gè) TriggerResult, TriggerResult 有如下幾種可能的選 擇:
- CONTINUE 不做任何事情
- FIRE 觸發(fā) window
- PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口
- PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口
四、WindowAPI調(diào)用案例示例
4.1 基于時(shí)間的滾動(dòng)和滑動(dòng)窗口
測(cè)試數(shù)據(jù)
- 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:每5秒鐘統(tǒng)計(jì)一次,最近5秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量--基于時(shí)間的滾動(dòng)窗口
需求2:每5秒鐘統(tǒng)計(jì)一次,最近10秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量--基于時(shí)間的滑動(dòng)窗口
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
- import org.apache.flink.streaming.api.windowing.time.Time;
- /**
- * @Package com.flink.source
- * @File :WindowDemo_TimeWindow.java
- * @author 大數(shù)據(jù)老哥
- * @date 2021/10/26 10:50
- * @version V1.0
- */
- object WindowDemo_TimeWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //需求1:每5秒鐘統(tǒng)計(jì)一次,最近5秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量
- val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
- //需求2:每5秒鐘統(tǒng)計(jì)一次,最近10秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量
- val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count")
- result.print()
- result2.print()
- env.execute("winds")
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)
4.2 基于數(shù)量的滾動(dòng)和滑動(dòng)窗口
測(cè)試數(shù)據(jù)
- 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)5次進(jìn)行統(tǒng)計(jì)--基于數(shù)量的滾動(dòng)窗口
需求2:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)3次進(jìn)行統(tǒng)計(jì)--基于數(shù)量的滑動(dòng)窗口
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
- /**
- * @Package com.flink.source
- * @File :WindosDemoo_CountWindos.java
- * @author 大數(shù)據(jù)老哥
- * @date 2021/10/26 14:04
- * @version V1.0
- */
- object WindowDemo_CountWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo] {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- // 需求1:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)5次進(jìn)行統(tǒng)計(jì)
- val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
- // 需求2:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)3次進(jìn)行統(tǒng)計(jì)
- val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
- result.print("result")
- result2.print("result2")
- env.execute()
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)
case class CartInfo(var sensorId: String, var count: Int)
4.3 會(huì)話(huà)窗口
測(cè)試數(shù)據(jù)
- 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
設(shè)置會(huì)話(huà)超時(shí)時(shí)間為10s,10s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
- import org.apache.flink.streaming.api.windowing.time.Time
- /**
- * @Package com.flink.source
- * @File :WindowDemo_SessionWindow.java
- * @author 大數(shù)據(jù)老哥
- * @date 2021/11/1 16:10
- * @version V1.0
- */
- object WindowDemo_SessionWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //設(shè)置會(huì)話(huà)超時(shí)時(shí)間為10s,10s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算
- val result = socketMap.keyBy(0)
- .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
- .sum("count")
- result.print()
- env.execute("winds")
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)