2017-11-15 4 views
0

私はカフカのメッセージを消費するためにSpringベースのプロジェクトで自然な選択と思われたので、プロジェクトでSpring Kafkaを使用しています。メッセージを消費するには、MessageListenerインターフェイスを使用できます。 Spring Kafkaは内部的に、新しいメッセージごとに私のonMessageメソッドを呼び出すように気を付けます。Spring Kafka: `onMessage`を使用して通知するのではなく、新しいメッセージをポーリングする

私の設定では、新しいメッセージを明示的にポーリングして、順番に(数秒かかる)作業することを好みます。回避策として、onMessageの実装をブロックするか、メッセージを内部的にバッファすることができます。しかし、これはSpring Kafkaのコアアイデアに反しているようです。

Kafkaは、消費者が自分の要件に合致する新しいメッセージをポーリングするように設計されています。 Spring Kafkaでこの「自然な」ワークフローを利用する方法はありますか?

このケースでは、Spring Kafkaを使用しないでください。

KafkaConsumer documentation状態:メッセージの処理時間が予測できない変化ユースケースについて

、 これらのオプションのいずれも十分であり得ます。 がメッセージ処理を別のスレッド に移動することで、プロセッサ がまだ動作している間にコンシューマがポーリングを継続できるようにすることをお勧めします。コミットされた のオフセットが実際の位置よりも先に進まないように注意する必要があります。一般的に、 のレコードの処理が完了した後にのみ、自動コミットを無効にし、処理済みオフセットを手動でコミットする必要があります( 必要な配信のセマンティクスに応じて)。 は、スレッドが以前に返されたものを処理し終えるまで、ポーリング から新しいレコードを受け取らないようにパーティションを一時停止する必要があることにも注意してください。

関連問題:https://github.com/spring-projects/spring-kafka/issues/195

+0

私は 'AcknowledgingConsumerAwareMessageListener'を使用して、あまりにも多くのメッセージが積み重ねられたときにコンシューマーを一時停止させるかもしれません。 –

答えて

1

消費者は今(KIP-62によって0.10.1.xに)解決されているので、それはあなたドン限り、問題は(これ以上はないポーリングを維持することの問題既定値は5分ですが、増加させることができます)。あなた自身をポーリングしたい場合

しかし、あなたはまだ使用することができ、スプリング・カフカ(例えば、あなたがブートを使用している場合は、春のブート自動構成良さを得るために)、しかし、あなたはDefaultKafkaConsumerFactoryからConsumerと直接poll()それを得ることができます。

+0

これは有効な回避策であり、upvoteを獲得しました。しかし、「プッシュ」モデルが「プル」技術のために強制されるため、(消費者として)Spring Kafkaをどのように使用すべきかという問題がまだあります。私はこれが設計によるものだと思うけど、私の場合、これはSpring Kafkaを奇妙な/使用不可能にする。 –

+1

私が言ったように、プッシュモデルは__を "強制"していないので、単に 'consumer.poll()'を直接使うことができます。フレームワークから期待していることがあれば、GitHubの問題を開いてください。 –

関連する問題