2017-09-25 12 views
0

私はカフカプロデューサとコンシューマをテストするための簡単なJava APIを作成しようとしています。 Macマシン上の別の端末でプロデューサとコンシューマを実行しているときにうまく動作します。しかし、私はこのエラーを取得し、Java APIのコードを使用してカフカサーバーに接続しようとしたとき:スレッドでApache KafkaプロデューサコンシューマAPIの問題

例外kafkatest2.ProducerTest.mainで「メイン」のjava.lang.NullPointerException (ProducerTest.java:34)

プロデューサーコード:

package kafkatest2; 
import java.util.Properties; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
public class ProducerTest { 
    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = null; 
    try { 
      producer = new KafkaProducer<>(props); 
     for (int i = 0; i < 10; i++) { 
     String msg = "Message " + i; 
     producer.send(new ProducerRecord<String, String>("tested", msg)); 
     System.out.println("Sent:" + msg); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 

    } finally { 
    System.out.println("last"); 
     producer.close(); 
    } 

    } 

} 

コンシューマコード:

package kafkatest2; 

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 

public class ConsumerTest { 

    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", "127.0.0.1:2181"); 
    props.put("group.id", "test-consumer-group"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> kafkaConsumer1 = new KafkaConsumer<>(props); 
    kafkaConsumer1.subscribe(Arrays.asList("tested")); 
    while (true) { 
     ConsumerRecords<String, String> records = kafkaConsumer1.poll(10); 
     for (ConsumerRecord<String, String> record : records) { 
     System.out.println("Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId()); 
     } 
    } 

    } 

} 

私に何が足りないのか教えてください。設定値に問題はありますか?

おかげで、 Vipul

答えて

0

は、スタックトレースは、あなたの生産者コード内のNPEを示しています

Exception in thread "main" java.lang.NullPointerException at 
kafkatest2.ProducerTest.main(ProducerTest.java:34) 

producer = new KafkaProducer<>(props);が例外をスローした場合、この現象が発生することができます。その場合、finallyブロックを入力すると、ローカル変数producerは定義されないので、producer.close()を呼び出すとNPEがスローされます。 if (producer != null)ブロックでコールを閉じるだけです。

+0

マイケルにコメントしていただきありがとうございます。しかし、問題はなぜプロデューサー=新しいKafkaProducer <>(小道具)ですか?エラーを投げていますか?私はそれを解決することができません。 –

+0

上記の貼り付けたプロデューサコードが私のために機能します(Kafka 0.11.0.1でテスト済み)。それでも問題が解決しない場合は、log4jログを有効にすることをお勧めします。 Consumerの場合、 'zookeeper.connect'を削除し、Producerの場合と同様に' bootstrap.servers'に置き換える必要があります。 –

+0

ええ、そのうまくいっています。ありがとう –

関連する問題