私はカフカのメッセージを消費するためにSpringベースのプロジェクトで自然な選択と思われたので、プロジェクトでSpring Kafkaを使用しています。メッセージを消費するには、MessageListener
インターフェイスを使用できます。 Spring Kafkaは内部的に、新しいメッセージごとに私のonMessage
メソッドを呼び出すように気を付けます。Spring Kafka: `onMessage`を使用して通知するのではなく、新しいメッセージをポーリングする
私の設定では、新しいメッセージを明示的にポーリングして、順番に(数秒かかる)作業することを好みます。回避策として、onMessage
の実装をブロックするか、メッセージを内部的にバッファすることができます。しかし、これはSpring Kafkaのコアアイデアに反しているようです。
Kafkaは、消費者が自分の要件に合致する新しいメッセージをポーリングするように設計されています。 Spring Kafkaでこの「自然な」ワークフローを利用する方法はありますか?
このケースでは、Spring Kafkaを使用しないでください。
KafkaConsumer
documentation状態:メッセージの処理時間が予測できない変化ユースケースについて
、 これらのオプションのいずれも十分であり得ます。 がメッセージ処理を別のスレッド に移動することで、プロセッサ がまだ動作している間にコンシューマがポーリングを継続できるようにすることをお勧めします。コミットされた のオフセットが実際の位置よりも先に進まないように注意する必要があります。一般的に、 のレコードの処理が完了した後にのみ、自動コミットを無効にし、処理済みオフセットを手動でコミットする必要があります( 必要な配信のセマンティクスに応じて)。 は、スレッドが以前に返されたものを処理し終えるまで、ポーリング から新しいレコードを受け取らないようにパーティションを一時停止する必要があることにも注意してください。
関連問題:https://github.com/spring-projects/spring-kafka/issues/195
私は 'AcknowledgingConsumerAwareMessageListener'を使用して、あまりにも多くのメッセージが積み重ねられたときにコンシューマーを一時停止させるかもしれません。 –