原文鏈接
成都創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的成華網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
FlinkCEP是在Flink之上實(shí)現(xiàn)的復(fù)雜事件處理(CEP)庫。 它允許你在×××的事件流中檢測(cè)事件模式,讓你有機(jī)會(huì)掌握數(shù)據(jù)中重要的事項(xiàng)。
本文描述了Flink CEP中可用的API調(diào)用。 首先介紹Pattern API,它允許你指定要在流中檢測(cè)的模式,然后介紹如何檢測(cè)匹配事件序列并對(duì)其進(jìn)行操作。
然后,我們將介紹CEP庫在處理事件時(shí)間延遲時(shí)所做的假設(shè)。
首先是要在你的pom.xml文件中,引入CEP庫。
org.apache.flink
flink-cep_2.11
1.5.0
注意要應(yīng)用模式匹配的DataStream中的事件必須實(shí)現(xiàn)正確的equals()和hashCode()方法,因?yàn)镕linkCEP使用它們來比較和匹配事件。
第一個(gè)demo如下:
Java:
DataStream input = ...
Pattern pattern = Pattern.begin("start").where(
new SimpleCondition() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream patternStream = CEP.pattern(input, pattern);
DataStream result = patternStream.select(
new PatternSelectFunction {
@Override
public Alert select(Map> pattern) throws Exception {
return createAlertFrom(pattern);
}
}
});
Scala:
val input: DataStream[Event] = ...
val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(input, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))
Pattern API允許你定義要從輸入流中提取的復(fù)雜模式序列。
每個(gè)復(fù)雜模式序列都是由多個(gè)簡(jiǎn)單模式組成,即尋找具有相同屬性的單個(gè)事件的模式。我們可以先定義一些簡(jiǎn)單的模式,然后組合成復(fù)雜的模式序列。
可以將模式序列視為此類模式的結(jié)構(gòu)圖,基于用戶指定的條件從一個(gè)模式轉(zhuǎn)換到下一個(gè)模式,例如, event.getName().equals("start")。
匹配是一系列輸入事件,通過一系列有效的模式轉(zhuǎn)換訪問復(fù)雜模式圖中的所有模式。
注意每個(gè)模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來標(biāo)識(shí)匹配的事件。
注意模式名稱不能包含字符“:”。
在本節(jié)接下來的部分,我們將首先介紹如何定義單個(gè)模式,然后如何將各個(gè)模式組合到復(fù)雜模式中。
Pattern可以是單單個(gè),也可以是循環(huán)模式。單個(gè)模式接受單個(gè)事件,而循環(huán)模式可以接受多個(gè)事件。在模式匹配符號(hào)中,模式“a b + c?d”(或“a”,后跟一個(gè)或多個(gè)“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。
默認(rèn)情況下,模式是單個(gè)模式,您可以使用Quantifiers將其轉(zhuǎn)換為循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件,基于它接受事件。
在FlinkCEP中,您可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個(gè)或多個(gè)事件發(fā)生的模式(例如之前提到的b +);和pattern.times(#ofTimes),
用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,例如4個(gè);和patterntimes(#fromTimes,#toTimes),用于期望給定類型事件的最小出現(xiàn)次數(shù)和最大出現(xiàn)次數(shù)的模式,例如, 2-4。
您可以使用pattern.greedy()方法使循環(huán)模式變得貪婪,但是還不能使組模式變得貪婪。您可以使用pattern.optional()方法使得所有模式,循環(huán)與否,變?yōu)榭蛇x。
對(duì)于名為start的模式,以下是有效的Quantifiers:
// expecting 4 occurrences
start.times(4);
// expecting 0 or 4 occurrences
start.times(4).optional();
// expecting 2, 3 or 4 occurrences
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();
// expecting 0 or more occurrences
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();
在每個(gè)模式中,從一個(gè)模式轉(zhuǎn)到下一個(gè)模式,可以指定其他條件。您可以將使用下面這些條件:
傳入事件的屬性,例如其值應(yīng)大于5,或大于先前接受的事件的平均值。
可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件。 條件可以是IterativeConditions或SimpleConditions。
這是最常見的條件類型。 你可以指定一個(gè)條件,該條件基于先前接受的事件的屬性或其子集的統(tǒng)計(jì)信息來接受后續(xù)事件。
下面代碼說的是:如果名稱以“foo”開頭同時(shí)如果該模式的先前接受的事件的價(jià)格總和加上當(dāng)前事件的價(jià)格不超過該值 5.0,則迭代條件接受名為“middle”的模式的下一個(gè)事件,。
迭代條件可以很強(qiáng)大的,尤其是與循環(huán)模式相結(jié)合,例如, oneOrMore()。
Java
middle.oneOrMore().where(new IterativeCondition() {
@Override
public boolean filter(SubEvent value, Context ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
Scala:
middle.oneOrMore()
.subtype(classOf[SubEvent])
.where(
(value, ctx) => {
lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
value.getName.startsWith("foo") && sum + value.getPrice < 5.0
}
)
注意對(duì)context.getEventsForPattern(...)的調(diào)用,將為給定潛在匹配項(xiàng)查找所有先前接受的事件。 此操作的代價(jià)可能會(huì)變化巨大,因此在使用條件時(shí),請(qǐng)盡量減少其使用。
這種類型的條件擴(kuò)展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接受事件。
start.where(new SimpleCondition() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
最后,還可以通過pattern.subtype(subClass)方法將接受事件的類型限制為初始事件類型的子類型。
Java:
start.subtype(SubEvent.class).where(new SimpleCondition() {
@Override
public boolean filter(SubEvent value) {
return ... // some condition
}
});
Scala:
start.where(event => event.getName.startsWith("foo"))
如上所示,可以將子類型條件與其他條件組合使用。 這適用于所有條件。 您可以通過順序調(diào)用where()來任意組合條件。
最終結(jié)果將是各個(gè)條件的結(jié)果的邏輯AND。 要使用OR組合條件,可以使用or()方法,如下所示。
pattern.where(new SimpleCondition() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new SimpleCondition() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});
Scala:
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如: 接受值大于5的事件,直到值的總和小于50。
為了更好的理解,可以看看下面的例子:
給定模式:(a+ until b),b之前,要出現(xiàn)一個(gè)或者多個(gè)a。
給定輸入序列:a1,c,a2,b,a3
輸出結(jié)果: {a1 a2}{a1}{a2}{a3}
可以看到{a1,a2,a3},{a2,a3}這兩個(gè)并沒有輸出,這就是停止條件的作用。
FlinkCEP支持事件之間以下形式進(jìn)行連續(xù):
嚴(yán)格連續(xù)性:希望所有匹配事件一個(gè)接一個(gè)地出現(xiàn),中間沒有任何不匹配的事件。
寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)的不匹配事件。 不能忽略兩個(gè)事件之間的匹配事件。
為了解釋上面的內(nèi)容,我們舉個(gè)例子。假如有個(gè)模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:
嚴(yán)格連續(xù)性:{a2 b} - 由于c的存在導(dǎo)致a1被廢棄
寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略
對(duì)于循環(huán)模式(例如oneOrMore()和times()),默認(rèn)是寬松的連續(xù)性。 如果你想要嚴(yán)格的連續(xù)性,你必須使用consecutive()顯式指定它,
如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。
注意在本節(jié)中,我們討論的是單個(gè)循環(huán)模式中的連續(xù)性,并且需要在該上下文中理解consecutive()和allowCombinations()。
稍后在講解組合模式時(shí),我們將討論其他方法,例如next()和followedBy(),用于指定模式之間的連續(xù)條件。
定義當(dāng)前模式的條件。 為了匹配模式,事件必須滿足條件。 多個(gè)連續(xù)的where(),其條件為AND:
pattern.where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // some condition
}
});
添加與現(xiàn)有條件進(jìn)行OR運(yùn)算的新條件。 只有在至少通過其中一個(gè)條件時(shí),事件才能匹配該模式:
pattern.where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // some condition
}
}).or(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
指定循環(huán)模式的停止條件。 意味著如果匹配給定條件的事件發(fā)生,則不再接受該模式中的事件。
僅適用于oneOrMore()
注意:它允許在基于事件的條件下清除相應(yīng)模式的狀態(tài)。
pattern.oneOrMore().until(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
定義當(dāng)前模式的子類型條件。 如果事件屬于此子類型,則事件只能匹配該模式:
pattern.subtype(SubEvent.class);
指定此模式至少發(fā)生一次匹配事件。
默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性。
注意:建議使用until()或within()來啟用狀態(tài)清除
pattern.oneOrMore().until(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
指定此模式至少需要#times次出現(xiàn)匹配事件。
默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性(在后續(xù)事件之間)。
pattern.timesOrMore(2);
指定此模式需要匹配事件的確切出現(xiàn)次數(shù)。
默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性(在后續(xù)事件之間)。
pattern.times(2);
指定此模式期望在匹配事件的#fromTimes次和#toTimes次之間出現(xiàn)。
默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性。
pattern.times(2, 4);
指定此模式是可選的,即有可能根本不會(huì)發(fā)生。 這適用于所有上述量詞。
pattern.oneOrMore().optional();
指定此模式是貪婪的,即它將盡可能多地重復(fù)。 這僅適用于quantifiers,目前不支持組模式。
pattern.oneOrMore().greedy();
與oneOrMore()和times()一起使用并在匹配事件之間強(qiáng)加嚴(yán)格的連續(xù)性,即任何不匹配的元素都會(huì)中斷匹配。
如果不使用,則使用寬松的連續(xù)性(如followBy())。
例如,這樣的模式:
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
針對(duì)上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B
使用consecutive:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
與oneOrMore()和times()一起使用,并在匹配事件之間強(qiáng)加非確定性寬松連續(xù)性(如 followedByAny())。
如果不應(yīng)用,則使用寬松的連續(xù)性(如followBy())。
例如,這樣的模式:
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
針對(duì)上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B
使用allowCombinations:{C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}
不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
已經(jīng)了解了單個(gè)模式的樣子,現(xiàn)在是時(shí)候看看如何將它們組合成一個(gè)完整的模式序列。
模式序列必須以初始模式開始,如下所示:
Pattern start = Pattern.begin("start");
接下來,您可以通過指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。 在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴(yán)格,寬松和非確定性寬松,以及如何在循環(huán)模式中應(yīng)用它們。
要在連續(xù)模式之間應(yīng)用它們,可以使用:
next() 對(duì)應(yīng) 嚴(yán)格,
followedBy() 對(duì)應(yīng) 寬松連續(xù)性
followedByAny() 對(duì)應(yīng) 非確定性寬松連續(xù)性
亦或
notNext() 如果不希望一個(gè)事件類型緊接著另一個(gè)類型出現(xiàn)。
notFollowedBy() 不希望兩個(gè)事件之間任何地方出現(xiàn)該事件。注意 模式序列不能以notFollowedBy()結(jié)束。
注意 NOT模式前面不能有可選模式。
// strict contiguity
Pattern strict = start.next("middle").where(...);
// relaxed contiguity
Pattern relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity
Pattern nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity
Pattern strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity
Pattern relaxedNot = start.notFollowedBy("not").where(...);
寬松連續(xù)性指的是僅第一個(gè)成功匹配的事件會(huì)被匹配到,然而非確定性寬松連續(xù)性,相同的開始會(huì)有多個(gè)匹配結(jié)果發(fā)出。距離,如果一個(gè)模式是"a b",給定輸入序列是"a c b1 b2"。對(duì)于不同連續(xù)性會(huì)有不同輸出。
a和b之間嚴(yán)格連續(xù)性,將會(huì)返回{},也即是沒有匹配。因?yàn)閏的出現(xiàn)導(dǎo)致a,拋棄了。
a和b之間寬松連續(xù)性,返回的是{a,b1},因?yàn)閷捤蛇B續(xù)性將會(huì)拋棄為匹配成功的元素,直至匹配到下一個(gè)要匹配的事件。
也可以為模式定義時(shí)間約束。 例如,可以通過pattern.within()方法定義模式應(yīng)在10秒內(nèi)發(fā)生。 時(shí)間模式支持處理時(shí)間和事件時(shí)間。
注意模式序列只能有一個(gè)時(shí)間約束。 如果在不同的單獨(dú)模式上定義了多個(gè)這樣的約束,則應(yīng)用最小的約束。
next.within(Time.seconds(10));
可以為begin,followBy,followByAny和next定義一個(gè)模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且
可對(duì)GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。
PatternPatte start = Pattern.begin(
Pattern.begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern strict = start.next(
Pattern.begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern relaxed = start.followedBy(
Pattern.begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern nonDetermin = start.followedByAny(
Pattern.begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
定義一個(gè)開始模式
Pattern start = Pattern.begin("start");
定義一個(gè)開始模式
Pattern start = Pattern.begin(
Pattern.begin("start").where(...).followedBy("middle").where(...)
);
追加一個(gè)新的模式。匹配事件必須直接跟著先前的匹配事件(嚴(yán)格連續(xù)性):
Pattern next = start.next("middle");
追加一個(gè)新的模式。匹配事件必須直接接著先前的匹配事件(嚴(yán)格連續(xù)性):
Pattern next = start.next(
Pattern.begin("start").where(...).followedBy("middle").where(...)
);
追加加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:
Pattern followedBy = start.followedBy("middle");
追加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:
Pattern followedBy = start.followedBy(
Pattern.begin("start").where(...).followedBy("middle").where(...)
);
添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對(duì)每個(gè)備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:
Pattern followedByAny = start.followedByAny("middle");
添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對(duì)每個(gè)備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:
Pattern followedByAny = start.followedByAny(
Pattern.begin("start").where(...).followedBy("middle").where(...)
);
添加新的否定模式。 匹配(否定)事件必須直接跟著先前的匹配事件(嚴(yán)格連續(xù)性)才能丟棄部分匹配:
Pattern notNext = start.notNext("not");
追加一個(gè)新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(寬松連續(xù)性)之間發(fā)生其他事件,也將丟棄部分匹配事件序列:
Pattern notFollowedBy = start.notFollowedBy("not");
定義事件序列進(jìn)行模式匹配的最大時(shí)間間隔。 如果未完成的事件序列超過此時(shí)間,則將其丟棄:
pattern.within(Time.seconds(10));
對(duì)于給定模式,可以將同一事件分配給多個(gè)成功匹配。 要控制將分配事件的匹配數(shù),需要指定名為AfterMatchSkipStrategy的跳過策略。
跳過策略有四種類型,如下所示:
NO_SKIP:將發(fā)出每個(gè)可能的匹配。
SKIP_PAST_LAST_EVENT:丟棄包含匹配事件的每個(gè)部分匹配。
SKIP_TO_FIRST:丟棄包含PatternName第一個(gè)之前匹配事件的每個(gè)部分匹配。
請(qǐng)注意,使用SKIP_TO_FIRST和SKIP_TO_LAST跳過策略時(shí),還應(yīng)指定有效的PatternName。
例如,對(duì)于給定模式a b {2}和數(shù)據(jù)流ab1,ab2,ab3,ab4,ab5,ab6,這四種跳過策略之間的差異如下:
要指定要使用的跳過策略,只需調(diào)用以下命令創(chuàng)建AfterMatchSkipStrategy:
使用方法:
AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測(cè)潛在匹配。 要針對(duì)模式序列運(yùn)行事件流,必須創(chuàng)建PatternStream。
給定輸入流 input,模式 pattern 和可選的比較器 comparator,用于在EventTime的情況下對(duì)具有相同時(shí)間戳的事件進(jìn)行排序或在同一時(shí)刻到達(dá),通過調(diào)用以下命令創(chuàng)建PatternStream:
DataStream input = ...
Pattern pattern = ...
EventComparator comparator = ... // optional
PatternStream patternStream = CEP.pattern(input, pattern, comparator);
根據(jù)實(shí)際情況,創(chuàng)建的流可以是有key,也可以是無key的。
請(qǐng)注意,在無key的流上使用模式,將導(dǎo)致job的并行度為1。
獲得PatternStream后,您可以通過select或flatSelect方法從檢測(cè)到的事件序列中進(jìn)行查詢。
select()方法需要PatternSelectFunction的實(shí)現(xiàn)。 PatternSelectFunction具有為每個(gè)匹配事件序列調(diào)用的select方法。
它以Map
給定模式的事件按時(shí)間戳排序。 返回每個(gè)模式的接受事件列表的原因是當(dāng)使用循環(huán)模式(例如oneToMany()和times())時(shí),對(duì)于給定模式可以接受多個(gè)事件。
選擇函數(shù)只返回一個(gè)結(jié)果。
class MyPatternSelectFunction implements PatternSelectFunction {
@Override
public OUT select(Map> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
}
PatternFlatSelectFunction類似于PatternSelectFunction,唯一的區(qū)別是它可以返回任意數(shù)量的結(jié)果。 為此,select方法有一個(gè)額外的Collector參數(shù),用于將輸出元素向下游轉(zhuǎn)發(fā)。
class MyPatternFlatSelectFunction implements PatternFlatSelectFunction {
@Override
public void flatSelect(Map> pattern, Collector collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
每當(dāng)模式具有通過within關(guān)鍵字附加的時(shí)間窗口長(zhǎng)度時(shí),部分事件序列可能因?yàn)槌鰰r(shí)間窗口長(zhǎng)度而被丟棄。 為了對(duì)這些超時(shí)的部分匹配作出相應(yīng)的處理,select和flatSelect API調(diào)用允許指定超時(shí)處理程序。
為每個(gè)超時(shí)的部分事件序列調(diào)用此超時(shí)處理程序。 超時(shí)處理程序接收到目前為止由模式匹配的所有事件,以及檢測(cè)到超時(shí)時(shí)的時(shí)間戳。
為了處理部分模式,select和flatSelect API提供了一個(gè)帶參數(shù)的重載版本
PatternStreamPatte patternStream = CEP.pattern(input, pattern);
OutputTag outputTag = new OutputTag("side-output"){};
SingleOutputStreamOperator result = patternStream.select(
new PatternTimeoutFunction() {...},
outputTag,
new PatternSelectFunction() {...}
);
DataStream timeoutResult = result.getSideOutput(outputTag);
SingleOutputStreamOperator flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction() {...},
outputTag,
new PatternFlatSelectFunction() {...}
);
DataStream timeoutFlatResult = flatResult.getSideOutput(outputTag);
在CEP中,元素處理的順序很重要。為了保證在采用事件事件時(shí)以正確的順序處理事件,最初將傳入的事件放入緩沖區(qū),其中事件基于它們的時(shí)間戳以升序排序,
并且當(dāng)watermark到達(dá)時(shí),處理該緩沖區(qū)中時(shí)間戳小于watermark時(shí)間的所有元素。這意味著watermark之間的事件按事件時(shí)間順序處理。
請(qǐng)注意,在采用事件時(shí)間時(shí),CEP library會(huì)假設(shè)watermark是正確的。
為了保證跨watermark的記錄按照事件時(shí)間順序處理,F(xiàn)link的CEP庫假定watermark是正確的,并將時(shí)間戳小于上次可見watermark的時(shí)間視為滯后事件。滯后事件不會(huì)被進(jìn)一步處理。
以下示例檢測(cè)事件的帶key數(shù)據(jù)流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件的key是其id,并且有效模式必須在10秒內(nèi)發(fā)生。 整個(gè)處理是用事件時(shí)間完成的。
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream input = ...
DataStream partitionedInput = input.keyBy(new KeySelector() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern pattern = Pattern.begin("start")
.next("middle").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream patternStream = CEP.pattern(partitionedInput, pattern);
DataStream alerts = patternStream.select(new PatternSelectFunction() {
@Override
public Alert select(Map> pattern) throws Exception {
return createAlert(pattern);
}
});