? streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無限數(shù)據(jù)集是指一種不斷增長的本質(zhì)上無限的數(shù)據(jù)集,而window是一種切割無限數(shù)據(jù)為有限塊進(jìn)行處理的手段。
? Window是無限數(shù)據(jù)流處理的核心,Window將一個(gè)無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計(jì)算操作。
創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供白塔網(wǎng)站建設(shè)、白塔做網(wǎng)站、白塔網(wǎng)站設(shè)計(jì)、白塔網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、白塔企業(yè)網(wǎng)站模板建站服務(wù),十多年白塔做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
window可以分為兩大類:
CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無關(guān)。比較少用
TimeWindow:按照時(shí)間生成Window。非常常用,下面主要將時(shí)間窗口有哪些類型。主要有四類:滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)、會(huì)話窗口(Session Window)和全局窗口(global window比較少用 )。
概述:將數(shù)據(jù)依據(jù)固定的窗口長度對(duì)數(shù)據(jù)進(jìn)行切片。只有一個(gè)工作參數(shù),就是窗口大小
特點(diǎn):時(shí)間對(duì)齊,窗口長度固定,沒有重疊。
? 滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊(前后時(shí)間點(diǎn)都是緊接著的)。例如:如果你指定了一個(gè)5分鐘大小的滾動(dòng)窗口,窗口的創(chuàng)建如下圖所示:
? 圖 1.2.1 滾動(dòng)窗口
適用場(chǎng)景:適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)。
概述:滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口工作參數(shù)由固定的窗口長度和滑動(dòng)間隔組成。
特點(diǎn):時(shí)間對(duì)齊,窗口長度固定,有重疊。
? 滑動(dòng)窗口分配器將元素分配到固定長度的窗口中,與滾動(dòng)窗口類似,窗口的大小由窗口大小參數(shù)來配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開始的頻率。因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動(dòng),那么每個(gè)窗口中5分鐘的窗口里包含著上個(gè)10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:
? 圖 1.2.2 滑動(dòng)窗口
適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來決定是否要報(bào)警)。
概述:由一系列事件組合一個(gè)指定時(shí)間長度的timeout間隙組成,類似于web應(yīng)用的session,也就是一段時(shí)間沒有接收到新數(shù)據(jù)就會(huì)生成新的窗口。
特點(diǎn):時(shí)間無對(duì)齊。窗口無固定長度
? session窗口分配器通過session活動(dòng)來對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。一個(gè)session窗口通過一個(gè)session間隔來配置,這個(gè)session間隔定義了非活躍周期的長度,當(dāng)這個(gè)非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。
? 圖1.2.3 會(huì)話窗口
window數(shù)據(jù)源分為兩種,一種是典型的KV類型(keyedStream),另一種是非KV類型(Non-keyedStream)。
區(qū)別:
keyedStream:
需要在使用窗口操作前,調(diào)用 keyBy對(duì)KV按照key進(jìn)行分區(qū),然后才可以調(diào)用window操作的api,比如 countWindow,timeWindow等
Non-keyedstream:
如果使用窗口操作前,沒有使用keyBy算子,那么就認(rèn)為是Non-keyedstream,調(diào)用的window api就是 xxxWindowAll,比如countWindowAll,timeWindowAll,而且因?yàn)槭欠荎V,所以無法分區(qū),也就是只有一個(gè)分區(qū),那么這個(gè)窗口并行度只能是1。這個(gè)是要注意的。
CountWindow根據(jù)窗口中相同key元素的數(shù)量來觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。
有兩個(gè)用法:
countWindow(window_size):只指定窗口大小,此時(shí)窗口是滾動(dòng)窗口
countWindow(window_size, slide):指定窗口大小以及滑動(dòng)間隔,此時(shí)窗口是滑動(dòng)窗口
注意:CountWindow的window_size指的是相同Key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。
1、滾動(dòng)窗口
默認(rèn)的CountWindow是一個(gè)滾動(dòng)窗口,只需要指定窗口大小即可,當(dāng)元素?cái)?shù)量達(dá)到窗口大小時(shí),就會(huì)觸發(fā)窗口的執(zhí)行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.readTextFile("/test.txt");
source.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
for (String s1 : s.split(" ")) {
collector.collect(new Tuple2<>(s1, 1));
}
}
}).keyBy(0).countWindow(5).reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 t1, Tuple2 t2) throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
}).print();
env.execute("滾動(dòng)窗口");
}
}
2、滑動(dòng)窗口
動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。
下面代碼中的sliding_size設(shè)置為了2,也就是說,每收到兩個(gè)相同key的數(shù)據(jù)就計(jì)算一次,每一次計(jì)算的window范圍是5個(gè)元素。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.readTextFile("/test.txt");
source.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
for (String s1 : s.split(" ")) {
collector.collect(new Tuple2<>(s1, 1));
}
}
}).keyBy(0).countWindow(5,2).reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 t1, Tuple2 t2) throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
}).print();
env.execute("滑動(dòng)窗口");
}
}
? TimeWindow是將指定時(shí)間范圍內(nèi)的所有數(shù)據(jù)組成一個(gè)window,一次對(duì)一個(gè)window里面的所有數(shù)據(jù)進(jìn)行計(jì)算。同樣支持類似上面的滾動(dòng)窗口和滑動(dòng)窗口模式。有兩個(gè)工作參數(shù):window_size和slide。只指定window_size時(shí)是滾動(dòng)窗口。
1、滾動(dòng)窗口
? Flink默認(rèn)的時(shí)間窗口根據(jù)Processing Time 進(jìn)行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進(jìn)入Flink的時(shí)間劃分到不同的窗口中。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.readTextFile("/test.txt");
source.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
for (String s1 : s.split(" ")) {
collector.collect(new Tuple2<>(s1, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(2)).reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 t1, Tuple2 t2) throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
}).print();
env.execute("滾動(dòng)窗口");
}
}
2、滑動(dòng)窗口
和上面類似,就是參數(shù)里面增加了slide參數(shù),也就是滑動(dòng)時(shí)間間隔。時(shí)間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來指定。
也就是在窗口算子之后執(zhí)行reduce算子,用法和普通的reduce一樣,只不過reduce的單位是一個(gè)窗口。即每一個(gè)窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。
也就是在窗口算子之后執(zhí)行fold算子,用法和普通的fold一樣,只不過fold的單位是一個(gè)窗口。即每一個(gè)窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。
指的是max、min等這些聚合算子,只不過是在window算子之后使用,以窗口為單位,每一個(gè)窗口返回一次聚合結(jié)果,而不是像普通那樣,每一次聚合結(jié)果都返回。
在flink中,time有不同分類,如下:
Event Time:
是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,F(xiàn)link通過時(shí)間戳分配器訪問事件時(shí)間戳。
Ingestion Time:
是數(shù)據(jù)進(jìn)入Flink的時(shí)間。
Processing Time:
是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。也就是數(shù)據(jù)被處理時(shí)的當(dāng)前時(shí)間。
這些時(shí)間有什么不同呢?因網(wǎng)絡(luò)傳輸需要時(shí)間,所以Ingestion Time不一定和Event Time相等,很多情況下是不等的。同樣Processing Time表示數(shù)據(jù)處理時(shí)的時(shí)間,如果數(shù)據(jù)是很久之前采集的,現(xiàn)在才處理,那么很明顯,三個(gè)時(shí)間time都不會(huì)相等的。
? 圖 2.1 flink--時(shí)間的概念
例子:
一條日志進(jìn)入Flink的時(shí)間為2017-11-12 10:00:00.123,到達(dá)Window的系統(tǒng)時(shí)間為2017-11-12 10:00:01.234,日志的內(nèi)容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
可以看到,三個(gè)time都不相等。而對(duì)于業(yè)務(wù)來說,要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?—— eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。但是flink默認(rèn)的窗口的時(shí)間是Processing Time,那么如何引入eventTime呢?
? 在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。默認(rèn)使用的是ProcessingTime。那么如何指定flink使用指定的time呢?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(時(shí)間類型);
//三種類型的time對(duì)應(yīng)如下:
TimeCharacteristic.EventTime; eventtime
TimeCharacteristic.IngestionTime; 到達(dá)flink的時(shí)間
TimeCharacteristic.ProcessingTime; 處理數(shù)據(jù)的時(shí)間
這種方式是整個(gè)env全局生效的,是直接將env默認(rèn)的時(shí)間設(shè)置為eventtime。后面的窗口操作默認(rèn)就會(huì)使用eventtime作為時(shí)間依據(jù)。如果想不同的窗口設(shè)置不同的時(shí)間類型,這種方式就行不通了。
stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window這個(gè)api就是所有窗口總的api,其他窗口api都是通過這個(gè)api封裝出來的??梢酝ㄟ^這個(gè)總api,參數(shù)直接窗口的類型,比如上面的就是指定eventtime 的timewindow,這樣并不會(huì)影響整個(gè)env的時(shí)間類型。
同樣的,其他時(shí)間類型窗口,比如:
SlidingEventTimeWindows 滑動(dòng)eventtime窗口
基本上看名字就知道是什么時(shí)間類型(三大時(shí)間類型)、以及什么類型(滑動(dòng)、滾動(dòng)、會(huì)話窗口)的窗口了。注意:eventtime沒有session窗口,processingTime和
? 我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。
? 圖 2.3 數(shù)據(jù)的亂序
? 那么此時(shí)出現(xiàn)一個(gè)問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時(shí)必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。
解釋:
如果只按照到達(dá)的event的eventtime來觸發(fā)窗口操作,假設(shè)有event1~5。如果到達(dá)順序是亂的,比如event5最先達(dá)到,然后event1也達(dá)到了,那么flink這邊怎么知道這中間還有沒有數(shù)據(jù)呢?沒辦法的,不能確定數(shù)據(jù)是否完整到達(dá),也不能無限制等待下去。所以需要一種機(jī)制來處理這種情況。
? Watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性,數(shù)據(jù)本身攜帶著對(duì)應(yīng)的Watermark。Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。
? 數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
? Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被watermark觸發(fā)執(zhí)行。
解釋:
? watermark是一種概率性的機(jī)制。假設(shè)event1~5,如果event5已經(jīng)到達(dá)了,那么其實(shí)按照event產(chǎn)生的先后順序,正常情況下,前面的event1~4應(yīng)該也到達(dá)了。而為了保證前面的event1~4的到達(dá)(其實(shí)是更多的到達(dá),但是不一定全部都到達(dá)),在event5到達(dá)了之后,提供一定的延遲時(shí)間t。當(dāng)event5到達(dá),且經(jīng)過 t 時(shí)間之后,正常情況下,前面的event1~4 大概率會(huì)到達(dá)了,如果沒有到達(dá),屬于少數(shù)情況,那么就認(rèn)為event5之前的event都到達(dá)了,無論是否真的全部到達(dá)了。如果在延遲時(shí)間之后到達(dá)了,這個(gè)舊數(shù)據(jù)直接會(huì)被丟棄。所以其實(shí)watermark就是一種保障更多event亂序到達(dá)的機(jī)制,提供了一定的延時(shí)機(jī)制,而因?yàn)橹粫?huì)延遲一定的時(shí)間,所以也不會(huì)導(dǎo)致flink無限期地等待下去。
有序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為0)
? 圖 2.4 有序數(shù)據(jù)流的watermark
亂序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為2)
? 圖 2.5 亂序數(shù)據(jù)流的watermark
? 當(dāng)Flink接收到每一條數(shù)據(jù)時(shí),都會(huì)產(chǎn)生一條Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長t,也就是說,Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
? 上圖中,我們?cè)O(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為7s的事件對(duì)應(yīng)的Watermark是5s,時(shí)間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時(shí)間戳為7s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為12s的事件到達(dá)時(shí)的Watermark恰好觸發(fā)窗口2。
? Window會(huì)不斷產(chǎn)生,屬于這個(gè)Window范圍的數(shù)據(jù)會(huì)被不斷加入到Window中,所有未被觸發(fā)的Window都會(huì)等待觸發(fā),只要Window還沒觸發(fā),屬于這個(gè)Window范圍的數(shù)據(jù)就會(huì)一直被加入到Window中,直到Window被觸發(fā)才會(huì)停止數(shù)據(jù)的追加,而當(dāng)Window觸發(fā)之后才接受到的屬于被觸發(fā)Window的數(shù)據(jù)會(huì)被丟棄。如果產(chǎn)生的窗口中沒有新到的數(shù)據(jù),也就不會(huì)有watermark,那么窗口就不會(huì)被觸發(fā)計(jì)算。
watermark時(shí)間(max_eventTime-t) >= window_end_time;
在[window_start_time,window_end_time)中有數(shù)據(jù)存在。
Punctuated:不間斷產(chǎn)生
數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark。
在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。
Periodic:周期性產(chǎn)生
周期性的(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。
在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會(huì)有很大的延時(shí)。
這兩種有不同的api實(shí)現(xiàn),下面會(huì)講
需要先引入eventime,然后引入watermark
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource source = env.readTextFile("/test.txt");
//引入的watermark的實(shí)現(xiàn)類
source.assignTimestampsAndWatermarks(xx)
watermark的實(shí)現(xiàn)有兩大類,對(duì)應(yīng)上面的兩種watermark的產(chǎn)生方式,有兩個(gè)接口:
AssignerWithPeriodicWatermarks; 周期性產(chǎn)生watermark,即Period
AssignerWithPunctuatedWatermarks; Punctuated:不間斷產(chǎn)生
看看AssignerWithPeriodicWatermarks這個(gè)接口的源碼,主要用于周期性產(chǎn)生watermark
public interface AssignerWithPeriodicWatermarks extends TimestampAssigner {
//獲取當(dāng)前的watermark
@Nullable
Watermark getCurrentWatermark();
}
//父接口===================
public interface TimestampAssigner extends Function {
//獲取當(dāng)前的時(shí)間戳
long extractTimestamp(T var1, long var2);
}
主要就是有兩個(gè)方法需要覆蓋,getCurrentWatermark()用于生成watermark,extractTimestamp用于獲取每個(gè)event的timestamp。
由于這是一個(gè)周期性產(chǎn)生watermark的接口,所以需要指定這個(gè)生成周期有多長,需要env的配置中指定,如:
env.getConfig().setAutoWatermarkInterval(n ms);
記住間隔時(shí)間單位是毫秒
例子:
/*根據(jù)eventTime 創(chuàng)建處理watermark
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks {
//watermark延遲時(shí)間 t,單位是毫秒
private final long maxOutOfOrderness = 3500; // 3.5 seconds
//保存當(dāng)前最大的時(shí)間戳
private long currentMaxTimestamp;
//根據(jù)傳遞進(jìn)來的event,獲取time,然后如果比當(dāng)前最大的time還大,就替換,否則保持。因?yàn)閿?shù)據(jù)亂序到達(dá)是無法保證時(shí)間是遞增的
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
//返回watermark
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
再加上設(shè)置的setAutoWatermarkInterval(n ms),就可以周期性生成watermark。
看看AssignerWithPunctuatedWatermarks這個(gè)接口的源碼,主要用于實(shí)時(shí)產(chǎn)生watermark
public interface AssignerWithPunctuatedWatermarks extends TimestampAssigner {
//獲取最新的watermark
@Nullable
Watermark checkAndGetNextWatermark(T var1, long var2);
}
//父接口
public interface TimestampAssigner extends Function {
//從event中獲取timestamp
long extractTimestamp(T var1, long var2);
}
寫法其實(shí)和上面的類似,只是這里不會(huì)設(shè)置生成watermark的時(shí)間間隔
1、BoundedOutOfOrdernessTimestampExtractor
繼承了AssignerWithPeriodicWatermarks接口的一個(gè)類,看看它的源碼
package org.apache.flink.streaming.api.functions.timestamps;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public abstract class BoundedOutOfOrdernessTimestampExtractor implements AssignerWithPeriodicWatermarks {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = -9223372036854775808L;
private final long maxOutOfOrderness;
//構(gòu)造方法中接收一個(gè)參數(shù),就是延遲時(shí)間 t
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0L) {
throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
} else {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
}
}
public long getMaxOutOfOrdernessInMillis() {
return this.maxOutOfOrderness;
}
//需要重寫的方法,用于獲取timestamp
public abstract long extractTimestamp(T var1);
//獲取watermark的方法已經(jīng)寫好了,用傳遞進(jìn)來的延遲時(shí)間t來計(jì)算得出watermark
public final Watermark getCurrentWatermark() {
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWM;
}
return new Watermark(this.lastEmittedWatermark);
}
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = this.extractTimestamp(element);
if (timestamp > this.currentMaxTimestamp) {
this.currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
這個(gè)類就是實(shí)現(xiàn)了用戶可以自定義設(shè)定延遲時(shí)間t 的一個(gè)watermark。
2、AscendingTimestampExtractor
也是繼承了AssignerWithPeriodicWatermarks接口的一個(gè)類。具有穩(wěn)定的遞增時(shí)間戳的數(shù)據(jù)源,比如kafka的分區(qū)數(shù)據(jù),每一條信息都是遞增+1的,適用于這個(gè)類。只需要重寫
extractAscendingTimestamp方法。
package flinktest;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class EventTimeTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStreamSource source = env.readTextFile("/tmp/test.txt");
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3000)) {
@Override
public long extractTimestamp(String s) {
return Integer.valueOf(s.split(" ")[0]);
}
}).flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
Tuple2 tmpTuple = new Tuple2<>();
for (String s1 : s.split(" ")) {
tmpTuple.setFields(s1, 1);
collector.collect(tmpTuple);
}
}
}).keyBy(0)
.timeWindow(Time.seconds(10))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 t1, Tuple2 t2) throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
})
.print();
try {
env.execute("eventtime test");
} catch (Exception e) {
e.printStackTrace();
}
}
}
window api的類繼承結(jié)構(gòu)