2017-01-04 6 views
3

を「失う」ことなく、@ KafkaListener-メソッド内でアクノリッジ:基本的に我々は現在、次の簡略化のメカニズムと基本的にメッセージを確認メッセージ

@KafkaListener(topics = "someTopic") 
    public void listen(final String message, final Acknowledgment ack) { 
    try { 
     processMessage(message); 
     ack.acknowledge(); 
    } catch (final IOException e) { 
     // do not acknowledge here since we can temporarily not process the message 
    } 

、我々は我々が欲しい(IOExceptionsの例で)一時的にメッセージを処理することができない時はいつでも後でそれを受け取ることができます。

しかし、これは、同じパーティション内のすべての以前のメッセージが正常に処理されたと認識したため、これは機能しません。 IOExceptionの場合、失敗したメッセージはスキップされますが、同じパーティション上のより高いインデックスを持つ別のメッセージによって確認される可能性があります。

これを修正する方法はいくつかありますが、これはKafkaListenerメソッド内で確認応答を直接呼び出さないようにするための厄介な回避策です。私たちのユースケースは非常に特殊なものか、それともkafkaユーザーが想定する「デフォルト」の動作に似ていないのでしょうか?

この種の問題にはスプリングカフカのソリューションがありますか?あるいは、これを「正しく」解決するアイデアはありますか?

答えて

2

これはちょうどカフカが動作する方法です。次のメッセージに移動する前に、IOExceptionを超えて再試行できるようにすることができます。

エラーハンドリングを構成して、後で再生するために別のトピックに失敗したメッセージを公開することができます。または、エラーハンドラは、新しい配送を防ぐためにコンテナを停止することができます。

コンテナを再起動すると、メッセージが再生されます。

+0

クイックアンサーに感謝します。私は、次のメッセージに移る前に再試行を使うことが、私たちのユースケースにとって最善の解決策であると考えています。 –

関連する問題