2017-12-27 10 views
0

私は頻繁に刻み目をつける(秒を考える)ティッカーKStreamを持っています.24時間のウィンドウでさまざまな統計を計算したいと思います。たとえば、24時間の変更、特定のポイントとその前の24時間の価格の差。私の所望の入力のための与えられたウィンドウのストリームに対する統計を計算する

私の出力は、次のとおりです。

t1 -> t1c1 
t2 -> t1c2 
t3 -> t1c3 
t1が入力ティッカーです

、そしてt1c1は、それに先立つ24時間ウィンドウのために計算された追加の統計で入力ティッカーです。

これはうまくいきませんでした。これはうまくいきませんでした。 * 1時間ごとにホッとしたサイズで24時間表示します。

builder.stream(rawPriceTickerTopic, ...) 
      .groupByKey() 
      .windowedBy(
        TimeWindows.of(TimeUnit.DAYS.toMillis(1)) 
        .advanceBy(TimeUnit.SECONDS.toMillis(1)) 
      .reduce((value1, value2) -> 
        value1.tickerWithStatsFrom(value2), ...)     
      .toStream(); 

各入力ティッカーは、それがメンバーである各ウィンドウの出力ティッカーを生成しかし、これは、出力点の膨大な数を生成します。

  • 、最新の時系列店のいくつかの種類をキープ店舗から24時間前の値を取得し、それから、私の統計ティッカーを計算するが、これは、ストリームのポイントに逆行しているようです。
+0

'しかし、これは生成膨大な数の出力点があります。各入力ティッカーは、そのメンバーである各ウィンドウに対して出力テロップを生成します.'明らかに、あなたはTimeWindowsを使用しています。 「スライディングウィンドウ」を使用するためにインデントし、単一の「最新/最新」のウィンドウのみを維持する場合は、遅れて到着したレコードを処理できないため、すぐにサポートされません。スライディングウインドウを模倣するために非ウィンドウ集約を使用するあなたのアプローチは、良い方法であるように思われますが、順序外れレコードを考慮に入れません。 –

+0

以下の私の解決策では、アウトオブオーダーのレコードがストリームに送られると、アグリゲーターが順不同でヒットし、タイムスタンプでソートされます。これは、それに続くレコードがその集計にそれを含めますが、これまで持っていたレコードではないことを意味します。私は、カフカがウインドウでアウトオブオーダーのレコードを処理する方法を理解できませんでした。ストリームプロセッサが集計を再計算し、別の出力レコードを生成しますか? – jaker

+0

Kafka Streamsはすべてのウィンドウを(期限が切れるまで)並列に維持し、遅れたレコードが到着した場合に結果を再計算します。したがって、それはまたあなたがしない "古い"ウィンドウを更新します - あなたはただ1つのウィンドウを維持するので)最新のウィンドウを更新し、ウィンドウの終了時間よりも古いレコードを削除します。 Kafka Streamsは設定された保存期間(デフォルト1日)のウィンドウサイズとは独立してウィンドウを管理します。したがって、ウィンドウサイズがわずか1時間の場合、レコードは数時間後に到着する可能性があり、依然として結果に含まれます。 –

答えて

0

私の最終的な解決策は、ウィンドウマネージャを放棄し、単にアグリゲータの自分の24時間のウィンドウを維持しながら、私のティッカーに集約することでした。これはまだ最良の方法のようには感じられず、カフカのウィンドウイングコンセプトでそれを解決できたと感じるような気がします。

streamBuilder.stream(tickerTopic, Consumed.with(...) 
       .groupByKey() 
       .aggregate(MyAggregator::new, 
         (key, value, aggregate) -> aggregate.addTicker(value), 
         Materialized.with(...) 
       .toStream() 

結果は、元のティッカー・ストリーム内のすべてのレコードに対して、私は私の出力ストリームに集約された値を取得することである:

としては、私は私のアグリゲータと単純集合を使用し、上記述べました。私のアグリゲーターロジックは簡単です:

  • 注文したコレクションに新しいティッカーを追加してください。
  • 24時間以上経過したティッカーは、この最新のティッカーよりも捨ててください。
  • 新しい24時間の変更を計算します。

(この技術は、例えば、所与のウィンドウにわたって計算任意の種類の移動平均を使用することができる)

アグリゲータのサンプルコード:

public class MyAggregator { 

    private BigDecimal change; 

    private TreeSet<Ticker> orderedTickers = new TreeSet<>(MyAggregator::tickerTimeComparator); 

    public MyAggregator() { 
     this.windowMilis = 86400000; 
    } 

    public MyAggregator addTicker(Ticker ticker) { 
     orderedTickers.add(ticker); 
     cleanOldTickers(); 
     change = getLatest().getAsk().subtract(getEarliest().getAsk()); 
     return this; 
    } 

    public BigDecimal getChange() { 
     return change; 
    } 

    public Ticker getEarliest() { 
     return orderedTickers.first(); 
    } 

    public Ticker getLatest() { 
     return orderedTickers.last(); 
    } 

    private void cleanOldTickers() { 
     Date endOfWindow = latestWindow(); 

     Iterator<Ticker> iterator = orderedTickers.iterator(); 
     while(iterator.hasNext()) { 
      Ticker next = iterator.next(); 
      if (next.getTimestamp().before(endOfWindow)) { 
       iterator.remove(); 
      } 
      // The collection is sorted by time so if we get here we can break. 
      break; 
     } 
    } 

    private Date latestWindow() { 
     return new Date(getLatest().getTimestamp().getTime() - windowMilis); 
    } 

    private static int tickerTimeComparator(Ticker t1, Ticker t2) { 
     return t1.getTimestamp().compareTo(t2.getTimestamp()); 
    } 

} 
関連する問題