私たちは、プロジェクトのためにカフカを使い始めました。私たちはkafka_2.11-0.9.0.0を使用しています。私はKafkaConsumerに関連するいくつかのクエリを持っています。カフカコンシューマーによるポーリングと再接続
1)ZookeeperとKafkaサーバーを起動する前にKafka Consumerを開始しましたが、引き続きKafkaConsumerクライアントが接続できました。私は、コード
Consumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}
2)私は、飼育係は、ポーリング(ロングタイムアウト)メソッド呼び出しを使用することにより、アクティブ消費者のトラックを保持し、読みの行を次のようしています。私がLong.MAX_VALUEを使ってpoll()でタイムアウトした場合、zookeeperは私の消費者をどのように追跡しますか? KafkaConsumerの世論調査通話の動作を理解するのを手伝ってください。
ありがとうございます。
ご回答いただきありがとうございます。私の消費者が投票方法でLong.MAX_VALUEを待っている場合、それはどのようにハートビートを送信し、Kafkaサーバ/ Zookeeperは私の消費者がまだ生きているかを知るでしょう。 – user1874156
ハートビートはアプリケーションコードが生きていることを確認するためのものです。あなたのプログラムの制御の流れがポーリング方法の内側にある限り、心拍を気にする必要はありません。 –