2017-12-13 10 views
0

カフカのトピックからメッセージのリストを読み込み、RESTサービスに送信するバネカフカバッチリスナーを実装しています。 RESTサービスが停止した場合のオフセット管理について理解したいと思います。バッチのオフセットはコミットしないでください。メッセージは次のポーリングのために処理する必要があります。私は春のカフカのドキュメントを読んだが、リスナーエラーハンドラと現在のコンテナエラーハンドラのシークをバッチで理解するのは混乱している。私はspring-boot-2.0.0.M7バージョンを使用しています。以下は私のコードです。バグkafkaバッチリスナー - バッチで手動でオフセットをコミットする

Listener Config: 

@Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 

     factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency"))); 
     // factory.getContainerProperties().setPollTimeout(3000); 
     factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler()); 

     factory.getContainerProperties().setAckMode(AckMode.BATCH); 
     factory.setBatchListener(true); 
     return factory; 
    } 
@Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers")); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
       env.getProperty("spring.kafka.consumer.enable-auto-commit")); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
       env.getProperty("spring.kafka.consumer.auto-commit-interval")); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout")); 
     propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id")); 
     return propsMap; 
    } 

Listener Class: 

@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory") 
    public void listen(List<String> payloadList) throws Exception { 
     if (payloadList.size() > 0) 
      //Post to the service 
    } 

Kafka Error Handler: 

public class KafkaErrorHandler implements BatchErrorHandler { 

    private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class); 

    @Override 
    public void handle(Exception thrownException, ConsumerRecords<?, ?> data) { 
     LOGGER.info("Exception occured while processing::" + thrownException.getMessage()); 

      } 

} 

レコードのバッチ処理中に何かが発生した場合、データが失われないように、カフカリスナーを処理する方法。

答えて

0

Apache Kafkaでは、決してデータを失いません。パーティションログには、任意の位置にシークするためのオフセットがあります。

一方、パーティションからレコードを消費する場合、オフセットをコミットする必要はありません。現在のコンシューマはメモリ内の状態を保持しています。同じグループ内の他の新しい消費者に対してのみ、現在のものが死んでいる場合にコミットする必要があります。エラーとは無関係に、現在のコンシューマは常に現在のインメモリオフセットの後ろで新しいデータをポーリングするように移動します。

同じ消費者で同じデータを再処理するには、消費者を希望の位置に戻すには、必ずseek操作を使用する必要があります。それは春カフカがSeekToCurrentErrorHandlerを紹介する方法は次のとおりです。

これは、現在のレコード(および残りの他は)次のポーリングによって取得されるように実装は、すべての未処理のトピック/パーティションを追求することができます。 SeekToCurrentErrorHandlerはこれを正確に行います。

https://docs.spring.io/spring-kafka/reference/htmlsingle/#_seek_to_current_container_error_handlers

+0

もSeekToCurrentErrorHandlerハンドルバッチ操作をしていますか?なぜならSeekToCurrentBatchErrorHandlerクラスはorg.springframework.kafka.listenerパッケージで利用できないからです。また、エラーハンドラをseektocurrenterrorhandlerに設定すると、どのようにロギングが発生するのか教えてください。 – user8363477

+0

'SeekToCurrentBatchErrorHandler'が1つありますが、' 2.1'以降は既にhttps://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#_seek_to_current_container_error_handlersです。コードを自分のクラスにアップグレードまたはコピー/貼り付けできます –

関連する問題