這篇“Java Flink窗口觸發(fā)器Trigger如何使用”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Java Flink窗口觸發(fā)器Trigger如何使用”文章吧。
創(chuàng)新互聯(lián)自成立以來,一直致力于為企業(yè)提供從網(wǎng)站策劃、網(wǎng)站設計、成都做網(wǎng)站、網(wǎng)站設計、電子商務、網(wǎng)站推廣、網(wǎng)站優(yōu)化到為企業(yè)提供個性化軟件開發(fā)等基于互聯(lián)網(wǎng)的全面整合營銷服務。公司擁有豐富的網(wǎng)站建設和互聯(lián)網(wǎng)應用系統(tǒng)開發(fā)管理經(jīng)驗、成熟的應用系統(tǒng)解決方案、優(yōu)秀的網(wǎng)站開發(fā)工程師團隊及專業(yè)的網(wǎng)站設計師團隊。
Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數(shù)處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發(fā)器不符合您的需求,您可以使用trigger。
public abstract class Triggerimplements Serializable { /** 只要有元素落?到當前窗?, 就會調(diào)?該?法 * @param element 收到的元素 * @param timestamp 元素抵達時間. * @param window 元素所屬的window窗口. * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調(diào). */ public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; /** * processing-time 定時器回調(diào)函數(shù) * * @param time 定時器觸發(fā)的時間. * @param window 定時器觸發(fā)的窗口對象. * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調(diào). */ public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * event-time 定時器回調(diào)函數(shù) * * @param time 定時器觸發(fā)的時間. * @param window 定時器觸發(fā)的窗口對象. * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調(diào). */ public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * 當 多個窗口合并到?個窗?的時候,調(diào)用該方法法,例如系統(tǒng)SessionWindow * * @param window 合并后的新窗口對象 * @param ctx ?個上下?對象,通常用該對象注冊 timer(ProcessingTime/EventTime)回調(diào)以及訪問狀態(tài) */ public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 當窗口被刪除后執(zhí)?所需的任何操作。例如:可以清除定時器或者刪除狀態(tài)數(shù)據(jù) */ public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; }
public enum TriggerResult { // 表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。 CONTINUE(false, false), // 觸發(fā)窗口計算,輸出結(jié)果,然后將窗口中的數(shù)據(jù)和窗口進行清除。 FIRE_AND_PURGE(true, true), // 觸發(fā)窗口計算,但是保留窗口元素 FIRE(true, false), // 不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。 PURGE(false, true); private final boolean fire; private final boolean purge; private TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return this.fire; } public boolean isPurge() { return this.purge; } }
一旦觸發(fā)器確定窗口已準備好進行處理,就會觸發(fā),返回狀態(tài)可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發(fā)窗口計算并保留窗口內(nèi)容,而FIRE_AND_PURGE是觸發(fā)窗口計算并刪除窗口內(nèi)容。默認情況下,預實現(xiàn)的觸發(fā)器只是簡單地FIRE不清除窗口狀態(tài)。
EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。
ContinuousEventTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結(jié)束時間小于當前EndTime觸發(fā)窗口計算。
ContinuousProcessingTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結(jié)束時間小于當前ProcessTime觸發(fā)窗口計算。
CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過設定的闕值判斷是否觸發(fā)窗口計算。
DeltaTrigger:根據(jù)接入數(shù)據(jù)計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。
PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計算完成后數(shù)據(jù)將被清理。
NeverTrigger:任何時候都不觸發(fā)窗口計算
主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。
public class EventTimeTrigger extends Trigger
public class ProcessingTimeTrigger extends Trigger
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數(shù)是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調(diào)用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計算,但是會保留窗口元素。
需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數(shù)的計算,計算完成后并不會清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數(shù)據(jù)進行清除。
TumblingEventTimeWindows :EventTimeTrigger public class TumblingEventTimeWindows extends WindowAssigner
TumblingProcessingTimeWindows :ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
SlidingEventTimeWindows:EventTimeTrigger public class SlidingEventTimeWindows extends WindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
SlidingProcessingTimeWindows :ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
EventTimeSessionWindows:EventTimeTrigger public class EventTimeSessionWindows extends MergingWindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
ProcessingTimeSessionWindows:ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
GlobalWindows :NeverTrigger public class GlobalWindows extends WindowAssigner{ public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return new GlobalWindows.NeverTrigger(); } }
以上就是關于“Java Flink窗口觸發(fā)器Trigger如何使用”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關的知識內(nèi)容,請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。