0
私は、分秒のタイムスタンプでキー入力された60秒のデータを最大30秒の遅延で集計しようとしています。Apache Flink。ウォーターマークを使用したウィンドウ処理
私はデータを受け取りました。透かしとタイムスタンプが設定されています。集計されたデータはohlcStreamAggregatedに送信されないため、ログに記録されません。
public TimestampExtractor(Time maxDelayInterval) {
if (maxDelayInterval.toMilliseconds() < 0) {
throw new RuntimeException("This parameter must be positive or 0.);
}
this.maxDelayInterval = maxDelayInterval.toMilliseconds()/1000;
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval;
}
@Override
public final Watermark getCurrentWatermark() {
// set maximum delay 30 seconds
long potentialWM = currentMaxTimestamp - maxDelayInterval;
if (potentialWM > lastEmittedWM) {
lastEmittedWM = potentialWM;
}
return new Watermark(lastEmittedWM);
}
@Override
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) {
BigDecimal bd = new BigDecimal(stockTrade.getTime());
long timestamp = bd.longValue();
//set the maximum seen timestamp so far
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
私はテンプレートとしてthis exampleを使用。
はい、env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);を設定します。 env.execute( "Kafkaトピックからの取引");最初の収集ストリームが正常に動作し、ログを記録するなど、問題はウィンドウ処理にあると私は考えています。私にあなたの電子メールを送ることができるならば、それについてさらに議論することができます。とても感謝しております。 –