2017-11-16 14 views
0

私はカフカのトピックとデータベースのオフセットを管理したいので、ある時点後にキューで再処理したいと思っています。これをどうやって進めることができますか?前もって感謝します。Kafkaのオフライン管理とローカルデータベース

+0

例を詳しく説明できますか? – shakeel

+0

トピックに既に保存されている場合、なぜそれをデータベースに保存する必要がありますか?次に、同じ 'groupId'でコンシューマを起動すると、Kafkaは適切にそのコンシューマにポーリングを開始するオフセットを割り当てます。 –

答えて

0

PartitionInfoを指定すると、そのパーティションのオフセットに対してコンシューマにseekToBeginningまたはseekと伝えることができます。

ConsumerRecordは、トピック、パーティションおよびオフセットを認識しています。これらの事実をデータベースに記録することができます。

しかし、あなたのトピックが分割されている場合は、ここでキャッチします。あなたのデータは、そのカテゴリの時系列になります。したがって、本質的に姓で2つのパーティションとパーティションがある場合、アルファベットの最初の半分の名前の変更は順次で、後半は順次ですが、名前の変更を1つの時間順に表示する方法は分かりませんシステム全体にわたって

ただし、データベースの特定の変更にパーティションとオフセットを記録した場合は、そのパーティションを探して、そのポイントからストリームをオフセットして再処理することができます。

(あなたはパーティションを1つだけ持っている場合、これは無関係になり、それはあなたのトピックまたはストリーミング・アーキテクチャは、複数のパーティション/を必要とする場合際に考えるべきものだ)理論への実際の質問からバックステッピング

を、私はしませんよなぜあなたがこれをしたいのか本当に確信しています。消費者グループがあなたのコミットされたオフセットをKafka自身に記録するので、ストリーム処理アプリケーションがクラッシュすると心配することなく中断したところからピックアップすることができます。このメッセージは、enable.auto.commit propertyを設定した場合に自動的に発生するか、コンシューマでを呼び出すと手動で制御できます。あるいは、不変のデータストア(Kafka)を変更可能なストアとして使用しようとしていますが、それは、あなたが望むことをやりたい理由を本当に説明していないという事実に基づいて、ちょっとした純粋な推測ですする。

関連する問題