2016-05-17 13 views
1

spring-cloud-stream 1.0.0.RELEASEのドキュメントを見てきましたが、エラー処理に関するドキュメントが見つからないようです。spring-cloud-stream kafkaエラー処理

私の消費者がRuntimeExceptionをスローすると、kafka 0.9での観測に基づいて、3回の再試行があります。 3回の再試行の後、私はログでこれを参照してください。

2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler  : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]] 

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message 

この時点で、消費者は1で遅れを相殺し、私は消費者を再起動した場合、メッセージは再び3回再試行されます。しかし、コンシューマが例外をスローしないように別のメッセージを同じパーティションに送信すると、コンシューマオフセットが更新され、例外がスローされた元のメッセージは再起動後もう一度リトライされません。

これは私が見つけられなかったどこかで文書化されていますか?エラー処理はバインダー固有であるか、バインダー間で一貫していることを抽象化していますか?私は、カフカバインダーでコンシューマーオフセットがどのように更新されるかは、計画外の結果であると考えています。私はenableDlq kafkaコンシューマープロパティーが追加されていることを確認していますが、そのことをテストしようとしていますが、カフカの死んだ文字をどのように扱うことができないのか分かりません。私はrabbitmqのデッドレターキューに精通していますが、rabbitmqでは、一時的なサービス停止による障害をカバーするために、rabbitmq shovelプラグインを使用してdlqメッセージを再発行して再試行することができます。私はkafkaで利用可能な同様の機能を認識していません。

UPDATE:enableDlq kafkaコンシューマプロパティを有効にしてテストすると、エラー処理で同じコンシューマオフセットの問題が発生します。コンシューマがRuntimeExceptionをスローすると、3回の再試行が行われた後、エラーメッセージが記録されず、error.<destination>.<group>に記載されたメッセージが表示されますが、コンシューマオフセットは更新されず、1だけ遅れます。元のトピックパーティションから同じ失敗したメッセージを再度処理し、3回再試行して同じメッセージをerror.<destination>.<group>トピックに再度入れます(dlqメッセージを複製します)。コンシューマがRuntimeExceptionをスローしない同じトピックパーティションに別のメッセージを公開すると、オフセットが更新され、元の失敗したメッセージは再起動時に再試行されなくなります。

enableDlqがtrueであるかどうかにかかわらず、コンシューマが消費者がエラーをスローすると、消費者はkafkaのコンシューマオフセットを更新する必要があります。そうすれば、すべての再試行に失敗したメッセージは破棄されるか(enableDlqがfalseの場合)、またはdlqに公開され、再試行されない(enableDlqが真の場合)。

答えて

1

私のバグのように見えます - リスナーコンテナはバインダーによって公開されていない(または設定されていない)autoCommitOnError(デフォルトではfalse)というプロパティを持っています。 booleanがtrueの場合、エラーハンドラ(エラーをパブリッシュする)を呼び出した後、オフセットがコミットされます。

githubで問題として報告してください。

+0

ご確認いただきありがとうございます。 https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00