2017-11-02 8 views
0

DBに処理された後、各レコードを承認するSpring KafkaListenerを使用しています。 DBへの書き込みに問題がある場合、私たちはレコードを承認しないので、オフセットはコンシューマーにコミットされません。これはうまく動作します。次に、次のポーリングで失敗したメッセージを取得して再試行します。 errorhandlerをリスナーに追加し、ConsumerAwareListenerErrorHandlerを呼び出し、失敗したメッセージオフセットに対してconsumer.seek()を実行しようとしました。期待は次の世論調査の間です、我々は失敗したメッセージを受け取るべきです。これは起こっていない。次のポーリングでは、新しいメッセージのみがフェッチされ、失敗したメッセージはフェッチされません。スプリングカフカ:カフカリスナー - 消費者.seek号

@Service 
public class KafkaConsumer { 
     @KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler") 
     public void onMessage(ConsumerRecord<Integer, String> record, 
      Acknowledgment acknowledgment) throws Exception { 

    try { 

     msg = JaxbUtil.convertJsonStringToMsg(record.value()); 
     onHandList = DCMUtil.convertMsgToOnHandDTO(msg); 

     TeradataDAO.updateData(onHandList); 
     acknowledgment.acknowledge(); 
     recordSuccess = true; 

     LOGGER.info("Message Saved in Teradata DB"); 

    } catch (Exception e) { 
     LOGGER.error("Error Processing On Hand Data ", e); 
     recordSuccess = false; 
    } 

} 

    @Bean 
    public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException { 
     return (message, exception, consumer) -> { 
      this.listen3Exception = exception; 
      MessageHeaders headers = message.getHeaders(); 
      consumer.seek(new org.apache.kafka.common.TopicPartition(
          headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), 
          headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), 
          headers.get(KafkaHeaders.OFFSET, Long.class)); 
      return null; 
     }; 
    } 
} 

    Container Class 

    @Bean 
public Map<Object,Object> consumerConfigs() { 
    Map<Object,Object> props = new HashMap<Object,Object>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      localhost:9092); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    return props; 
} 

@SuppressWarnings({ "rawtypes", "unchecked" }) 
@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory(consumerConfigs()); 
} 

@SuppressWarnings("unchecked") 
@Bean 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
kafkaManualAckListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

答えて

0

次のように動作するようになっています:

エラーハンドラは、前の世論調査から追加のレコードを破棄したい場合に例外をスローする必要があります。

エラーを処理しているため、コンテナは何も知らず、引き続きポーリングの残りのレコードでリスナーを呼び出します。

つまり、コンテナはエラーハンドラによってスローされた例外も無視しています(エラーハンドラが例外ではなくErrorをスローすると破棄されます)。私はこれについての問題を開くでしょう。

もう1つの回避策は、リスナーメソッドのシグネチャにConsumerを追加し、そこでシークして例外をスローすることです。エラーハンドラがない場合、残りのバッチは破棄されます。コンテナは何ErrorHandlerを持っていない場合は

修正

は、ListenerErrorHandlerによってスローされたThrowableは残りのレコードが破棄されることになります。

+0

私は私のリスナーへの追加の消費者を試してみましたが、例外@KafkaListener(トピック=( "$ {kafka.input.stream.topic}")、containerFactory = "kafkaManualAckListenerContainerFactory") \t \tます。public voidのonMessage(ConsumerRecordに求めました<整数、文字列>レコード、 \t \t \t \t肯定応答確認、消費者の消費者)は例外{ }キャッチ(例外e){ \t \t \t consumer.seek(新しいorg.apache.kafka.common.TopicPartition(レコードをスロー.topic()、 record.partition())、record.offset()); \t \t \t throw e; nullの代わりにLoggingErrorHandler()というエラーハンドラが表示される – SKumar

+0

コメントにコードを入れないでください。読むのは難しいです。代わりに質問を編集してください。うん。私はhttps://github.com/spring-projects/spring-kafka/issues/470で作業を始めました。次の日に修正される予定です。 –

+0

私は今のところ、ConsumerAwareErrorHandler(コンテナ内で、リスナーエラーハンドラではない)を使用し、 'Error'をスローするしかないと思います。 –

関連する問題