2016-07-26 4 views
2

私たちは、プロジェクトのためにカフカを使い始めました。私たちはkafka_2.11-0.9.0.0を使用しています。私はKafkaConsumerに関連するいくつかのクエリを持っています。カフカコンシューマーによるポーリングと再接続

1)ZookeeperとKafkaサーバーを起動する前にKafka Consumerを開始しましたが、引き続きKafkaConsumerクライアントが接続できました。私は、コード

Consumer<String, String> consumer = new KafkaConsumer<String,String>(props); 
    consumer.subscribe(getConsumerRegisteredTopics()); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
     for (ConsumerRecord<String, String> record : records){ 
      processRecord (record) 
     } 
    } 

2)私は、飼育係は、ポーリング(ロングタイムアウト)メソッド呼び出しを使用することにより、アクティブ消費者のトラックを保持し、読みの行を次のようしています。私がLong.MAX_VALUEを使ってpoll()でタイムアウトした場合、zookeeperは私の消費者をどのように追跡しますか? KafkaConsumerの世論調査通話の動作を理解するのを手伝ってください。

ありがとうございます。

答えて

1

1)あなたが消費者を始める前に動物園とカフカを開始しなかった場合、それは接続できませんが、カフカからメタデータを読み込もうとします。私の経験では、KafkaConsumerの「投票」コールは、メタデータに接続して読み取ることができるまで、不定期にブロックされます。言い換えればあなたの消費者は実際にはつながっていませんが、カフカクラスターが現れるのを待っています。

2)ポーリングタイムアウトは、データを返すまでの待機時間を消費者に通知します。ポールリターンの後に、あなたの消費者がアクティブな状態を維持できるように、すぐにポールを再度呼び出すようにする必要があります。ポールコールに与えられたタイムアウトは、KafkaConsumerのキープアライブメカニズムとは関係ありません(これは、コンシューマのコンシューマプロパティのsession.timeout.msのプロパティによって制御されます)。

+0

ご回答いただきありがとうございます。私の消費者が投票方法でLong.MAX_VALUEを待っている場合、それはどのようにハートビートを送信し、Kafkaサーバ/ Zookeeperは私の消費者がまだ生きているかを知るでしょう。 – user1874156

+0

ハートビートはアプリケーションコードが生きていることを確認するためのものです。あなたのプログラムの制御の流れがポーリング方法の内側にある限り、心拍を気にする必要はありません。 –

関連する問題