2016-04-12 13 views
4

私は、2番目のイベントがx秒以内に最初のイベントに従わなかった場合に状態を変更する必要があるシナリオを持っています。例えば、ユーザーが100分でログアウトしなかった場合、彼は無効な状態にあると考えます。どのようにこれは現在のパターン操作を使用して設計できますか?Apache Flink CEPパターン操作がNOT followBy

答えて

2

これはできません。解決策は、定義された時間ウィンドウから外れるのでイベントシーケンスが破棄されるたびにトリガされるタイムアウトハンドラを持つことです。すでにtimeout handler実装を追跡するJIRAの問題があります。

1

これは既に実装されているので、私はここで回答を探している人たちにこの質問に答えようと考えました。あなたのCEPパターンは、このようなものであれば

FLINK 1.0.0のように、これは、例えば、TIMEDOUTパターンを処理して行うことができます。

例の一部Flink Websiteからから(1.2の間でいくつかの主要な変更はありおよび1.3に応じてコードを調整してください、この答えは1.3に焦点を当てて)

パターンの説明: - 10秒以内に「重要な」タイプの2つ目のイベントイベントに続いて、タイプ「エラー」の最初のイベントを取得します

Pattern<Event, ?> pattern = Pattern.<Event>begin("start") 
.next("middle").where(new SimpleCondition<Event>() { 
    @Override 
    public boolean filter(Event value) throws Exception { 
     return value.getName().equals("error"); 
    } 
}).followedBy("end").where(new SimpleCondition<Event>() { 
    @Override 
    public boolean filter(Event value) throws Exception { 
     return value.getName().equals("critical"); 
    } 
}).within(Time.seconds(10)); 

PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern) 

DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() { 
    @Override 
    public String timeout(Map<String, List<Event>> map, long l) throws Exception { 
    return map.toString() +" @ "+ l; 
    } 
}, new PatternSelectFunction<Event, String>() { 

    @Override 
    public String select(Map<String, List<Event>> map) throws Exception { 
    return map.toString(); 
    } 
}); 

この場合、ユーザーが100分後にログアウトしないと、対応するイベントが到着しないため、パターンがタイムアウトになり、部分イベント(開始イベント)が発生します。 PatternTimeoutFunctionに取り込まれます。

関連する問題