2016-03-28 16 views
1

私のサンプルプログラムでは、ファイルを公開してすぐに消費しようとします。しかし、私の消費者イテレータはnullを返します。 私が間違っていることは何ですか?その上でKafka消費者が空のイテレータを返す

テスト

**main(){** 

KafkaMessageProducer producer = new KafkaMessageProducer(topic, file); 
     producer.generateMessgaes(); 

     MessageListener listener = new MessageListener(topic); 
     listener.start(); 
} 

のMessageListener

public void start() { 



     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE)); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector 
       .createMessageStreams(topicCountMap); 

     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
     executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE); 

     for (KafkaStream<byte[], byte[]> stream : streams) { 
      System.out.println("The stream is --"+ stream.iterator().makeNext().topic()); 
      executor.submit(new ListenerThread(stream));  
     } 
     try { // without this wait the subsequent shutdown happens immediately before any messages are delivered 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     if (consumerConnector != null) { 
      consumerConnector.shutdown(); 
     } 
     if (executor != null) { 
      executor.shutdown(); 
     } 
    } 

ListenerThread

public class ListenerThread implements Runnable { 
     private KafkaStream<byte[], byte[]> stream; 

     public ListenerThread(KafkaStream<byte[], byte[]> msgStream) { 
      this.stream = msgStream; 
      System.out.println("----------" + stream.iterator().makeNext().topic()); 
     } 

public void run() { 
     try { 

      ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
      while (it.hasNext()) { 
       // MessageAndMetadata<byte[], byte[]> messageAndMetadata = 
       // it.makeNext(); 
       // String topic = messageAndMetadata.topic(); 
       // byte[] message = messageAndMetadata.message(); 
       System.out.println("111111111111111111111111111"); 
       FileProcessor processor = new FileProcessor(); 
       processor.processFile("LOB_TOPIC", it.next().message()); 
      } 

iteratorはnullなので、whileループの中には入っていません。しかし、私は同じ話題に単一のメッセージを公開しており、消費者はその話題を聞いていると確信しています。

すべてのヘルプは、私は昨日、この同じ問題を抱えていた

+0

私は同じ問題を抱えています。メッセージがコンシューマ端末に届いているのを見ることができますが、私のアプリは「空のイテレータ」を表示しています。それは以前から仕事をしていて、突然起き始めました。理由を知っている人なら、私は感謝しています。 – Mhoque

答えて

1

をいただければ幸いです。それをしばらく使ってみると、私は現在の話題から読むことができませんでした。だから私は次のステップを踏んだ。

a。私の消費者は、

bを停止しました。プロデューサーを停止しました

c。カフカサーバーを停止しました bin/zookeeper-server-stop.sh config/zookeeper.properties

d。飼い猫を止めました bin/zookeeper-server-stop.sh config/zookeeper.properties

その後、私は自分の話題を削除しました。 bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

「マルチブローカークラスタの設定」に従って作成されたファイルも削除しましたが、それが問題を作り出したと思う。

a。 Zookeeperを開始しました b。カフカ開始 c。プロデューサーを始めて、いくつかのメッセージをKafkaに送ってください。

それは再び働き始めました。これがあなたを助けるかどうかはわかりません。しかし、何とか私のプロデューサーが消費者から切り離されているに違いないようです。お役に立てれば。

+0

私はこの問題をどのように克服したのかわかりません。しかし、今私はkafkaユーザーのメーリングリストに投稿した同様の問題に直面しています。 Subject "私のプロデューサーが生産していて、なぜ消費者が消費できなかったのですか?それは@ poll()をスタックしました。 – Ratha

関連する問題