毎秒1データポイントからなるストリーム "ストリーム-1"があり、ホッピングウィンドウを使用して合計を含む派生ストリーム "ストリーム-5"を計算したいとします10秒のホッピング窓を使用して合計を含む「ストリーム-5」に基づいた別のストリーム「ストリーム-10」とを含む。集約は各キーごとに別々に行う必要があり、私は別のプロセスで各ステップを実行できるようにしたいと考えています。最後の値が正しい限り、stream-5とstream-10に同じキー/タイムスタンプの更新が含まれているので(つまり、必ずしもHow to send final kafka-streams aggregation result of a time windowed KTable?は必要ありません)、それ自体は問題ありません。カフカストリームDSLを用いた2ステップウインドウ付き集計
高水準のカフカストリームDSLを使用してこれを解決する方法はありますか?これまでは、集約のためにstream-5で生成された中間更新を処理するためのエレガントな方法を見ていませんでした。
cache.max.bytes.buffering
とcommit.interval.ms
の設定で中間更新が何とか制御できることは知っていますが、どのような設定でも中間値が生成されないことを保証できる設定はないと思います。また、 "ストリーム5"をキーのタイムスタンプ部分で読み込んだKTableに変換しようとしましたが、KTableのようなウィンドウ操作をサポートしていないようです。
これは私がこれまで持っているもので、ストリーム-1はintputs(キーがちょうどKEYである)
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}
が含まれている場合に起因するストリーム-5
今
Reducer<DataPoint> sum = new Reducer<DataPoint>() {
@Override
public DataPoint apply(DataPoint x, DataPoint y) {
return new DataPoint(x.timestamp, x.value + y.value);
}
};
KeyValueMapper<Windowed<String>, DataPoint, String> strip = new
KeyValueMapper<Windowed<String>, DataPoint, String>() {
@Override
public String apply(Windowed<String> wKey, DataPoint arg1) {
return wKey.key();
}
};
KStream<String, DataPoint> s1 = builder.stream("stream-1");
s1.groupByKey()
.reduce(sum, TimeWindows.of(5000).advanceBy(5000))
.toStream()
.selectKey(strip)
.to("stream-5");
KStream<String, DataPoint> s5 = builder.stream("stream-5");
s5.groupByKey()
.reduce(sum, TimeWindows.of(10000).advanceBy(10000))
.toStream()
.selectKey(strip)
.to("stream-10");
上の中間集計値に失敗しました
ストリーム5が正しい(最終)値が含ま:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}
が、ストリーム10は、間違った(最終的な値であります
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}
リンクありがとうございます。そのアプローチで州の店舗サイズが無限に拡大するのを防ぐ方法の更新を楽しみにしています。 – veryltdbeard