简介
Flink CEP是在flink之上实现的复杂事件处理(CEP)库,它允许我们在事件流中检测事件的模式,让我们有机会掌握数据中重要的事项。
本文章主要是介绍了flink cep中可用的api调用,首先介绍Pattern API,它允许你指定要在事件流中检测的模式,并介绍匹配事件并对其进行操作。最后分析下CEP库在处理事件时间延迟问题。
使用步骤
(1)首先我们需要引入cep的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.5.0</version> </dependency> |
(2)确定equals()和hashcode()方法
如果使用CEP,需要我们在datastream中的事件实现正确的equals()和hashcode()方法,因为Flink CEP使用他们来比较和匹配事件。
简单demo代码:
val input: DataStream[Event] = ... val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_)) |
(3)Pattern API
Pattern API允许你定义要从输入流中提取的复杂模式序列。
每个复杂模式序列都是由多个简单模式组成,简单模式就是寻找具有相同属性的单个事件的模式,我们可以先定义一些简单的模式,然后组合成复杂的序列模式。
可以将模式序列视为此类模式的结构图,基于用户指定的条件从一个模式转换到下一个模式,例如:event.getName().equals(“start”).
匹配的是一系列输入事件,通过一系列有效的模式转换访问复杂模式图中的所有模式。注意每个模式必须具有唯一的名称,以便后续可以使用该名称来标识匹配的事件。模式名称中不能包含字符”:”。
下面我们首先介绍如何定义单个模式,然后再将各个模式组合到复杂模式中。
单个模式
Pattern可以是单个,也可以是循环模式,单个模式接收单个事件,而循环模式可以接收多个事件,在模式匹配符号中,模式“a b + c?d”(或“a”,后跟一个或多个“b”,可选地后跟“c”,后跟“d”),a,c ?,和d是单例模式,而b +是循环模式。
默认情况下,模式是单个模式,可以使用Quantifiers将其转换为循环模式。每个模式可以有一个或多个条件,基于它接收的事件。
Quantifiers
在FlinkCEP中,可以使用以下方法指定循环模式:pattern.oneOrMore(),用于期望一个或多个事件发生的模式(例如之前提到的b+);用于期望给定类型事件的特定出现次数的模式,
对于名为start的模式,以下是有效的Quantifiers:
// expecting 4 occurrences start.times(4); // expecting 0 or 4 occurrences start.times(4).optional(); // expecting 2, 3 or 4 occurrences start.times(2, 4); // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy(); // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy(); // expecting 1 or more occurrences start.oneOrMore(); // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy(); // expecting 0 or more occurrences start.oneOrMore().optional(); // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy(); // expecting 2 or more occurrences start.timesOrMore(2); // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy(); // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy(); |
Conditions条件
每个模式中,从一个模式转到下一个模式,可以指定其他条件,我们可以使用下面这些条件:
1:传入事件的属性,例如其值应大于5,或者大于先前接收的事件的平均值;
2:匹配事件的连续性,例如检测模式a,b,c序列中不能有任何非匹配事件。
Conditions on Properties关于属性的条件
可以通过pattern.where(),pattern.or()或pattern.until()方法指定事件属性的条件,条件可以是iterativeConditions或SimpleConditions.
1:迭代条件
这是最常见的条件类型,你可以指定一个条件,该条件基于先前接收的事件的属性或器子集的统计信息来接收后续事件。
下面代码说的是:如果名称以”foo”开头同时如果该模式的先前接收的事件的价格总和加上当前事件的价格不超过该值5.0,则迭代条件接收名为”middle”的模式的下一个事件:迭代条件可以很强大,尤其是与循环模式相结合,例如:oneOrMore();
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => { lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } ) |
注意对context.getEventsForPattern()的调用将为给定潜在匹配项查找所有先前接收的事件,此操作代价可能会变化巨大,因此应尽量减少其使用。
2:简单条件
这种类型的条件时扩展了前面提到的IterativeCondition类,并且仅根据事件本身的属性决定是否接收事件:
start.where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) { return value.getName().startsWith("foo"); }}); |
此外还可以通过pattern.subtype(subclass)方法将接收事件的类型限定为初始事件类型的子类型:
start.where(event => event.getName.startsWith("foo")) |
组合条件:
如上所示,可以将子类型条件与其他条件组合使用,这适用于所有条件。我们可以通过顺序调用where()来任意组合条件。最终结果将是各个条件的结果的逻辑and,要使用or组合条件,可以使用or()方法,如下所示:
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */) |
停止条件
在循环模式(oneOrMore()和oneOrMore().optional())的情况下,还可以指定停止条件,例如:接收值大于5的事件,直到其值的总和小于50.
我们看个例子来更好的理解:
给定模式:(a+ until b),b之前,要出现一个或者多个a,
给定输入的序列:a1,c,a2,b,a3
输出结果:{a1 a2}{a1}{a2}{a3}
我们可以看到{a1,a2,a3},{a2,a3}两个并没有输出,这就是停止条件的作用。
连续事件的条件
Flink CEP支持事件之间以一下形式连续:
严格连续性:希望所有匹配事件一个接一个的出现,中间没有任何不匹配的事件;
宽松连续性:忽略匹配的事件之间出现不匹配事件,不能忽略两个事件之间的匹配事件。
非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其它匹配。
为了解释上面的内容,我们举个例子。假如有个模式序列"a+ b",输入序列"a1,c,a2,b",不同连续条件下有不同的区别:
严格连续性:{a2 b} - 由于c的存在导致a1被废弃
宽松连续性:{a1,b}和{a1 a2 b} - c被忽略
非确定性宽松连续性:{a1 b}, {a2 b}, 和 {a1 a2 b}
对于循环模式(例如oneOrMore()和times()),默认是宽松的连续性。 如果你想要严格的连续性,你必须使用consecutive()显式指定它, 如果你想要非确定性的松弛连续性,你可以使用allowCombinations()方法。
组合模式
简介
已经了解了单个模式的样子,现在是时候看看如何将它们组合成一个完整的模式序列。
模式序列必须以初始模式开始,如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
接下来,您可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 在上一节中,我们描述了Flink支持的不同邻接模式,即严格,宽松和非确定性宽松,以及如何在循环模式中应用它们。 要在连续模式之间应用它们,可以使用:
next() 对应严格, followedBy() 对应宽松连续性 followedByAny() 对应非确定性宽松连续性
亦或
notNext() 如果不希望一个事件类型紧接着另一个类型出现。 notFollowedBy() 不希望两个事件之间任何地方出现该事件。 注意 模式序列不能以notFollowedBy()结束。 注意 NOT模式前面不能有可选模式。
// strict contiguity Pattern<Event, ?> strict = start.next("middle").where(...); // relaxed contiguity Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // non-deterministic relaxed contiguity Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // NOT pattern with strict contiguity Pattern<Event, ?> strictNot = start.notNext("not").where(...); // NOT pattern with relaxed contiguity Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...); |
宽松连续性指的是仅第一个成功匹配的事件会被匹配到,然而非确定性宽松连续性,相同的开始会有多个匹配结果发出。距离,如果一个模式是"a b",给定输入序列是"a c b1 b2"。对于不同连续性会有不同输出。
a和b之间严格连续性,将会返回{},也即是没有匹配。因为c的出现导致a,抛弃了。
a和b之间宽松连续性,返回的是{a,b1},因为宽松连续性将会抛弃为匹配成功的元素,直至匹配到下一个要匹配的事件。
a和b之间非确定性宽松连续性,返回的是{a,b1},{a,b2}。
也可以为模式定义时间约束。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 时间模式支持处理时间和事件时间。 注意模式序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
next.within(Time.seconds(10));
可以为begin,followBy,followByAny和next定义一个模式序列作为条件。模式序列将被逻辑地视为匹配条件,而且将返回GroupPattern并且 可对GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。