2017-12-09 6 views
0

私は、Kafkaトピックから消費されたデータを集約するためにApache Kafkaストリーミングを使用しています。集約は、それ自体が消費され、結果がDBに格納された別のトピックにシリアル化されます。私が推測するようなかなり古典的な使用例。Apache KafkaストリーミングKTableの変更ログ

集計コールの結果は、Kafkaの変更履歴「トピック」によってバックアップされたKTableを作成しています。

これは実際にはそれよりも複雑ですが、(平均を計算する)のは、それが与えられたキーのイベントの数との和を記憶しているとしましょう:

KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...) 

「トピック」をchangelogのように見えていません保持期間が設定されています(私のグローバルな保存設定ごとに他のトピックとは反対に「期限切れ」と表示されません)。

これは、将来のイベントに同じキーが含まれている場合に集約状態が失われないようにするために、実際には必要です。

しかし、長期的に言えば、これは、この変更ログは(より多くのキーが入ってくるので)永遠に成長することを意味しますか?私は潜在的に多くのキーを持っています(そして、私の集計はカウント/サムのように小さくありません)。

私は、特定のキーのイベントを取得しないことを知る手段があります(いくつかのイベントは「最終」とマークされています)。これらの特定のキーの集約状態を取り除く方法はありますか?私がもうそれらを必要としないように、それが永遠に成長するのを避けるための変更ログは、場合によっては「ちょっと」遅れている可能性があります。

この「問題」を回避するために、カフカストリーミングとまったく異なる方法がありますか?

+0

私はtombstoneメッセージについて読んだばかりですが、keyはそれらを削除できるnullメッセージです。まだテストする必要があります。とにかく正しいパターンであることにはまだ興味があります。 – Christophe

+0

はい:変更ログトピックはログ圧縮で構成され、保持時間では構成されません。 「最終」レコードを受け取った場合、集約は集約結果として単に「null」を返すことができます。これにより、ローカルのRocksDBストアおよび基になるchangelogのトピックから削除されます。 –

+1

Matthiasさん、ありがとうございます。私はテストして、すべてが "最終"レコードに到達するとnullを返すように期待通りに進むことを確認しました。 – Christophe

答えて

1

はい:変更ログのトピックは、ログ圧縮で構成され、保持時間では構成されません。 「最終」レコードを受け取った場合、集計は集計結果としてnullを返します。これにより、ローカルのRocksDBストアおよび基になるchangelogのトピックから削除されます。

関連する問題