2017-07-13 6 views
0

毎秒1データポイントからなるストリーム "ストリーム-1"があり、ホッピングウィンドウを使用して合計を含む派生ストリーム "ストリーム-5"を計算したいとします10秒のホッピング窓を使用して合計を含む「ストリーム-5」に基づいた別のストリーム「ストリーム-10」とを含む。集約は各キーごとに別々に行う必要があり、私は別のプロセスで各ステップを実行できるようにしたいと考えています。最後の値が正しい限り、stream-5とstream-10に同じキー/タイムスタンプの更新が含まれているので(つまり、必ずしもHow to send final kafka-streams aggregation result of a time windowed KTable?は必要ありません)、それ自体は問題ありません。カフカストリームDSLを用いた2ステップウインドウ付き集計

高水準のカフカストリームDSLを使用してこれを解決する方法はありますか?これまでは、集約のためにstream-5で生成された中間更新を処理するためのエレガントな方法を見ていませんでした。

cache.max.bytes.bufferingcommit.interval.msの設定で中間更新が何とか制御できることは知っていますが、どのような設定でも中間値が生成されないことを保証できる設定はないと思います。また、 "ストリーム5"をキーのタイムスタンプ部分で読み込んだKTableに変換しようとしましたが、KTableのようなウィンドウ操作をサポートしていないようです。

これは私がこれまで持っているもので、ストリーム-1はintputs(キーがちょうどKEYである)

KEY {"timestamp":0,"value":1.0} 
KEY {"timestamp":1000,"value":1.0} 
KEY {"timestamp":2000,"value":1.0} 
KEY {"timestamp":3000,"value":1.0} 
KEY {"timestamp":4000,"value":1.0} 
KEY {"timestamp":5000,"value":1.0} 
KEY {"timestamp":6000,"value":1.0} 
KEY {"timestamp":7000,"value":1.0} 
KEY {"timestamp":8000,"value":1.0} 
KEY {"timestamp":9000,"value":1.0} 
が含まれている場合に起因するストリーム-5

Reducer<DataPoint> sum = new Reducer<DataPoint>() {                   
    @Override                             
    public DataPoint apply(DataPoint x, DataPoint y) {                   
     return new DataPoint(x.timestamp, x.value + y.value);                 
    }                               
};                               

KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
      KeyValueMapper<Windowed<String>, DataPoint, String>() {  
     @Override                            
     public String apply(Windowed<String> wKey, DataPoint arg1) {                
      return wKey.key();                         
     }                              
};                               

KStream<String, DataPoint> s1 = builder.stream("stream-1");                  

s1.groupByKey()                            
     .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                  
     .toStream()                            
     .selectKey(strip)                          
     .to("stream-5");                           

KStream<String, DataPoint> s5 = builder.stream("stream-5");                  

s5.groupByKey()                            
     .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                 
     .toStream()                            
     .selectKey(strip)                          
     .to("stream-10");  

上の中間集計値に失敗しました

ストリーム5が正しい(最終)値が含ま:

KEY {"timestamp":0,"value":1.0} 
KEY {"timestamp":0,"value":2.0} 
KEY {"timestamp":0,"value":3.0} 
KEY {"timestamp":0,"value":4.0} 
KEY {"timestamp":0,"value":5.0} 
KEY {"timestamp":5000,"value":1.0} 
KEY {"timestamp":5000,"value":2.0} 
KEY {"timestamp":5000,"value":3.0} 
KEY {"timestamp":5000,"value":4.0} 
KEY {"timestamp":5000,"value":5.0} 

が、ストリーム10は、間違った(最終的な値であります

KEY {"timestamp":0,"value":1.0} 
KEY {"timestamp":0,"value":3.0} 
KEY {"timestamp":0,"value":6.0} 
KEY {"timestamp":0,"value":10.0} 
KEY {"timestamp":0,"value":15.0} 
KEY {"timestamp":0,"value":21.0} 
KEY {"timestamp":0,"value":28.0} 
KEY {"timestamp":0,"value":36.0} 
KEY {"timestamp":0,"value":45.0} 
KEY {"timestamp":0,"value":55.0} 

答えて

0

問題は、すべての集計の結果は、それらの出力トピックを生成レコードがchanglogを表す意味し、KTablesであることである:それはまた、ストリーム5のアカウントに中間値をとるため)10.0であるべきです。しかし、後でそれらをストリームとしてロードすると、ダウンストリーム集約は二重にカウントされます。

代わりに、中間トピックをストリームではなくテーブルとしてロードする必要があります。ただし、ストリームでしか使用できないため、ウィンドウ化された集計を使用することはできません。

あなたは、ストリームの代わりに、テーブルの上にウィンドウ凝集を達成するために、次のパターンを使用することができます。

https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows

あなたはそれを適応させることができ、別のプロセスの各ステップを実行したい場合は、単にロードすることを忘れないでください中間テーブルはbuilder.table()を使用し、builder.stream()は使用しません。

+0

リンクありがとうございます。そのアプローチで州の店舗サイズが無限に拡大するのを防ぐ方法の更新を楽しみにしています。 – veryltdbeard

関連する問題