私は頻繁に刻み目をつける(秒を考える)ティッカーKStreamを持っています.24時間のウィンドウでさまざまな統計を計算したいと思います。たとえば、24時間の変更、特定のポイントとその前の24時間の価格の差。私の所望の入力のための与えられたウィンドウのストリームに対する統計を計算する
私の出力は、次のとおりです。
t1 -> t1c1
t2 -> t1c2
t3 -> t1c3
t1
が入力ティッカーです
、そしてt1c1
は、それに先立つ24時間ウィンドウのために計算された追加の統計で入力ティッカーです。
これはうまくいきませんでした。これはうまくいきませんでした。 * 1時間ごとにホッとしたサイズで24時間表示します。
builder.stream(rawPriceTickerTopic, ...)
.groupByKey()
.windowedBy(
TimeWindows.of(TimeUnit.DAYS.toMillis(1))
.advanceBy(TimeUnit.SECONDS.toMillis(1))
.reduce((value1, value2) ->
value1.tickerWithStatsFrom(value2), ...)
.toStream();
各入力ティッカーは、それがメンバーである各ウィンドウの出力ティッカーを生成しかし、これは、出力点の膨大な数を生成します。
- 、最新の時系列店のいくつかの種類をキープ店舗から24時間前の値を取得し、それから、私の統計ティッカーを計算するが、これは、ストリームのポイントに逆行しているようです。
'しかし、これは生成膨大な数の出力点があります。各入力ティッカーは、そのメンバーである各ウィンドウに対して出力テロップを生成します.'明らかに、あなたはTimeWindowsを使用しています。 「スライディングウィンドウ」を使用するためにインデントし、単一の「最新/最新」のウィンドウのみを維持する場合は、遅れて到着したレコードを処理できないため、すぐにサポートされません。スライディングウインドウを模倣するために非ウィンドウ集約を使用するあなたのアプローチは、良い方法であるように思われますが、順序外れレコードを考慮に入れません。 –
以下の私の解決策では、アウトオブオーダーのレコードがストリームに送られると、アグリゲーターが順不同でヒットし、タイムスタンプでソートされます。これは、それに続くレコードがその集計にそれを含めますが、これまで持っていたレコードではないことを意味します。私は、カフカがウインドウでアウトオブオーダーのレコードを処理する方法を理解できませんでした。ストリームプロセッサが集計を再計算し、別の出力レコードを生成しますか? – jaker
Kafka Streamsはすべてのウィンドウを(期限が切れるまで)並列に維持し、遅れたレコードが到着した場合に結果を再計算します。したがって、それはまたあなたがしない "古い"ウィンドウを更新します - あなたはただ1つのウィンドウを維持するので)最新のウィンドウを更新し、ウィンドウの終了時間よりも古いレコードを削除します。 Kafka Streamsは設定された保存期間(デフォルト1日)のウィンドウサイズとは独立してウィンドウを管理します。したがって、ウィンドウサイズがわずか1時間の場合、レコードは数時間後に到着する可能性があり、依然として結果に含まれます。 –