私はKafkaConsumer
の実装をjavaに持っていますが、現在は.poll
メソッドを終了することはありません。デバッグモードでソースコードにドリルダウンすると、コーディネーターが見つからないため、whileループで滞っていることがわかりましたAbstractCoordinator.ensureCoordinatorKnown()
にあります。KafkaConsumerは終了しません.pollメソッド - GroupCoordinatorNotAvailableException
ループでsendGroupMetadataRequest()
から返された未来は、初めてorg.apache.kafka.clients.consumer.internals.SendFailedException
で失敗し、以後毎回org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
で失敗します。誰がなぜこれが起こるのか知っていますか?
コンソールプロデューサ/コンシューマを使用しても、正常にメッセージを送受信できるようになるのは、私がKafkaConsumerの実装を使用する場合のみです。さらに、コンシューマーは2つのサーバーで作業するため、コンシューマーの実装ではありません。
は、ここに私の消費者が使用して作成されたプロパティです。
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
編集:
トピックは間違いなく消費者の開始前に作成されます。
編集2: クラスタ内のすべてのブローカーを削除して再作成しましたが、別の時点で失敗しています。 AbstractCoordinator.ensureActiveGroup()
に再参加しようとしているうちに、performGroupJoin()
から返された未来は、org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
で繰り返し失敗します。まだ何が起こっているのか分かりません。
編集3: 私はブローカーを削除し、異なるIDでそれらを再作成し、今.poll()
方法が戻っていると、それが成功したメッセージを消費しています。私はまだそれが最初に失敗した理由を知りたいのですが、私はそれが再び起こらないことを確かめることができます。
カフカのどのバージョンですか?ブローカーと消費者のための同じバージョン?カフカは本当にmyserver:9000(9092はデフォルト)を聞いていますか?コンシューママシンからTelnet経由でKafkaに接続できますか? –
私はKafka 0.9.1を使用しています。はいkafkaは本当にそこを聞いて、私はJavaの1つとして作成したコンソールの消費者のために同じブローカーを使用します。グーグルのtelnetだけで、サーバー上で実行されているかどうかわかりません。私がコンソールの消費者から消費することができることを既に知っているとき、それに接続すると何が証明されますか? – annedroiid