2016-11-02 6 views
4

私はkafkaを初めて使用しています。開始するにはドキュメントを読んでいますが、今は組み込みカフカモードを使用しています。同じサンプルプログラムを試しました。NoSuchElementException kafka

public static void main(String args[]) throws InterruptedException, IOException { 

    // setup Zookeeper 
    EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); 
    String zkConnect = ZKHOST + ":" + zkServer.port(); 
    ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); 
    ZkUtils zkUtils = ZkUtils.apply(zkClient, false); 

    // setup Broker 
    Properties brokerProps = new Properties(); 
    brokerProps.setProperty("zookeeper.connect", zkConnect); 
    brokerProps.setProperty("broker.id", "0"); 
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); 
    brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT); 
    KafkaConfig config = new KafkaConfig(brokerProps); 
    Time mock = new MockTime(); 
    KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

    // create topic 
    AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); 

    // setup producer 
    Properties producerProps = new Properties(); 
    producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); 
    producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); 
    producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); 
    List<PartitionInfo> partitionInfo = producer.partitionsFor("test"); 
    System.out.println(partitionInfo); 
    // setup consumer 
    Properties consumerProps = new Properties(); 
    consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); 
    consumerProps.setProperty("group.id", "group0"); 
    consumerProps.setProperty("client.id", "consumer0"); 
    consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); 
    consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic 
    KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
    consumer.subscribe(Arrays.asList(TOPIC)); 

    // send message 
    ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); 
    producer.send(data); 
    producer.close(); 

    // starting consumer 
    ConsumerRecords<Integer, byte[]> records = consumer.poll(1000); 

    Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); 
    ConsumerRecord<Integer, byte[]> record = recordIterator.next(); 
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 


    kafkaServer.shutdown(); 
    zkClient.close(); 
    zkServer.shutdown(); 
    } 

} 

が、Iamはできない誰も私を導くことができるプログラムの開発

java.util.NoSuchElementException 
at  org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) 
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105) 

を実行しながらtopics.Iamは、次の例外を取得するためのデータをフェッチ?

UPDATED-

WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties 
WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties 
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} 
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} 
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} 
    [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = [0,]] 
ERROR [main] (NIOServerCnxnFactory.java#uncaughtException:44) - Thread Thread[main,5,main] died 
java.util.NoSuchElementException 
at  org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) 
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105) 
+0

完全なスタックトレースを投稿 – rafid059

+0

私は質問を更新しました。それはWARN [kafka-producer-network-thread | (エンベデッド・カフカ)を作成しましたか?(NetworkClient.java#handleResponse:600) - 相関IDが0のメタデータを取得中にエラーが発生しました:{test = LEADER_NOT_AVAILABLE} – bhavanak

+0

'EmbeddedKafka.java'を記述しましたか? 105行目を見てください。何か間違ったことをしています。そのブロックを投稿する – rafid059

答えて

1

生成されたメッセージが実際にディスクに永続化されることを確認するためのメッセージを読む前に)(producer.flushを呼び出すようにしてください。

0

このエラーは、消費者がkafkaログに保存される前でもメッセージを読み取ろうとしていることを示しています。理想的には、プロデューサとコンシューマを別のプロセスとして実行する必要があります私は同じ問題に直面していたが、それは他の理由によるものだった。iterator.next()が間違って2回呼び出された。他の誰かが同じ問題に直面している場合に備えて。

関連する問題