2017-03-28 6 views
0

私はApache Flink 1.2を使用しています。ここに私の質問があります。 私はデータストリームを持っており、1日のウィンドウでメトリックを計算したいと思います。したがって、私は次のようなものを書くでしょう:Apache Flinkの別のウィンドウでメトリックを計算する

DataStream<Tuple6<Timestamp, String, Double, Double, Double, Integer>> myStream0 = 
      env.readTextFile("Myfile.csv") 
      .map(new MyMapper())    // Parse the input 
      .assignTimestampsAndWatermarks(new MyExtractor()) //Assign the timestamp of the event 
      .timeWindowAll(Time.days(1))  
      .apply(new average()); // compute average, max, sum 

私は1時間のウィンドウで同じメトリクスを計算したいと思います。

私はこれまでと同じように書くことができ、Time.hours(1)を指定することができますが、私の懸念は、この方法では、入力ファイルの2倍の読み込みと2回の作業です。すべてのtogheter(つまり、同じストリームを使用して)を行う方法があるのだろうかと思います。

答えて

0

時間単位の集計とそれらの集計から計算できます。だから、

DataStream<Double> vals = ... // source + timestamp extractor 

DataStream<Tuple2<Double, Long>> valCnt = vals // (sum, cnt) 
    .map(new CntAppender())  // Double -> Tuple2<Double, Long(1)> 

DataStream<Tuple3<Double, Long, Long>> hourlySumCnt = valCnt // (sum, cnt, endTime) 
    .timeWindowAll(Time.hours(1)) 
    // SumCounter ReduceFunction sums the Double and Long field (Long is Count) 
    // WindowEndAppender WindowFunction adds the window end timestamp (3rd field) 
    .reduce(new SumCounter(), new WindowEndAppender()) 

DataStream<Tuple2<Double, Long>> hourlyAvg = hourlySumCnt // (avg, endTime) 
    .map(new SumDivCnt()) // MapFunction divides Sum by Cnt for average 

DataStream<Tuple3<Double, Long, Long>> dailySumCnt = hourlySumCnt // (sum, cnt, endTime) 
    .map(new StripeOffTime()) // removes unnecessary time field -> Tuple2<Double, Long> 
    .timeWindowAll(Time.days(1)) 
    .reduce(new SumCounter(), new WindowEndAppender()) // same as above 

DataStream<Tuple2<Double, Long>> dailyAvg = dailySumCnt // (avg, endTime) 
    .map(new SumDivCnt()) // same as above 

、あなたは基本的にあなたが

  1. は毎日毎時間平均
  2. 計算を計算し、その結果に基づいて合計を計算し、各時間カウントし、次のようにこれは単純なDataStream<Double>を探します合計カウントと私はReduceFunction代わりのためのWindowFunctionを使用していますことを、毎日平均

注意、和と数の計算は、ReduceFunctionが熱心に適用されるため、つまり、ウィンドウのすべてのレコードは収集されずにすぐに集計されるためです。したがって、維持する必要がある状態は単一のレコードです。

関連する問題