2017-05-15 11 views
1

私たちはメッセージブローカーソリューションとしてKafkaを実装しようとしています。私たちはSpring BootマイクロサービスをIBM BLuemixに導入しています。内部メッセージブローカーの実装はKafkaバージョン0.10です。私の経験はJMS、ActiveMQの方が多いので、Javaのコンシューマのシステムレベルのエラーを処理するにはどうすればよいでしょうか?ここでApache Kafkaシステムエラー処理

は、我々が現在

消費者の特性

enable.auto.commit=false 
auto.offset.reset=latest 

我々は

消費者

max.partition.fetch.bytes 
session.timeout.ms 

カフカのデフォルトのプロパティを使用している、それを実装している方法です

トピックごとに3つのスレッド、つまりすべてが同じgroupId(スレッドごとに1つのKafkaConsumerインスタンス)を持つようにスピンアップしています。現在のところ、パーティションは1つしかありません。消費者のコードは、スレッドクラス

kafkaConsumer = new KafkaConsumer<String, String>(properties); 

    final List<String> topicList = new ArrayList<String>(); 
    topicList.add(properties.getTopic()); 

    kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() { 

     @Override 
     public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { 
     } 

     @Override 
     public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { 
      try { 
       logger.info("Partitions assigned, consumer seeking to end."); 

       for (final TopicPartition partition : partitions) { 
        final long position = kafkaConsumer.position(partition); 
        logger.info("current Position: " + position); 

        logger.info("Seeking to end..."); 
        kafkaConsumer.seekToEnd(Arrays.asList(partition)); 
        logger.info("Seek from the current position: " + kafkaConsumer.position(partition)); 
        kafkaConsumer.seek(partition, position); 
       } 
       logger.info("Consumer can now begin consuming messages."); 
      } catch (final Exception e) { 
       logger.error("Consumer can now begin consuming messages."); 
      } 

     } 
    }); 

実際の読み取りを処理するサービスクラスがあるあなたはEventProcessorに気づくでしょう

try { 
      // Poll on the Kafka consumer every second. 
      final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); 


      // Iterate through all the messages received and print their 
      // content. 
      for (final TopicPartition partition : records.partitions()) { 

       final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
       logger.info("consumer is alive and is processing "+ partitionRecords.size() +" records"); 
       for (final ConsumerRecord<String, String> record : partitionRecords) { 
        logger.info("processing topic "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset()); 

        final Class<? extends Event> resourceClass = eventProcessors.getResourceClass(); 
        final Object obj = converter.convertToObject(record.value(), resourceClass); 
        if (obj != null) { 
         logger.info("Event: " + obj + " acquired by " + Thread.currentThread().getName()); 
         final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass)); 
         final MessageResults results = eventProcessors.processEvent(event 
           ); 
         if ("Success".equals(results.getStatus())) { 
          // commit the processed message which changes 
          // the offset 
          kafkaConsumer.commitSync(); 
          logger.info("Message processed sucessfully"); 
         } else { 
          kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()); 
          logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset()); 
          break; 
         } 

        } 
       } 

      } 
      // TODO add return 

     } catch (final Exception e) { 
      logger.error("Consumer has failed with exception: " + e, e); 
      shutdown(); 
     } 

スレッドのrunメソッドで起こるのコンストラクタで、このようになります。ほとんどの場合、各レコードはデータベース内のレコードをコミットします。プロセッサがエラー(System ExceptionまたはValidationException)をスローした場合、コミットせずにシークを設定して、後続のポーリングがそのグループIDのオフセットから返るようにします。

今疑いがあるのは、正しいアプローチですか?エラーが発生し、オフセットを設定すると、それが修正されるまで他のメッセージは処理されません。これは、DBに接続できないようなシステムエラーのために働くかもしれませんが、問題がそのイベントのみであり、他のレコードはこのレコードを処理できない場合、他のレコードは処理できません。 ErrorTopicという概念を考えました。エラーが発生すると、消費者はErrorTopicにそのイベントを公開し、その間に他の後続イベントを処理し続けます。しかし、私たちは以前の経験から、JMSの設計コンセプトをkafkaに取り込もうとしているようで、カフカのエラー処理を解決する良い方法があるかもしれません。また、エラートピックから再処理することで、いくつかのシナリオでは望ましくないメッセージの順序が変更される可能性があります。

Kafka基準に従ってプロジェクトでこのシナリオをどのように処理したか教えてください。

-Tatha

+0

あなたはメッセージハブを使用しているか、Bluemixでカフカブローカーを自分で実装したのですか? – ValerieLampkin

+0

@ValerieLampkinメッセージハブを使用しています – Tatha

答えて

1

問題は、我々は他のどの正しいのレコード

と使用するように提案を処理することができる文句を言わない、この一つのレコードを処理するだけで他の人そのイベントにしていない場合エラーのトピックが考えられる可能性があります。

onPartitionsAssignedの扱いでは、消費者がコミットしたオフセットを本質的に使用していないことに気付きました。

あなたはオフセット犯し、最後の正常から再起動する場合は、あなたが同じで3人の消費者を持つ、ということを知っているように見えるけれども最後に、私は、指摘したいと思いますseek

を行うべきではありません1つのパーティションにサブスクライブされているグループ - 3つのうち2つがアイドル状態になることを意味します。

HTH 江戸

+0

ありがとうございます。はい、デバッグ中です。私たちが終了しようとしていることに気付きました。その結果、そのアプリケーションがダウンしていて、その間に新しいイベントが送信された場合、それを処理した消費者はいませんでした。私たちは終わりを求める努力を取り除いた。また、同じコンシューマ・グループの1つのパーティションを指している複数のスレッドが動作しないことに言及したように、私たちはそれを1つのスレッドに変更しました。 – Tatha

関連する問題