真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink水印延遲與窗口允許延遲的概念是什么

這篇文章主要講解了“Flink水印延遲與窗口允許延遲的概念是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink水印延遲與窗口允許延遲的概念是什么”吧!

網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、成都小程序開發(fā)、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了常山免費建站歡迎大家使用!

link 在開窗處理事件時間(Event Time)  數(shù)據(jù)時,可設(shè)置水印延遲以及設(shè)置窗口允許延遲(allowedLateness)以保證數(shù)據(jù)的完整性。這兩者因都是設(shè)置延遲時間所以剛接觸時容易混淆。本文接下將展開討論分析“水印延遲”與“窗口允許延遲”概念及區(qū)別。

水印延遲(WaterMark)

(1) 水印

由于采用了事件時間,脫離了物理掛鐘。窗口不知道什么時候需要關(guān)閉并進行計算,這個時候需要借助水印來解決該問題。當(dāng)窗口遇到水位標(biāo)識時就默認(rèn)是窗口時間段內(nèi)的數(shù)據(jù)都到齊了,可以觸發(fā)窗口計算。

(2) 水印延遲

設(shè)置水印延遲時間的目的是讓水印延遲到達,從而可以解決亂序問題。通過水印延遲到達讓在延遲時間范圍內(nèi)到達的遲到數(shù)據(jù)可以加入到窗口計算中,保證了數(shù)據(jù)的完整性。當(dāng)水印到達后就會觸發(fā)窗口計算,在水印之后到達的遲到數(shù)據(jù)則會被丟棄。

Flink水印延遲與窗口允許延遲的概念是什么

窗口允許延遲(allowedLateness)

Flink水印延遲與窗口允許延遲的概念是什么

使用 StreamAPI 時,在進行開窗后可設(shè)置 allowedLateness 窗口延遲。官網(wǎng)中對其解釋如下:

默認(rèn)情況下,當(dāng)水印到達窗口末端時,遲到元素將會被刪除。但Flink允許為window  operators指定允許的最大延遲。允許延遲指定元素在被刪除之前延遲的時間,默認(rèn)值為0。當(dāng)元素在水印經(jīng)過窗口末端后到達,且它的到達時間在窗口末端加上運行延遲的時間之內(nèi),其仍會被添加到窗口中。根據(jù)所使用的觸發(fā)器,延遲但未被丟棄的元素可能會再次觸發(fā)窗口計算。EventTimeTrigger就是這種情況。為了做到這一點,F(xiàn)link保持窗口的狀態(tài),直到它們允許的延遲到期。一旦發(fā)生這種情況,F(xiàn)link將刪除窗口并刪除其狀態(tài),正如窗口生命周期部分中所描述的那樣。

簡單理解:通常在水印到達之后遲到數(shù)據(jù)將會被刪除,而窗口的延遲則是指數(shù)據(jù)在被刪除之前的允許保留時間。也就是說,在水印達到之后遲到數(shù)據(jù)本該被刪除,但是如果設(shè)置了窗口延遲,那么在水印之后到窗口延遲時間段內(nèi)到達的遲到數(shù)據(jù)還是會被加入到窗口計算中,并再次觸發(fā)窗口計算。

一個Demo 兩個猜想

下面我用一個 Demo 和兩個猜想來幫助大家加深理解這兩個概念。

例子:接收 Kafka 數(shù)據(jù),數(shù)據(jù)為 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我們開一個 5  秒的 tumbling windows 滾動窗口,以 word 作為 key 在窗口內(nèi)對 count 值進行累加。同時設(shè)置水印延遲 2 秒,窗口延遲 2  秒。代碼如下:

public class MyExample {      public static void main(String[] args) throws Exception {         // 創(chuàng)建環(huán)境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);          // 設(shè)置時間特性為         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);         // 水印策略,其需要注入Timestamp Assigner(描述了如何訪問事件時間戳)和 Watermark Generator (事件流顯示的超出正常范圍的程度)         WatermarkStrategy watermarkStrategy = WatermarkStrategy                 // forBoundedOutOfOrderness 屬于(periodic周期性),周期生成器通常通過onEvent()觀察傳入的事件,然后在框架調(diào)用onPeriodicEmit()時發(fā)出水印。                 .forBoundedOutOfOrderness(Duration.ofSeconds(2))                 .withTimestampAssigner(new SerializableTimestampAssigner() {                     @Override                     public long extractTimestamp(WC wc, long l) {                         return wc.getEventTime() * 1000;                     }                 });          // Kafka 配置         Properties properties = new Properties();         properties.setProperty("bootstrap.servers", "Kafka地址:9092");         properties.setProperty("group.id", "test");          // Flink 需要知道如何轉(zhuǎn)換Kafka消息為Java對象(反序列化),默認(rèn)提供了 KafkaDeserializationSchema(序列化需要自己編寫)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema         env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())                 // map 構(gòu)建 WC 對象                 .map(new MapFunction() {                     @Override                     public WC map(ObjectNode jsonNode) throws Exception {                         JsonNode valueNode = jsonNode.get("value");                         WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());                         return wc;                     }                 })                 // 設(shè)定水印策略                 .assignTimestampsAndWatermarks(watermarkStrategy)                 .keyBy(WC::getWord)                 // 窗口設(shè)置,這里設(shè)置為滾動窗口                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))                                 // 設(shè)置窗口延遲                 .allowedLateness(Time.seconds(2))                 .reduce(new ReduceFunction() {                     @Override                     public WC reduce(WC wc, WC t1) throws Exception {                         return new WC(wc.getWord(), wc.getCount() + t1.getCount());                     }                 })                 .print();          env.execute();     }       static class WC {         public String word;         public int count;         public long eventTime;          public long getEventTime() {             return eventTime;         }          public void setEventTime(long eventTime) {             this.eventTime = eventTime;         }          public String getWord() {             return word;         }          public void setWord(String word) {             this.word = word;         }          public int getCount() {             return count;         }          public void setCount(int count) {             this.count = count;         }          public WC(String word, int count) {             this.word = word;             this.count = count;         }                  public WC(String word, int count,long eventTime) {             this.word = word;             this.count = count;             this.eventTime = eventTime;         }                @Override         public String toString() {             return "WC{" +                     "word='" + word + '\'' +                     ", count=" + count +                     '}';         }     } }

猜想1:

水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認(rèn)為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計算。

// 往 Kafka 中寫入數(shù)據(jù) {"word":"a","count":1,"time":1604286560}   //2020-11-02 11:09:20 {"word":"a","count":1,"time":1604286561}   //2020-11-02 11:09:21 {"word":"a","count":1,"time":1604286562}   //2020-11-02 11:09:22 {"word":"a","count":1,"time":1604286566}   //2020-11-02 11:09:26 {"word":"a","count":1,"time":1604286567}   //2020-11-02 11:09:27 (觸發(fā)了窗口計算)

Flink水印延遲與窗口允許延遲的概念是什么

控制臺輸出

分析:通過測試發(fā)現(xiàn)最后在第 7s 也就是 11:09:27 時觸發(fā)了窗口計算,這符合了我們的猜想一。水印延遲 2s 達到,所以會在第 5 + 2 = 7s  時認(rèn)為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計算。計算結(jié)果為3,這是因為只有最前面的3條數(shù)據(jù)屬于 [0,5) 窗口計算范圍之內(nèi)。

猜想2:

設(shè)置了窗口延遲2秒,那么只要在水印之后到窗口允許延遲的時間范圍內(nèi)達到且屬于 [ 0,5) 窗口的遲到數(shù)據(jù)會被加入到窗口中,且再次觸發(fā)窗口運算:

// 繼續(xù)往 Kafka 中寫入數(shù)據(jù) {"word":"a","count":1,"time":1604286568}   //2020-11-02 11:09:28 時間到達了第 8 秒 {"word":"a","count":1,"time":1604286563}   //2020-11-02 11:09:23 模擬一個在水印之后、在窗口允許延遲范圍內(nèi)、且屬于[0,5) 窗口的遲到數(shù)據(jù),該數(shù)據(jù)還是會觸發(fā)并參與到[0,5) 窗口的計算

Flink水印延遲與窗口允許延遲的概念是什么

控制臺輸出新增了一行

// 我們再繼續(xù)往 Kafka 中寫入數(shù)據(jù) {"word":"a","count":1,"time":1604286569}  //2020-11-02 11:09:29  時間到達第9秒 {"word":"a","count":1,"time":1604286563}  //2020-11-02 11:09:23 模擬一個在水印之后且超出窗口允許延遲范圍、且屬于[0,5) 窗口的遲到數(shù)據(jù),該數(shù)據(jù)不會參與和觸發(fā)[0,5)窗口計算

查看控制臺并沒有發(fā)現(xiàn)新的輸出打印。

Flink水印延遲與窗口允許延遲的概念是什么

解析:水印因延遲在第 7s 到達之后會觸發(fā)[0,5) 窗口計算,如果沒有設(shè)置窗口延遲的情況下,水印之后遲到且屬于 [0,5)  窗口的數(shù)據(jù)會被丟棄。上面我們實驗設(shè)置窗口延遲 2s,實現(xiàn)的效果就是在水印之后,窗口允許延遲時間之內(nèi)(7 + 2 = 9s 之間),遲到且屬于 [0,5)  窗口的數(shù)據(jù)還是會觸發(fā)一次窗口計算,并參與到窗口計算中。而在 9s 之后,也就是超過窗口允許延時時間,那么遲到且屬于[0,5)的數(shù)據(jù)就會被丟棄。

感謝各位的閱讀,以上就是“Flink水印延遲與窗口允許延遲的概念是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Flink水印延遲與窗口允許延遲的概念是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!


分享標(biāo)題:Flink水印延遲與窗口允許延遲的概念是什么
文章地址:http://weahome.cn/article/gshscd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部