私は順序が乱れたストリームを持っています。それらを順序付けし、次のフレームの同じフィールドでフィールド値を合計する必要があります。私のコード:Flinkタイムスタンプ、計算されたフィールドをストリームに追加する
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Message> messageswithTS = messages.assignTimestampsAndWatermarks(new TimeLagWaterMarkGenerator());
DataStream<Message> SumNumber = messageswithTS
.keyBy("deviceId")
.map(new Sumalo())
Sumalo()
は、追加する機能です。タイムスタンプを抽出するコード:
public class TimeLagWaterMarkGenerator implements AssignerWithPeriodicWatermarks<Message> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Message element, long previousElementTimestamp) {
long timestamp = element.getDate();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
}
結果:
1 TRUE 0 21 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 29 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 29 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....
正しい結果:
1 TRUE 0 0 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 4 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 4 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....
は、任意のヘルプは理解されます。
Fabianさん、ありがとうございました。私はテーブルAPIとSQLの機能でそれを試しましたが、ストリームテーブルの集約は現在サポートされていません... – jag
いいえ残念ながら、それは現時点でそれを実装する唯一の方法です。この機能は将来、テーブルAPIに追加されます。 –