私は、キー入力され、異なる期間(1分、5分、1日、1週間)の転倒のためのカウントを計算する必要があるデータストリームを持っています。Apache Flink:複数のカウントウィンドウ関数を適用するには?
1つのアプリケーションで4つのウィンドウ数をすべて計算することはできますか?
私は、キー入力され、異なる期間(1分、5分、1日、1週間)の転倒のためのカウントを計算する必要があるデータストリームを持っています。Apache Flink:複数のカウントウィンドウ関数を適用するには?
1つのアプリケーションで4つのウィンドウ数をすべて計算することはできますか?
はい、可能です。
イベント時間を使用している場合は、時間間隔を増やしてウィンドウをカスケードするだけで済みます。だから、あなたは:
DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne);
DataStream<Tuple2<String, Long>> 1minCnts = withOnes
// key by String field
.keyBy(0)
// define time window
.timeWindow(Time.of(1, MINUTES))
// sum ones of the Long field
// in practice you want to use an incrementally aggregating ReduceFunction and
// a WindowFunction to extract the start/end timestamp of the window
.sum(1);
// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());
// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
// key by String field
.keyBy(0)
// define time window of 5 minutes
.timeWindow(Time.of(5, MINUTES))
// sum the 1-minute counts in the Long field
.sum(1);
// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());
// continue with 1 day window and 1 week window
注意これが可能であることを、ので:
ReduceFunction
にコメントについて:
通常、あなたはウィンドウ操作(の出力に窓の開始および/または終了タイムスタンプを持つようにしたい、同じキーのためのそれ以外のすべての結果同じように見える)。ウィンドウの開始時間と終了時間は、apply()
メソッドのwindow
パラメータWindowFunction
からアクセスできます。ただし、WindowFunction
は、レコードをインクリメンタルに集計するのではなく、それらを収集してウィンドウの最後にレコードを集約します。したがって、インクリメンタルアグリゲーションにはReduceFunction
を使用し、結果にウィンドウの開始時間および/または終了時間を追加する方が効率的です。 documentationでは詳細が説明されています。
処理時間を使用してこれを計算する場合は、ウィンドウをカスケードすることはできませんが、入力データストリームから4つのウィンドウ関数にファンアウトする必要があります。
優秀 - once i code assignTimestampsAndWatermarks et al私は常に計画だったEventTimeを使用できます。あなたは 'ベストプラクティスのreduceFunction' comment 'にちょっとした拡張を施せますか?面白いと思った... –
確かに、私は私の答えを広げた –