2017-12-15 2 views
0

現在、私はKafka Streamsを使ってメッセージを処理するプロジェクトに取り組んでいます。私たちのメッセージは、2つの識別子から成ります.1つはuserの識別子で、もう1つはユーザが属するメッセージリストの識別子です。カフカトポロジの消費者オフセットを変更するにはどうすればよいですか?

アプリケーションの話は、クライアントが自分のアプリケーションを介してユーザーの一覧にプッシュメッセージを送信したいということです。したがって、クライアントはプッシュメッセージをユーザーに送信する前に外部ソースからユーザーリストをロードした後、送信を開始します。しかし、クライアントは、いつでも、いつでも送信を停止したいと考えています。

この時点で私の問題が発生しました。カフカからのすべてのメッセージをストリームAPI経由で消費することなく、プッシュメッセージの送信を停止したい。私は最新のものとしてストリームのソースのオフセットの変更を求めたが、それを達成するための適切な方法を見つけることはできない。

どうすればできますか?おかげさまで

+0

達成したいことが理解できません。なぜあなたはオフセットを変更する必要がありますか?あなたは精緻化できますか? –

+0

メッセージを処理せずに、消費されないメッセージはスキップします。私が思う唯一の方法は、ストリームの消費者のオフセットを変更することです。しかし、私はそれを変更するための適切な方法を見つけることができませんでした。私は[seekToEnd](https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection))のような方法を探していますカフカストリーム – AlexiWilius

答えて

1

カフカストリームはこれをサポートしていません。実際にこれを行う必要がある場合は、Kafka Streamsを停止し、bin/kafka-consumer-groups.shを使用して、Kafka Streamsアプリケーション(注:application.id == group.id)を手動で検索することができます。その後、カフカストリームを再開できます。

Btw:今後の1.1リリースでは、bin/kafka-streams-application-reset.shもオフセットを操作できます。また、内部で使用されているクラスStreamsResetterを公開APIにすることもできれば幸いです。これにより、コマンドラインにフォールバックする必要がなく、アプリケーション内でこれをプログラムで行うことができます。

関連する問題