聊聊Flink:這次把Flink的觸發(fā)器(Trigger)、移除器(Evictor)講透
一、觸發(fā)器(Trigger)
Trigger 決定了一個(gè)窗口(由 window assigner 定義)何時(shí)可以被 window function 處理。每個(gè) WindowAssigner 都有一個(gè)默認(rèn)的 Trigger。如果默認(rèn) trigger 無(wú)法滿足你的需要,你可以在 trigger(…) 調(diào)用中指定自定義的 trigger。
1.1 Flink中預(yù)置的Trigger
窗口的計(jì)算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對(duì)應(yīng)的窗口觸發(fā)機(jī)制,都有一個(gè)默認(rèn)的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時(shí)候來(lái)觸發(fā)計(jì)算。flink內(nèi)部定義多種觸發(fā)器,每種觸發(fā)器對(duì)應(yīng)于不同的WindowAssigner。常見(jiàn)的觸發(fā)器如下:
- EventTimeTrigger:通過(guò)對(duì)比EventTime和窗口的Endtime確定是否觸發(fā)窗口計(jì)算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
- ProcessTimeTrigger:通過(guò)對(duì)比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計(jì)算,否則窗口繼續(xù)等待。
- ProcessingTimeoutTrigger:可以將任何觸發(fā)器轉(zhuǎn)變?yōu)槌瑫r(shí)觸發(fā)器。
- ContinuousEventTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前EndTime觸發(fā)窗口計(jì)算。
- ContinuousProcessingTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前ProcessTime觸發(fā)窗口計(jì)算。
- CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過(guò)設(shè)定的闕值判斷是否觸發(fā)窗口計(jì)算。
- DeltaTrigger:根據(jù)接入數(shù)據(jù)計(jì)算出來(lái)的Delta指標(biāo)是否超過(guò)指定的Threshold去判斷是否觸發(fā)窗口計(jì)算。
- PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。
- NeverTrigger:任何時(shí)候都不觸發(fā)窗口計(jì)算
1.2 Trigger的抽象類
Trigger 接口提供了五個(gè)方法來(lái)響應(yīng)不同的事件:
- onElement() 方法在每個(gè)元素被加入窗口時(shí)調(diào)用。
- onEventTime() 方法在注冊(cè)的 event-time timer 觸發(fā)時(shí)調(diào)用。
- onProcessingTime() 方法在注冊(cè)的 processing-time timer 觸發(fā)時(shí)調(diào)用。
- canMerge() 方法判斷是否可以合并。
- onMerge() 方法與有狀態(tài)的 trigger 相關(guān)。該方法會(huì)在兩個(gè)窗口合并時(shí), 將窗口對(duì)應(yīng) trigger 的狀態(tài)進(jìn)行合并,比如使用會(huì)話窗口時(shí)。
- clear() 方法處理在對(duì)應(yīng)窗口被移除時(shí)所需的邏輯。
觸發(fā)器接口的源碼如下:
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
/**
* Called for every element that gets added to a pane. The result of this will determine whether
* the pane is evaluated to emit results.
*
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which the element is being added.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception;
/**
* Called when a processing-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
/**
* Called when an event-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
/**
* Returns true if this trigger supports merging of trigger state and can therefore be used with
* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
*
* <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
* OnMergeContext)}
*/
public boolean canMerge() {
return false;
}
/**
* Called when several windows have been merged into one window by the {@link
* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
*
* @param window The new window that results from the merge.
* @param ctx A context object that can be used to register timer callbacks and access state.
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* Clears any state that the trigger might still hold for the given window. This is called when
* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
// ------------------------------------------------------------------------
/**
* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
*/
public interface TriggerContext {
// ...
}
/**
* Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
* OnMergeContext)}.
*/
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(
StateDescriptor<S, ?> stateDescriptor);
}
}
關(guān)于上述方法,需要注意三件事:
(1)前三個(gè)方法返回TriggerResult枚舉類型,其包含四個(gè)枚舉值:
- CONTINUE:表示對(duì)窗口不執(zhí)行任何操作。即不觸發(fā)窗口計(jì)算,也不刪除元素。
- FIRE:觸發(fā)窗口計(jì)算,但是保留窗口元素。
- PURGE:不觸發(fā)窗口計(jì)算,丟棄窗口,并且刪除窗口的元素。
- FIRE_AND_PURGE:觸發(fā)窗口計(jì)算,輸出結(jié)果,然后將窗口中的數(shù)據(jù)和窗口進(jìn)行清除。
源碼如下:
public enum TriggerResult {
// 不觸發(fā),也不刪除元素
CONTINUE(false, false),
// 觸發(fā)窗口,窗口出發(fā)后刪除窗口中的元素
FIRE_AND_PURGE(true, true),
// 觸發(fā)窗口,但是保留窗口元素
FIRE(true, false),
// 不觸發(fā)窗口,丟棄窗口,并且刪除窗口的元素
PURGE(false, true);
// ------------------------------------------------------------------------
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}
(2) 每一個(gè)窗口分配器都擁有一個(gè)屬于自己的 Trigger,Trigger上會(huì)有定時(shí)器,用來(lái)決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除,當(dāng)定時(shí)器觸發(fā)后,會(huì)調(diào)用對(duì)應(yīng)的回調(diào)返回,返回TriggerResult。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說(shuō)窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。
1.3 ProcessingTimeTrigger源碼分析
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "ProcessingTimeTrigger()";
}
/** Creates a new trigger that fires once system time passes the end of the window. */
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會(huì)注冊(cè)一個(gè)ProcessingTime定時(shí)器,時(shí)間參數(shù)是window.maxTimestamp(),也就是窗口的最終時(shí)間,當(dāng)時(shí)間到達(dá)這個(gè)窗口最終時(shí)間,定時(shí)器觸發(fā)并調(diào)用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計(jì)算,但是會(huì)保留窗口元素。
需要注意的是ProcessingTimeTrigger類只會(huì)在窗口的最終時(shí)間到達(dá)的時(shí)候觸發(fā)窗口函數(shù)的計(jì)算,計(jì)算完成后并不會(huì)清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲(chǔ)在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實(shí)際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會(huì)對(duì)窗口中的數(shù)據(jù)進(jìn)行清除。
EventTimeTriggerr在onElement設(shè)置的定時(shí)器:
圖片
EventTime通過(guò)registerEventTimeTimer注冊(cè)定時(shí)器,在內(nèi)部Watermark達(dá)到或超過(guò)Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)。
二、移除器(Evictor)
2.1 Evictor扮演的角色
圖片
當(dāng)一個(gè)元素進(jìn)入stream中之后,一般要經(jīng)歷Window(開(kāi)窗)、Trigger(觸發(fā)器)、Evitor(移除器)、Windowfunction(窗口計(jì)算操作),具體過(guò)程如下:
- Window中的WindowAssigner(窗口分配器)定義了數(shù)據(jù)應(yīng)該被分配到哪個(gè)窗口中,每一個(gè) WindowAssigner都會(huì)有一個(gè)默認(rèn)的Trigger,如果用戶在代碼中指定了窗口的trigger,默認(rèn)的 trigger 將會(huì)被覆蓋。
- Trigger上會(huì)有定時(shí)器,用來(lái)決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說(shuō)窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。
- 當(dāng)Trigger fire了,窗口中的元素集合就會(huì)交給Evictor(如果指定了的話)。Evictor 主要用來(lái)遍歷窗口中的元素列表,并決定最先進(jìn)入窗口的多少個(gè)元素需要被移除。剩余的元素會(huì)交給用戶指定的函數(shù)進(jìn)行窗口的計(jì)算。如果沒(méi)有 Evictor 的話,窗口中的所有元素會(huì)一起交給WindowFunction進(jìn)行計(jì)算。
- WindowFunction收到了窗口的元素(可能經(jīng)過(guò)了 Evictor 的過(guò)濾),并計(jì)算出窗口的結(jié)果值,并發(fā)送給下游。窗口計(jì)算操作有很多,比如預(yù)定義的sum(),min(),max(),還有 ReduceFunction,WindowFunction。WindowFunction 是最通用的計(jì)算函數(shù),其他的預(yù)定義的函數(shù)基本都是基于該函數(shù)實(shí)現(xiàn)的。
現(xiàn)在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪個(gè)位置,讓我們繼續(xù)看為何使用Evictor。
Evictor接口定義如下:
圖片
evictBefore()包含要在窗口函數(shù)之前應(yīng)用的清除邏輯,而evictAfter()包含要在窗口函數(shù)之后應(yīng)用的清除邏輯。應(yīng)用窗口函數(shù)之前清除的元素將不會(huì)被窗口函數(shù)處理。
窗格是具有相同Key和相同窗口的元素組成的桶,即同一個(gè)窗口中相同Key的元素一定屬于同一個(gè)窗格。一個(gè)元素可以在多個(gè)窗格中(當(dāng)一個(gè)元素被分配給多個(gè)窗口時(shí)),這些窗格都有自己的清除器實(shí)例。
注:window默認(rèn)沒(méi)有evictor,一旦把window指定Evictor,該window會(huì)由EvictWindowOperator類來(lái)負(fù)責(zé)操作。
2.2 Flink內(nèi)置的Evitor
- CountEvictor:保留窗口中用戶指定的元素?cái)?shù)量,并丟棄窗口緩沖區(qū)剩余的元素。
- DeltaEvictor:依次計(jì)算窗口緩沖區(qū)中的最后一個(gè)元素與其余每個(gè)元素之間的delta值,若delta值大于等于指定的閾值,則該元素會(huì)被移除。使用DeltaEvictor清除器需要指定兩個(gè)參數(shù),一個(gè)是double類型的閾值;另一個(gè)是DeltaFunction接口的實(shí)例,DeltaFunction用于指定具體的delta值計(jì)算邏輯。
- TimeEvictor:傳入一個(gè)以毫秒為單位的時(shí)間間隔參數(shù)(例如以size表示),對(duì)于給定的窗口,取窗口中元素的最大時(shí)間戳(例如以max表示),使用TimeEvictor清除器將刪除所有時(shí)間戳小于或等于max-size的元素(即清除從窗口開(kāi)頭到指定的截止時(shí)間之間的元素)。
2.2.1 CountEvictor
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
// 小于最大數(shù)量,不做處理
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
// 移除前size - maxCount個(gè)元素,只剩下最后maxCount個(gè)元素
iterator.remove();
}
}
}
}
2.2.2 DeltaEvictor
DeltaEvictor通過(guò)計(jì)算DeltaFunction的值(依次傳入每個(gè)元素和最后一個(gè)元素),并將其與threshold進(jìn)行對(duì)比,如果DeltaFunction計(jì)算結(jié)果大于等于threshold,則該元素會(huì)被移除。DeltaEvictor的實(shí)現(xiàn)如下:
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
// 獲取最后一個(gè)元素
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 依次計(jì)算每個(gè)元素和最后一個(gè)元素的delta值,同時(shí)和threshold的值進(jìn)行比較
// 若計(jì)算結(jié)果大于threshold值或者是相等,則該元素會(huì)被移除
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}
2.2.3 TimeEvictor
TimeEvictor以時(shí)間為判斷標(biāo)準(zhǔn),決定元素是否會(huì)被移除。TimeEvictor會(huì)獲取窗口中所有元素的最大時(shí)間戳currentTime,currentTime減去窗口大小(windowSize) 可得到能保留最久的元素的時(shí)間戳evictCutoff,然后再遍歷窗口中的元素,如果元素的時(shí)間戳小于evictCutoff,就執(zhí)行移除操作,否則不移除。具體邏輯如下圖所示:
TimeEvictor的代碼實(shí)現(xiàn)如下:
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
// 如果element沒(méi)有timestamp,直接返回
if (!hasTimestamp(elements)) {
return;
}
// 獲取elements中最大的時(shí)間戳(到來(lái)最晚的元素的時(shí)間)
long currentTime = getMaxTimestamp(elements);
// 截止時(shí)間為: 到來(lái)最晚的元素的時(shí)間 - 窗口大小(可以理解為保留最近的多久的元素)
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 清除所有時(shí)間戳小于截止時(shí)間的元素
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}