FlinkCEP - Complex event processing for Flink
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html
?
首先目的是匹配pattern sequence
pattern Sequence是由多個pattern構成
DataStream<Event> input = ...Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<Event>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.select(new PatternSelectFunction<Event, Alert> {@Overridepublic Alert select(Map<String, List<Event>> pattern) throws Exception {return createAlertFrom(pattern);}} });如例子中,這個pattern Sequence由3個pattern組成,begin,next,followedBy
pattern Sequence的第一個pattern都是begin
每個pattern都需要有一個唯一的名字,比如這里的start,middle,end
每個pattern也可以設置condition,比如where
?
Pattern
Pattern可以分為兩種,Individual Patterns,Complex Patterns.
對于individual patterns,又可以分為singleton pattern, or a looping one
通俗點,singleton pattern指出現一次,而looping指可能出現多次,在有限自動機里面匹配相同的pattern就形成looping
比如,對于a b+ c? d
b+就是looping,而其他的都是singleton
?
對于singleton pattern可以加上Quantifiers,就變成looping
// expecting 4 occurrencesstart.times(4);// expecting 0 or 4 occurrencesstart.times(4).optional();// expecting 1 or more occurrences start.oneOrMore();// expecting 0 or more occurrencesstart.oneOrMore().optional();同一個pattenr的多次匹配可以定義Contiguity
illustrate the above with an example, a pattern sequence "a+ b" (one or more "a"’s followed by a "b") with input "a1", "c", "a2", "b" will have the following results:
Strict Contiguity: {a2 b} – the "c" after "a1" causes "a1" to be discarded.
Relaxed Contiguity: {a1 b} and {a1 a2 b} – c is simply ignored.
Non-Deterministic Relaxed Contiguity: {a1 b}, {a2 b}, and {a1 a2 b}.
oneOrMore() and times()) the default is relaxed contiguity. If you want strict contiguity, you have to explicitly specify it by using the consecutive() call, and if you want non-deterministic relaxed contiguity you can use the allowCombinations() call
consecutive() 的使用例子,
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("c");} }) .followedBy("middle").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("a");} }).oneOrMore().consecutive() .followedBy("end1").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("b");} });Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B
with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
?
這是針對單個pattern的Contiguity,后面還可以定義pattern之間的Contiguity
?
當然對于Pattern,很關鍵的是Conditions
就是條件,怎么樣算匹配上?
Conditions 分為好幾種,
Simple Conditions
start.where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) {return value.getName().startsWith("foo");} });很容易理解,單純的根據當前Event來判斷
?
Iterative Conditions
This is how you can specify a condition that accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them.
即當判斷這個條件是否滿足時,需要參考之前已經匹配過的pattern,所以稱為iterative
Below is the code for an iterative condition that accepts the next event for a pattern named “middle” if its name starts with “foo”, and if the sum of the prices of the previously accepted events for that pattern plus the price of the current event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping patterns, e.g. oneOrMore().
middle.oneOrMore().where(new IterativeCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {if (!value.getName().startsWith("foo")) {return false;}double sum = value.getPrice();for (Event event : ctx.getEventsForPattern("middle")) {sum += event.getPrice();}return Double.compare(sum, 5.0) < 0;} });首先這是個oneOrMore,可以匹配一個或多個,但匹配每一個時,除了判斷是否以“foo”開頭外
還要判斷和之前匹配的event的price的求和小于5
這里主要用到ctx.getEventsForPattern,取出某個名字的pattern當前的所有的匹配
?
Combining Conditions
pattern.where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) {return ... // some condition } }).or(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) {return ... // or condition } });可以有多個條件,where表示“and”語義,而or表示“or” 語義
?
Pattern Sequence
sequence是有多個pattern組成,那么多個pattern之間是什么關系?
?
A pattern sequence has to start with an initial pattern, as shown below:
Pattern<Event, ?> start = Pattern.<Event>begin("start");每個sequence都必須要有個開始,begin
?
Next, you can append more patterns to your pattern sequence by specifying the desired contiguity conditions between them.
or
在begin開始后, 可以加上各種pattern,pattern之間的Contiguity關系有上面幾種
例子,
As an example, a pattern a b, given the event sequence"a", "c", "b1", "b2", will give the following results:
Strict Contiguity between a and b: {} (no match) – the "c" after "a" causes "a" to be discarded.
Relaxed Contiguity between a and b: {a b1} – as relaxed continuity is viewed as “skip non-matching events till the next matching one”.
Non-Deterministic Relaxed Contiguity between a and b: {a b1}, {a b2} – as this is the most general form.
?
temporal constraint
一個sequence還可以指定時間限制,supported for both processing and event time
next.within(Time.seconds(10));?
?
Detecting Patterns
當定義好pattern sequence后,我們需要真正的去detect,
DataStream<Event> input = ... Pattern<Event, ?> pattern = ...PatternStream<Event> patternStream = CEP.pattern(input, pattern);?
生成PatternStream
The input stream can be keyed or non-keyed depending on your use-case
Applying your pattern on a non-keyed stream will result in a job with parallelism equal to 1
如果non-keyed stream,并發只能是1
如果是keyed stream,不同的key可以單獨的detect pattern,所以可以并發
?
Once you have obtained a PatternStream you can select from detected event sequences via the select or flatSelect methods.
對于PatternStream,可以用
PatternSelectFunctionPatternFlatSelectFunction?
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {@Overridepublic OUT select(Map<String, List<IN>> pattern) {IN startEvent = pattern.get("start").get(0);IN endEvent = pattern.get("end").get(0);return new OUT(startEvent, endEvent);} }對于PatternSelectFunction需要實現select接口,
參數是Map<String, List<IN>> pattern,這是一個匹配成功的pattern sequence,key是pattern名,后面是list是因為對于looping可能有多個匹配值
而對于PatternFlatSelectFunction,只是在接口上多了Collector,這樣可以輸出多個值
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {@Overridepublic void select(Map<String, List<IN>> pattern, Collector<OUT> collector) {IN startEvent = pattern.get("start").get(0);IN endEvent = pattern.get("end").get(0);for (int i = 0; i < startEvent.getValue(); i++ ) {collector.collect(new OUT(startEvent, endEvent));}} }?
源碼
首先是定義pattern,雖然pattern定義比較復雜,但是實現比較簡單
最終,
org.apache.flink.cep.nfa.compiler.NFACompiler會將pattern sequence轉化為 NFA,非確定有限狀態機,sequence匹配的大部分邏輯都是通過NFA來實現的,就不詳細描寫了
?
最終調用到,patternStream.select產生結果流
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {SingleOutputStreamOperator<Map<String, List<T>>> patternStream =CEPOperatorUtils.createPatternStream(inputStream, pattern);return patternStream.map(new PatternSelectMapper<>(patternStream.getExecutionEnvironment().clean(patternSelectFunction))).returns(outTypeInfo);}?
CEPOperatorUtils.createPatternStream
if (inputStream instanceof KeyedStream) {// We have to use the KeyedCEPPatternOperator which can deal with keyed input streamsKeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());patternStream = keyedStream.transform("KeyedCEPPatternOperator",(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),new KeyedCEPPatternOperator<>(inputSerializer,isProcessingTime,keySerializer,nfaFactory,true));} else {KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;patternStream = inputStream.keyBy(keySelector).transform("CEPPatternOperator",(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),new KeyedCEPPatternOperator<>(inputSerializer,isProcessingTime,keySerializer,nfaFactory,false)).forceNonParallel();}關鍵就是,生成KeyedCEPPatternOperator
public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, List<IN>>>?
AbstractKeyedCEPPatternOperator
最關鍵的就是當一個StreamRecord過來時,我們怎么處理他
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {// there can be no out of order elements in processing timeNFA<IN> nfa = getNFA();processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());updateNFA(nfa);} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than the last seen watermark are considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp >= lastWatermark) { //只處理非late record// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark. saveRegisterWatermarkTimer();List<IN> elementsForTimestamp = elementQueueState.get(timestamp);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();}if (getExecutionConfig().isObjectReuseEnabled()) {// copy the StreamRecord so that it cannot be changed elementsForTimestamp.add(inputSerializer.copy(value));} else {elementsForTimestamp.add(element.getValue());}elementQueueState.put(timestamp, elementsForTimestamp);}}}可以看到,如果是isProcessingTime,非常簡單,直接丟給NFA處理就好
但如果是eventTime,問題就復雜了,因為要解決亂序問題,不能直接交給NFA處理
需要做cache,所以看看elementQueueState
private transient MapState<Long, List<IN>> elementQueueState;elementQueueState = getRuntimeContext().getMapState(new MapStateDescriptor<>(EVENT_QUEUE_STATE_NAME,LongSerializer.INSTANCE,new ListSerializer<>(inputSerializer)));elementQueueState,記錄所有時間點上的record list
?
onEventTime中會觸發對elementQueueState上數據的處理,
@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and priority queue iff they// have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); // 把elementQueueState的key按時間排序,PriorityQueue就是堆排序NFA<IN> nfa = getNFA();// STEP 2while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) { // peek從小的時間取起,如果小于currentWatermark,就觸發long timestamp = sortedTimestamps.poll();for (IN element: elementQueueState.get(timestamp)) { // 把該時間對應的record list拿出來處理processEvent(nfa, element, timestamp);}elementQueueState.remove(timestamp);}// STEP 3 advanceTime(nfa, timerService.currentWatermark());// STEP 4if (sortedTimestamps.isEmpty()) {elementQueueState.clear();}updateNFA(nfa);if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {saveRegisterWatermarkTimer();}// STEP 5 updateLastSeenWatermark(timerService.currentWatermark()); // 更新lastWatermark}?
onEventTime在何時被調用,
?
AbstractStreamOperator中有個
InternalTimeServiceManager timeServiceManager來管理所有的time service?在AbstractKeyedCEPPatternOperator中open的時候會,會創建這個time service,并把AbstractKeyedCEPPatternOperator作為triggerTarget傳入 timerService = getInternalTimerService("watermark-callbacks",VoidNamespaceSerializer.INSTANCE,this);?
在processElement會調用
saveRegisterWatermarkTimer(); long currentWatermark = timerService.currentWatermark();// protect against overflowif (currentWatermark + 1 > currentWatermark) {timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);}這個邏輯看起來非常tricky,其實就是往timeService你們注冊currentWatermark + 1的timer
?
AbstractStreamOperator中,當收到watermark的時候,
public void processWatermark(Watermark mark) throws Exception {if (timeServiceManager != null) {timeServiceManager.advanceWatermark(mark);}output.emitWatermark(mark);}timeServiceManager.advanceWatermark其實就是調用其中每一個time service的advanceWatermark
?
當前time service的實現,只有HeapInternalTimerService
HeapInternalTimerService.advanceWatermark
public void advanceWatermark(long time) throws Exception {currentWatermark = time; // 更新currentWatermark InternalTimer<K, N> timer;while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { // 從eventTimeTimersQueue取出一個timer,判斷如果小于當前的watermark,記得我們注冊過一個上個watermark+1的timerSet<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);timerSet.remove(timer);eventTimeTimersQueue.remove();keyContext.setCurrentKey(timer.getKey());triggerTarget.onEventTime(timer); // 調用到onEventTime}}?
這里還有個需要注意的點,對于KeyedStream,怎么保證不同key獨立detect pattern sequence?
對于keyed state,elementQueueState,本身就是按key獨立的,所以天然就支持
轉載于:https://www.cnblogs.com/fxjwind/p/7307535.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的FlinkCEP - Complex event processing for Flink的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用openoffice+jodconve
- 下一篇: python操作RabbitMQ