2017-08-14 9 views
0

カフカストリーム処理は、トランザクション処理のために当社のシステムで実装されています。解決策は以下のように実装されます。ストリーム処理後のカフカトピックからのメッセージの削除

カフカプロデューサはイベントをカフカトピックにパブリッシュし、ストリームプロセッサは入力イベントを処理して集約操作を実行します。ストリーム処理の後、イベントは別のトピックに公開されます。最初のトピックでコンシューマーが実装されていないため、最初のトピックから処理されたメッセージを削除するにはどうすればよいですか。

答えて

2

ストリーム処理チェーンが最初のトピックのコンシューマーであることを考慮してください。元のデータを何らかの理由で再処理する必要がある場合(たとえば、ストリーム処理ロジックにバグがあることがわかっている場合など)は、元のメッセージを処理した後でも最初のトピックで利用できるようにすることができます。

メッセージを削除する必要はありません。必要に応じて、そのトピックの保存ポリシーを設定する必要があります。トレードオフは、通常、データが使用可能な時間と必要なストレージの量です。

1

kafkaからメッセージを手動で削除する方法はありません(ディスク上のデータをハッキングしない、AFAIK)。

  • 使用時間ベースの保持ポリシー(たとえばカフカは自動的に1時間より古いすべてのメッセージを削除してみましょう)

  • 使用のストレージベースの保持ポリシー(カフカがに話題のサイズを維持しましょう:あなただけの3つのオプションを持っていますいくつかの事前定義された値)

  • トピックコンパクションポリシーを使用する - kafkaに最新のバージョンのキーを保存させます。古いバージョンのキーはすべて削除(圧縮)されます。

既にLuciano Afranllieが記述しているように、手動でメッセージを削除する必要はありません。あなたはメッセージを処理して、カフカにあなたのポリシーに従ってトピックを管理させることができます。

0

このユースケースにこの機能を正確に追加するKafka改善提案(KIP)があります。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient

現時点ではメッセージの削除を行うには、すべてのScalaのコードは、Java AdminClientのAPIでこの機能をただし加えて0.11カフカにあり、

https://github.com/apache/kafka/pull/2476

を動作するようにテストされていますドキュメントはまだ完成していません。

関連する問題