2017-11-23 14 views

答えて

3

はい、可能です。

イベント時間を使用している場合は、時間間隔を増やしてウィンドウをカスケードするだけで済みます。だから、あなたは:

DataStream<String> data = ... 
// append a Long 1 to each record to count it. 
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 

DataStream<Tuple2<String, Long>> 1minCnts = withOnes 
    // key by String field 
    .keyBy(0) 
    // define time window 
    .timeWindow(Time.of(1, MINUTES)) 
    // sum ones of the Long field 
    // in practice you want to use an incrementally aggregating ReduceFunction and 
    // a WindowFunction to extract the start/end timestamp of the window 
    .sum(1); 

// emit 1-min counts to wherever you need it 
1minCnts.addSink(new YourSink()); 

// compute 5-min counts based on 1-min counts 
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts 
    // key by String field 
    .keyBy(0) 
    // define time window of 5 minutes 
    .timeWindow(Time.of(5, MINUTES)) 
    // sum the 1-minute counts in the Long field 
    .sum(1); 

// emit 5-min counts to wherever you need it 
5minCnts.addSink(new YourSink()); 

// continue with 1 day window and 1 week window 

注意これが可能であることを、ので:

  1. 合計は、連想機能(あなたは部分和を合計して合計を計算することができます)です。
  2. タンブリングウィンドウはうまく整列しており、重なり合っていません。インクリメンタル集約ReduceFunctionにコメントについて

通常、あなたはウィンドウ操作(の出力に窓の開始および/または終了タイムスタンプを持つようにしたい、同じキーのためのそれ以外のすべての結果同じように見える)。ウィンドウの開始時間と終了時間は、apply()メソッドのwindowパラメータWindowFunctionからアクセスできます。ただし、WindowFunctionは、レコードをインクリメンタルに集計するのではなく、それらを収集してウィンドウの最後にレコードを集約します。したがって、インクリメンタルアグリゲーションにはReduceFunctionを使用し、結果にウィンドウの開始時間および/または終了時間を追加する方が効率的です。 documentationでは詳細が説明されています。

処理時間を使用してこれを計算する場合は、ウィンドウをカスケードすることはできませんが、入力データストリームから4つのウィンドウ関数にファンアウトする必要があります。

+0

優秀 - once i code assignTimestampsAndWatermarks et al私は常に計画だったEventTimeを使用できます。あなたは 'ベストプラクティスのreduceFunction' comment 'にちょっとした拡張を施せますか?面白いと思った... –

+0

確かに、私は私の答えを広げた –

関連する問題