2017-10-19 12 views
0

与えられた(ホッピング)時間ウィンドウの長さがTの場合に、しきい値がNよりも頻繁に表示されるメッセージをフィルタリングしようとしています。Kafka Streams - 時間ウィンドウに頻繁に表示されるメッセージのフィルタリング

例えば、以下の流れで上記簡略化のみになり、可能でない場合

#time, key 
0, A 
1, B 
2, A 
3, C 
4, D 
5, A 
6, B 
7, C 
8, C 
9, D 
10, A 
11, D 
12, D 
13, D 
14, D 
15, D 

N=2T=3、結果は、代替的

0, A 
2, A 
7, C 
8, C 
9, D 
11, D 
12, D 
13, D 
14, D 
15, D 

であるべきではしきい値が満たされた後のメッセージのフィルタリング:

#time, key 
2, A 
8, C 
11, D 
12, D 
13, D 
14, D 
15, D 

カフカストリームでこれが可能ですか?

これまでストリームのwindowed count(インスタンスはKTable)を作成し、元のストリームに戻してみました。 windowed countのキーをKTable#toStream((k,v) -> k.key())を使用してdummy aggregationのインスタンスをKTableに戻して元のキーに戻します。これは、leftJoinにスレッシュホールドを超過した後に非常に近いメッセージを見逃す遅延を引き起こすようです。

source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

wcountに各UPSERTがイベントをトリガーするので、これは、重複出力をもたらす:KStream(ダミー集合を除外)適切なウィンドウと結合 - 私もKStreamを実行しようとした

final Serde<String> stringSerde = Serdes.String(); 
    final Serde<Long> longSerde = Serdes.Long(); 

    KStream<String, Long> wcount = source.groupByKey() 
      .count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts") 
      .toStream((k,v) -> k.key()); 

    // perform dummy aggregation to get KTable 
    KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde) 
       .reduce((aggValue, newValue) -> newValue, 
       "dummy-aggregation-store"); 

    // left join and filter with threshold N=1 
    source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

答えて

1

。リスト内のすべての生データを収集するウィンドウ集約を適用することができます(つまり、ウィンドウを手動で実現します)。その後、ウィンドウを評価するflatMapを適用します。しきい値がまだ満たされていない場合は、何も出さない。しきい値が初めて満たされた場合は、すべてバッファされたデータを送信します。フラットマップの値がしきい値より大きい場合は、リスト内の最新のものだけを出力します(以前はflatMapの呼び出しではなく、新しく追加されたものだけを出力していました)。

注:KTableキャッシュを無効にする必要があり、すなわち、それ以外の場合は、アルゴリズムが正しく動作しませんconfigパラメータ「cache.max.bytes.buffering」= 0を設定します。

このような何か:

KStream<Windowed<K>, List<V>> windows = stream.groupByKey() 
               .aggregate(
               /*init with empty list*/, 
               /*add value to list in agg*/, 
               TimeWindows.of()...), 
               ...) 
               .toStream(); 
KStream<K,V> thresholdMetStream = windows.flatMap(
              /* if List#size < threshold 
               then return empty-list, ie, nothing 
               elseif List#size == threshold 
               then return whole list 
               else [List#size > threshold] 
               then return last element from list 
              */); 
+1

おかげで、必要に応じて、私は(ほとんど)は、この作業を得ました。 [タンブリングウィンドウ](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing)では、新しいタイムウィンドウの開始時のイベントを除いて、これは機能します(関連するイベントが前のウィンドウ)。ホッピング・ウィンドウの場合、複数のウィンドウにイベントの単一トリガー・シーケンスを入れることができるため、出力ストリームに重複があります。そこで、私は、プロセッサAPIとウィンドウ付き永続ストアを使用してソリューションを実装することを選択しました。 –

関連する問題