2017-08-16 5 views
0

私のカフカの消費者コードは以下の通りですが、消費者は1人だけです!リバランスのためコミットが完了できません

Properties consumerConfig = new Properties(); 
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "sandbox.hortonworks.com:6667"); 
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); 
     consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     @SuppressWarnings("resource") 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerConfig); 
     TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); 
     consumer.subscribe(Collections.singletonList("test1"), rebalanceListener); 
     HDFSAppendTrial example = new HDFSAppendTrial(); 
     String coreSite = "/usr/hdp/2.6.0.3-8/hadoop/etc/hadoop/core-site.xml"; 
     String hdfsSite = "/usr/hdp/2.6.0.3-8/hadoop/etc/hadoop/hdfs-site.xml"; 
     String hdfsFilePath = "/appendTo/Trial.csv"; 

     while (true) { 
      ConsumerRecords<String, String> records = consumer.poll(999999999); 
      for (ConsumerRecord<String, String> record : records) { 
       FileSystem fileSystem = example.configureFileSystem(coreSite, hdfsSite); 
       String res = example.appendToFile(fileSystem, record.value(), hdfsFilePath); 
       System.out.printf("%s\n", record.value()); 
       if (res.equalsIgnoreCase("success")) { 
        System.out.println("Successfully appended to file"); 
       } 
       else 
        System.out.println("couldn't append to file"); 
       example.closeFileSystem(fileSystem); 

      } 
      consumer.commitSync(); 


     } 

は、レコードなどを克服する方法を、上記の問題を解決してくれ、問題の原因を説明するための方法

enter image description here

のいくつかの数をストリーミングした後、次のエラーを取得しています!誰かが、私が持っているいくつかのより多くの要件を消費者にコードするのを助けることができますか?私を助け、また、少なくとも試し

答えて

0

consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"True"); 
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"99998"); 
consumerConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000"); 
consumerConfig.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"99999"); 

を与えるものに、私は上記のプロパティを追加することによってこの問題を解決することができた可能性が1のため、事前に

感謝プロパティオブジェクトのconsumerConfig

関連する問題