I持っているクラスタずにローカルで実行し、次のコード:FLINK CEPは確定的ではありません
1,1
1,2
1,3
2,1
2,2
2,3
...
I:
val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
println("Found: " + (patternName, eventMap))
count.incrementAndGet()
})
env.execute()
println(count)
私のデータは次の形式でCSVファイル(ユーザー、valが)でありますパターンのイベントを検出しようとしています(event(val=1) -> event(val=2) -> event(val=3)
)。私が知っている一定数のイベントがストリームに存在する大きな入力ストリームでこれを実行すると、検出されるイベントの数が不一致になります。ほとんどの場合、システム内のイベントの数よりも少なくなります。 env.setParallelism(1)
(コードの3行目と同様)、すべてのイベントが検出されます。
並列性が> 1のときに複数のスレッドがストリームからイベントを処理しているという問題があるとします。つまり、あるスレッドがevent(val=1) -> event(val=2)
である間に別のスレッドにevent(val=3)
が送信され、検出された。
ここに何か不足していますか?ストリームのパターンを失うことはできませんが、並列性を1に設定すると、Flinkのようなシステムでイベントを検出するという目的を破るようです。
更新:これはお互いに干渉異なるユーザのイベントを防ぎますが
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
:
は、私が使用してストリームをキーイング試してみました
1,1
2,2
1,3
これからFLINKを防ぐことはできません。イベントをノードに順番に送信する。これは、非決定論が依然として存在することを意味する。
はい、試しました。残念ながらそれはどちらも役に立たない。 – Sriram
ストリームのキー入力後に問題が発生しましたか? 結果として何が得られますか?あなたはどんな結果を期待していましたか? – Claudi
まだ決定的ではありません。それは次のようにイベントを防止するものの:検出されてから 1,1 2,2- 1,3、キーイングは、依然として複数のスレッドで順不同処理ノードに行くからイベントを停止しません。 – Sriram