自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊Flink:這次把Flink的觸發(fā)器(Trigger)、移除器(Evictor)講透

開(kāi)發(fā) 前端
窗口的計(jì)算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對(duì)應(yīng)的窗口觸發(fā)機(jī)制,都有一個(gè)默認(rèn)的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時(shí)候來(lái)觸發(fā)計(jì)算。flink內(nèi)部定義多種觸發(fā)器,每種觸發(fā)器對(duì)應(yīng)于不同的WindowAssigner。

一、觸發(fā)器(Trigger)

Trigger 決定了一個(gè)窗口(由 window assigner 定義)何時(shí)可以被 window function 處理。每個(gè) WindowAssigner 都有一個(gè)默認(rèn)的 Trigger。如果默認(rèn) trigger 無(wú)法滿足你的需要,你可以在 trigger(…) 調(diào)用中指定自定義的 trigger。

1.1 Flink中預(yù)置的Trigger

窗口的計(jì)算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對(duì)應(yīng)的窗口觸發(fā)機(jī)制,都有一個(gè)默認(rèn)的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時(shí)候來(lái)觸發(fā)計(jì)算。flink內(nèi)部定義多種觸發(fā)器,每種觸發(fā)器對(duì)應(yīng)于不同的WindowAssigner。常見(jiàn)的觸發(fā)器如下:

  • EventTimeTrigger:通過(guò)對(duì)比EventTime和窗口的Endtime確定是否觸發(fā)窗口計(jì)算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
  • ProcessTimeTrigger:通過(guò)對(duì)比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計(jì)算,否則窗口繼續(xù)等待。
  • ProcessingTimeoutTrigger:可以將任何觸發(fā)器轉(zhuǎn)變?yōu)槌瑫r(shí)觸發(fā)器。
  • ContinuousEventTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前EndTime觸發(fā)窗口計(jì)算。
  • ContinuousProcessingTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前ProcessTime觸發(fā)窗口計(jì)算。
  • CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過(guò)設(shè)定的闕值判斷是否觸發(fā)窗口計(jì)算。
  • DeltaTrigger:根據(jù)接入數(shù)據(jù)計(jì)算出來(lái)的Delta指標(biāo)是否超過(guò)指定的Threshold去判斷是否觸發(fā)窗口計(jì)算。
  • PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。
  • NeverTrigger:任何時(shí)候都不觸發(fā)窗口計(jì)算

1.2 Trigger的抽象類

Trigger 接口提供了五個(gè)方法來(lái)響應(yīng)不同的事件:

  • onElement() 方法在每個(gè)元素被加入窗口時(shí)調(diào)用。
  • onEventTime() 方法在注冊(cè)的 event-time timer 觸發(fā)時(shí)調(diào)用。
  • onProcessingTime() 方法在注冊(cè)的 processing-time timer 觸發(fā)時(shí)調(diào)用。
  • canMerge() 方法判斷是否可以合并。
  • onMerge() 方法與有狀態(tài)的 trigger 相關(guān)。該方法會(huì)在兩個(gè)窗口合并時(shí), 將窗口對(duì)應(yīng) trigger 的狀態(tài)進(jìn)行合并,比如使用會(huì)話窗口時(shí)。
  • clear() 方法處理在對(duì)應(yīng)窗口被移除時(shí)所需的邏輯。

觸發(fā)器接口的源碼如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    /**
     * Called for every element that gets added to a pane. The result of this will determine whether
     * the pane is evaluated to emit results.
     *
     * @param element The element that arrived.
     * @param timestamp The timestamp of the element that arrived.
     * @param window The window to which the element is being added.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when a processing-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when an event-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Returns true if this trigger supports merging of trigger state and can therefore be used with
     * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
     *
     * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
     * OnMergeContext)}
     */
    public boolean canMerge() {
        return false;
    }

    /**
     * Called when several windows have been merged into one window by the {@link
     * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
     *
     * @param window The new window that results from the merge.
     * @param ctx A context object that can be used to register timer callbacks and access state.
     */
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    /**
     * Clears any state that the trigger might still hold for the given window. This is called when
     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {
        // ...
    }

    /**
     * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
     * OnMergeContext)}.
     */
    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(
                StateDescriptor<S, ?> stateDescriptor);
    }
}

關(guān)于上述方法,需要注意三件事:

(1)前三個(gè)方法返回TriggerResult枚舉類型,其包含四個(gè)枚舉值:

  • CONTINUE:表示對(duì)窗口不執(zhí)行任何操作。即不觸發(fā)窗口計(jì)算,也不刪除元素。
  • FIRE:觸發(fā)窗口計(jì)算,但是保留窗口元素。
  • PURGE:不觸發(fā)窗口計(jì)算,丟棄窗口,并且刪除窗口的元素。
  • FIRE_AND_PURGE:觸發(fā)窗口計(jì)算,輸出結(jié)果,然后將窗口中的數(shù)據(jù)和窗口進(jìn)行清除。

源碼如下:

public enum TriggerResult {

    // 不觸發(fā),也不刪除元素
    CONTINUE(false, false),

    // 觸發(fā)窗口,窗口出發(fā)后刪除窗口中的元素
    FIRE_AND_PURGE(true, true),

    // 觸發(fā)窗口,但是保留窗口元素
    FIRE(true, false),

    // 不觸發(fā)窗口,丟棄窗口,并且刪除窗口的元素
    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}

(2) 每一個(gè)窗口分配器都擁有一個(gè)屬于自己的 Trigger,Trigger上會(huì)有定時(shí)器,用來(lái)決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除,當(dāng)定時(shí)器觸發(fā)后,會(huì)調(diào)用對(duì)應(yīng)的回調(diào)返回,返回TriggerResult。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說(shuō)窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。

1.3 ProcessingTimeTrigger源碼分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    /** Creates a new trigger that fires once system time passes the end of the window. */
    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會(huì)注冊(cè)一個(gè)ProcessingTime定時(shí)器,時(shí)間參數(shù)是window.maxTimestamp(),也就是窗口的最終時(shí)間,當(dāng)時(shí)間到達(dá)這個(gè)窗口最終時(shí)間,定時(shí)器觸發(fā)并調(diào)用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計(jì)算,但是會(huì)保留窗口元素。

需要注意的是ProcessingTimeTrigger類只會(huì)在窗口的最終時(shí)間到達(dá)的時(shí)候觸發(fā)窗口函數(shù)的計(jì)算,計(jì)算完成后并不會(huì)清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲(chǔ)在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實(shí)際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會(huì)對(duì)窗口中的數(shù)據(jù)進(jìn)行清除。

EventTimeTriggerr在onElement設(shè)置的定時(shí)器:

圖片圖片

EventTime通過(guò)registerEventTimeTimer注冊(cè)定時(shí)器,在內(nèi)部Watermark達(dá)到或超過(guò)Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)。

二、移除器(Evictor)

2.1 Evictor扮演的角色

圖片圖片

當(dāng)一個(gè)元素進(jìn)入stream中之后,一般要經(jīng)歷Window(開(kāi)窗)、Trigger(觸發(fā)器)、Evitor(移除器)、Windowfunction(窗口計(jì)算操作),具體過(guò)程如下:


  • Window中的WindowAssigner(窗口分配器)定義了數(shù)據(jù)應(yīng)該被分配到哪個(gè)窗口中,每一個(gè) WindowAssigner都會(huì)有一個(gè)默認(rèn)的Trigger,如果用戶在代碼中指定了窗口的trigger,默認(rèn)的 trigger 將會(huì)被覆蓋。
  • Trigger上會(huì)有定時(shí)器,用來(lái)決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說(shuō)窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。
  • 當(dāng)Trigger fire了,窗口中的元素集合就會(huì)交給Evictor(如果指定了的話)。Evictor 主要用來(lái)遍歷窗口中的元素列表,并決定最先進(jìn)入窗口的多少個(gè)元素需要被移除。剩余的元素會(huì)交給用戶指定的函數(shù)進(jìn)行窗口的計(jì)算。如果沒(méi)有 Evictor 的話,窗口中的所有元素會(huì)一起交給WindowFunction進(jìn)行計(jì)算。
  • WindowFunction收到了窗口的元素(可能經(jīng)過(guò)了 Evictor 的過(guò)濾),并計(jì)算出窗口的結(jié)果值,并發(fā)送給下游。窗口計(jì)算操作有很多,比如預(yù)定義的sum(),min(),max(),還有 ReduceFunction,WindowFunction。WindowFunction 是最通用的計(jì)算函數(shù),其他的預(yù)定義的函數(shù)基本都是基于該函數(shù)實(shí)現(xiàn)的。

現(xiàn)在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪個(gè)位置,讓我們繼續(xù)看為何使用Evictor。

Evictor接口定義如下:

圖片圖片

evictBefore()包含要在窗口函數(shù)之前應(yīng)用的清除邏輯,而evictAfter()包含要在窗口函數(shù)之后應(yīng)用的清除邏輯。應(yīng)用窗口函數(shù)之前清除的元素將不會(huì)被窗口函數(shù)處理。

窗格是具有相同Key和相同窗口的元素組成的桶,即同一個(gè)窗口中相同Key的元素一定屬于同一個(gè)窗格。一個(gè)元素可以在多個(gè)窗格中(當(dāng)一個(gè)元素被分配給多個(gè)窗口時(shí)),這些窗格都有自己的清除器實(shí)例。

注:window默認(rèn)沒(méi)有evictor,一旦把window指定Evictor,該window會(huì)由EvictWindowOperator類來(lái)負(fù)責(zé)操作。

2.2 Flink內(nèi)置的Evitor

  • CountEvictor:保留窗口中用戶指定的元素?cái)?shù)量,并丟棄窗口緩沖區(qū)剩余的元素。
  • DeltaEvictor:依次計(jì)算窗口緩沖區(qū)中的最后一個(gè)元素與其余每個(gè)元素之間的delta值,若delta值大于等于指定的閾值,則該元素會(huì)被移除。使用DeltaEvictor清除器需要指定兩個(gè)參數(shù),一個(gè)是double類型的閾值;另一個(gè)是DeltaFunction接口的實(shí)例,DeltaFunction用于指定具體的delta值計(jì)算邏輯。
  • TimeEvictor:傳入一個(gè)以毫秒為單位的時(shí)間間隔參數(shù)(例如以size表示),對(duì)于給定的窗口,取窗口中元素的最大時(shí)間戳(例如以max表示),使用TimeEvictor清除器將刪除所有時(shí)間戳小于或等于max-size的元素(即清除從窗口開(kāi)頭到指定的截止時(shí)間之間的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        // 小于最大數(shù)量,不做處理
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                // 移除前size - maxCount個(gè)元素,只剩下最后maxCount個(gè)元素
                iterator.remove();
            }
        }
    }
}

2.2.2 DeltaEvictor

DeltaEvictor通過(guò)計(jì)算DeltaFunction的值(依次傳入每個(gè)元素和最后一個(gè)元素),并將其與threshold進(jìn)行對(duì)比,如果DeltaFunction計(jì)算結(jié)果大于等于threshold,則該元素會(huì)被移除。DeltaEvictor的實(shí)現(xiàn)如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {

    // 獲取最后一個(gè)元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);

    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
        TimestampedValue<T> element = iterator.next();
        // 依次計(jì)算每個(gè)元素和最后一個(gè)元素的delta值,同時(shí)和threshold的值進(jìn)行比較
        // 若計(jì)算結(jié)果大于threshold值或者是相等,則該元素會(huì)被移除
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
            iterator.remove();
        }
    }
}

2.2.3 TimeEvictor

TimeEvictor以時(shí)間為判斷標(biāo)準(zhǔn),決定元素是否會(huì)被移除。TimeEvictor會(huì)獲取窗口中所有元素的最大時(shí)間戳currentTime,currentTime減去窗口大小(windowSize) 可得到能保留最久的元素的時(shí)間戳evictCutoff,然后再遍歷窗口中的元素,如果元素的時(shí)間戳小于evictCutoff,就執(zhí)行移除操作,否則不移除。具體邏輯如下圖所示:

TimeEvictor的代碼實(shí)現(xiàn)如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {

    // 如果element沒(méi)有timestamp,直接返回
    if (!hasTimestamp(elements)) {
        return;
    }

    // 獲取elements中最大的時(shí)間戳(到來(lái)最晚的元素的時(shí)間)
    long currentTime = getMaxTimestamp(elements);
    // 截止時(shí)間為: 到來(lái)最晚的元素的時(shí)間 - 窗口大小(可以理解為保留最近的多久的元素)
    long evictCutoff = currentTime - windowSize;

    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();

        // 清除所有時(shí)間戳小于截止時(shí)間的元素
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}


責(zé)任編輯:武曉燕 來(lái)源: 老周聊架構(gòu)
相關(guān)推薦

2010-05-04 09:44:12

Oracle Trig

2024-02-27 08:05:32

Flink分區(qū)機(jī)制數(shù)據(jù)傳輸

2011-05-20 14:06:25

Oracle觸發(fā)器

2024-01-29 08:07:42

FlinkYARN架構(gòu)

2024-04-09 07:50:59

Flink語(yǔ)義Watermark

2010-10-12 10:04:15

MySQL觸發(fā)器

2010-05-31 18:06:07

MySQL 觸發(fā)器

2011-04-14 13:54:22

Oracle觸發(fā)器

2011-05-19 14:29:49

Oracle觸發(fā)器語(yǔ)法

2009-09-18 14:31:33

CLR觸發(fā)器

2011-03-28 10:05:57

sql觸發(fā)器代碼

2009-04-26 22:27:54

觸發(fā)器密碼修改數(shù)據(jù)庫(kù)

2010-10-11 14:52:43

Mysql觸發(fā)器

2010-05-18 15:36:44

MySQL觸發(fā)器

2009-04-07 13:56:03

SQL Server觸發(fā)器實(shí)例

2009-10-22 17:18:20

CLR觸發(fā)器

2010-05-18 15:58:39

MySQL觸發(fā)器

2010-10-12 10:24:58

mysql觸發(fā)器

2009-11-18 13:15:06

Oracle觸發(fā)器

2021-07-30 10:33:57

MySQL觸發(fā)器數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)