私は、Kafkaトピックから消費されたデータを集約するためにApache Kafkaストリーミングを使用しています。集約は、それ自体が消費され、結果がDBに格納された別のトピックにシリアル化されます。私が推測するようなかなり古典的な使用例。Apache KafkaストリーミングKTableの変更ログ
集計コールの結果は、Kafkaの変更履歴「トピック」によってバックアップされたKTableを作成しています。
これは実際にはそれよりも複雑ですが、(平均を計算する)のは、それが与えられたキーのイベントの数との和を記憶しているとしましょう:
KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...)
「トピック」をchangelogのように見えていません保持期間が設定されています(私のグローバルな保存設定ごとに他のトピックとは反対に「期限切れ」と表示されません)。
これは、将来のイベントに同じキーが含まれている場合に集約状態が失われないようにするために、実際には必要です。
しかし、長期的に言えば、これは、この変更ログは(より多くのキーが入ってくるので)永遠に成長することを意味しますか?私は潜在的に多くのキーを持っています(そして、私の集計はカウント/サムのように小さくありません)。
私は、特定のキーのイベントを取得しないことを知る手段があります(いくつかのイベントは「最終」とマークされています)。これらの特定のキーの集約状態を取り除く方法はありますか?私がもうそれらを必要としないように、それが永遠に成長するのを避けるための変更ログは、場合によっては「ちょっと」遅れている可能性があります。
この「問題」を回避するために、カフカストリーミングとまったく異なる方法がありますか?
私はtombstoneメッセージについて読んだばかりですが、keyはそれらを削除できるnullメッセージです。まだテストする必要があります。とにかく正しいパターンであることにはまだ興味があります。 – Christophe
はい:変更ログトピックはログ圧縮で構成され、保持時間では構成されません。 「最終」レコードを受け取った場合、集約は集約結果として単に「null」を返すことができます。これにより、ローカルのRocksDBストアおよび基になるchangelogのトピックから削除されます。 –
Matthiasさん、ありがとうございます。私はテストして、すべてが "最終"レコードに到達するとnullを返すように期待通りに進むことを確認しました。 – Christophe