自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Flink 漫談系列(03) - Watermark

開發(fā) 開發(fā)工具
本節(jié)以一個流計算常見的亂序問題介紹了Apache Flink如何利用Watermark機制來處理亂序問題.。本篇內(nèi)容在一定程度上也體現(xiàn)了EventTime Window中的Trigger機制依賴了Watermark。

實際問題(亂序)

在介紹Watermark相關內(nèi)容之前我們先拋出一個具體的問題,在實際的流式計算中數(shù)據(jù)到來的順序?qū)τ嬎憬Y(jié)果的正確性有至關重要的影響,比如:某數(shù)據(jù)源中的某些數(shù)據(jù)由于某種原因(如:網(wǎng)絡原因,外部存儲自身原因)會有5秒的延時,也就是在實際時間的第1秒產(chǎn)生的數(shù)據(jù)有可能在第5秒中產(chǎn)生的數(shù)據(jù)之后到來(比如到Window處理節(jié)點)。選具體某個delay的元素來說,假設在一個5秒的Tumble窗口(詳見Window介紹章節(jié)),有一個EventTime是 11秒的數(shù)據(jù),在第16秒時候到來了。圖示第11秒的數(shù)據(jù),在16秒到來了,如下圖:

那么對于一個Count聚合的Tumble(5s)的window,上面的情況如何處理才能window2=4,window3=2 呢?

Apache Flink的時間類型

開篇我們描述的問題是一個很常見的TimeWindow中數(shù)據(jù)亂序的問題,亂序是相對于事件產(chǎn)生時間和到達Apache Flink 實際處理算子的順序而言的,關于時間在Apache Flink中有如下三種時間類型,如下圖:

Apache Flink

(1)ProcessingTime

ProcessingTime是數(shù)據(jù)流入到具體某個算子時候相應的系統(tǒng)時間。ProcessingTime 有***的性能和***的延遲。但在分布式計算環(huán)境中ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運行有可能產(chǎn)生不同的計算結(jié)果。

(2)IngestionTime

IngestionTime是數(shù)據(jù)進入Apache Flink框架的時間,是在Source Operator中設置的。與ProcessingTime相比可以提供更可預測的結(jié)果,因為IngestionTime的時間戳比較穩(wěn)定(在源處只記錄一次),同一數(shù)據(jù)在流經(jīng)不同窗口操作時將使用相同的時間戳,而對于ProcessingTime同一數(shù)據(jù)在流經(jīng)不同窗口算子會有不同的處理時間戳。

(3)EventTime

EventTime是事件在設備上產(chǎn)生時候攜帶的。在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,并且EventTime也可以從記錄中提取出來。在實際的網(wǎng)上購物訂單等業(yè)務場景中,大多會使用EventTime來進行數(shù)據(jù)計算。

開篇描述的問題和本篇要介紹的Watermark所涉及的時間類型均是指EventTime類型。

什么是Watermark

Watermark是Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質(zhì)上也是一種時間戳,由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統(tǒng)Event,與普通數(shù)據(jù)流Event一樣流轉(zhuǎn)到對應的下游算子,接收到Watermark Event的算子以此不斷調(diào)整自己管理的EventTime clock。 Apache Flink 框架保證Watermark單調(diào)遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時間維度)的方式。 Watermark的產(chǎn)生和Apache Flink內(nèi)部處理邏輯如下圖所示:

ProcessingTime

Watermark的產(chǎn)生方式

目前Apache Flink 有兩種生產(chǎn)Watermark的方式,如下:

  • Punctuated - 數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark。在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
  • Periodic - 周期性的(一定時間間隔或者達到一定的記錄條數(shù))產(chǎn)生一個Watermark。在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大的延時。

所以Watermark的生成方式需要根據(jù)業(yè)務場景的不同進行不同的選擇。

Watermark的接口定義

對應Apache Flink Watermark兩種不同的生成方式,我們了解一下對應的接口定義,如下:

  • Periodic Watermarks - AssignerWithPeriodicWatermarks
    1. /** 
    2. * Returns the current watermark. This method is periodically called by the 
    3. * system to retrieve the current watermark. The method may return {@code null} to 
    4. * indicate that no new Watermark is available. 
    5. <p>The returned watermark will be emitted only if it is non-null and itsTimestamp 
    6. * is larger than that of the previously emitted watermark (to preserve the contract of 
    7. * ascending watermarks). If the current watermark is still 
    8. * identical to the previous one, no progress in EventTime has happened since 
    9. * the previous call to this method. If a null value is returned, or theTimestamp 
    10. * of the returned watermark is smaller than that of the last emitted one, then no 
    11. * new watermark will be generated. 
    12. <p>The interval in which this method is called and Watermarks are generated 
    13. * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. 
    14. * @see org.Apache.flink.streaming.api.watermark.Watermark 
    15. * @see ExecutionConfig#getAutoWatermarkInterval() 
    16. * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. 
    17. */ 
    18. @Nullable 
    19. Watermark getCurrentWatermark(); 
  • Punctuated Watermarks - AssignerWithPunctuatedWatermarks
    1. public interface AssignerWithPunctuatedWatermarks<T> extendsTimestampAssigner<T> { 
    2.  
    3. /** 
    4. * Asks this implementation if it wants to emit a watermark. This method is called right after 
    5. * the {@link #extractTimestamp(Object, long)} method. 
    6. <p>The returned watermark will be emitted only if it is non-null and itsTimestamp 
    7. * is larger than that of the previously emitted watermark (to preserve the contract of 
    8. * ascending watermarks). If a null value is returned, or theTimestamp of the returned 
    9. * watermark is smaller than that of the last emitted one, then no new watermark will 
    10. * be generated. 
    11. <p>For an example how to use this method, see the documentation of 
    12. * {@link AssignerWithPunctuatedWatermarks this class}. 
    13. * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. 
    14. */ 
    15. @Nullable 
    16. Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);} 
  • AssignerWithPunctuatedWatermarks 繼承了TimestampAssigner接口 -TimestampAssigner
    1. public interfaceTimestampAssigner<T> extends Function { 
    2.  
    3. /** 
    4. * Assigns aTimestamp to an element, in milliseconds since the Epoch. 
    5. <p>The method is passed the previously assignedTimestamp of the element. 
    6. * That previousTimestamp may have been assigned from a previous assigner, 
    7. * by ingestionTime. If the element did not carry aTimestamp before, this value is 
    8. * {@code Long.MIN_VALUE}. 
    9. * @param element The element that theTimestamp is wil be assigned to. 
    10. * @param previousElementTimestamp The previous internalTimestamp of the element, 
    11. * or a negative value, if noTimestamp has been assigned, yet. 
    12. * @return The newTimestamp. 
    13. */ 
    14. long extractTimestamp(T element, long previousElementTimestamp); 

從接口定義可以看出,Watermark可以在Event(Element)中提取EventTime,進而定義一定的計算邏輯產(chǎn)生Watermark的時間戳。

Watermark解決如上問題

從上面的Watermark生成接口和Apache Flink內(nèi)部對Periodic Watermark的實現(xiàn)來看,Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等于Event中的EventTime,Event中的EventTime自產(chǎn)生那一刻起就不可以改變了,不受Apache Flink框架控制,而Watermark的產(chǎn)生是在Apache Flink的Source節(jié)點或?qū)崿F(xiàn)的Watermark生成器計算產(chǎn)生(如上Apache Flink內(nèi)置的 Periodic Watermark實現(xiàn)),Apache Flink內(nèi)部對單流或多流的場景有統(tǒng)一的Watermark處理。

回過頭來我們在看看Watermark機制如何解決上面的問題,上面的問題在于如何將遲來的EventTime 位11的元素正確處理。要解決這個問題我們還需要先了解一下EventTime window是如何觸發(fā)的? EventTime window 計算條件是當Window計算的Timer時間戳 小于等于 當前系統(tǒng)的Watermak的時間戳時候進行計算。

  • 當Watermark的時間戳等于Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結(jié)果如下:

上面對應的DDL(Alibaba 對 Apache Flink 的增強分支)定義如下:

  1. CREATE TABLE source( 
  2. ..., 
  3. Event_timeTimeStamp, 
  4. WATERMARK wk1 FOR Event_time as withOffset(Event_time, 0) 
  5. ) with ( 
  6. ... 
  7. ); 
  • 如果想正確處理遲來的數(shù)據(jù)可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:

上面對應的DDL(Alibaba 對 Apache Flink 的增強分支)定義如下:

  1. CREATE TABLE source( 
  2. ..., 
  3. Event_timeTimeStamp, 
  4. WATERMARK wk1 FOR Event_time as withOffset(Event_time, 5000) 
  5. ) with ( 
  6. ... 
  7. ); 

上面正確處理的根源是我們采取了 延遲觸發(fā) window 計算 的方式正確處理了 Late Event. 與此同時,我們發(fā)現(xiàn)window的延時觸發(fā)計算,也導致了下游的LATENCY變大,本例子中下游得到window的結(jié)果就延遲了5s。

多流的Watermark處理

在實際的流計算中往往一個job中會處理多個Source的數(shù)據(jù),對Source的數(shù)據(jù)進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節(jié)點,并攜帶各自的Watermark,Apache Flink內(nèi)部要保證Watermark要保持單調(diào)遞增,多個Source的Watermark匯聚到一起時候可能不是單調(diào)自增的,這樣的情況Apache Flink內(nèi)部是如何處理的呢?如下圖所示:

Apache Flink內(nèi)部實現(xiàn)每一個邊上只能有一個遞增的Watermark, 當出現(xiàn)多流攜帶Eventtime匯聚到一起(GroupBy or Union)時候,Apache Flink會選擇所有流入的Eventtime中最小的一個向下游流出。從而保證watermark的單調(diào)遞增和保證數(shù)據(jù)的完整性。如下圖:

小結(jié)

本節(jié)以一個流計算常見的亂序問題介紹了Apache Flink如何利用Watermark機制來處理亂序問題。本篇內(nèi)容在一定程度上也體現(xiàn)了EventTime Window中的Trigger機制依賴了Watermark(后續(xù)Window篇章會介紹)。Watermark機制是流計算中處理亂序,正確處理Late Event的核心手段。

# 關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-07-13 13:03:29

流計算亂序

2022-06-10 17:26:07

數(shù)據(jù)集計算

2022-07-13 12:53:59

數(shù)據(jù)存儲

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2018-11-07 08:48:31

Apache Flin持續(xù)查詢流計算

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2024-04-09 07:50:59

Flink語義Watermark

2020-04-09 11:08:30

PyFlinkJAR依賴

2022-05-19 08:47:30

Flinkwatermark窗口計算
點贊
收藏

51CTO技術棧公眾號