2017-06-21 4 views
0

私はキー付きデータストリームの "ローリング"リダクションを理解しようとしています。「キー付きデータストリームのローリング削減」の意味は何ですか? DataStream reduce function

私は、一定の時間内に到着するメッセージを無視したいシナリオがあります。例えば。最初のイベント - >イベントの間 - >最後のイベント、最初と最後のイベントをキャプチャし、ストリームからイベントの間をスキップ/削除したいだけです。そして重要なのは、それはあるキーに基づいて起こるはずです。私のケースでは、私がキーを適用しているときにそのハッピングをしていません。

答えて

0

ローリング・リダクションは、おそらくあなたが探しているものではありません。状態として要素を保持します。新しい要素を受け取るたびに、格納された要素と新しい要素にReduceFunctionが適用されます。関数の結果が放出され、状態が更新されます。この演算子は終了要素の概念を持たず、常に状態を持ち、次の要素が処理されるのを待ちます。

私はステートフルFlatMapFunctionまたはProcessFunctionがあなたのユースケースに適していると思います。この関数は、start要素をstateとして格納し、end要素が到着するのを待ちます。受信されると、開始要素と終了要素が出力され、状態がクリーンアップされます。

全体的なプログラムは次のようになります。

val stream[Event] = ... 
val startEnd[(Event, Event)] = stream 
    .keyBy(yourKey) 
    .flatMap(yourStatefulFunction) 
+0

は直接の最初と最後のイベントを区別し、私は間の時間ラップを使用して計算する必要がない(と時間がダイナミックな要因である)があるキーを持ついくつかのメッセージxはt時間を有し、キーYはt2時間遅延を有する何らかのメッセージである。 提案したように、私は@ https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.htmlを探しています –