自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

大數(shù)據(jù)
本文接下將展開討論分析“水印延遲”與“窗口允許延遲”概念及區(qū)別。

link 在開窗處理事件時間(Event Time) 數(shù)據(jù)時,可設置水印延遲以及設置窗口允許延遲(allowedLateness)以保證數(shù)據(jù)的完整性。這兩者因都是設置延遲時間所以剛接觸時容易混淆。本文接下將展開討論分析“水印延遲”與“窗口允許延遲”概念及區(qū)別。

[[350692]]

水印延遲(WaterMark)

(1) 水印

由于采用了事件時間,脫離了物理掛鐘。窗口不知道什么時候需要關閉并進行計算,這個時候需要借助水印來解決該問題。當窗口遇到水位標識時就默認是窗口時間段內(nèi)的數(shù)據(jù)都到齊了,可以觸發(fā)窗口計算。

(2) 水印延遲

設置水印延遲時間的目的是讓水印延遲到達,從而可以解決亂序問題。通過水印延遲到達讓在延遲時間范圍內(nèi)到達的遲到數(shù)據(jù)可以加入到窗口計算中,保證了數(shù)據(jù)的完整性。當水印到達后就會觸發(fā)窗口計算,在水印之后到達的遲到數(shù)據(jù)則會被丟棄。

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

窗口允許延遲(allowedLateness)

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

使用 StreamAPI 時,在進行開窗后可設置 allowedLateness 窗口延遲。官網(wǎng)中對其解釋如下:

默認情況下,當水印到達窗口末端時,遲到元素將會被刪除。但Flink允許為window operators指定允許的最大延遲。允許延遲指定元素在被刪除之前延遲的時間,默認值為0。當元素在水印經(jīng)過窗口末端后到達,且它的到達時間在窗口末端加上運行延遲的時間之內(nèi),其仍會被添加到窗口中。根據(jù)所使用的觸發(fā)器,延遲但未被丟棄的元素可能會再次觸發(fā)窗口計算。EventTimeTrigger就是這種情況。為了做到這一點,F(xiàn)link保持窗口的狀態(tài),直到它們允許的延遲到期。一旦發(fā)生這種情況,F(xiàn)link將刪除窗口并刪除其狀態(tài),正如窗口生命周期部分中所描述的那樣。

簡單理解:通常在水印到達之后遲到數(shù)據(jù)將會被刪除,而窗口的延遲則是指數(shù)據(jù)在被刪除之前的允許保留時間。也就是說,在水印達到之后遲到數(shù)據(jù)本該被刪除,但是如果設置了窗口延遲,那么在水印之后到窗口延遲時間段內(nèi)到達的遲到數(shù)據(jù)還是會被加入到窗口計算中,并再次觸發(fā)窗口計算。

一個Demo 兩個猜想

下面我用一個 Demo 和兩個猜想來幫助大家加深理解這兩個概念。

例子:接收 Kafka 數(shù)據(jù),數(shù)據(jù)為 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我們開一個 5 秒的 tumbling windows 滾動窗口,以 word 作為 key 在窗口內(nèi)對 count 值進行累加。同時設置水印延遲 2 秒,窗口延遲 2 秒。代碼如下:

  1. public class MyExample { 
  2.  
  3.     public static void main(String[] args) throws Exception { 
  4.         // 創(chuàng)建環(huán)境 
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  6.         env.setParallelism(1); 
  7.  
  8.         // 設置時間特性為 
  9.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
  10.         // 水印策略,其需要注入Timestamp Assigner(描述了如何訪問事件時間戳)和 Watermark Generator (事件流顯示的超出正常范圍的程度) 
  11.         WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy 
  12.                 // forBoundedOutOfOrderness 屬于(periodic周期性),周期生成器通常通過onEvent()觀察傳入的事件,然后在框架調(diào)用onPeriodicEmit()時發(fā)出水印。 
  13.                 .<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2)) 
  14.                 .withTimestampAssigner(new SerializableTimestampAssigner<WC>() { 
  15.                     @Override 
  16.                     public long extractTimestamp(WC wc, long l) { 
  17.                         return wc.getEventTime() * 1000; 
  18.                     } 
  19.                 }); 
  20.  
  21.         // Kafka 配置 
  22.         Properties properties = new Properties(); 
  23.         properties.setProperty("bootstrap.servers", "Kafka地址:9092"); 
  24.         properties.setProperty("group.id", "test"); 
  25.  
  26.         // Flink 需要知道如何轉(zhuǎn)換Kafka消息為Java對象(反序列化),默認提供了 KafkaDeserializationSchema(序列化需要自己編寫)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema 
  27.         env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest()) 
  28.                 // map 構建 WC 對象 
  29.                 .map(new MapFunction<ObjectNode, WC>() { 
  30.                     @Override 
  31.                     public WC map(ObjectNode jsonNode) throws Exception { 
  32.                         JsonNode valueNode = jsonNode.get("value"); 
  33.                         WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong()); 
  34.                         return wc; 
  35.                     } 
  36.                 }) 
  37.                 // 設定水印策略 
  38.                 .assignTimestampsAndWatermarks(watermarkStrategy) 
  39.                 .keyBy(WC::getWord) 
  40.                 // 窗口設置,這里設置為滾動窗口 
  41.                 .window(TumblingEventTimeWindows.of(Time.seconds(5))) 
  42.                                 // 設置窗口延遲 
  43.                 .allowedLateness(Time.seconds(2)) 
  44.                 .reduce(new ReduceFunction<WC>() { 
  45.                     @Override 
  46.                     public WC reduce(WC wc, WC t1) throws Exception { 
  47.                         return new WC(wc.getWord(), wc.getCount() + t1.getCount()); 
  48.                     } 
  49.                 }) 
  50.                 .print(); 
  51.  
  52.         env.execute(); 
  53.     } 
  54.  
  55.  
  56.     static class WC { 
  57.         public String word; 
  58.         public int count; 
  59.         public long eventTime; 
  60.  
  61.         public long getEventTime() { 
  62.             return eventTime; 
  63.         } 
  64.  
  65.         public void setEventTime(long eventTime) { 
  66.             this.eventTime = eventTime; 
  67.         } 
  68.  
  69.         public String getWord() { 
  70.             return word; 
  71.         } 
  72.  
  73.         public void setWord(String word) { 
  74.             this.word = word; 
  75.         } 
  76.  
  77.         public int getCount() { 
  78.             return count; 
  79.         } 
  80.  
  81.         public void setCount(int count) { 
  82.             this.count = count; 
  83.         } 
  84.  
  85.         public WC(String word, int count) { 
  86.             this.word = word; 
  87.             this.count = count; 
  88.         } 
  89.          
  90.         public WC(String word, int count,long eventTime) { 
  91.             this.word = word; 
  92.             this.count = count; 
  93.             this.eventTime = eventTime; 
  94.         } 
  95.        
  96.         @Override 
  97.         public String toString() { 
  98.             return "WC{" + 
  99.                     "word='" + word + '\'' + 
  100.                     ", count=" + count + 
  101.                     '}'; 
  102.         } 
  103.     } 

猜想1:

水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計算。

  1. // 往 Kafka 中寫入數(shù)據(jù) 
  2. {"word":"a","count":1,"time":1604286560}   //2020-11-02 11:09:20 
  3. {"word":"a","count":1,"time":1604286561}   //2020-11-02 11:09:21 
  4. {"word":"a","count":1,"time":1604286562}   //2020-11-02 11:09:22 
  5. {"word":"a","count":1,"time":1604286566}   //2020-11-02 11:09:26 
  6. {"word":"a","count":1,"time":1604286567}   //2020-11-02 11:09:27 (觸發(fā)了窗口計算) 

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

控制臺輸出

分析:通過測試發(fā)現(xiàn)最后在第 7s 也就是 11:09:27 時觸發(fā)了窗口計算,這符合了我們的猜想一。水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計算。計算結果為3,這是因為只有最前面的3條數(shù)據(jù)屬于 [0,5) 窗口計算范圍之內(nèi)。

猜想2:

設置了窗口延遲2秒,那么只要在水印之后到窗口允許延遲的時間范圍內(nèi)達到且屬于 [ 0,5) 窗口的遲到數(shù)據(jù)會被加入到窗口中,且再次觸發(fā)窗口運算:

  1. // 繼續(xù)往 Kafka 中寫入數(shù)據(jù) 
  2. {"word":"a","count":1,"time":1604286568}   //2020-11-02 11:09:28 時間到達了第 8 秒 
  3. {"word":"a","count":1,"time":1604286563}   //2020-11-02 11:09:23 模擬一個在水印之后、在窗口允許延遲范圍內(nèi)、且屬于[0,5) 窗口的遲到數(shù)據(jù),該數(shù)據(jù)還是會觸發(fā)并參與到[0,5) 窗口的計算 

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

控制臺輸出新增了一行

  1. // 我們再繼續(xù)往 Kafka 中寫入數(shù)據(jù) 
  2. {"word":"a","count":1,"time":1604286569}  //2020-11-02 11:09:29  時間到達第9秒 
  3. {"word":"a","count":1,"time":1604286563}  //2020-11-02 11:09:23 模擬一個在水印之后且超出窗口允許延遲范圍、且屬于[0,5) 窗口的遲到數(shù)據(jù),該數(shù)據(jù)不會參與和觸發(fā)[0,5)窗口計算 

查看控制臺并沒有發(fā)現(xiàn)新的輸出打印。

區(qū)分理解Flink水印延遲與窗口允許延遲的概念

解析:水印因延遲在第 7s 到達之后會觸發(fā)[0,5) 窗口計算,如果沒有設置窗口延遲的情況下,水印之后遲到且屬于 [0,5) 窗口的數(shù)據(jù)會被丟棄。上面我們實驗設置窗口延遲 2s,實現(xiàn)的效果就是在水印之后,窗口允許延遲時間之內(nèi)(7 + 2 = 9s 之間),遲到且屬于 [0,5) 窗口的數(shù)據(jù)還是會觸發(fā)一次窗口計算,并參與到窗口計算中。而在 9s 之后,也就是超過窗口允許延時時間,那么遲到且屬于[0,5)的數(shù)據(jù)就會被丟棄。

總結

  • WaterMark 到達之前,窗口在攢數(shù)據(jù),不會觸發(fā)計算。
  • WaterMark 等于 windowEndTime 時,第一次觸發(fā)窗口計算。
  • WaterMark 到達之后,allowlateness之前,如果來了數(shù)據(jù),每條數(shù)據(jù)都會觸發(fā)窗口計算。
  • 超過了allowlateness之后到達的遲到數(shù)據(jù)會丟棄。

水印用于解決亂序問題保證數(shù)據(jù)的完整性。而之所以有allowlateness的出現(xiàn)是因為如果WaterMark 加大會導致窗口計算延遲。WaterMark 設定的時間,是第一次觸發(fā)窗口計算的時間。allowlateness 表示,WaterMark 觸發(fā)窗口計算以后,還可以再等多久的遲到數(shù)據(jù),每次符合條件的數(shù)據(jù)到達都會再次觸發(fā)一次窗口計算。allowlateness 是在 Watermark 基礎上再做了一層遲到數(shù)據(jù)的保證。

 

責任編輯:趙寧寧 來源: 今日頭條
相關推薦

2020-11-13 16:40:05

RocketMQ延遲消息架構

2011-06-08 14:22:51

延遲加載

2025-03-03 09:10:00

C++開發(fā)

2023-06-29 07:48:35

異步加載JavaScript

2009-07-02 09:39:37

Hibernate延遲

2024-04-30 11:44:24

2023-10-23 10:02:58

RabbitMQ延遲隊列

2012-08-15 11:36:13

Hibernate

2023-12-28 11:18:01

MySQL數(shù)據(jù)庫級聯(lián)從庫

2024-07-05 10:17:08

數(shù)據(jù)流系統(tǒng)CPU

2011-11-24 21:03:10

ibmdw

2018-03-26 14:21:51

音視頻延遲架構網(wǎng)絡

2009-07-15 17:11:31

JDBC的概念

2024-11-11 10:34:55

2009-09-25 10:47:25

Hibernate延遲

2009-09-24 11:41:46

Hibernate延遲

2009-06-17 11:18:02

Hibernate延遲

2019-08-29 07:04:29

網(wǎng)絡延遲IP網(wǎng)絡

2010-01-26 13:28:42

24口全千兆交換機

2015-04-09 10:40:29

HTTP協(xié)議TCPHTTP事務延遲
點贊
收藏

51CTO技術棧公眾號