2017-07-09 11 views
0

私はApache kafka文書に従ってkafkaを学んでいます。私はそれをデフォルト設定で開始しました。カフカコンシューマーAPIコンシューマー.poll()は間違っていて例外もブロックもしません。

bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties & 

私が生産するkafka-console-producer.shとkafka-console-consumer.shを実行し、メッセージが成功した消費。プロデューサAPIを使用してJavaコードを記述してメッセージを生成するのはOKです。これはkafka-console-consumer.shによって検証されます。コードはApache kafka guideと同じです。

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer<>(props); 
for(int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 

producer.close(); 

プロデューサコードは動作しますが、コンシューマコードは動作しません。それは何のExcption、ノー間違ってませんが、ちょうどconsumer.poll(100)でブロックするも、Apacheのカフカの文書から【選択コード:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("my-topic")); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) 
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
} 

ところで、Apacheのカフカでkafka-console-consumer.shの例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

しかし、私は飼育係を接続しますが、直接カフカブローカーを接続していない場合:文書が成功したが、プロデューサーで話題の「テスト」に生成メッセージを消費しています。それは間違いなく例外もなく機能しませんが、ただブロックします。彼らはメッセージを消費することはできませんなぜ

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 

カフカのバージョンとAPIのバージョンは0.11.0.0

のですか?

+0

こんにちは私は同じポイントで立ち往生していますか?あなたは何とかそれを働かせることができましたか? –

答えて

0

--zookeeperパラメータは、古いコンシューマを使用することを意味し、Zookeeperサーバー(localhost:2181)を指定しているため、正常に動作します。 Kafkaブローカを指定する場合(新しいコンシューマを使用する場合)、 - zookeeperを使用しているのに有効なKafkaブローカアドレス(localhost:9092)を渡している--bootstrap-serverオプションを使用する必要があります。コンソールコンシューマアプリケーションの場合、 - bootstrap-server localhost:9092の代わりに--zookeeper localhost:9092の設定が必要です。 コードに関しては、ポーリング方法がブロックされていることを確認してください。レコードがなくブロックされていない場合は、100ミリ秒後に終了する必要があります(指定したタイムアウト)。それからあなたのコードから、プロデューサが「my-topic」に、消費者が「foo」と「bar」にサブスクライブしていることがわかります。最後に、コンソールコンシューマは「テスト」から読み込みます。すべての異なるトピック!

+0

私の質問の説明には申し訳ありません。最初は私が消費者の話題 "私の話"でしたが、間違って書いています。もう1つは、--bootstrap-server localhost:9092です。しかし、それはあまりにも機能しません。 – Robinhood

+0

コンソールコンシューマは、別のコンソールプロデューサがトピック「テスト」にメッセージを生成するため、「テスト」から読み取ります。 – Robinhood

0

props.put( "auto.offset.reset"、 "smallest"); このプロパティを追加すると動作します

+0

それは動作しません。また、auto.offset.resetの値は最新の、最も早い、noneのいずれかです – Robinhood

関連する問題