2016-03-24 19 views
0

カフカブローカーとローカルホスト上で動作するカフカプロデューサーを使用して、本当に簡単なアプリケーションを作成しようと数日間苦労しました。 Googleの同様の問題への答えはまだそれを動作させることができませんでした。 kafka.producer.async.DefaultEventHandler - トピックのリクエストを送信できませんでした。

この

は私が取得エラーです:

INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] ERROR kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: clicks 
[ProducerSendThread-] INFO kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 3 

そして、これは私のコードです:私はすでに飼育係のインスタンスが正常にローカルホスト上で実行されている

Properties properties= new Properties(); 
properties.put("broker.id", "1"); 
properties.put("advertised.host.name", "localhost"); 
properties.put("advertised.port", "9092"); 
properties.put("host.name", "localhost"); 
properties.put("auto.create.topics.enable","true"); 
properties.put("zookeeper.connect", zookeeperConnectString); 
properties.put("port","9092"); 
properties.setProperty("num.partitions", "1"); 
properties.setProperty("log.dirs", newPath(KAFKA_LOG_DIR).toString()); 

KafkaConfig kafkaConfig = new KafkaConfig(properties); 
KafkaServerStartable kafkaServer = new KafkaServerStartable(kafkaConfig); 
kafkaServer.startup(); 

String topic = clicks; 

ZkClient zookeeper = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); 
if (!AdminUtils.topicExists(zookeeper, topic)) { 
    AdminUtils.createTopic(new ZkClient(zookeeperConnectString), topic, 1, 1, new Properties()); 
    } 
zookeeper.close(); 

Properties producerProps = new Properties(); 
producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.setProperty("producer.type", "async"); 
producerProps.put("metadata.broker.list", "localhost:9092"); 
producerProps.put("request.required.acks","0"); 

Producer producer = new Producer(new ProducerConfig(producerProps)); 

String click = "exampleMessage"; 
producer.send(ImmutableList.of(new KeyedMessage(topic, click))); 
producer.close(); 

:2181。

私は、次のカフカのバージョンと飼育係使用しています:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.zookeeper</groupId> 
     <artifactId>zookeeper</artifactId> 
     <version>3.4.6</version> 
    </dependency> 

おかげで任意のヘルプやコメント:)ケース誰かに

+0

_describe_コマンドを使用して、コマンドラインでトピックのメタ情報を取得しようとしましたか? 'bin/kafka-topics.sh --describe - zookeeper localhost:2181 --topic clicks' – avr

+0

私はkafkaコマンドラインを使用できません。しかし私がキュレーターと動物園にアクセスすると、トピックのノードが作成されていることがわかります:/ brokers/topic/cliks。代わりに、ノード/ブローカー/ IDは空です。 – nicola

+0

、つまり、アクティブなカフカブローカーは存在しません。それで、少なくとも1人のカフカブローカーが立ち上げて稼動していることを確認してください。 – avr

答えて

0

は、同じ問題が発生しましたが、私はそれが仕事Thread.sleep(500);を追加作らトピックの作成後理由が分かりません。ローカルホストで実行中のZookeeperのインスタンスが初期化に時間がかかることが原因である可能性があります。

関連する問題