与えられた(ホッピング)時間ウィンドウの長さがT
の場合に、しきい値がN
よりも頻繁に表示されるメッセージをフィルタリングしようとしています。Kafka Streams - 時間ウィンドウに頻繁に表示されるメッセージのフィルタリング
例えば、以下の流れで上記簡略化のみになり、可能でない場合
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
とN=2
とT=3
、結果は、代替的
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
であるべきではしきい値が満たされた後のメッセージのフィルタリング:
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
カフカストリームでこれが可能ですか?
これまでストリームのwindowed count
(インスタンスはKTable
)を作成し、元のストリームに戻してみました。 windowed count
のキーをKTable#toStream((k,v) -> k.key())
を使用してdummy aggregationのインスタンスをKTable
に戻して元のキーに戻します。これは、leftJoin
にスレッシュホールドを超過した後に非常に近いメッセージを見逃す遅延を引き起こすようです。
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
wcount
に各UPSERT
がイベントをトリガーするので、これは、重複出力をもたらす:KStream
(ダミー集合を除外)適切なウィンドウと結合 - 私もKStream
を実行しようとした
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
。
おかげで、必要に応じて、私は(ほとんど)は、この作業を得ました。 [タンブリングウィンドウ](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing)では、新しいタイムウィンドウの開始時のイベントを除いて、これは機能します(関連するイベントが前のウィンドウ)。ホッピング・ウィンドウの場合、複数のウィンドウにイベントの単一トリガー・シーケンスを入れることができるため、出力ストリームに重複があります。そこで、私は、プロセッサAPIとウィンドウ付き永続ストアを使用してソリューションを実装することを選択しました。 –