自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

徹底搞清 Flink 中的 Window 機(jī)制

系統(tǒng) Windows
在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,有時(shí)我們需要做一些聚合類(lèi)的處理,例如:在過(guò)去的1分鐘內(nèi)有多少用戶(hù)點(diǎn)擊了我們的網(wǎng)頁(yè)。

[[432700]]

一、 為什么需要Window

在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,有時(shí)我們需要做一些聚合類(lèi)的處理,例如:在過(guò)去的1分鐘內(nèi)有多少用戶(hù)點(diǎn)擊了我們的網(wǎng)頁(yè)。

在這種情況下,我們必須定義一個(gè)窗口(window),用來(lái)收集最近1分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算

二、Window的分類(lèi)

2.1 按照time和count分類(lèi)

time-window:時(shí)間窗口:根據(jù)時(shí)間劃分窗口,如:每xx分鐘統(tǒng)計(jì)最近xx分鐘的數(shù)據(jù)

count-window:數(shù)量窗口:根據(jù)數(shù)量劃分窗口,如:每xx個(gè)數(shù)據(jù)統(tǒng)計(jì)最近xx個(gè)數(shù)據(jù)

2.2 按照slide和size分類(lèi)

窗口有兩個(gè)重要的屬性: 窗口大小size和滑動(dòng)間隔slide,根據(jù)它們的大小關(guān)系可分為:

tumbling-window:滾動(dòng)窗口:size=slide,如:每隔10s統(tǒng)計(jì)最近10s的數(shù)據(jù)

sliding-window:滑動(dòng)窗口:size>slide,如:每隔5s統(tǒng)計(jì)最近10s數(shù)據(jù)

注意:當(dāng)size<slide的時(shí)候,如每隔15s統(tǒng)計(jì)最近10s的數(shù)據(jù),那么中間5s

小結(jié)

按照上面窗口的分類(lèi)方式進(jìn)行組合,可以得出如下的窗口:

  • 基于時(shí)間的滾動(dòng)窗口tumbling-time-window--用的較多
  • 基于時(shí)間的滑動(dòng)窗口sliding-time-window--用的較多
  • 基于數(shù)量的滾動(dòng)窗口tumbling-count-window--用的較少
  • 基于數(shù)量的滑動(dòng)窗口sliding-count-window--用的較少

注意:Flink還支持一個(gè)特殊的窗口:Session會(huì)話(huà)窗口,需要設(shè)置一個(gè)會(huì)話(huà)超時(shí)時(shí)間,如30s,則表示30s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算

三、WindowAPI

3.1 window和windowAll

使用keyby的流,應(yīng)該使用window方法

未使用keyby的流,應(yīng)該調(diào)用windowAll方法

區(qū)別:

Window算子:是可以設(shè)置并行度的

WindowAll 算子:并行度始終為1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的類(lèi)型,定義如何將數(shù)據(jù)流分配到一個(gè)或者多個(gè)窗口,API中通過(guò)window (WindowsAssigner assigner)指定。在Flink中支持兩種類(lèi)型的窗口,一種是基于時(shí)間的窗口(TimeWindow),另一種是基于數(shù)量的窗口(countWindow)。窗口所表現(xiàn)出的類(lèi)型特性取決于window assigner的定義。

Flink底層Window模型僅有TimeWindow以及GlobalWindow。

Flink提供了很多各種場(chǎng)景用的WindowAssigner:

如果需要自定制數(shù)據(jù)分發(fā)策略,則可以實(shí)現(xiàn)一個(gè) class,繼承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶(hù)代碼之前,也可以在執(zhí)行

用戶(hù)代碼之后,更詳細(xì)的描述可以參考o(jì)rg.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個(gè)方法。

Flink 提供了如下三種通用的 evictor:

CountEvictor 保留指定數(shù)量的元素

TimeEvictor 設(shè)定一個(gè)閾值 interval,刪除所有不再 max_ts - interval 范圍內(nèi)的元

素,其中 max_ts 是窗口內(nèi)時(shí)間戳的最大值。

DeltaEvictor 通過(guò)執(zhí)行用戶(hù)給定的 DeltaFunction 以及預(yù)設(shè)的 theshold,判斷是否刪 除一個(gè)元素。

3.4 trigger

trigger 用來(lái)判斷一個(gè)窗口是否需要被觸發(fā),每個(gè) WindowAssigner 都自帶一個(gè)默認(rèn)的trigger,

如果默認(rèn)的 trigger 不能滿(mǎn)足你的需求,則可以自定義一個(gè)類(lèi),繼承自Trigger 即可,我們?cè)敿?xì)描述下 Trigger 的接口以及含義:

onEventTime() 當(dāng) event-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用

onElement() 每次往 window 增加一個(gè)元素的時(shí)候都會(huì)觸發(fā)

onMerge() 對(duì)兩個(gè) `rigger 的 state 進(jìn)行 merge 操作

clear() window 銷(xiāo)毀的時(shí)候被調(diào)用

上面的接口中前三個(gè)會(huì)返回一個(gè) TriggerResult, TriggerResult 有如下幾種可能的選 擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發(fā) window
  • PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口
  • PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口

四、WindowAPI調(diào)用案例示例

4.1 基于時(shí)間的滾動(dòng)和滑動(dòng)窗口

測(cè)試數(shù)據(jù)

  1. 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:每5秒鐘統(tǒng)計(jì)一次,最近5秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量--基于時(shí)間的滾動(dòng)窗口

需求2:每5秒鐘統(tǒng)計(jì)一次,最近10秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量--基于時(shí)間的滑動(dòng)窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5. import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows} 
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7.  
  8. /** 
  9.  * @Package com.flink.source 
  10.  * @File :WindowDemo_TimeWindow.java 
  11.  * @author 大數(shù)據(jù)老哥 
  12.  * @date 2021/10/26 10:50 
  13.  * @version V1.0 
  14.  */ 
  15. object WindowDemo_TimeWindow { 
  16.   def main(args: Array[String]): Unit = { 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.  
  19.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  20.     val socketMap = socketData.map(new MapFunction[String, CartInfo]() { 
  21.       override def map(t: String): CartInfo = { 
  22.         val arr = t.split(","
  23.         CartInfo(arr(0), arr(1).toInt) 
  24.       } 
  25.     }) 
  26.     //需求1:每5秒鐘統(tǒng)計(jì)一次,最近5秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量 
  27.     val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count"
  28.     //需求2:每5秒鐘統(tǒng)計(jì)一次,最近10秒鐘內(nèi),各個(gè)路口通過(guò)紅綠燈汽車(chē)的數(shù)量 
  29.     val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count"
  30.     result.print() 
  31.     result2.print() 
  32.     env.execute("winds"
  33.   } 
  34.  
  35.  
  36. case class CartInfo(var sensorId: String, var countInt

4.2 基于數(shù)量的滾動(dòng)和滑動(dòng)窗口

測(cè)試數(shù)據(jù)

  1. 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)5次進(jìn)行統(tǒng)計(jì)--基于數(shù)量的滾動(dòng)窗口

需求2:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)3次進(jìn)行統(tǒng)計(jì)--基于數(shù)量的滑動(dòng)窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5.  
  6. /** 
  7.  * @Package com.flink.source 
  8.  * @File :WindosDemoo_CountWindos.java 
  9.  * @author 大數(shù)據(jù)老哥 
  10.  * @date 2021/10/26 14:04 
  11.  * @version V1.0 
  12.  */ 
  13. object WindowDemo_CountWindow { 
  14.   def main(args: Array[String]): Unit = { 
  15.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  16.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  17.     val socketMap = socketData.map(new MapFunction[String, CartInfo] { 
  18.       override def map(t: String): CartInfo = { 
  19.         val arr = t.split(","
  20.         CartInfo(arr(0), arr(1).toInt) 
  21.       } 
  22.     }) 
  23.      // 需求1:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)5次進(jìn)行統(tǒng)計(jì) 
  24.     val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count"
  25.      // 需求2:統(tǒng)計(jì)在最近5條消息中,各自路口通過(guò)的汽車(chē)數(shù)量,相同的key每出現(xiàn)3次進(jìn)行統(tǒng)計(jì) 
  26.     val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count"
  27.     result.print("result"
  28.     result2.print("result2"
  29.     env.execute() 
  30.  
  31.   } 
  32. case class CartInfo(var sensorId: String, var countInt

case class CartInfo(var sensorId: String, var count: Int)

4.3 會(huì)話(huà)窗口

測(cè)試數(shù)據(jù)

  1. 信號(hào)燈編號(hào)和通過(guò)該信號(hào)燈的車(chē)的數(shù)量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

設(shè)置會(huì)話(huà)超時(shí)時(shí)間為10s,10s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator 
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
  6. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8.  
  9.   
  10. /** 
  11.  * @Package com.flink.source 
  12.  * @File :WindowDemo_SessionWindow.java 
  13.  * @author 大數(shù)據(jù)老哥 
  14.  * @date 2021/11/1 16:10 
  15.  * @version V1.0 
  16.  */ 
  17. object WindowDemo_SessionWindow { 
  18.   def main(args: Array[String]): Unit = { 
  19.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  20.  
  21.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  22.     val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() { 
  23.       override def map(t: String): CartInfo = { 
  24.         val arr = t.split(","
  25.         CartInfo(arr(0), arr(1).toInt) 
  26.       } 
  27.     }) 
  28.     //設(shè)置會(huì)話(huà)超時(shí)時(shí)間為10s,10s內(nèi)沒(méi)有數(shù)據(jù)到來(lái),則觸發(fā)上個(gè)窗口的計(jì)算 
  29.     val result = socketMap.keyBy(0) 
  30.       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) 
  31.       .sum("count"
  32.     result.print() 
  33.     env.execute("winds"
  34.   } 
  35.  
  36.  
  37. case class CartInfo(var sensorId: String, var countInt

 

責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)老哥
相關(guān)推薦

2024-02-27 08:05:32

Flink分區(qū)機(jī)制數(shù)據(jù)傳輸

2020-11-02 11:40:24

Node.jsRequire前端

2022-01-14 07:56:38

Checkpoint機(jī)制Flink

2024-06-21 08:32:24

2020-06-03 08:19:00

Kubernetes

2023-03-22 18:34:30

Flink調(diào)度部署

2022-04-25 09:03:16

JavaScript代碼

2021-09-12 07:01:07

Flink SQL ETL datastream

2024-05-11 08:31:20

中斷機(jī)制插隊(duì)機(jī)制React

2022-05-19 08:47:30

Flinkwatermark窗口計(jì)算

2020-10-14 09:11:44

IO 多路復(fù)用實(shí)現(xiàn)機(jī)

2024-05-28 08:02:08

Vue3父組件子組件

2021-12-29 17:29:07

KubernetesEvents集群

2023-04-12 08:38:44

函數(shù)參數(shù)Context

2024-04-09 07:50:59

Flink語(yǔ)義Watermark

2018-07-19 10:16:25

華光昱能

2022-07-05 09:03:05

Flink SQLTopN

2021-09-04 07:29:57

Android

2018-11-30 09:03:55

HTTP緩存Web

2024-05-17 10:05:06

Java機(jī)制應(yīng)用
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)