2017-03-24 3 views
3

私はkafkaトピックからの消費にKafkaMessageListenerContainerを使用していますが、他のマイクロサービスにも依存する各レコードを処理するアプリケーションロジックがあります。私は今、各レコードが処理された後、オフセットを手動でコミットしています。Springカフカコンシューマ、実行時にオフセットをシークしますか?

私はアプリケーションロジックが失敗した場合、失敗したオフセットを探し、成功するまで処理を続ける必要があります。そのためには、最後のオフセットの実行時手動検索を行う必要があります。

これはKafkaMessageListenerContainerでまだ可能ですか?

+0

アプリケーションロジックの後にオフセットをコミットしています。したがって、アプリケーション・ロジックに障害が発生した場合にオフセットをコミットしないと、オフセットは先に進まず、同じメッセージを再度処理します。これはあなたの問題を解決しますか? – yaswanth

+1

@yaswanthいいえ、それは私が推測するように動作しません。私はそれをテストするまであなたと同様の前提を持っていました。私はENABLE_AUTO_COMMIT_CONFIGを有効にしました - 偽 とAcknowledgingMessageListenerを持つAbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATEのコンテナプロパティ。 レコード1を送信し、次にレコード2をトピックに追加しました。レコード1 - アプリケーションロジックが失敗しました(私は返信しません)。この時点で、次の消費者アンケートはレコード1を再度取得しますが、レコード2を取得します。 configで固定されたものですparams私に教えてください! – sash

+0

あなたは正しいです!私は今まで偽りの仮定の下にいます。 – yaswanth

答えて

2

Seeking to a Specific Offsetを参照してください。追求するために

、あなたのリスナーは、次の方法がありますConsumerSeekAwareを実装する必要があります。

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

コンテナが開始されたときに最初に呼び出されます;このコールバックは、初期化後の任意の時間にシークするときに使用する必要があります。コールバックへの参照を保存する必要があります。複数のコンテナ(またはConcurrentMessageListenerContainer)で同じリスナーを使用している場合は、コールバックをThreadLocalまたはリスナースレッドによってキーされた他の構造体に格納する必要があります。

+0

それはそれを指摘していただきありがとうございます:) – sash

関連する問題