2016-06-29 6 views
0

Apache Kafkaにキューイングシステムを構築しました。アプリケーションは特定のKafka topicへのメッセージを生成し、消費者側ではそのトピックに生成されたすべてのレコードを消費する必要があります。
新しいJava Consumer APIを使用してコンシューマを作成しました。 コードはここkafkaコンシューマ(新しいコンシューマAPI)を永久に実行しています

Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBrokerIp+":9092"); 
        props.put("group.id",groupId); 
        props.put("enable.auto.commit", "true"); 
        props.put("session.timeout.ms", "30000"); 
        props.put("auto.offset.reset", "earliest"); 
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("consumertest")); 
        while (true) { 
         ConsumerRecords<String, String> records = consumer.poll(100); 
         for (ConsumerRecord<String, String> record : records){ 
          System.out.println("Data recieved : "+record.value()); 
          } 
        } 

のように見える私は、任意のレコードが即座に消費され、処理すべきプロデューサーカフカのトピックの中に押し込まれるように永遠に消費者を実行する必要があります。
私の混乱は、サンプルコードのような無限のwhileループを使用してデータを消費する正しい方法ですか?

答えて

0

これは私にとってはうまくいくものですが、例外をスローする場合に備えて、内部ループをtry/catchブロックに入れたい場合があります。切断すると、定期的な再接続タスクも考慮してください。

0

はい、無限ループを使用できます。実際、それはビジーなループではありません。各ポーリング中にデータが利用できない場合、コールは所定の時間待機します。

long millisToWait = 100; 
consumer.poll(millisToWait); 

新しいコンシューマは、ネットワーク通信の問題を自動的に処理します。シャットダウン時に消費者が正常に閉じることを確認してください。

関連する問題