2016-06-15 17 views
2

Iセットアップ単一ノードカフカの仕事とそのような単純なパブリッシュ/サブスクライブパターンしようとしない:私はコードでいくつかのメッセージを生成し、私のラップトップからハイレベルカフカのコンシューマAPIは

を:

Properties props = new Properties(); 
    props.put("bootstrap.servers", "192.168.23.152:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = new KafkaProducer<>(props); 
    for (int i = 0; i < 10; i++) 
     producer.send(new ProducerRecord<String, String>("tp3", Integer.toString(i), "hello " + Integer.toString(i))); 

    producer.close(); 

とIまた、単純な消費者書か:

Properties props = new Properties(); 
    props.put("bootstrap.servers", "192.168.23.152:9092"); 
    props.put("group.id", "g1"); 
    props.put("client.id","client1"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "latest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("tp3")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     TimeUnit.SECONDS.sleep(1000); 
    } 

をしかし、消費者は何も取得しませんでした

誰でも何が起こったのか教えてください。私はメッセージを取得するには、コンソールコマンドを使用し、それは完全に働いたので、私は」 は、プロデューサーの仕事も確信している(ここで私は実績のあるアタッチ画像)どれが高く評価され役立ちます enter image description here

:(:(:(

+0

バージョンについて不明な点がある場合は、2.11-0.10.0.0です(スクリーンショット参照)。私は同じことで苦労している。プロデューサは正常に動作しますが、コンシューマAPIは自分が何をしても動作したくありません。 localhostと "enable.auto.commit"をfalseに使用していることを除いて、私はあなたとほとんど同じコードを実行しています。 –

+0

私は設定パラメータで少しテストしています。私はポーリング時間を1000と5000に増やしました。消費者を動かすと、実際には動作します - 時には、ここでは私がまだ特定していないいくつかの問題があります。 –

+0

私はVpn(私たちのVPNポリシーは多くのポートを禁止しています)を介してそれらを接続しました。私は9092ポートにアクセスする権限がありませんでしたので、私はそれが大きな理由だと思います。私は多くの他のケースでソックスプロキシを使用していました:((しかし、Kafkaで失敗しました –

答えて

0

よりますカフカFAQへ:

なぜ消費者がどのようなデータを取得することはありませんん

デフォルトでは、消費者はスタートのとき?トピックのすべての既存データを無視し、コンシューマーの開始後に新しいデータのみを消費します。このような場合は、コンシューマーの起動後にさらにデータを送信してみてください。あるいは、0.9で新しい消費者に対してauto.offset.resetを "最も早く"設定し、古い消費者に対して "最小"に設定して、コンシューマを設定することもできます。

関連する問題