2016-04-19 14 views
1

私はFlinkのWindowedStream上でいくつかの操作を実行したいと思います。 しかし、サムのように事前に定義可能な非常に限られた操作がある、最小値、最大値などFlinkでWindowedStreamのカスタム操作を実行するにはどうすればよいですか?

val windowedStream = valueStream 
          .keyBy(0) 
          .timeWindow(Time.minutes(5)) 
          .sum(2) //Change this to average? 

は、私はそれをどのように行うことができ、私は平均を見つけたいと仮定?

答えて

3

FlinkにはWindowStreamの平均を計算するための組み込み関数がありません。このためにカスタムWindowFunctionを実装する必要があります。

最も効率的な方法は、あなたが平均化したい値とReduceFunctionの結果を取り、平均値を算出し、その後のWindowFunctionの数と合計を計算ReduceFunctionを実装することです。 Flinkは入力値に直接適用するため、ReduceFunctionを使用する方が効率的です。したがって、その場で値を集約し、ウィンドウ内で値を収集しません。これにより、ウィンドウのメモリフットプリントが大幅に削減されます。

ReduceFunctionの出力は入力と同じタイプなので、ReduceFunctionを適用する前にカウントのフィールドを追加する必要があります。

val valueStream: DataStream[(String, Double)] = ??? 

val r: DataStream[(String, Double)] = valueStream 
    // append a 1L for counting 
    .map(x => (x._1, x._2, 1l)) 
    // key and window stream 
    .keyBy(0).timeWindow(Time.minutes(5)) 
    .apply(
    // ReduceFunction (compute sum and count) 
    (x: (String, Double, Long), y: (String, Double, Long)) => 
     (x._1, x._2 + y._2, x._3 + y._3), 
    // WindowFunction 
    (key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => { 
     // get first (and only) value 
     val x: (String, Double, Long) = input.toIterator.next 
     // compute average as sum/count 
     out.collect(x._1, x._2/x._3) 
    } 
) 
+0

FLINKでこのようなタスクを行うために利用可能な任意の例やブログの投稿はありません、私は知らない私は 'ReduceFunction'と' WindowFunction'を実装する方法:次のような

何かがトリックを行う必要があります。親切にも、サンプルコードを共有できますか? –

+0

OK、私は例を追加しました。 –

+0

ありがとう、ちょっと微調整して作業しています。しかし、私はまだ飛び跳ねることが非常に新しく、これを自分で書くことは決して考えられませんでした。あなたは私がFlinkで手を差し伸べるところからの参考文献を教えてもらえますか? –

関連する問題