2017-09-29 16 views
0

Flink 1.3.2を使用して2段階の集約を実行しようとしていますが、結果が期待どおりに機能していないようです。当初、私は期待される結果を得る。2層アグリゲーションを実行する最適な方法は何ですか?

ここで取り組んでいるアプローチに根本的な問題はありますか?

このタイプの連鎖操作を実行している他の人たちの良い例は見つかりませんでした。

val myStream = sourceStream 
    .keyBy(0)  
    .timeWindow(Time.minutes(30)) 
    .reduce((r1: myRow, r2: myRow) => { r1 + r2 }, 
      (key: Any, window: TimeWindow, iterable: Iterable[myRow], out: Collector[myRow]) => { out.collect(iterable.iterator.next.setWindowStart(window.getStart)) } ) 
    .map(tier2Row.fromMyRow(_)) 
    .keyBy(0)   
    .timeWindow(Time.minutes(10)) 
    .reduce(_ + _) 
    .addSink(new MyTier2RowSink) 

答えて

0

元は時間ベースのデータ用に処理時間モードを使用していて、奇妙な結果を得ていました。私は摂取時間モードに切り替えました。階層化された集約は、モードを設定する以外の方法で私のコードに変更を加えることなく、予想通りに処理されます。

+0

Fabianがこの投稿の手がかりをくれました:https://stackoverflow.com/questions/46498772/how-can-i-inspect-the-internal-flink-timestamp-for-an-item-in-a-ストリームを使用して –

関連する問題