真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ApacheFlink官方文檔-FlinkCEP

原文鏈接

成都創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的成華網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

Flink CEP

0. 本文概述簡(jiǎn)介

FlinkCEP是在Flink之上實(shí)現(xiàn)的復(fù)雜事件處理(CEP)庫。 它允許你在×××的事件流中檢測(cè)事件模式,讓你有機(jī)會(huì)掌握數(shù)據(jù)中重要的事項(xiàng)。

本文描述了Flink CEP中可用的API調(diào)用。 首先介紹Pattern API,它允許你指定要在流中檢測(cè)的模式,然后介紹如何檢測(cè)匹配事件序列并對(duì)其進(jìn)行操作。
然后,我們將介紹CEP庫在處理事件時(shí)間延遲時(shí)所做的假設(shè)。

1.入門

首先是要在你的pom.xml文件中,引入CEP庫。


  org.apache.flink
  flink-cep_2.11
  1.5.0

注意要應(yīng)用模式匹配的DataStream中的事件必須實(shí)現(xiàn)正確的equals()和hashCode()方法,因?yàn)镕linkCEP使用它們來比較和匹配事件。

第一個(gè)demo如下:
Java:

DataStream input = ...

Pattern pattern = Pattern.begin("start").where(
        new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream patternStream = CEP.pattern(input, pattern);

DataStream result = patternStream.select(
    new PatternSelectFunction {
        @Override
        public Alert select(Map> pattern) throws Exception {
            return createAlertFrom(pattern);
        }
    }
});

Scala:

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

2.Pattern API

Pattern API允許你定義要從輸入流中提取的復(fù)雜模式序列。

每個(gè)復(fù)雜模式序列都是由多個(gè)簡(jiǎn)單模式組成,即尋找具有相同屬性的單個(gè)事件的模式。我們可以先定義一些簡(jiǎn)單的模式,然后組合成復(fù)雜的模式序列。
可以將模式序列視為此類模式的結(jié)構(gòu)圖,基于用戶指定的條件從一個(gè)模式轉(zhuǎn)換到下一個(gè)模式,例如, event.getName().equals("start")。
匹配是一系列輸入事件,通過一系列有效的模式轉(zhuǎn)換訪問復(fù)雜模式圖中的所有模式。

注意每個(gè)模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來標(biāo)識(shí)匹配的事件。

注意模式名稱不能包含字符“:”。

在本節(jié)接下來的部分,我們將首先介紹如何定義單個(gè)模式,然后如何將各個(gè)模式組合到復(fù)雜模式中。

2.1 單個(gè)模式

Pattern可以是單單個(gè),也可以是循環(huán)模式。單個(gè)模式接受單個(gè)事件,而循環(huán)模式可以接受多個(gè)事件。在模式匹配符號(hào)中,模式“a b + c?d”(或“a”,后跟一個(gè)或多個(gè)“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。
默認(rèn)情況下,模式是單個(gè)模式,您可以使用Quantifiers將其轉(zhuǎn)換為循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件,基于它接受事件。

2.1.1 Quantifiers

在FlinkCEP中,您可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個(gè)或多個(gè)事件發(fā)生的模式(例如之前提到的b +);和pattern.times(#ofTimes),
用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,例如4個(gè);和patterntimes(#fromTimes,#toTimes),用于期望給定類型事件的最小出現(xiàn)次數(shù)和最大出現(xiàn)次數(shù)的模式,例如, 2-4。

您可以使用pattern.greedy()方法使循環(huán)模式變得貪婪,但是還不能使組模式變得貪婪。您可以使用pattern.optional()方法使得所有模式,循環(huán)與否,變?yōu)榭蛇x。

對(duì)于名為start的模式,以下是有效的Quantifiers:

 // expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();
2.1.2 Conditions-條件

在每個(gè)模式中,從一個(gè)模式轉(zhuǎn)到下一個(gè)模式,可以指定其他條件。您可以將使用下面這些條件:

  1. 傳入事件的屬性,例如其值應(yīng)大于5,或大于先前接受的事件的平均值。

  2. 匹配事件的連續(xù)性,例如檢測(cè)模式a,b,c,序列中間不能有任何非匹配事件。
2.1.3 Conditions on Properties-關(guān)于屬性的條件

可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件。 條件可以是IterativeConditions或SimpleConditions。

  1. 迭代條件:

這是最常見的條件類型。 你可以指定一個(gè)條件,該條件基于先前接受的事件的屬性或其子集的統(tǒng)計(jì)信息來接受后續(xù)事件。

下面代碼說的是:如果名稱以“foo”開頭同時(shí)如果該模式的先前接受的事件的價(jià)格總和加上當(dāng)前事件的價(jià)格不超過該值 5.0,則迭代條件接受名為“middle”的模式的下一個(gè)事件,。
迭代條件可以很強(qiáng)大的,尤其是與循環(huán)模式相結(jié)合,例如, oneOrMore()。
Java

middle.oneOrMore().where(new IterativeCondition() {
    @Override
    public boolean filter(SubEvent value, Context ctx) throws Exception {
        if (!value.getName().startsWith("foo")) {
            return false;
        }

        double sum = value.getPrice();
        for (Event event : ctx.getEventsForPattern("middle")) {
            sum += event.getPrice();
        }
        return Double.compare(sum, 5.0) < 0;
    }
});

Scala:

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

注意對(duì)context.getEventsForPattern(...)的調(diào)用,將為給定潛在匹配項(xiàng)查找所有先前接受的事件。 此操作的代價(jià)可能會(huì)變化巨大,因此在使用條件時(shí),請(qǐng)盡量減少其使用。

  1. 簡(jiǎn)單條件:

這種類型的條件擴(kuò)展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接受事件。

start.where(new SimpleCondition() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

最后,還可以通過pattern.subtype(subClass)方法將接受事件的類型限制為初始事件類型的子類型。
Java:

start.subtype(SubEvent.class).where(new SimpleCondition() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});

Scala:

start.where(event => event.getName.startsWith("foo"))
  1. 組合條件:

如上所示,可以將子類型條件與其他條件組合使用。 這適用于所有條件。 您可以通過順序調(diào)用where()來任意組合條件。
最終結(jié)果將是各個(gè)條件的結(jié)果的邏輯AND。 要使用OR組合條件,可以使用or()方法,如下所示。

pattern.where(new SimpleCondition() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});

Scala:

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
  1. 停止條件:

在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如: 接受值大于5的事件,直到值的總和小于50。

為了更好的理解,可以看看下面的例子:

給定模式:(a+ until b),b之前,要出現(xiàn)一個(gè)或者多個(gè)a。

給定輸入序列:a1,c,a2,b,a3

輸出結(jié)果: {a1 a2}{a1}{a2}{a3}

可以看到{a1,a2,a3},{a2,a3}這兩個(gè)并沒有輸出,這就是停止條件的作用。

  1. 連續(xù)事件條件

FlinkCEP支持事件之間以下形式進(jìn)行連續(xù):

  • 嚴(yán)格連續(xù)性:希望所有匹配事件一個(gè)接一個(gè)地出現(xiàn),中間沒有任何不匹配的事件。

  • 寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)的不匹配事件。 不能忽略兩個(gè)事件之間的匹配事件。

  • 非確定性輕松連續(xù)性:進(jìn)一步放寬連續(xù)性,允許忽略某些匹配事件的其他匹配。

為了解釋上面的內(nèi)容,我們舉個(gè)例子。假如有個(gè)模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:

  1. 嚴(yán)格連續(xù)性:{a2 b} - 由于c的存在導(dǎo)致a1被廢棄

  2. 寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略

  3. 非確定性寬松連續(xù)性:{a1 b}, {a2 b}, 和 {a1 a2 b}

對(duì)于循環(huán)模式(例如oneOrMore()和times()),默認(rèn)是寬松的連續(xù)性。 如果你想要嚴(yán)格的連續(xù)性,你必須使用consecutive()顯式指定它,
如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。

注意在本節(jié)中,我們討論的是單個(gè)循環(huán)模式中的連續(xù)性,并且需要在該上下文中理解consecutive()和allowCombinations()。
稍后在講解組合模式時(shí),我們將討論其他方法,例如next()和followedBy(),用于指定模式之間的連續(xù)條件。

2.1.4 API簡(jiǎn)介
  1. where(condition)

定義當(dāng)前模式的條件。 為了匹配模式,事件必須滿足條件。 多個(gè)連續(xù)的where(),其條件為AND:

pattern.where(new IterativeCondition() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
});
  1. or(condition)

添加與現(xiàn)有條件進(jìn)行OR運(yùn)算的新條件。 只有在至少通過其中一個(gè)條件時(shí),事件才能匹配該模式:

pattern.where(new IterativeCondition() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
}).or(new IterativeCondition() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. until(condition)

指定循環(huán)模式的停止條件。 意味著如果匹配給定條件的事件發(fā)生,則不再接受該模式中的事件。

僅適用于oneOrMore()

注意:它允許在基于事件的條件下清除相應(yīng)模式的狀態(tài)。

pattern.oneOrMore().until(new IterativeCondition() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. subtype(subClass)

定義當(dāng)前模式的子類型條件。 如果事件屬于此子類型,則事件只能匹配該模式:

pattern.subtype(SubEvent.class);
  1. oneOrMore()

指定此模式至少發(fā)生一次匹配事件。

默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性。

注意:建議使用until()或within()來啟用狀態(tài)清除

pattern.oneOrMore().until(new IterativeCondition() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. timesOrMore(#times)

指定此模式至少需要#times次出現(xiàn)匹配事件。

默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性(在后續(xù)事件之間)。

pattern.timesOrMore(2);
  1. times(#ofTimes)

指定此模式需要匹配事件的確切出現(xiàn)次數(shù)。

默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性(在后續(xù)事件之間)。

pattern.times(2);
  1. times(#fromTimes, #toTimes)

指定此模式期望在匹配事件的#fromTimes次和#toTimes次之間出現(xiàn)。

默認(rèn)情況下,使用寬松的內(nèi)部連續(xù)性。

pattern.times(2, 4);
  1. optional()

指定此模式是可選的,即有可能根本不會(huì)發(fā)生。 這適用于所有上述量詞。

pattern.oneOrMore().optional();
  1. greedy()

指定此模式是貪婪的,即它將盡可能多地重復(fù)。 這僅適用于quantifiers,目前不支持組模式。

pattern.oneOrMore().greedy();
  1. consecutive()

與oneOrMore()和times()一起使用并在匹配事件之間強(qiáng)加嚴(yán)格的連續(xù)性,即任何不匹配的元素都會(huì)中斷匹配。

如果不使用,則使用寬松的連續(xù)性(如followBy())。

例如,這樣的模式:

Pattern.begin("start").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

針對(duì)上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B

使用consecutive:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

  1. allowCombinations()

與oneOrMore()和times()一起使用,并在匹配事件之間強(qiáng)加非確定性寬松連續(xù)性(如 followedByAny())。

如果不應(yīng)用,則使用寬松的連續(xù)性(如followBy())。

例如,這樣的模式:

Pattern.begin("start").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

針對(duì)上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B

使用allowCombinations:{C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

2.2 組合模式

2.2.1 簡(jiǎn)介

已經(jīng)了解了單個(gè)模式的樣子,現(xiàn)在是時(shí)候看看如何將它們組合成一個(gè)完整的模式序列。

模式序列必須以初始模式開始,如下所示:

Pattern start = Pattern.begin("start");

接下來,您可以通過指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。 在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴(yán)格,寬松和非確定性寬松,以及如何在循環(huán)模式中應(yīng)用它們。
要在連續(xù)模式之間應(yīng)用它們,可以使用:

next() 對(duì)應(yīng) 嚴(yán)格,
followedBy() 對(duì)應(yīng) 寬松連續(xù)性
followedByAny() 對(duì)應(yīng) 非確定性寬松連續(xù)性

亦或

notNext() 如果不希望一個(gè)事件類型緊接著另一個(gè)類型出現(xiàn)。
notFollowedBy() 不希望兩個(gè)事件之間任何地方出現(xiàn)該事件。

注意 模式序列不能以notFollowedBy()結(jié)束。

注意 NOT模式前面不能有可選模式。

// strict contiguity
Pattern strict = start.next("middle").where(...);

// relaxed contiguity
Pattern relaxed = start.followedBy("middle").where(...);

// non-deterministic relaxed contiguity
Pattern nonDetermin = start.followedByAny("middle").where(...);

// NOT pattern with strict contiguity
Pattern strictNot = start.notNext("not").where(...);

// NOT pattern with relaxed contiguity
Pattern relaxedNot = start.notFollowedBy("not").where(...);

寬松連續(xù)性指的是僅第一個(gè)成功匹配的事件會(huì)被匹配到,然而非確定性寬松連續(xù)性,相同的開始會(huì)有多個(gè)匹配結(jié)果發(fā)出。距離,如果一個(gè)模式是"a b",給定輸入序列是"a c b1 b2"。對(duì)于不同連續(xù)性會(huì)有不同輸出。

  1. a和b之間嚴(yán)格連續(xù)性,將會(huì)返回{},也即是沒有匹配。因?yàn)閏的出現(xiàn)導(dǎo)致a,拋棄了。

  2. a和b之間寬松連續(xù)性,返回的是{a,b1},因?yàn)閷捤蛇B續(xù)性將會(huì)拋棄為匹配成功的元素,直至匹配到下一個(gè)要匹配的事件。

  3. a和b之間非確定性寬松連續(xù)性,返回的是{a,b1},{a,b2}。

也可以為模式定義時(shí)間約束。 例如,可以通過pattern.within()方法定義模式應(yīng)在10秒內(nèi)發(fā)生。 時(shí)間模式支持處理時(shí)間和事件時(shí)間。
注意模式序列只能有一個(gè)時(shí)間約束。 如果在不同的單獨(dú)模式上定義了多個(gè)這樣的約束,則應(yīng)用最小的約束。

next.within(Time.seconds(10));

可以為begin,followBy,followByAny和next定義一個(gè)模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且
可對(duì)GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。

PatternPatte  start = Pattern.begin(
    Pattern.begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern strict = start.next(
    Pattern.begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern relaxed = start.followedBy(
    Pattern.begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern nonDetermin = start.followedByAny(
    Pattern.begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
2.2.2 API
  1. begin(#name)

定義一個(gè)開始模式

Pattern start = Pattern.begin("start");
  1. begin(#pattern_sequence)

定義一個(gè)開始模式

Pattern start = Pattern.begin(
    Pattern.begin("start").where(...).followedBy("middle").where(...)
);
  1. next(#name)

追加一個(gè)新的模式。匹配事件必須直接跟著先前的匹配事件(嚴(yán)格連續(xù)性):

Pattern next = start.next("middle");
  1. next(#pattern_sequence)

追加一個(gè)新的模式。匹配事件必須直接接著先前的匹配事件(嚴(yán)格連續(xù)性):

Pattern next = start.next(
    Pattern.begin("start").where(...).followedBy("middle").where(...)
);
  1. followedBy(#name)

追加加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:

Pattern followedBy = start.followedBy("middle");
  1. followedBy(#pattern_sequence)

追加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:

Pattern followedBy = start.followedBy(
    Pattern.begin("start").where(...).followedBy("middle").where(...)
);
  1. followedByAny(#name)

添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對(duì)每個(gè)備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:

Pattern followedByAny = start.followedByAny("middle");
  1. followedByAny(#pattern_sequence)

添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對(duì)每個(gè)備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:

Pattern followedByAny = start.followedByAny(
    Pattern.begin("start").where(...).followedBy("middle").where(...)
);
  1. notNext()

添加新的否定模式。 匹配(否定)事件必須直接跟著先前的匹配事件(嚴(yán)格連續(xù)性)才能丟棄部分匹配:

Pattern notNext = start.notNext("not");
  1. notFollowedBy()

追加一個(gè)新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(寬松連續(xù)性)之間發(fā)生其他事件,也將丟棄部分匹配事件序列:

Pattern notFollowedBy = start.notFollowedBy("not");
  1. within(time)

定義事件序列進(jìn)行模式匹配的最大時(shí)間間隔。 如果未完成的事件序列超過此時(shí)間,則將其丟棄:

pattern.within(Time.seconds(10));

2.3 匹配后的跳過策略

對(duì)于給定模式,可以將同一事件分配給多個(gè)成功匹配。 要控制將分配事件的匹配數(shù),需要指定名為AfterMatchSkipStrategy的跳過策略。
跳過策略有四種類型,如下所示:

  • NO_SKIP:將發(fā)出每個(gè)可能的匹配。

  • SKIP_PAST_LAST_EVENT:丟棄包含匹配事件的每個(gè)部分匹配。

  • SKIP_TO_FIRST:丟棄包含PatternName第一個(gè)之前匹配事件的每個(gè)部分匹配。

  • SKIP_TO_LAST:丟棄包含PatternName最后一個(gè)匹配事件之前的每個(gè)部分匹配。

請(qǐng)注意,使用SKIP_TO_FIRST和SKIP_TO_LAST跳過策略時(shí),還應(yīng)指定有效的PatternName。

例如,對(duì)于給定模式a b {2}和數(shù)據(jù)流ab1,ab2,ab3,ab4,ab5,ab6,這四種跳過策略之間的差異如下:

Apache Flink官方文檔-Flink CEP

要指定要使用的跳過策略,只需調(diào)用以下命令創(chuàng)建AfterMatchSkipStrategy:

Apache Flink官方文檔-Flink CEP

使用方法:

AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);

2.4 檢測(cè)模式-Detecting Patterns

指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測(cè)潛在匹配。 要針對(duì)模式序列運(yùn)行事件流,必須創(chuàng)建PatternStream。
給定輸入流 input,模式 pattern 和可選的比較器 comparator,用于在EventTime的情況下對(duì)具有相同時(shí)間戳的事件進(jìn)行排序或在同一時(shí)刻到達(dá),通過調(diào)用以下命令創(chuàng)建PatternStream:

DataStream input = ...
Pattern pattern = ...
EventComparator comparator = ... // optional

PatternStream patternStream = CEP.pattern(input, pattern, comparator);

根據(jù)實(shí)際情況,創(chuàng)建的流可以是有key,也可以是無key的。

請(qǐng)注意,在無key的流上使用模式,將導(dǎo)致job的并行度為1。

2.5 Selecting from Patterns

獲得PatternStream后,您可以通過select或flatSelect方法從檢測(cè)到的事件序列中進(jìn)行查詢。

select()方法需要PatternSelectFunction的實(shí)現(xiàn)。 PatternSelectFunction具有為每個(gè)匹配事件序列調(diào)用的select方法。
它以Map >的形式接收匹配,其中key是模式序列中每個(gè)模式的名稱,值是該模式的所有已接受事件的列表(IN是輸入元素的類型)。
給定模式的事件按時(shí)間戳排序。 返回每個(gè)模式的接受事件列表的原因是當(dāng)使用循環(huán)模式(例如oneToMany()和times())時(shí),對(duì)于給定模式可以接受多個(gè)事件。
選擇函數(shù)只返回一個(gè)結(jié)果。

class MyPatternSelectFunction implements PatternSelectFunction {
    @Override
    public OUT select(Map> pattern) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);
        return new OUT(startEvent, endEvent);
    }
}

PatternFlatSelectFunction類似于PatternSelectFunction,唯一的區(qū)別是它可以返回任意數(shù)量的結(jié)果。 為此,select方法有一個(gè)額外的Collector參數(shù),用于將輸出元素向下游轉(zhuǎn)發(fā)。

class MyPatternFlatSelectFunction implements PatternFlatSelectFunction {
    @Override
    public void flatSelect(Map> pattern, Collector collector) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

2.6 處理超時(shí)部分模式

每當(dāng)模式具有通過within關(guān)鍵字附加的時(shí)間窗口長(zhǎng)度時(shí),部分事件序列可能因?yàn)槌鰰r(shí)間窗口長(zhǎng)度而被丟棄。 為了對(duì)這些超時(shí)的部分匹配作出相應(yīng)的處理,select和flatSelect API調(diào)用允許指定超時(shí)處理程序。
為每個(gè)超時(shí)的部分事件序列調(diào)用此超時(shí)處理程序。 超時(shí)處理程序接收到目前為止由模式匹配的所有事件,以及檢測(cè)到超時(shí)時(shí)的時(shí)間戳。

為了處理部分模式,select和flatSelect API提供了一個(gè)帶參數(shù)的重載版本

  • PatternTimeoutFunction/ PatternFlatTimeoutFunction。
  • OutputTag 超時(shí)的匹配將會(huì)在其中返回。
  • PatternSelectFunction / PatternFlatSelectFunction。
PatternStreamPatte  patternStream = CEP.pattern(input, pattern);

OutputTag outputTag = new OutputTag("side-output"){};

SingleOutputStreamOperator result = patternStream.select(
    new PatternTimeoutFunction() {...},
    outputTag,
    new PatternSelectFunction() {...}
);

DataStream timeoutResult = result.getSideOutput(outputTag);

SingleOutputStreamOperator flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction() {...},
    outputTag,
    new PatternFlatSelectFunction() {...}
);

DataStream timeoutFlatResult = flatResult.getSideOutput(outputTag);

2.7 事件事件模式下處理滯后數(shù)據(jù)

在CEP中,元素處理的順序很重要。為了保證在采用事件事件時(shí)以正確的順序處理事件,最初將傳入的事件放入緩沖區(qū),其中事件基于它們的時(shí)間戳以升序排序,
并且當(dāng)watermark到達(dá)時(shí),處理該緩沖區(qū)中時(shí)間戳小于watermark時(shí)間的所有元素。這意味著watermark之間的事件按事件時(shí)間順序處理。

請(qǐng)注意,在采用事件時(shí)間時(shí),CEP library會(huì)假設(shè)watermark是正確的。

為了保證跨watermark的記錄按照事件時(shí)間順序處理,F(xiàn)link的CEP庫假定watermark是正確的,并將時(shí)間戳小于上次可見watermark的時(shí)間視為滯后事件。滯后事件不會(huì)被進(jìn)一步處理。

2.8 例子

以下示例檢測(cè)事件的帶key數(shù)據(jù)流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件的key是其id,并且有效模式必須在10秒內(nèi)發(fā)生。 整個(gè)處理是用事件時(shí)間完成的。

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream input = ...

DataStream partitionedInput = input.keyBy(new KeySelector() {
    @Override
    public Integer getKey(Event value) throws Exception {
        return value.getId();
    }
});

Pattern pattern = Pattern.begin("start")
    .next("middle").where(new SimpleCondition() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

PatternStream patternStream = CEP.pattern(partitionedInput, pattern);

DataStream alerts = patternStream.select(new PatternSelectFunction() {
    @Override
    public Alert select(Map> pattern) throws Exception {
        return createAlert(pattern);
    }
});

當(dāng)前題目:ApacheFlink官方文檔-FlinkCEP
分享路徑:http://weahome.cn/article/jgsseg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部