2016-03-24 15 views
1

私は消費者を書くためにkafka 2.11バージョンを使用しています。私は継続的にタイムアウト例外を取得しています。私はここで正しいAPIを使用しているかどうかわからないkafka.consumer.ConsumerTimeoutExceptionを克服する方法

誰でも助けてくれますか?

キュータ

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

     public class MessageListener { 
      private Properties properties; 

      private ConsumerConnector consumerConnector; 
      private String topic; 
      private ExecutorService executor; 

      public MessageListener(String topic) { 
       this.topic = topic; 

       KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); 
       try { 
        properties = confLoader.loadConsumerConfig(); 
        ConsumerConfig consumerConfig = new ConsumerConfig(properties); 
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 
       } catch (FileNotFoundException e) { 
        e.printStackTrace(); 
       } 
      } 

      public void start(File file) { 

       Map<String, Integer> topicCountMap = new HashMap<>(); 
       topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE)); 

       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector 
         .createMessageStreams(topicCountMap); 
       List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
       executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE); 

       for (KafkaStream<byte[], byte[]> stream : streams) { 
        executor.submit(new ListenerThread(stream)); 

       } 
      } 


     } 

スレッド

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.ConsumerTimeoutException; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
public class ListenerThread implements Runnable { 
    private KafkaStream<byte[], byte[]> stream;; 

    public ListenerThread(KafkaStream<byte[], byte[]> msgStream) { 
     this.stream = msgStream; 

    } 

    @Override 
    public void run() { 
     try { 

      ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

      while (it.hasNext()) { 
       MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext(); 
       String topic = messageAndMetadata.topic(); 
       byte[] message = messageAndMetadata.message(); 
       System.out.println("111111111111111111111111111"); 
       FileProcessor processor = new FileProcessor(); 
       processor.processFile(topic, message); 
      } 
} catch (ConsumerTimeoutException cte) { 
      System.out.println("Consumer timed out"); 
     } 

     catch (Exception ex) { 
      ex.printStackTrace(); 
     } 

    } 
} 
+1

2.11は実際にはそのカフカと互換性のあるスカラーバージョンです。おそらくあなたはkafka 0.9.xを使用していますか? – Nautilus

+0

@nautilus oh..im 2.11 with java....これは間違っていますか? Javaの最新バージョンは何ですか? – Ratha

+0

ここでカフカのバージョンを確認することができます。http://kafka.apache.org/documentation.html – Nautilus

答えて

1

あなたは、この例外がスローされたくない場合は、consumer.timeout.ms=-1を設定することができます。

+0

これは質問に対する答えを提供しません。批評をしたり、著者の説明を求めるには、投稿の下にコメントを残してください。 - [レビューから](レビュー/低品質の投稿/ 11759353) – Rob

+0

質問はどのようにkafka.consumer.ConsumerTimeoutExceptionを克服するのですか?そのプロパティを-1に設定することはそれを克服する方法です。 – Nautilus

+0

@nautilus "consumer.timeout.ms"プロパティを提供しない場合、それはどういう意味ですか? – Ratha

関連する問題