1

カフカコンシューマを構築しています。私は以下のような回復コールバックを設定しました。手動コミットを有効にしました。回復コールバックメソッドのメッセージを確認して、遅延がないようにするにはどうすればよいですか。Spring Kafkaコンシューマ - リカバリコールバックメカニズム付き手動コミット

@Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConcurrency(conncurrency); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setRetryTemplate(retryTemplate()); 
       factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      return null; 
     } 
    }); 
     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

答えて

2

あなたの質問は広すぎます。より具体的にする必要があります。

消費エラー中に再試行する場合に実行できることは、フレームワークには何も想定されていません。

私はあなたがそのすべてでRecoveryCallbackとどのようにそれが動作であるかを理解するためにSpring Retryプロジェクトから開始すべきだと思う:

テンプレートを中止することを決定する前に、ビジネス・ロジックが成功しない場合は、クライアントでありますリカバリコールバックを使用していくつかの代替処理を実行する機会が与えられました。

RetryContextがあります

/** 
* Accessor for the exception object that caused the current retry. 
* 
* @return the last exception that caused a retry, or possibly null. It will be null 
* if this is the first attempt, but also if the enclosing policy decides not to 
* provide it (e.g. because of concerns about memory usage). 
*/ 
Throwable getLastThrowable(); 

はまた春カフカはRecoveryCallbackに対処することRetryContextに追加の属性を取り込みますhttps://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

RecoveryCallback意志に渡さRetryContextの内容リスナーのタイプによって異なります。コンテキストには、常にエラーが発生したレコードの属性recordがあります。リスナーが認識しているか、消費者が認識している場合は、追加の属性acknowledgmentおよび/またはconsumerが利用可能になります。便宜上、RetryingAcknowledgingMessageListenerAdapterはこれらのキーに静的定数を提供します。詳細については、javadocsを参照してください。

+0

ありがとうございましたArtem。最後のステートメントに基づいて、retrycontextから確認応答属性を取得できると仮定します。どのようにこれを取得することができますか? –

+0

'(謝辞)retryContext.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –

+0

ありがとう! –

関連する問題