flink cep源码分析

关于Flink中cep实现原理的分析

最近一直在搞动态cep的事情,有点焦头烂额很久没有更新博客了。其实有时候也在思考写博客的意义,因为写博客也是时间成本很高的一件事,如果得不到相应的收益其实是划不来的。那么写博客的收益到底是什么呢?两点:笔记记录和传播知识的作用,整理记录的功能一个web博客不会强于一个终端笔记例如:为知笔记。所以博客的真正意义在于传播知识观点。有时候你遇到一个百思不得其解的问题的时候,google一下找到一篇博客,竟然能够解答心中所惑的时候你是不是心中会很感谢博主呢,我认为这样的一篇文章就是有价值的文章,所以我希望我也能做好这样一件有价值的事情。

引言

好了,进入本文的主题flink cep原理的深入理解,很多人可能还不知道flink cep是什么,flink cep其实实现自一篇论文,具体论文细节见我之前的一篇文章的分享flink-cep-paper. flink cep的全称是Complex Event Processing,在我看来它主要能做的是在一个连续不断的事件中提取出用户所关心的事件序列,他和flink的filter算子的区别在于filter只能去实现单个元素的过滤,而cep是能完成先后顺序事件的过滤。下面让我们来走进他的源码实现原理吧。以下代码基于社区1.4.2分支分析。

我们的文章以一系列问题来展开:

  1. 用户定义的Pattern最后会以什么形式工作
  2. 当CEP Operator获取到上游一个算子的时候会做什么事情?
  3. 在ProcessingTime和Eventtime的语义下处理逻辑有什么不同点?
  4. 匹配成功的元素如何存储,状态机转化流程是怎么样的?
  5. 超时未匹配成功的元素会做什么?

问题一

用户在定义Pattern和condition之后,会通过NFAcompiler将Pattern翻译成一个一个相关联的State,表明了这一组规则的状态机的走向流程。

State包括Start、Normal、Final、Stop,start表示一个起始状态例如begin('A').followedBy('B') 这里面A就是一个Start状态。Final状态表示整个序列已经匹配完成可以向下游发送了,Stop状态是用来处理否定类型的规则,一旦到达Stop状态即意味着整个匹配过程失败。各个状态之间通过StateTransition来连接,连接方式有:

1
2
3
ignore: 忽略此次匹配的元素
proceed: 相当于forward的意思,到达下一个状态,不存储元素,继续做下一个状态的condition判断
take: 存储本次的元素

这是一段创建中间状态的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private State<T> createMiddleStates(final State<T> sinkState) {
State<T> lastSink = sinkState;
// 不断往上遍历pattern进行state的生成
while (currentPattern.getPrevious() != null) {

if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
//skip notFollow patterns, they are converted into edge conditions
} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
final IterativeCondition<T> notCondition = getTakeCondition(currentPattern);
// 否定类型的pattern需要创建一个stop state
final State<T> stopState = createStopState(notCondition, currentPattern.getName());

if (lastSink.isFinal()) {
//so that the proceed to final is not fired
结尾状态不用proceed过去做下一次计算了,可以直接ignore到Final,然后输出结果
notNext.addIgnore(lastSink, new NotCondition<>(notCondition));
} else {
notNext.addProceed(lastSink, new NotCondition<>(notCondition));
}
// 在满足Not_NEXT的条件的时候就转化成stop状态即匹配失败
notNext.addProceed(stopState, notCondition);
lastSink = notNext;
} else {
// 非否定类型的状态的处理逻辑都在这个方法中
lastSink = convertPattern(lastSink);
}

// we traverse the pattern graph backwards
followingPattern = currentPattern;
currentPattern = currentPattern.getPrevious();

final Time currentWindowTime = currentPattern.getWindowTime();
if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
// the window time is the global minimum of all window times of each state
windowTime = currentWindowTime.toMilliseconds();
}
}
return lastSink;
}

生成这样的state列表之后,最终会创建一个NFA,一个NFA中包含了两个重要组件:
一个是SharedBuffer用于存储中间匹配命中的数据,这是一个基于论文实现的带版本的内存共享,主要解决的事情是在同一个元素触发多个分支的时候避免存储多次。
另一个是ComputationState队列表示的是一系列当前匹配到的计算状态,每一个状态在拿到下一个元素的时候都会根据condition判断自己是能够继续往下匹配生成下一个computation state还是匹配失败。

问题二、三

问题二和问题三一起解释,在消费到上游一个元素之后会判断时间语义,这里主要是为了处理乱序问题,如果是processingtime的话就会直接经由nfa#process进行处理,因为processing time不需要考虑事件是否乱序,他给每个事件都打上了当前的时间戳。而event语义下,会先将该数据buffer到rocksdb中,并且注册一个比当前时间戳大1的eventimer,用以触发真正的计算,也就是说,eventtime其实是每毫秒获取过去存储的数据做一次匹配计算。

1
2
3
4
5
6
7
protected void saveRegisterWatermarkTimer() {
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
}
}

问题四、五

nfa#process做了什么?取出前面说到的nfa中所有的当前computationState去做计算,当然计算之前会先判断时间和computation的starttime比较匹配是否超出时间,即within算子所做的时间,如果设置了超时处理的方式,就会将超时未匹配完成,已匹配到的部分元素向下游发送,并做sharebuffer的清理工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if (!computationState.isStartState() &&
windowTime > 0L &&
timestamp - computationState.getStartTimestamp() >= windowTime) {

if (handleTimeout) {
// extract the timed out event pattern
Map<String, List<T>> timedOutPattern = extractCurrentMatches(computationState);
timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
}

eventSharedBuffer.release(
NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getCounter());

newComputationStates = Collections.emptyList();
nfaChanged = true;
} else if (event != null) {
// 在computeNextState的时候判断成功的take条件会将元素put到eventSharedBuffer中
newComputationStates = computeNextStates(computationState, event, timestamp);

if (newComputationStates.size() != 1) {
nfaChanged = true;
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaChanged = true;
}
} else {
newComputationStates = Collections.singleton(computationState);
}

在完成匹配之后达到final状态将数据提取出来向下游发送完成匹配。

以上便是cep的大致原理,说白了其实这个就是基于flink runntime开发出来的一个衍生lib,flink runtime其实是一个分布式的阻塞队列,通过这个概念可以在上面开发出很多有意思的产品,cep就是其中一个。 分析结束,欢迎拍砖~