2017-02-27 10 views
2

AWSにKafka on DC/OS(Mesos)クラスタをインストールしました。 3つのブローカーを有効にし、 "topic1"というトピックを作成しました。Java用クライアントAPIを使用してDC/OS上のKafkaからのメッセージを消費している間にコンシューマがハングするのはなぜですか?

dcos kafka topic create topic1 --partitions 3 --replication 3 

次に、メッセージを送信するProducerクラスと受信するConsumerクラスを作成しました。

public class Producer { 
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException { 
     Map<String, Object> producerConfig = new HashMap<>(); 
     System.out.println("setting Producerconfig."); 
     producerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 

     ByteArraySerializer serializer = new ByteArraySerializer(); 
     System.out.println("Creating KafkaProcuder"); 
     KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer); 
     for (int i = 0; i < 100; i++) { 
      String msgstr = msg + i; 
      byte[] message = msgstr.getBytes(); 
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message); 
      System.out.println("Sent:" + msgstr); 
      kafkaProducer.send(record); 
     } 
     kafkaProducer.close(); 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     sendMessage("Kafka test message 2/27 3:32"); 
    } 

} 

public class Consumer { 
    public static String getMessage() { 
     Map<String, Object> consumerConfig = new HashMap<>(); 
     consumerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 
     consumerConfig.put("group.id", "dj-group"); 
     consumerConfig.put("enable.auto.commit", "true"); 
     consumerConfig.put("auto.offset.reset", "earliest"); 
     ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); 
     KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer); 

     kafkaConsumer.subscribe(Arrays.asList("topic1")); 
     while (true) { 
      ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100); 
      System.out.println(records.count() + " of records received."); 
      for (ConsumerRecord<byte[], byte[]> record : records) { 
       System.out.println(Arrays.toString(record.value())); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     getMessage(); 
    } 
} 

まず私はtopic1にメッセージを送信するために、クラスタ上でProducerを走りました。しかし、私がConsumerを実行したとき、それは何も受信できませんでした。ただハングします。私は

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning 

をインストールカフカに付属のシェルスクリプトを実行して、すべてのメッセージを取得することができたので、

Producerは機能していますが、なぜ私はConsumerで受け取ることができませんか?このpostは、古いオフセットを持つgroup.idが原因である可能性があることを示しています。プロデューサーではなく消費者のgroup.idのみを作成します。このグループのオフセットを設定するにはどうすればよいですか?

+0

group.idに問題がないことを確認するには、 'kafkaConsumer.seekToBeginning()'を使用してください。 –

+0

@ MatthiasJ.Sax 'consumerConfig.put(" auto.offset.reset "、" earliest ")を削除する必要があります。それで?購読したら、この行をどこに追加すればよいですか?これを追加しても何も得られません。 – ddd

+0

pollを呼び出すときにより長いタイムアウトを使用し、サーバー/クライアント側のログに例外がスローされますか? – amethystic

答えて

1

このように、kafkaConsumer.subscribe(Arrays.asList("topic1"));は、poll()をハングさせています。 Kafka Consumer does not receive messages によれば、トピックに接続するには、assignsubscribeという2つの方法があります。私がsubscribeを以下の行に置き換えた後、動作し始めました。

TopicPartition tp = new TopicPartition("topic1", 0); 
    List<TopicPartition> tps = Arrays.asList(tp); 
    kafkaConsumer.assign(tps); 

しかし、出力には期待されない数字の配列が表示されます(プロデューサが送信した文字列)。しかし、これは別の問題だと思います。

+1

いわゆる「別問題」は、あなたがバイトを受け取っていることです(カフカはフードの下でバイトを処理するので)。デシリアライザを使用する必要があります。キーは 'key.deserializer = org.apache.kafka.common.serialization.StringDeserializer'、値は別々です。 http://kafka.apache.org/documentation/を参照してください(ただし、SerDeの正確なページは見つかりません)。 –

+1

@JacekLaskowski説明をありがとう。別の投稿を保存しました – ddd

関連する問題