2017-12-15 12 views
0

レッツは、私は2つのストリームを持っていると言いますTimeWindowで指定された時間範囲内にある数値の合計を含むTimeWindowオブジェクトが出力に含まれるようにストリームを結合しますか?
具体的には、持続時間ストアのどこにXXXを設定しますか?win.getDuration()ここで、が勝利はValueJoinerで参照されるものです。JoinWindows期間がストリームの1つのオブジェクトに格納されている2つのKafka KStreamを結合することは可能ですか?</li> </ol> <p>は、ユーザのDSL APIのいずれかまたはProcess APIへのそれは可能です(開始時間と終了時間)(タイムスタンプ付き)</li> <li>番号を</p> <ol> <li>TimeWindow:

timeWindow.join( 
    numbers, 
    (ValueJoiner<TimeWindow, Number, TimeWindow>) (win, num) -> win.addToTotal(num), 
    new JoinWindows(XXX, 0) 
).to("output_Topic"); 

TimeWindowのタイムスタンプはendtimeなので、JoinWindows afterは0です。 XXX期間はTimeWindows終了時刻として計算する必要があります。開始時間はミリ秒単位です。

何か助けてくれてありがとう!

+0

DSLを使用することはできませんが、プロセッサAPIを使用すると何かを構築できます:)しかし、SOが提供できる範囲を超えてどのようにアドバイスしますか。それはかなり深い質問です。カスタムストアを使用し、結合を計算する独自の 'Processor'を書く必要があります... –

答えて

0

Matthiasのおかげで、TimestampExtractorsの実装とMemory State Store(デフォルトではRockDBを使用)の実装でProcessor APIを使用してこの機能を実装しました。