0
Flink 1.3.2を使用して2段階の集約を実行しようとしていますが、結果が期待どおりに機能していないようです。当初、私は期待される結果を得る。2層アグリゲーションを実行する最適な方法は何ですか?
ここで取り組んでいるアプローチに根本的な問題はありますか?
このタイプの連鎖操作を実行している他の人たちの良い例は見つかりませんでした。
val myStream = sourceStream
.keyBy(0)
.timeWindow(Time.minutes(30))
.reduce((r1: myRow, r2: myRow) => { r1 + r2 },
(key: Any, window: TimeWindow, iterable: Iterable[myRow], out: Collector[myRow]) => { out.collect(iterable.iterator.next.setWindowStart(window.getStart)) } )
.map(tier2Row.fromMyRow(_))
.keyBy(0)
.timeWindow(Time.minutes(10))
.reduce(_ + _)
.addSink(new MyTier2RowSink)
Fabianがこの投稿の手がかりをくれました:https://stackoverflow.com/questions/46498772/how-can-i-inspect-the-internal-flink-timestamp-for-an-item-in-a-ストリームを使用して –