2016-08-10 10 views
1

I持っているクラスタずにローカルで実行し、次のコード:FLINK CEPは確定的ではありません

1,1 
1,2 
1,3 
2,1 
2,2 
2,3 
... 

I:

val count = new AtomicInteger() 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(1) 
val text: DataStream[String] = env.readTextFile("file:///flink/data2") 
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1))) 
val pattern: ... 
CEP.pattern(mapped, pattern).select(eventMap => { 
    println("Found: " + (patternName, eventMap)) 
    count.incrementAndGet() 
}) 

env.execute() 
println(count) 

私のデータは次の形式でCSVファイル(ユーザー、valが)でありますパターンのイベントを検出しようとしています(event(val=1) -> event(val=2) -> event(val=3))。私が知っている一定数のイベントがストリームに存在する大きな入力ストリームでこれを実行すると、検出されるイベントの数が不一致になります。ほとんどの場合、システム内のイベントの数よりも少なくなります。 env.setParallelism(1)(コードの3行目と同様)、すべてのイベントが検出されます。

並列性が> 1のときに複数のスレッドがストリームからイベントを処理しているという問題があるとします。つまり、あるスレッドがevent(val=1) -> event(val=2)である間に別のスレッドにevent(val=3)が送信され、検出された。

ここに何か不足していますか?ストリームのパターンを失うことはできませんが、並列性を1に設定すると、Flinkのようなシステムでイベントを検出するという目的を破るようです。

更新:これはお互いに干渉異なるユーザのイベントを防ぎますが

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user")) 

は、私が使用してストリームをキーイング試してみました

1,1 
2,2 
1,3 

これからFLINKを防ぐことはできません。イベントをノードに順番に送信する。これは、非決定論が依然として存在することを意味する。

答えて

0

ストリームをユーザーID(最初の値)で入力することについて考えましたか? Flinkは、あるキーのすべてのイベントが同じ処理ノードに到達することを保証します。 もちろん、ユーザーごとにval = 1-> val = 2-> val = 3のパターンを検出する場合にのみ役立ちます。

+0

はい、試しました。残念ながらそれはどちらも役に立たない。 – Sriram

+0

ストリームのキー入力後に問題が発生しましたか? 結果として何が得られますか?あなたはどんな結果を期待していましたか? – Claudi

+0

まだ決定的ではありません。それは次のようにイベントを防止するものの:検出されてから 1,1 2,2- 1,3、キーイングは、依然として複数のスレッドで順不同処理ノードに行くからイベントを停止しません。 – Sriram

0

おそらく問題は、map演算子の後にkeyBy演算子を適用することにあります。だから、

、代わりに:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user")) 

があるはずです:

val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...) 

私は、これは古い質問ですけど、多分それは誰かを助けます。

関連する問題