同様に、この質問と同じように少し異なります:KStream batch process windows、私はKStream
からのメッセージを消費者にプッシュダウンする前にバッチしたいと思います。KStreamを固定サイズのリストに集めるには?
ただし、このプッシュダウンは、固定された時間枠でスケジュールするのではなく、キーごとに固定メッセージ数のしきい値に設定する必要があります。まず第2質問について
が頭に浮かぶ:
1)これは、処理されるべき方法カスタムAbstractProcessor
ですか?線に沿って何か:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2)StateStore
が潜在的に「ごみコレクト」は、このための最良の方法は何か、(場合にエントリ値を転送するために、しきい値に達することはありません)爆発するので、 ?私はタイムベースのスケジュールを実行し、古すぎるキーを削除することができます...しかし、それは非常にDIYとエラーが発生しやすいようです。
これらの貴重なご意見ありがとうございます。より具体的な質問は実装に続くだろう、間違いない。 – Raf