FlinkにはWindowStream
の平均を計算するための組み込み関数がありません。このためにカスタムWindowFunction
を実装する必要があります。
最も効率的な方法は、あなたが平均化したい値とReduceFunction
の結果を取り、平均値を算出し、その後のWindowFunction
の数と合計を計算ReduceFunction
を実装することです。 Flinkは入力値に直接適用するため、ReduceFunction
を使用する方が効率的です。したがって、その場で値を集約し、ウィンドウ内で値を収集しません。これにより、ウィンドウのメモリフットプリントが大幅に削減されます。
ReduceFunction
の出力は入力と同じタイプなので、ReduceFunction
を適用する前にカウントのフィールドを追加する必要があります。
が
val valueStream: DataStream[(String, Double)] = ???
val r: DataStream[(String, Double)] = valueStream
// append a 1L for counting
.map(x => (x._1, x._2, 1l))
// key and window stream
.keyBy(0).timeWindow(Time.minutes(5))
.apply(
// ReduceFunction (compute sum and count)
(x: (String, Double, Long), y: (String, Double, Long)) =>
(x._1, x._2 + y._2, x._3 + y._3),
// WindowFunction
(key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
// get first (and only) value
val x: (String, Double, Long) = input.toIterator.next
// compute average as sum/count
out.collect(x._1, x._2/x._3)
}
)
FLINKでこのようなタスクを行うために利用可能な任意の例やブログの投稿はありません、私は知らない私は 'ReduceFunction'と' WindowFunction'を実装する方法:次のような
何かがトリックを行う必要があります。親切にも、サンプルコードを共有できますか? –
OK、私は例を追加しました。 –
ありがとう、ちょっと微調整して作業しています。しかし、私はまだ飛び跳ねることが非常に新しく、これを自分で書くことは決して考えられませんでした。あなたは私がFlinkで手を差し伸べるところからの参考文献を教えてもらえますか? –