2016-07-22 1 views
0

私はKafka 0.9.0.1を使用しています。カフカ:トピックからの最初のメッセージを消費するときの断続的な遅さ

それは私が(別のconfigsで)異なるカフカブローカーを使用しました

トピックから「最新」のメッセージを取得するために、20〜30秒かかり、私は自分のアプリケーションを起動する初めて

まだ私はまだ、この動作を参照してください。通常、後続のメッセージには遅さはありません。

これが期待どおりの動作ですか?あなたは明らかに、このサンプルアプリケーションを実行し、独自の設定にブローカー/トピック名を変更することにより、以下、これを見ることができます

あなたが指定したコンシューマ・グループに新しい消費者を起動したときので、最初のメッセージは残りの部分よりも長く取る必要があります
public class KafkaProducerConsumerTest { 

    public static final String KAFKA_BROKERS = "..."; 
    public static final String TOPIC = "..."; 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
     new KafkaProducerConsumerTest().run(); 
    } 

    public void run() throws ExecutionException, InterruptedException { 
     Properties consumerProperties = new Properties(); 
     consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     consumerProperties.setProperty("group.id", "Test"); 
     consumerProperties.setProperty("auto.offset.reset", "latest"); 
     consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC); 
     Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume()); 

     Properties producerProperties = new Properties(); 
     producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC); 
     kafkaProducer.publish("Test Message"); 
    } 
} 


class MyKafkaConsumer { 
    private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class); 
    private KafkaConsumer<String, Object> kafkaConsumer; 

    public MyKafkaConsumer(Properties properties, String topic) { 
     kafkaConsumer = new KafkaConsumer<String, Object>(properties); 
     kafkaConsumer.subscribe(Lists.newArrayList(topic)); 
    } 

    public void consume() { 
     while (true) { 
      logger.info("Started listening..."); 
      ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE); 
      logger.info("Received records {}", consumerRecords.iterator().next().value()); 
     } 
    } 
} 

class MyKafkaProducer { 
    private KafkaProducer<String, Object> kafkaProducer; 
    private String topic; 

    public MyKafkaProducer(Properties properties, String topic) { 
     this.kafkaProducer = new KafkaProducer<String, Object>(properties); 
     this.topic = topic; 
    } 

    public void publish(Object object) throws ExecutionException, InterruptedException { 
     ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object); 
     Future<RecordMetadata> response = kafkaProducer.send(producerRecord); 
     response.get(); 
    } 

} 

答えて

1

ステートメントconsumerProperties.setProperty("group.id", "Test");によって、Kakfkaは各パーティションが1つのコンシューマによって消費され、トピックのパーティションを複数のコンシューマプロセスに分散するようにパーティションのバランスをとります。

また、Kafka 0.9では、消費者グループの各消費者のオフセットを管理するためにKafkaが使用する個別の__consumer_offsetsトピックがあります。最初にコンシューマーを始めたときには、このトピックを見て最新のオフセットを取得している可能性があります(消費者がこのトピックから先に消滅していた可能性があります。正しいオフセット)。

これらの2つの要素は、最初のメッセージセットの消費に大きな遅延を発生させます。私は20-30秒の正確な待ち時間についてコメントすることはできませんが、これはデフォルトの動作でなければならないと思います。

PS正確な数は、ブローカー&を実行しているかどうか(ネットワーク待ち時間がない場合)か、TCPを使用して通信する別のもの。

0

最小ロギングの追加でコードを試してみました。典型的なログ出力は次のとおりです。

2016-07-24 15:12:51,417 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,604 producer has send message|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,619 producer got response, exiting|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Received records [Test Message]|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:54,680 returning on empty poll result|INFO|KafkaProducerConsumerTest 

イベントシーケンスは、予期されたとおりに、タイムリーに行われます。消費者はポーリングを開始し、プロデューサはメッセージを送信して結果を受信し、消費者はメッセージを受信し、すべてこれを300msで受信します。その後、消費者は再びポーリングを開始し、それぞれ3秒後にポーリングタイムアウトを変更すると投げ捨てられます。

私はブローカーとクライアントライブラリ用にKafka 0.9.0.1を使用しています。接続はlocalhost上にあり、負荷のないテスト環境です。

完全性のために、上記の交換によってトリガーされたサーバーのログ形式を以下に示します。

[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 0 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Stabilized group Test generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,617] INFO [GroupCoordinator 0]: Assignment received from leader for group Test for generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,635] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,637] INFO [GroupCoordinator 0]: Group Test generation 1 is dead and removed (kafka.coordinator.GroupCoordinator) 

同じExchangeのサーバーログと比較したい場合があります。 this linkによると

+0

試していただきありがとうございます、私は断続的にその瞬間的な動作を見ましたが、あなたが数回それを試してくださいしていない場合は、遅延が表示されます。また、私はあなたの理論に感謝しますが、私はまた、 "リスニングを始める"の後に2回目のメッセージを公開しました。それでも20秒ほどかかります – DJ180

0

(あなたの消費者にgroup_id=Noneを設定してみてください、または(consumer.closeを呼び出す) スクリプトを終了する前、または割り当てを使用するには)(サブスクライブしません)。それ以外の場合は、知られているが応答の遅いメンバーが存在する既存のグループに が再び参加します。グループコーディネーターの はのメンバーがcheckin/leave/timeoutまで待機するです。 消費者はもはや存在しません(以前のスクリプトが実行されています)ので、 がタイムアウトします。 また、consumer.poll()はグループの再調整中にブロックされます。

あなたが応答しないメンバーとグループに参加している場合は正しく動作します(多分アプリケーションを正常に終了させるかもしれません)。

アプリケーションを終了する前に「consumer.close()」を呼び出すことを確認してください。

関連する問題