2017-05-04 14 views
2

私はSpring Boot AppでKafkaListenerインターフェイスを使用していますが、これは正常に動作します。オフセットはカフカ自身によって保存されます。@ KafkaListenerアノテーションを使用する際にオフセットをリセットする標準的な方法はありますか?

トピックのコンシューマーの1人が新しいバージョンをデプロイし、2時間分のメッセージをスクリューアップするとします。その後、彼らはアプリを修正し、2時間前からのオフセットで新しいバージョンを開始したい。

私は以前のconsumer.offsetsForTimes()呼び出しでconsumer.seek()を使用できましたが、Spring KafkaListenerを使用しているときではなく、ポーリングメカニズムを使用しているときにはそれが困難です。

私はConsumerRebalanceListenerを使用してみましたが、私は消費者がConsumerRebalanceListenerをインスタンス化する際に、私は持っていないもの、アクティブにする必要があるオフセットを設定する...

私は、ポーリングに切り替える必要がありますか? 私のKafkaListenerが最初に呼び出される前に、コンシューマにパーティションが割り当てられた後で、コンシューマに連絡することはできますか?

答えて

1

同時にではありません。今後の2.0 releaseにはConsumerAwareMessageListenerが導入されており、を引数として@KafkaListenerメソッドに渡すことをサポートしています。

+0

このような迅速な対応をありがとうございます。 しかし、説明されているシナリオを処理するために現在提案されている方法は何ですか?多分再生ログプロデューサーですか? (https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-ReplayLogProducer) - しかし、範囲パラメータなどのオフセットを渡すことができないので、アプリケーションを持つことができます一時的に別の話題を処理する。 – smlgbl

+0

「ConsumerSeekAware」がどうにか役立つかどうか疑問に思う:http://docs.spring.io/spring-kafka/reference/html/_reference.html#seek。そして、 'ConsumerSeekAware.onPartitionsAssigned()'で使用するために処理されたメッセージに 'offset'を格納する必要があるかもしれません。 –

+0

' ConsumerSeekAware'を実装して任意のシークを可能にしますが、コンテナアイドルイベントが発行されたときに、メッセージ配信中またはアプリケーションリスナ内で呼び出されます。しかし、 'offsetsForTimes'メソッドにアクセスする方法はありませんので、シークポイントを決定する他の方法が必要です。 –

関連する問題