2017-07-17 4 views
0

私は、分秒のタイムスタンプでキー入力された60秒のデータを最大30秒の遅延で集計しようとしています。Apache Flink。ウォーターマークを使用したウィンドウ処理

​​

私はデータを受け取りました。透かしとタイムスタンプが設定されています。集計されたデータはohlcStreamAggregatedに送信されないため、ログに記録されません。

public TimestampExtractor(Time maxDelayInterval) { 
     if (maxDelayInterval.toMilliseconds() < 0) { 
      throw new RuntimeException("This parameter must be positive or 0.); 
     } 
     this.maxDelayInterval = maxDelayInterval.toMilliseconds()/1000; 
     this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval; 
    } 

@Override 
public final Watermark getCurrentWatermark() { 
     // set maximum delay 30 seconds 
     long potentialWM = currentMaxTimestamp - maxDelayInterval; 
     if (potentialWM > lastEmittedWM) { 
      lastEmittedWM = potentialWM; 
     } 
     return new Watermark(lastEmittedWM); 
    } 
@Override 
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) { 
     BigDecimal bd = new BigDecimal(stockTrade.getTime()); 
     long timestamp = bd.longValue(); 
     //set the maximum seen timestamp so far 
     if (timestamp > currentMaxTimestamp) { 
      currentMaxTimestamp = timestamp; 
     } 
     return timestamp; 
    } 

私はテンプレートとしてthis exampleを使用。

答えて

0

(多分要旨に)あなたが全部を共有することができれば、あなたのアプリケーションを診断するために容易になるだろう、しかし、あなたをした:

  • は、イベントの時間(docs)に特徴的な時間を設定しますか?
  • ストリーム実行環境でcall execute?

また、タイムスタンプエクストラクタはかなり簡単です。これ以上のもの:

public static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<StockTrade> { 
    public TimestampExtractor() { 
     super(Time.seconds(30)); 
    } 

    @Override 
    public long extractTimestamp(StockTrade trade) { 
     return trade.getTime(); 
    } 
} 
+0

はい、env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);を設定します。 env.execute( "Kafkaトピックからの取引");最初の収集ストリームが正常に動作し、ログを記録するなど、問題はウィンドウ処理にあると私は考えています。私にあなたの電子メールを送ることができるならば、それについてさらに議論することができます。とても感謝しております。 –

関連する問題